// Databricks notebook source exported at Tue, 28 Jun 2016 11:12:33 UTC
Scalable Data Science
Change Detection in Degree Corrected Stochastic BlockModel Graphs
Scalable data science project by Shakira Suwan
The html source url of this databricks notebook and its recorded Uji :
Network Analysis
Stochastic Blockmodel
Limitations:
- SBM treats all nodes within a community as stochastically equivalent.
- Thus does not allow for the existence of “hubs,” high-degree nodes at the center of many communities observed in real data.
- To address this issue, Karrer and Newman [20] proposed the degree-corrected stochastic block model, which can accommodate hubs.
Degree Corrected Stochastic Blockmodel
Change Detection
Ide and Kashima (2004) compare the current activity vector with the past average vector (profile vector) by computing the change score for time instant, as
The change scores of the nodes, at time is
Simulation Studies
Parameter Setting:
- K =3,
- n= 900
- , for
- , for
- 30 graph realizations
- G =21 is generated from one of the following change scenarios
Change scenarios:
Split
Form
Shift
Source Code
First let's import various libraries.
import breeze.linalg._
import breeze.numerics._
import breeze.stats.distributions._
Powerlaw Distribution
- This function returns random numbers generated from a powerlaw distribution.
def powerlawRnd(n:Int,xmin:Double, alpha:Double ):DenseVector[Double]={
val u=Range(0,n).map(v=>{
var r= new scala.util.Random
r.nextDouble()
}).toArray
val plx=u.map(a=>xmin*pow((1-a),(-1/alpha-1)))
new DenseVector(plx)
}
DCSBM Graphs
- This function returns an degree corrected stochastic blockmodel graph
def constructGraph(K:Int,p:DenseVector[Double], r:Double, theta1:DenseVector[Double],theta2:DenseVector[Double],lambda:Double,rho:DenseVector[Double],N1:Int,N2:Int):DenseMatrix[Double]={
var n1_block:DenseVector[Double]= rho :* N1.toDouble
n1_block = floor(n1_block)
var n1_ind = DenseMatrix.zeros[Double](K,2)
val tmp = accumulate(n1_block)
n1_ind(::,1) := tmp -1.toDouble
n1_ind(0,0) = 0;
n1_ind(1 to -1,0) := tmp(0 to -1-1)
var n2_block:DenseVector[Double]= rho :* N2.toDouble
n2_block = floor(n2_block)
var n2_ind = DenseMatrix.zeros[Double](K,2)
val tmp2 = accumulate(n2_block)
n2_ind(::,1) := tmp2 - 1.toDouble
n2_ind(0,0) = 0;
n2_ind(1 to -1,0) := tmp2(0 to -1-1)
var temp1 = DenseMatrix.fill(K,K){n1_block(1)}
var temp2 = DenseMatrix.fill(K,K){n2_block(1)}
var expectedNum = temp1 :* temp2
var p2:DenseVector[Double]=null
if(p.length==1){
var p1=p(0)
p2= DenseVector.fill(K){p1}
}else{
p2 = p
}
var p1 = DenseVector.rangeD(1,K+1,1):* p2//clear structure (p1>p2)
var B = (lambda*DenseMatrix.eye[Double](K):* diag(p1)) :* expectedNum :+ ((1-lambda) * DenseMatrix.fill(K,K)(r) :*expectedNum)
// generate node probabilities
var Z1 = DenseMatrix.zeros[Double](N1,K)
var Z2 = DenseMatrix.zeros[Double](N2,K)
var blockDeg1 = DenseVector.zeros[Double](K)
var blockDeg2 = DenseVector.zeros[Double](K)
var rowDeg = DenseVector.zeros[Double](N1)
var colDeg = DenseVector.zeros[Double](N2)
val n1Ind = n1_ind.mapValues(_.toInt)
val n2Ind = n2_ind.mapValues(_.toInt)
for(k <- 0 to K-1){
blockDeg1(k) = sum(theta1.slice(n1Ind(k,0),n1Ind(k,1)))
blockDeg2(k) = sum(theta2.slice(n2Ind(k,0),n2Ind(k,1)))
rowDeg.slice(n1Ind(k,0),n1Ind(k,1)) := theta1.slice(n1Ind(k,0),n1Ind(k,1)) :/ blockDeg1(k)
colDeg.slice(n2Ind(k,0),n1Ind(k,1)) := theta2.slice(n2Ind(k,0),n2Ind(k,1)) :/ blockDeg2(k)
Z1(n1Ind(k,0) to n1Ind(k,1), k) := 1.0;
Z2(n2Ind(k,0) to n2Ind(k,1), k) := 1.0;
}
var nodeProb = diag(rowDeg) * Z1 * B * Z2.t * diag(colDeg)
// create graph realizations
var W = DenseMatrix.zeros[Double](N1,N2)
for(i<-0 to N1-1){
for(j<- i+1 to N2-1){
var poi= Poisson(nodeProb(i,j))
W(i ,j) = poi.draw
}
}
var A = W
for(i<-0 to N1-1){
for(j<- i+1 to N2-1){
if(W(i,j)!=0.0)
A(i ,j) = 1.0
}
}
var A1 = upperTriangular(A) + upperTriangular(A).t
A1
}
Change Scenarios
- This function returns an changed dcsbm graph.
// create an anomaly graph, form, split, shift
case class graphAnomalyClass(W:DenseMatrix[Double],A1:DenseVector[Double], A2:DenseVector[Double])
def createA_anomaly(K:Int,p:Double, r:Double, theta1:DenseVector[Double],theta2:DenseVector[Double],lambda:Double,rho:DenseVector[Double],changeType:String, graphType:String):graphAnomalyClass={
var N1:Int = theta1.length
var N2:Int = theta2.length
val s1 = rho(0)*N1.toDouble
val s2 = rho(1)*N2.toDouble
var s1_temp = DenseVector.fill(K){s1}
var s2_temp = DenseVector.fill(K){s2}
val s1_cumsum = accumulate(s1_temp)
var s2_cumsum = accumulate(s2_temp)
val constD = DenseVector{0.0}
val constI = DenseVector{0}
var W_poi:DenseMatrix[Double]=null
var anomalies1:DenseVector[Double]=null
var anomalies2:DenseVector[Double]=null
changeType match{
case "form" =>
var pnew = DenseVector.fill(K)(p)
pnew(0) = pnew(0)*3.0
W_poi = constructGraph(K,pnew,r,theta1,theta2,lambda,rho,N1,N2)
anomalies1 = DenseVector.rangeD(1, s1_cumsum(0)+1,1)
anomalies2 = DenseVector.rangeD(1, s2_cumsum(0)+1,1)
case "split" =>
var stmp1 = DenseVector.vertcat(constD,s1_temp)
var stmp2 = DenseVector.vertcat(constD,s2_temp)
val k2 = K+1
val temp1 = stmp1(1)*0.5
val temp2 = stmp2(1)*0.5
stmp1(0)= temp1
stmp1(1)= stmp1(1) - temp1
stmp2(0)= temp2
stmp2(1)= stmp2(1) - temp2
val rho_tmp = stmp1 :/ N1.toDouble
anomalies1 = DenseVector.rangeD(1,s1_temp(0)+1.0,1)
anomalies2 = DenseVector.rangeD(1,s2_temp(0)+1.0,1)
var pnew = DenseVector.fill(k2){p}
for(i <- 1 to pnew.length-1){
pnew(i) = pnew(i)*(1.0/(i+1.0))
}
var rowV = DenseVector.range(1,k2,1)
rowV = DenseVector.vertcat(constI+1,rowV)
var pnew1 = pnew:*rowV.mapValues(_.toDouble)
W_poi = constructGraph(k2,pnew1,r,theta1,theta2,lambda,rho_tmp,N1,N2)
case "shift" =>
var stmp1 = s1_temp
val temp1 = stmp1(0)*0.5
stmp1(0) = stmp1(0) - temp1
stmp1(1) = stmp1(1) + temp1
var stmp2 = s2_temp
val temp2 = stmp2(0)*0.5
stmp2(0) =stmp2(0) - temp2
stmp2(1) =stmp2(1) + temp2
var rho_tmp = stmp1:/N1.toDouble
var pnew = DenseVector.fill(K){p}
anomalies1 = DenseVector.rangeD(stmp1(0),(stmp1(0)+temp1)+1.0)
anomalies2 = DenseVector.rangeD(stmp2(0),(stmp2(0)+temp2)+1.0)
W_poi = constructGraph(K,pnew,r,theta1,theta2,lambda,rho_tmp,N1,N2)
}
val result=graphAnomalyClass(W_poi,anomalies1,anomalies2)
result
}
def preprocessGraph(adj:DenseMatrix[Double],N1:Int,N2:Int):DenseMatrix[Double]={
val totalUniqueNode=N1.toDouble+N2.toDouble
val totalConnection = sum(adj)
val avgCon=(2.0*totalConnection.toDouble)/totalUniqueNode
val perturbationP = totalConnection / pow(totalUniqueNode,2)
var W = adj + perturbationP // perturbation
var degree = sum(W(*, ::))
var invDeg = pow(degree,-0.5)
var invD = diag(invDeg)
var normW = invD*W*invD
normW
}
Activity Vector
- compute the singular value decomposition (SVD) and extract the principle singular vector (left singular vector corresponding to the largest singular value)
def getActivity(edge:DenseMatrix[Double]):DenseVector[Double]={
val leftSingular= svd(edge)
var U =leftSingular.U
var Ac1 = abs(U(::,0))
Ac1
}
Activity Vectors in the first window
// perform SVD and get activity vectors for the first 5 graphs
def getFirstActivityVectors(N1:Int,N2:Int,edgeArray:Array[DenseMatrix[Double]],tw:Int):DenseMatrix[Double]={
var winActivity1= DenseMatrix.zeros[Double](N1,tw)
for(i<- 0 to (tw-1)){
var edge_i=edgeArray(i)
var W = preprocessGraph(edge_i,N1,N2)
winActivity1(::,i) := getActivity(W)
}
winActivity1
}
Let's use MLlib svd rountine next.
// MLlib svd
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
def getSvdSpark(winActivity:DenseMatrix[Double]):DenseVector[Double] = {
val tWinAc1_0 = winActivity.t
val arr = new Array[(Int, Array[Double])](tWinAc1_0.cols)
for(i <- 0 until tWinAc1_0.cols) {
arr(i) = (i, tWinAc1_0(::,i).toArray)
}
val rdd = sc.parallelize(arr)
rdd.cache()
val svd = new IndexedRowMatrix(rdd.map{case(row: Int, colVector: Array[Double]) => {
new IndexedRow(row, new org.apache.spark.mllib.linalg.DenseVector(colVector))
}}).computeSVD(5, true, 1e-9)
rdd.unpersist()
val uMatrix = svd.U.rows
val firstCol = DenseVector.zeros[Double](uMatrix.count.toInt)
uMatrix.collect.foreach{row => {
firstCol(row.index.toInt) = row.vector(0)
}}
abs(firstCol)
}
Generate DCSBM Graphs
// Parameter settings
var graphType:String = "unipartite"
var N1:Int = 900
var N2:Int = N1
var K1:Int = 3
var K2:Int = 3
var s1:Double = N1/K1
var s2:Double = N2/K2
var rho1 = DenseVector.fill(K1){s1/N1}
var rho2 = DenseVector.fill(K2){s2/N2}
var p =DenseVector[Double]{0.05} // within block probability
var r:Double = 0.008 // between block probabilty
var lambda:Double = 0.8 //signal in the graph
val nd: Int = 29 // graph realization sets
var beforeChange:String = "simple"
var changeType:String = "shift" // form, split, shift
val tw =5
// parameter for the powerLaw degree distribution
val beta:Double = 2.5
val xmin:Double = 1
var theta1 = DenseVector.zeros[Double](N1)
var theta2 = DenseVector.zeros[Double](N2)
if(graphType == "unipartite"){
N2 = N1
K2 = K1
theta1 = powerlawRnd(N1,xmin,beta) // generate powerlaw random variable
theta2 = theta1
}else{
theta1 = powerlawRnd(N1,xmin,beta) // type 1 node
theta2 = powerlawRnd(N2,xmin,beta) // type 2 node
}
// generate a dynamic network
val tChange = 20 // at day 21 is the change
var sparseA: Array[DenseMatrix[Double]]=new Array[DenseMatrix[Double]](nd+1)
var anomaly1:DenseVector[Double]=null
var anomaly2:DenseVector[Double]=null
for (m <- 0 to nd){
if(m<tChange && m >tChange){
sparseA(m) = constructGraph(K1,p,r, theta1,theta2,lambda,rho1,N1,N2)
}else{
val G = createA_anomaly(K2, p(0), r, theta1,theta2,lambda,rho2,changeType,graphType)
sparseA(m) = G.W
anomaly1 =G.A1
anomaly2 =G.A2
}
// create graph 1:20 from Dist A
// create graph 21 from Dist B
// create graph 22:30 graph Dist A
}
Computing node and time scores
// computing node scores and time scores from day 6 onwards
var winAc1_0 = getFirstActivityVectors(N1,N2,sparseA,tw)
var nt = sparseA.length-tw
var timeScore = DenseVector.zeros[Double](nt)
var nodeScore = DenseMatrix.zeros[Double](N1,nt)
for(i <- 0 to nt-1){
var edge_i= sparseA(i+tw)
var W_i = preprocessGraph(edge_i,N1,N2)
var winAc1 = winAc1_0
var refAc1 = getSvdSpark(winAc1) // get reference embedding
var Ac1 = getActivity(W_i) // get current embedding
timeScore(i) = 1 - norm(refAc1.t * Ac1)
var tmp =(Ac1.t * refAc1)* refAc1.t
for(j <- 0 to N1-1){
nodeScore(j,i) = abs((Ac1(j) - tmp.toDenseVector(j)))
}
var winAc_tmp = winAc1_0(::,1 to 4)
winAc1_0 = DenseMatrix.horzcat(winAc_tmp,Ac1.toDenseMatrix.t)
}
Results
//
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
def convertScore(matrix:DenseMatrix[Double]):DataFrame={
//val matrix = timeScore.toDenseMatrix
val n = matrix.rows
val m = matrix.cols
val tSb = sc.broadcast(matrix)
// creating schema for DataFrame matrix
val columns = for (i <- 0 until m) yield StructField(s"$i", DoubleType, false)
val indexColumn = Seq(StructField("index", IntegerType, false))
val schema = StructType(indexColumn ++ columns)
val df = sqlContext.createDataFrame(sc.parallelize(0 until n).map { v =>
val values = v +: tSb.value(v,::).t.toArray
Row(values: _*)
}, schema)
df
}
val tScoreDF = convertScore(timeScore.toDenseMatrix)
val nodeScoreDF = convertScore(nodeScore.toDenseMatrix)
tScoreDF.registerTempTable("timeScore_table")
nodeScoreDF.registerTempTable("nodeScore_table")
%r
timedf <- sql(sqlContext, "FROM timeScore_table SELECT *")
timeScore <- collect(timedf)
nodedf <- sql(sqlContext, "FROM nodeScore_table SELECT *")
nodeScore <- collect(nodedf)
%r
display(timeScore)
display(nodeScore)
//%r
/*data.matrix(timeScore, rownames.force = NA)
require(grDevices) # for colours
image(nodeScore)
x<- c(1:25)
timeScore[,2:5]
plot(x,timeScore[,2:26], type="o", xlab="day",
ylab="score" )*/
Split
NB: Changed day = 21, changed nodes are from 150-300
Form
NB: Changed day = 21, changed nodes are from 1-300
Shift
NB: Changed day = 21, changed nodes are from 150-300