SDS-2.2, Scalable Data Science
Archived YouTube video of this live unedited lab-lecture:
//imports
import org.apache.spark.sql.types.{StructType, StructField, StringType};
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame,Row}
import org.apache.spark.sql.types._
import sqlContext.implicits
import org.apache.spark.graphx._
import org.apache.spark._
import math._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.sql.types.{StructType, StructField, StringType} import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types._ import sqlContext.implicits import org.apache.spark.graphx._ import org.apache.spark._ import math._ import org.apache.spark.mllib.rdd.RDDFunctions._
Load data
// just read from dbfs
val TTTsDF = sqlContext.read.parquet("/datasets/MEP/GB/TTTsDFAsParquetDF")
val miniTTTsDF = spark.createDataFrame(TTTsDF.rdd, TTTsDF.schema).select("CurrentTweetDate","CurrentTwID","CPostUserID","CPostUserSN","OPostUserIDinRT","OPostUserSNinRT", "OPostUserIDinQT", "OPostUserSNinQT", "OPostUserIDinReply", "OPostUserSNinReply", "URLs", "hashTags", "UMentionAsID", "UMentionAsSN", "TweetType", "CurrentTweet", "Weight")
TTTsDF: org.apache.spark.sql.DataFrame = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 34 more fields] miniTTTsDF: org.apache.spark.sql.DataFrame = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 15 more fields]
val augmentedTweetsTTTDF = sqlContext.read.parquet("dbfs:/datasets/MEP/GB/AugmentedTTTsDFAsParquet/")
val augmentedTweetsTTTfiltered = augmentedTweetsTTTDF.filter($"CurrentTweetDate" >= "2017-05-01T00" && $"CurrentTweetDate" <= "2017-06-30T00")
augmentedTweetsTTTDF: org.apache.spark.sql.DataFrame = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 34 more fields] augmentedTweetsTTTfiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 34 more fields]
augmentedTweetsTTTfiltered.count()
res1: Long = 5248765
val unionTTTsDF = TTTsDF.union(augmentedTweetsTTTDF) // merge retrospective augmented data with the original tweets
unionTTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 34 more fields]
val TTTDF = spark.createDataFrame(unionTTTsDF.rdd, unionTTTsDF.schema).select("CurrentTweetDate","CurrentTwID","CPostUserID","CPostUserSN","OPostUserIDinRT","OPostUserSNinRT", "OPostUserIDinQT", "OPostUserSNinQT", "OPostUserIDinReply", "OPostUserSNinReply", "URLs", "hashTags", "UMentionAsID", "UMentionAsSN", "TweetType", "CurrentTweet", "Weight").filter($"CurrentTweetDate" >= "2017-05-01T00" && $"CurrentTweetDate" <= "2017-06-30T00")
TTTDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 15 more fields]
TTTDF.coalesce(40)
TTTDF.count()
res2: Long = 11084903
Extract the retweet network
import org.apache.spark.sql.functions.explode_outer
// Retweet Network containing URLs
val retweetNetwork = TTTDF.withColumn("URL", explode_outer($"URLs"))
.na.fill("",Seq("URL"))
.filter($"TweetType"==="ReTweet")
.select("OPostUserIDinRT", "OPostUserSNinRT", "CPostUserID", "CPostUserSN", "URL", "Weight")
import org.apache.spark.sql.functions.explode_outer retweetNetwork: org.apache.spark.sql.DataFrame = [OPostUserIDinRT: bigint, OPostUserSNinRT: string ... 4 more fields]
/*
* Map distance
*/
def mapDistances(x: Integer): Integer = {
if (x <= 5) {
return x
}
//else if (x <= 6) {
// return 4
//}
else {
return 6
}
}
/*
/ Makes the weighted network of retweets, and computes the geometric probabilities.
*/
def makeWeightedNetwork(allRetweetsSrcIdDstId: DataFrame, srcColName: String, dstColName: String, weightColName: String): DataFrame = {
allRetweetsSrcIdDstId.withColumn("w",lit(1.0))
.groupBy(srcColName,dstColName)
.agg(sum("w").as(weightColName))
.select(srcColName,dstColName,weightColName)
.withColumn("one",lit(1.0))
.withColumn("GeomProb",$"one"/($"one"+col(weightColName))).drop("one")
}
import scala.reflect.ClassTag
import scala.util.Random
import org.apache.spark.graphx._
/*
/ Shortest path Sequence from source vertex to every other vertex
/ By: Ivan Sadikov
*/
object Dijkstra extends Serializable {
import org.apache.spark.graphx._
type VertexId = Long
// vertex type contains tracking vertex and state (path, parent)
type VertexType = (Double, List[(VertexId, Double)])
// null value for vertex that does not have any parent (start vertex)
private val NULL_PARENT = -1L
// initial and infinity values, use to relax edges
private val INITIAL = 0.0
private val INFINITY = Int.MaxValue.toDouble
def run(graph: Graph[VertexId, Double], start: VertexId): Graph[VertexType, Double] = {
val spGraph = graph.mapVertices { (vid, attr) =>
if (start == vid) (INITIAL, List[(VertexId, Double)]()) else (INFINITY, List[(VertexId, Double)]())
}
val initialMessage = (INFINITY, List[(VertexId,Double)]())
def vertexProgram(id: VertexId, dst0: VertexType, dst1: VertexType): VertexType = {
select(dst0, dst1)
}
def sendMessage(edge: EdgeTriplet[VertexType, Double]): Iterator[(VertexId, VertexType)] = {
val weight = edge.attr
if (edge.srcAttr._1 + weight < edge.dstAttr._1) {
//Iterator((edge.dstId, (edge.srcAttr._1 + weight, (edge.srcId, edge.srcAttr._1) +: edge.srcAttr._2)))
Iterator((edge.dstId, (edge.srcAttr._1 + weight, (edge.srcId, weight) +: edge.srcAttr._2)))
} else {
Iterator.empty
}
}
def select(dst0: VertexType, dst1: VertexType): VertexType = {
if (dst0._1 > dst1._1) dst1 else dst0
}
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, select)
}
}
def runDijkstraOnce(nodeIdColName: String, g: Graph[Long, Double], srcNodeId: Long, srcNodeName: String): DataFrame = {
val result = Dijkstra.run(g, srcNodeId)
// note that for each unreachable vertex with shortest distance to souce vertex = 2.147483647E9,
// the path is an empty list and x._2._2.map(d => (1.0 - d._2)/d._2 ).sum = the sum of this empty list is 0.0,
// and 0.0 id the right Expectation of the number of Geometricaly distributed Retweet events between the source vertex and the unreachable vertex
//val df = result.vertices.map(x => (x._1, x._2._1, x._2._2.map(d => (1.0 - d._2)/d._2 ).sum )).toDF(nodeIdColName,srcNodeName+"GeoProbsSum",srcNodeName+"GeomMeansSum")
return result.vertices.map(x => {val maxRTPath = x._2._2.map(d => (1.0 - d._2)/d._2 )
val meanMaxPath = if (maxRTPath.isEmpty) 0.0 else maxRTPath.sum/maxRTPath.length
(x._1, meanMaxPath, mapDistances(maxRTPath.length)) // NOTE! here maxRTPath.length=0 means thre is NO Path!
}).toDF(nodeIdColName,srcNodeName+"ESumGeom", srcNodeName+"PathLen")
}
mapDistances: (x: Integer)Integer makeWeightedNetwork: (allRetweetsSrcIdDstId: org.apache.spark.sql.DataFrame, srcColName: String, dstColName: String, weightColName: String)org.apache.spark.sql.DataFrame import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.graphx._ defined object Dijkstra runDijkstraOnce: (nodeIdColName: String, g: org.apache.spark.graphx.Graph[Long,Double], srcNodeId: Long, srcNodeName: String)org.apache.spark.sql.DataFrame
Make the graph
val gF = makeWeightedNetwork(retweetNetwork,"OPostUserIDinRT","CPostUserID","Weight")
// Need to add srcVid, dstVid and dist
// directed - need to switch src and dst because in shortest path algorithm we want distance from each vertex to each landmark vertex which are the original/source tweets of the retweet
val eDF = gF.select($"OPostUserIDinRT".as("srcVid"), $"CPostUserID".as("dstVid"), $"GeomProb")
val ex = eDF.rdd.map{ case Row(id1: Long, id2: Long, w: Double) => Edge(id1, id2, w) } //.rdd.map(row => Edge(row.getLong(0), row.getLong(1), row.getDouble(2))) // convert DF into rdd
val exr = eDF.rdd.map{ case Row(id1: Long, id2: Long, w: Double) => Edge(id2, id1, w) }
val vDF = eDF.select("srcVid").union(eDF.select("dstVid")).distinct().withColumnRenamed("srcVid","id")
val vx = vDF.rdd.map(row => (row.getLong(0),row.getLong(0)))
// A graph with edge attributes containing distances
val graph = Graph(vx, ex).cache()
val graphr = Graph(vx, exr) // reversed edges
gF: org.apache.spark.sql.DataFrame = [OPostUserIDinRT: bigint, CPostUserID: bigint ... 2 more fields] eDF: org.apache.spark.sql.DataFrame = [srcVid: bigint, dstVid: bigint ... 1 more field] ex: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[48785] at map at <console>:67 exr: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[48786] at map at <console>:68 vDF: org.apache.spark.sql.DataFrame = [id: bigint] vx: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[48798] at map at <console>:70 graph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@7d39c1a0 graphr: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@54e5bb09
Check that the network is mainly one connected component
val ccV = graph.connectedComponents().vertices
val ccIDs = ccV.map {
case (a) => a._2
}.toDF("CCId").withColumn("w",lit(1.0)).groupBy("CCId").sum("w")
ccV: org.apache.spark.graphx.VertexRDD[org.apache.spark.graphx.VertexId] = VertexRDDImpl[48947] at RDD at VertexRDD.scala:57 ccIDs: org.apache.spark.sql.DataFrame = [CCId: bigint, sum(w): double]
display(ccIDs.orderBy($"sum(w)".desc)) // just one main component
CCId | sum(w) |
---|---|
12.0 | 2408064.0 |
2.4539804e7 | 11.0 |
5.58985624e8 | 10.0 |
1.4684809e7 | 6.0 |
3.248055651e9 | 4.0 |
4.3825543e7 | 4.0 |
2.80909035e8 | 4.0 |
4.887288238e9 | 4.0 |
1.4996293e7 | 4.0 |
3.0995308e7 | 4.0 |
2.43057875e8 | 4.0 |
9.03181812e8 | 4.0 |
3.2634036e7 | 4.0 |
1.92591747e8 | 4.0 |
2.80481025e8 | 3.0 |
4.6928976e7 | 3.0 |
5.81466861e8 | 3.0 |
1.32871344e8 | 3.0 |
3.064832195e9 | 3.0 |
1.8373162e7 | 3.0 |
5.38790007e8 | 3.0 |
2.860616215e9 | 3.0 |
5.3000535e7 | 3.0 |
1.178506776e9 | 3.0 |
1.13820551e8 | 3.0 |
5.0740237e7 | 3.0 |
3.085141858e9 | 3.0 |
1.8116791e7 | 3.0 |
1.1685838e8 | 3.0 |
4.09958044e8 | 3.0 |
Truncated to 30 rows
Define landmarks
Find a way to define a good set of landmarks. One way could be data driven by chosing the top nodes by some centrality measure, e.g. out degree, out neighbourhood or pagerank. Or domain expertise the landmarks can by chosen manually.
val landmarkVertices = Array((117777690L,"jeremycorbyn"), (747807250819981312L, "theresa_may"), (222748037L,"AngelaRayner"), (3131144855L,"BorisJohnson"), (16973333L, "Independent"), (16343974L, "Telegraph"), (14157134L, "Peston"), (216299334L,"piersmorgan"), (60886384L,"Kevin_Maguire"), (65045121L,"OwenJones84"), (19811190L, "paulmasonnews"), (19346439L, "LouiseMensch"), (465973L,"GuidoFawkes"))
landmarkVertices: Array[(Long, String)] = Array((117777690,jeremycorbyn), (747807250819981312,theresa_may), (222748037,AngelaRayner), (3131144855,BorisJohnson), (16973333,Independent), (16343974,Telegraph), (14157134,Peston), (216299334,piersmorgan), (60886384,Kevin_Maguire), (65045121,OwenJones84), (19811190,paulmasonnews), (19346439,LouiseMensch), (465973,GuidoFawkes))
val landmarkOutNghbd = retweetNetwork.select("OPostUSerIDinRT", "OPostUserSNinRT", "CPostUSerID", "Weight").distinct().groupBy("OPostUSerIDinRT", "OPostUserSNinRT").agg(sum($"Weight").as("OutNghbd")).toDF("Id", "SN", "OutNghbd").alias("landmarkOutNghbd")
val landmarkInNghbd = retweetNetwork.select("OPostUSerIDinRT", "CPostUserSN", "CPostUSerID", "Weight").distinct().groupBy("CPostUserID", "CPostUserSN").agg(sum($"Weight").as("InNghbd")).toDF("Id", "SN", "InNghbd").alias("landmarkInNghbd")
val landmarkOutDegree = retweetNetwork.groupBy("OPostUSerIDinRT", "OPostUserSNinRT").agg(sum($"Weight").as("OutDegree")).toDF("Id", "SN", "OutDegree").alias("landmarkOutDegree")
val landmarkInDegree = retweetNetwork.groupBy("CPostUserId", "CPostUserSN").agg(sum($"Weight").as("InDegree")).toDF("Id", "SN", "InDegree").alias("landmarkInDegree")
landmarkOutNghbd: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: bigint, SN: string ... 1 more field] landmarkInNghbd: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: bigint, SN: string ... 1 more field] landmarkOutDegree: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: bigint, SN: string ... 1 more field] landmarkInDegree: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Id: bigint, SN: string ... 1 more field]
Display popular vertices
display(landmarkOutDegree.join(landmarkOutNghbd.drop("SN"), Seq("ID")).join(landmarkInDegree.drop("SN"), Seq("ID"), joinType="outer").join(landmarkInNghbd.drop("SN"), Seq("ID"), joinType="outer").orderBy($"OutDegree".desc))
Id | SN | OutDegree | OutNghbd | InDegree | InNghbd |
---|---|---|---|---|---|
1.1777769e8 | jeremycorbyn | 516833.0 | 184236.0 | 21.0 | 20.0 |
6.5045121e7 | OwenJones84 | 202084.0 | 78548.0 | 261.0 | 192.0 |
1.6973333e7 | Independent | 195573.0 | 67341.0 | 681.0 | 22.0 |
1.541364193e9 | britainelects | 130161.0 | 46921.0 | 15.0 | 14.0 |
2.16299334e8 | piersmorgan | 118588.0 | 79514.0 | 157.0 | 128.0 |
1.28216887e8 | jonsnowC4 | 90555.0 | 53637.0 | 94.0 | 28.0 |
1.981119e7 | paulmasonnews | 74207.0 | 27358.0 | 309.0 | 222.0 |
1.6343974e7 | Telegraph | 60732.0 | 29500.0 | 95.0 | 15.0 |
1.9346439e7 | LouiseMensch | 53739.0 | 16916.0 | 3287.0 | 916.0 |
1.4157134e7 | Peston | 48052.0 | 29552.0 | 25.0 | 8.0 |
7.47807250819981312e17 | theresa_may | 47791.0 | 31075.0 | null | null |
2.2812734e7 | faisalislam | 46715.0 | 21148.0 | 101.0 | 75.0 |
2.22748037e8 | AngelaRayner | 45272.0 | 15751.0 | 101.0 | 68.0 |
1.8020612e7 | DavidLammy | 43043.0 | 27350.0 | 29.0 | 21.0 |
3.331501e7 | davidallengreen | 39141.0 | 15527.0 | 183.0 | 95.0 |
6.1183568e7 | bbclaurak | 37683.0 | 18288.0 | 85.0 | 29.0 |
2.1202851e7 | IanDunt | 36600.0 | 16069.0 | 203.0 | 157.0 |
8.62264836306214913e17 | LordBuckethead | 36436.0 | 28899.0 | 13.0 | 10.0 |
6.0886384e7 | Kevin_Maguire | 36378.0 | 17015.0 | 5.0 | 1.0 |
1.5439395e7 | stephenfry | 32521.0 | 26379.0 | 2.0 | 2.0 |
6.178126e7 | Ed_Miliband | 32264.0 | 23832.0 | 9.0 | 9.0 |
1.5438913e7 | MailOnline | 31988.0 | 15781.0 | 594.0 | 10.0 |
1.9335378e7 | johnprescott | 31906.0 | 23329.0 | 51.0 | 29.0 |
465973.0 | GuidoFawkes | 29033.0 | 10410.0 | 78.0 | 37.0 |
1.4700117e7 | MayorofLondon | 27816.0 | 20162.0 | 44.0 | 17.0 |
2.5275453e7 | jimwaterson | 27480.0 | 18512.0 | 20.0 | 19.0 |
7.7234984e7 | johnmcdonnellMP | 26587.0 | 13841.0 | 40.0 | 22.0 |
3.4655603e7 | TheSun | 26458.0 | 13462.0 | 476.0 | 23.0 |
1.53810216e8 | HackneyAbbott | 25869.0 | 16886.0 | 151.0 | 56.0 |
1.5143478e7 | RichardDawkins | 25059.0 | 16605.0 | 3.0 | 3.0 |
Truncated to 30 rows
Display popular vertices of some chategory of accounts, e.g. newspapers or politicians.
val allTracked = sc.textFile("/FileStore/tables/gj2ee8j11500983679360/ids_with_labels2-7c031.csv")
.map(x => x.split(","))
.map(x => (x(0).toLong, x(1).toString, x(2).toString, x(3).toString))
.toDF(Seq("UserId", "UserSN", "RealName", "Affiliation"): _*)
allTracked: org.apache.spark.sql.DataFrame = [UserId: bigint, UserSN: string ... 2 more fields]
display(retweetNetwork.select("OPostUserSNinRT", "OPostUserIDinRT").distinct().toDF("SN", "Id")
.join(allTracked.filter($"Affiliation" === "Newspaper"), $"UserId" === $"Id")
//.join(landmarkPageRanks.toDF(Seq("Id" ,"PageRank"): _*), "Id")
.join(landmarkOutNghbd.drop("SN"), "Id")
//.join(landmarkInNghbd.drop("SN"), "Id")
//.join(landmarkOutDegree.drop("SN"), "Id")
//.join(landmarkInDegree.drop("SN"), "Id")
.orderBy($"OutNghbd".desc))
Id | SN | UserId | UserSN | RealName | Affiliation | OutNghbd |
---|---|---|---|---|---|---|
1.6973333e7 | Independent | 1.6973333e7 | Ind | The Ind | Newspaper | 67341.0 |
1.6343974e7 | Telegraph | 1.6343974e7 | Telegraph | The Telegraph | Newspaper | 29500.0 |
1.5438913e7 | MailOnline | 1.5438913e7 | MailOnline | Daily Mail Online | Newspaper | 15781.0 |
3.4655603e7 | TheSun | 3.4655603e7 | TheSun | The Sun | Newspaper | 13462.0 |
1.8949452e7 | FT | 1.8949452e7 | FT | Financial Times | Newspaper | 12885.0 |
1.6887175e7 | DailyMirror | 1.6887175e7 | DailyMirror | Daily Mirror | Newspaper | 12335.0 |
3.814238e7 | standardnews | 3.814238e7 | standardnews | Evening Standard | Newspaper | 9017.0 |
1.789582e7 | Daily_Express | 1.789582e7 | Daily_Express | Daily Express | Newspaper | 6381.0 |
1.4138785e7 | TelegraphNews | 1.4138785e7 | TelegraphNews | Telegraph News | Newspaper | 6336.0 |
6107422.0 | thetimes | 6107422.0 | thetimes | The Times of London | Newspaper | 5877.0 |
4898091.0 | FinancialTimes | 4898091.0 | FinancialTimes | Financial Times | Newspaper | 5626.0 |
2.05770556e8 | theipaper | 2.05770556e8 | theipaper | i newspaper | Newspaper | 4395.0 |
2.044293e7 | Daily_Star | 2.044293e7 | Daily_Star | Daily Star | Newspaper | 4154.0 |
788524.0 | guardiannews | 788524.0 | guardiannews | Guardian news | Newspaper | 2673.0 |
1.30778462e8 | BrookesTimes | 1.30778462e8 | BrookesTimes | Peter Brookes | Newspaper | 728.0 |
3.4904355e7 | ftweekend | 3.4904355e7 | ftweekend | FT Weekend | Newspaper | 1.0 |
7.2811888e7 | dailyexpressuk | 7.2811888e7 | dailyexpressuk | Daily Express UK | Newspaper | 1.0 |
display(retweetNetwork.select("OPostUserSNinRT", "OPostUserIDinRT").distinct().toDF("SN", "Id")
.join(allTracked.filter($"Affiliation" =!= "Newspaper").filter($"Affiliation" =!= "Other"), $"UserId" === $"Id")
//.join(landmarkPageRanks.toDF(Seq("Id" ,"PageRank"): _*), "Id")
.join(landmarkOutNghbd, "Id")
//.join(landmarkInNghbd, "Id")
//.join(landmarkOutDegree, "Id")
//.join(landmarkInDegree, "Id")
.orderBy($"OutNghbd".desc))
Id | SN | UserId | UserSN | RealName | Affiliation | SN | OutNghbd |
---|---|---|---|---|---|---|---|
1.1777769e8 | jeremycorbyn | 1.1777769e8 | jeremycorbyn | Jeremy Corbyn | LP | jeremycorbyn | 184236.0 |
7.47807250819981312e17 | theresa_may | 7.47807250819981312e17 | theresa_may | Theresa May | CP | theresa_may | 31075.0 |
8.62264836306214913e17 | LordBuckethead | 8.62264836306214913e17 | LordBuckethead | Lord Buckethead | Ind | LordBuckethead | 28899.0 |
1.8020612e7 | DavidLammy | 1.8020612e7 | DavidLammy | David Lammy | LP | DavidLammy | 27350.0 |
6.178126e7 | Ed_Miliband | 6.178126e7 | Ed_Miliband | Ed Miliband | LP | Ed_Miliband | 23832.0 |
1.53810216e8 | HackneyAbbott | 1.53810216e8 | HackneyAbbott | Diane Abbott | LP | HackneyAbbott | 16886.0 |
2.22748037e8 | AngelaRayner | 2.22748037e8 | AngelaRayner | Angela Rayner | LP | AngelaRayner | 15751.0 |
7.7234984e7 | johnmcdonnellMP | 7.7234984e7 | johnmcdonnellMP | John McDonnell | LP | johnmcdonnellMP | 13841.0 |
8.08029e7 | CarolineLucas | 8.08029e7 | CarolineLucas | Caroline Lucas | GP | CarolineLucas | 11349.0 |
2.36786367e8 | AlexSalmond | 2.36786367e8 | AlexSalmond | Alex Salmond | SNP | AlexSalmond | 11229.0 |
5.45081356e8 | RichardBurgon | 5.45081356e8 | RichardBurgon | Richard Burgon | LP | RichardBurgon | 8692.0 |
3.3300246e7 | ChukaUmunna | 3.3300246e7 | ChukaUmunna | Chuka Umunna | LP | ChukaUmunna | 8191.0 |
2.23539098e8 | nw_nicholas | 2.23539098e8 | nw_nicholas | Nicholas Wilson | Ind | nw_nicholas | 7609.0 |
3.131144855e9 | BorisJohnson | 3.131144855e9 | BorisJohnson | Boris Johnson | CP | BorisJohnson | 7333.0 |
3.28634628e8 | YvetteCooperMP | 3.28634628e8 | YvetteCooperMP | Yvette Cooper | LP | YvetteCooperMP | 6695.0 |
2.425571623e9 | Keir_Starmer | 2.425571623e9 | Keir_Starmer | Keir Starmer | LP | Keir_Starmer | 6040.0 |
8.0021045e7 | timfarron | 8.0021045e7 | timfarron | Tim Farron | LD | timfarron | 5512.0 |
1.5484198e7 | georgegalloway | 1.5484198e7 | georgegalloway | George Galloway | Ind | georgegalloway | 5379.0 |
1.4321261e8 | JonAshworth | 1.4321261e8 | JonAshworth | Jon Ashworth | Lab Co-op | JonAshworth | 4745.0 |
1.07722321e8 | BarryGardiner | 1.07722321e8 | BarryGardiner | Barry Gardiner | LP | BarryGardiner | 4605.0 |
1.64226176e8 | EmilyThornberry | 1.64226176e8 | EmilyThornberry | Emily Thornberry | LP | EmilyThornberry | 4468.0 |
3.6924726e7 | labourlewis | 3.6924726e7 | labourlewis | Clive Lewis | LP | labourlewis | 4288.0 |
7.21026242e8 | JCHannah77 | 7.21026242e8 | JCHannah77 | Jon Hannah | LD | JCHannah77 | 3783.0 |
1.4190551e7 | tom_watson | 1.4190551e7 | tom_watson | Tom Watson | LP | tom_watson | 3375.0 |
4.26116125e8 | heidiallen75 | 4.26116125e8 | heidiallen75 | Heidi Allen | CP | heidiallen75 | 3059.0 |
1.5010349e7 | nick_clegg | 1.5010349e7 | nick_clegg | Nick Clegg | LD | nick_clegg | 2909.0 |
1.91807697e8 | jon_trickett | 1.91807697e8 | jon_trickett | Jon Trickett | LP | jon_trickett | 2805.0 |
2.0000725e7 | jessphillips | 2.0000725e7 | jessphillips | Jess Phillips | LP | jessphillips | 2522.0 |
9.48015937e8 | SarahChampionMP | 9.48015937e8 | SarahChampionMP | Sarah Champion | LP | SarahChampionMP | 2457.0 |
1.4077382e7 | JamesCleverly | 1.4077382e7 | JamesCleverly | James Cleverly | CP | JamesCleverly | 2009.0 |
Truncated to 30 rows
val totalNumberOfRetweeters = retweetNetwork.select("CPostUserId").distinct().count() // 776801
totalNumberOfRetweeters: Long = 808388
Find the shortest paths from each landmark to every other node
This is done using Dijkstra's algorithm, where the weights of each edge is inversely proportional to the number of retweets.
// Run Dijkstra on the graph for all the landmarks
var df = vDF //.drop("srcVid")
for (landmark <- landmarkVertices) {
//for (landmark <- Array(landmarkVertices(0),landmarkVertices(6)) ){
val temp = runDijkstraOnce("id", graph, landmark._1, landmark._2)
df = df.join(temp, "id")
df.cache()
df.count()
}
df: org.apache.spark.sql.DataFrame = [id: bigint, jeremycorbynESumGeom: double ... 25 more fields]
df.cache()
df.count() // 790811 //1038382
res4: Long = 1083714
import org.apache.spark.sql.functions.col
import scala.collection.mutable.ArrayBuffer
val temp = ArrayBuffer[String]()
for (landmark <- landmarkVertices) {
temp += landmark._2+"PathLen"
}
val pathLens = temp.toSeq // the column names corresponding to path lengths
val temp2 = ArrayBuffer[String]()
for (landmark <- landmarkVertices) {
temp2 += landmark._2+"ESumGeom"
}
val sumGeoms = temp2.toSeq // the column names corresponding to sum geoms
import org.apache.spark.sql.functions.col import scala.collection.mutable.ArrayBuffer temp: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(jeremycorbynPathLen, theresa_mayPathLen, AngelaRaynerPathLen, BorisJohnsonPathLen, IndependentPathLen, TelegraphPathLen, PestonPathLen, piersmorganPathLen, Kevin_MaguirePathLen, OwenJones84PathLen, paulmasonnewsPathLen, LouiseMenschPathLen, GuidoFawkesPathLen) pathLens: Seq[String] = ArrayBuffer(jeremycorbynPathLen, theresa_mayPathLen, AngelaRaynerPathLen, BorisJohnsonPathLen, IndependentPathLen, TelegraphPathLen, PestonPathLen, piersmorganPathLen, Kevin_MaguirePathLen, OwenJones84PathLen, paulmasonnewsPathLen, LouiseMenschPathLen, GuidoFawkesPathLen) temp2: scala.collection.mutable.ArrayBuffer[String] = ArrayBuffer(jeremycorbynESumGeom, theresa_mayESumGeom, AngelaRaynerESumGeom, BorisJohnsonESumGeom, IndependentESumGeom, TelegraphESumGeom, PestonESumGeom, piersmorganESumGeom, Kevin_MaguireESumGeom, OwenJones84ESumGeom, paulmasonnewsESumGeom, LouiseMenschESumGeom, GuidoFawkesESumGeom) sumGeoms: Seq[String] = ArrayBuffer(jeremycorbynESumGeom, theresa_mayESumGeom, AngelaRaynerESumGeom, BorisJohnsonESumGeom, IndependentESumGeom, TelegraphESumGeom, PestonESumGeom, piersmorganESumGeom, Kevin_MaguireESumGeom, OwenJones84ESumGeom, paulmasonnewsESumGeom, LouiseMenschESumGeom, GuidoFawkesESumGeom)
// Filter out a bunch of stuff:
val minRT=1/(1+1.0) // only keep users who have retweeted at least one of the landmarks 2 or more times.
val df1 = df // couldn't come up wiht a smarter way to do this...
.filter((col(sumGeoms(0))>minRT||col(sumGeoms(1))>minRT||col(sumGeoms(2))>minRT||col(sumGeoms(3))>minRT||col(sumGeoms(4))>minRT||col(sumGeoms(5))>minRT||col(sumGeoms(6))>minRT||col(sumGeoms(7))>minRT||col(sumGeoms(8))>minRT||col(sumGeoms(9))>minRT)||col(sumGeoms(10))>minRT||col(sumGeoms(11))>minRT||col(sumGeoms(12))>minRT)//||col(sumGeoms(13))>minRT||col(sumGeoms(14))>minRT||col(sumGeoms(15))>minRT||col(sumGeoms(16))>minRT))//||col(sumGeoms(17))>minRT||col(sumGeoms(18))>minRT))
//&&
minRT: Double = 0.5 df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, jeremycorbynESumGeom: double ... 25 more fields]
val df2 = df1.select(pathLens.head, pathLens.tail: _*).withColumn("count",lit(1.0)).groupBy(pathLens.map(col(_)): _*).agg(sum($"count")).orderBy($"sum(count)".desc) // Aggregate users with the same profiles.
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [jeremycorbynPathLen: int, theresa_mayPathLen: int ... 12 more fields]
// We want to groupBy and apply different agg. operations to different columns. Find the average of "...sumGeoms" and sum "counts".
// don't need this anymore, but it is clever so I'll keep it here if I need some other time.
val exprs = (sumGeoms.map((_ -> "mean")) ++ Seq("count").map((_ -> "sum"))).toMap
// val df2 = df1.withColumn("count",lit(1.0)).groupBy(pathLens.map(col(_)): _*).agg(exprs)
exprs: scala.collection.immutable.Map[String,String] = Map(count -> sum, GuidoFawkesESumGeom -> mean, paulmasonnewsESumGeom -> mean, theresa_mayESumGeom -> mean, PestonESumGeom -> mean, LouiseMenschESumGeom -> mean, Kevin_MaguireESumGeom -> mean, TelegraphESumGeom -> mean, AngelaRaynerESumGeom -> mean, jeremycorbynESumGeom -> mean, OwenJones84ESumGeom -> mean, IndependentESumGeom -> mean, BorisJohnsonESumGeom -> mean, piersmorganESumGeom -> mean)
// Dataframe-ified zipWithIndex
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}
dfZipWithIndex: (df: org.apache.spark.sql.DataFrame, offset: Int, colName: String, inFront: Boolean)org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val columnNames = pathLens ++ Seq("sum(count)")
val sumFreqs = df2.agg(sum("sum(count)")).first.getDouble(0)
val pathLengthFeatures = dfZipWithIndex(
df2.select(columnNames.head, columnNames.tail: _*)
.withColumn("percentage", $"sum(count)"/sumFreqs * 100.0)
.withColumn("cs", sum($"percentage").over(Window.orderBy($"percentage".desc).rowsBetween(Long.MinValue, 0)))
.select((Seq("percentage", "cs") ++ columnNames).head, (Seq("percentage", "cs") ++ columnNames).tail: _*))
// .cache() takes about 37 minutes with cache
import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window columnNames: Seq[String] = ArrayBuffer(jeremycorbynPathLen, theresa_mayPathLen, AngelaRaynerPathLen, BorisJohnsonPathLen, IndependentPathLen, TelegraphPathLen, PestonPathLen, piersmorganPathLen, Kevin_MaguirePathLen, OwenJones84PathLen, paulmasonnewsPathLen, LouiseMenschPathLen, GuidoFawkesPathLen, sum(count)) sumFreqs: Double = 736034.0 pathLengthFeatures: org.apache.spark.sql.DataFrame = [id: bigint, percentage: double ... 15 more fields]
pathLengthFeatures.count()
res5: Long = 132295
display(pathLengthFeatures.filter($"theresa_mayPathLen"===1).agg(sum("sum(count)")))
sum(sum(count)) |
---|
30051.0 |
display(pathLengthFeatures) // this is the df which is used to compute the neighbourjoining-tree
id | percentage | cs | jeremycorbynPathLen | theresa_mayPathLen | AngelaRaynerPathLen | BorisJohnsonPathLen | IndependentPathLen | TelegraphPathLen | PestonPathLen | piersmorganPathLen | Kevin_MaguirePathLen |
---|---|---|---|---|---|---|---|---|---|---|---|
1.0 | 10.111897004757932 | 10.111897004757932 | 1.0 | 3.0 | 2.0 | 5.0 | 3.0 | 5.0 | 4.0 | 4.0 | 4.0 |
2.0 | 7.631576802158596 | 17.743473806916526 | 3.0 | 2.0 | 3.0 | 3.0 | 3.0 | 2.0 | 3.0 | 1.0 | 3.0 |
3.0 | 3.2983530652116615 | 21.041826872128187 | 4.0 | 6.0 | 6.0 | 4.0 | 1.0 | 4.0 | 4.0 | 4.0 | 6.0 |
4.0 | 2.630856726727298 | 23.672683598855485 | 2.0 | 3.0 | 3.0 | 3.0 | 2.0 | 3.0 | 2.0 | 3.0 | 3.0 |
5.0 | 1.864451913906151 | 25.537135512761637 | 3.0 | 2.0 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 |
6.0 | 1.8249156968292226 | 27.36205120959086 | 4.0 | 5.0 | 3.0 | 5.0 | 4.0 | 3.0 | 3.0 | 3.0 | 3.0 |
7.0 | 1.56867753391827 | 28.93072874350913 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
8.0 | 1.533081352220142 | 30.463810095729272 | 4.0 | 6.0 | 4.0 | 6.0 | 6.0 | 1.0 | 4.0 | 4.0 | 5.0 |
9.0 | 1.5140604917707607 | 31.977870587500032 | 4.0 | 3.0 | 5.0 | 3.0 | 2.0 | 4.0 | 4.0 | 4.0 | 2.0 |
10.0 | 1.435395647483676 | 33.41326623498371 | 5.0 | 5.0 | 6.0 | 6.0 | 5.0 | 6.0 | 5.0 | 6.0 | 6.0 |
11.0 | 1.3597197955529228 | 34.77298603053663 | 5.0 | 6.0 | 5.0 | 5.0 | 5.0 | 5.0 | 5.0 | 5.0 | 5.0 |
12.0 | 1.3315960947456231 | 36.10458212528225 | 3.0 | 5.0 | 4.0 | 5.0 | 4.0 | 3.0 | 5.0 | 3.0 | 5.0 |
13.0 | 1.2518443441471454 | 37.356426469429394 | 3.0 | 3.0 | 6.0 | 3.0 | 3.0 | 3.0 | 4.0 | 4.0 | 6.0 |
14.0 | 1.0627226459647243 | 38.41914911539412 | 3.0 | 3.0 | 4.0 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 5.0 |
15.0 | 0.9384077365991246 | 39.357556851993245 | 5.0 | 6.0 | 5.0 | 5.0 | 5.0 | 4.0 | 3.0 | 4.0 | 5.0 |
16.0 | 0.8386840825287962 | 40.19624093452204 | 4.0 | 3.0 | 4.0 | 3.0 | 4.0 | 6.0 | 3.0 | 3.0 | 4.0 |
17.0 | 0.699016621514767 | 40.8952575560368 | 4.0 | 3.0 | 5.0 | 6.0 | 4.0 | 6.0 | 3.0 | 6.0 | 4.0 |
18.0 | 0.6435843996337126 | 41.53884195567051 | 3.0 | 5.0 | 5.0 | 5.0 | 4.0 | 5.0 | 5.0 | 4.0 | 3.0 |
19.0 | 0.6380140047878223 | 42.17685596045833 | 3.0 | 5.0 | 4.0 | 5.0 | 4.0 | 5.0 | 5.0 | 3.0 | 4.0 |
20.0 | 0.48408089843675706 | 42.66093685889509 | 2.0 | 3.0 | 3.0 | 4.0 | 2.0 | 3.0 | 2.0 | 3.0 | 3.0 |
21.0 | 0.44590331424907 | 43.106840173144164 | 5.0 | 5.0 | 5.0 | 5.0 | 4.0 | 4.0 | 1.0 | 4.0 | 6.0 |
22.0 | 0.4340832081126687 | 43.54092338125683 | 1.0 | 3.0 | 3.0 | 3.0 | 2.0 | 3.0 | 2.0 | 3.0 | 3.0 |
23.0 | 0.42824108668893013 | 43.96916446794576 | 2.0 | 4.0 | 4.0 | 4.0 | 3.0 | 3.0 | 4.0 | 3.0 | 3.0 |
24.0 | 0.3847648342332012 | 44.35392930217896 | 5.0 | 6.0 | 6.0 | 6.0 | 5.0 | 3.0 | 4.0 | 4.0 | 5.0 |
25.0 | 0.37104264205186177 | 44.72497194423082 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 4.0 | 2.0 | 6.0 | 5.0 |
26.0 | 0.3694122825847719 | 45.09438422681559 | 3.0 | 2.0 | 5.0 | 2.0 | 4.0 | 2.0 | 3.0 | 2.0 | 2.0 |
27.0 | 0.34903278924614894 | 45.443417016061744 | 4.0 | 3.0 | 4.0 | 4.0 | 4.0 | 4.0 | 4.0 | 4.0 | 4.0 |
28.0 | 0.34060926533285146 | 45.784026281394596 | 4.0 | 4.0 | 3.0 | 4.0 | 3.0 | 5.0 | 2.0 | 4.0 | 3.0 |
29.0 | 0.3278381161739811 | 46.111864397568574 | 3.0 | 5.0 | 5.0 | 5.0 | 4.0 | 3.0 | 4.0 | 5.0 | 5.0 |
30.0 | 0.3275663895961328 | 46.43943078716471 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 3.0 | 4.0 | 5.0 | 3.0 |
Truncated to 30 rows
Truncated to 12 cols
// Copy-Paste into Matlab!
print("[")
df2.select(pathLens.head, pathLens.tail: _*).take(10).foreach(x => println(x.toString + ";..."))
print("];")
[[1,3,2,5,3,5,4,4,4,3,3,5,4];... [3,2,3,3,3,2,3,1,3,2,3,4,3];... [4,6,6,4,1,4,4,4,6,3,4,6,3];... [2,3,3,3,2,3,2,3,3,1,3,3,4];... [3,2,3,3,3,3,3,3,3,3,3,1,3];... [4,5,3,5,4,3,3,3,3,3,5,4,4];... [0,1,0,0,0,0,0,0,0,0,0,0,0];... [4,6,4,6,6,1,4,4,5,6,4,5,5];... [4,3,5,3,2,4,4,4,2,3,3,2,3];... [5,5,6,6,5,6,5,6,6,5,6,6,6];... ];
Communication in the neighbourjoing tree
Now we find the frequency of retweets across different branches in the trre
import spark.implicits._
val profiles = pathLengthFeatures.select(pathLens.head, pathLens.tail: _*).rdd.map{r =>
val array = r.toSeq.toArray
array.map(_.asInstanceOf[Integer]).map(_.toDouble) }.take(100) // first we get the number of profiles we want to have in our tree.
/*
/ Find all ids with the profile. Return them in a df.
*/
def filterIdsWithProfile(df: DataFrame, pathLens: Seq[String], profile: Array[Integer], label: Integer): DataFrame = {
val columnNames = pathLens ++ Seq("id")
return df.select(columnNames.head, columnNames.tail: _*)
.filter(col(pathLens(0))===profile(0)&&col(pathLens(1))===profile(1)&&col(pathLens(2))===profile(2)&&col(pathLens(3))===profile(3)&&col(pathLens(4))===profile(4)&&col(pathLens(5))===profile(5))
.select("id").withColumn("label", lit(label))
}
import spark.implicits._ profiles: Array[Array[Double]] = Array(Array(1.0, 3.0, 2.0, 5.0, 3.0, 5.0, 4.0, 4.0, 4.0, 3.0, 3.0, 5.0, 4.0), Array(3.0, 2.0, 3.0, 3.0, 3.0, 2.0, 3.0, 1.0, 3.0, 2.0, 3.0, 4.0, 3.0), Array(4.0, 6.0, 6.0, 4.0, 1.0, 4.0, 4.0, 4.0, 6.0, 3.0, 4.0, 6.0, 3.0), Array(2.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 3.0, 1.0, 3.0, 3.0, 4.0), Array(3.0, 2.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 1.0, 3.0), Array(4.0, 5.0, 3.0, 5.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 5.0, 4.0, 4.0), Array(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0), Array(4.0, 6.0, 4.0, 6.0, 6.0, 1.0, 4.0, 4.0, 5.0, 6.0, 4.0, 5.0, 5.0), Array(4.0, 3.0, 5.0, 3.0, 2.0, 4.0, 4.0, 4.0, 2.0, 3.0, 3.0, 2.0, 3.0), Array(5.0, 5.0, 6.0, 6.0, 5.0, 6.0, 5.0, 6.0, 6.0, 5.0, 6.0, 6.0, 6.0), Array(5.0, 6.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 6.0), Array(3.0, 5.0, 4.0, 5.0, 4.0, 3.0, 5.0, 3.0, 5.0, 3.0, 4.0, 5.0, 3.0), Array(3.0, 3.0, 6.0, 3.0, 3.0, 3.0, 4.0, 4.0, 6.0, 6.0, 4.0, 4.0, 4.0), Array(3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 5.0, 3.0, 5.0, 3.0, 3.0), Array(5.0, 6.0, 5.0, 5.0, 5.0, 4.0, 3.0, 4.0, 5.0, 5.0, 6.0, 6.0, 5.0), Array(4.0, 3.0, 4.0, 3.0, 4.0, 6.0, 3.0, 3.0, 4.0, 5.0, 3.0, 4.0, 6.0), Array(4.0, 3.0, 5.0, 6.0, 4.0, 6.0, 3.0, 6.0, 4.0, 4.0, 5.0, 4.0, 6.0), Array(3.0, 5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 4.0, 3.0, 5.0, 5.0, 6.0, 6.0), Array(3.0, 5.0, 4.0, 5.0, 4.0, 5.0, 5.0, 3.0, 4.0, 3.0, 4.0, 4.0, 4.0), Array(2.0, 3.0, 3.0, 4.0, 2.0, 3.0, 2.0, 3.0, 3.0, 3.0, 1.0, 4.0, 4.0), Array(5.0, 5.0, 5.0, 5.0, 4.0, 4.0, 1.0, 4.0, 6.0, 4.0, 6.0, 5.0, 6.0), Array(1.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 3.0, 1.0, 3.0, 3.0, 4.0), Array(2.0, 4.0, 4.0, 4.0, 3.0, 3.0, 4.0, 3.0, 3.0, 2.0, 3.0, 4.0, 4.0), Array(5.0, 6.0, 6.0, 6.0, 5.0, 3.0, 4.0, 4.0, 5.0, 4.0, 6.0, 6.0, 5.0), Array(3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 2.0, 6.0, 5.0, 4.0, 4.0, 5.0, 6.0), Array(3.0, 2.0, 5.0, 2.0, 4.0, 2.0, 3.0, 2.0, 2.0, 5.0, 3.0, 6.0, 2.0), Array(4.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 4.0, 2.0, 4.0), Array(4.0, 4.0, 3.0, 4.0, 3.0, 5.0, 2.0, 4.0, 3.0, 3.0, 4.0, 4.0, 4.0), Array(3.0, 5.0, 5.0, 5.0, 4.0, 3.0, 4.0, 5.0, 5.0, 5.0, 3.0, 6.0, 6.0), Array(3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 4.0, 5.0, 3.0, 3.0, 3.0, 4.0, 6.0), Array(5.0, 4.0, 5.0, 5.0, 5.0, 4.0, 5.0, 3.0, 5.0, 4.0, 5.0, 6.0, 5.0), Array(4.0, 3.0, 5.0, 6.0, 4.0, 3.0, 3.0, 4.0, 5.0, 3.0, 5.0, 6.0, 3.0), Array(6.0, 5.0, 6.0, 5.0, 6.0, 4.0, 4.0, 5.0, 4.0, 6.0, 5.0, 5.0, 6.0), Array(3.0, 4.0, 5.0, 4.0, 4.0, 5.0, 3.0, 3.0, 5.0, 3.0, 3.0, 4.0, 4.0), Array(1.0, 3.0, 2.0, 3.0, 2.0, 4.0, 4.0, 4.0, 2.0, 3.0, 3.0, 2.0, 3.0), Array(3.0, 4.0, 4.0, 3.0, 3.0, 3.0, 4.0, 4.0, 3.0, 4.0, 4.0, 4.0, 4.0), Array(3.0, 3.0, 4.0, 4.0, 2.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 4.0, 4.0), Array(2.0, 4.0, 3.0, 5.0, 5.0, 6.0, 4.0, 5.0, 6.0, 5.0, 5.0, 6.0, 5.0), Array(5.0, 6.0, 4.0, 6.0, 6.0, 2.0, 5.0, 5.0, 6.0, 6.0, 5.0, 6.0, 6.0), Array(1.0, 3.0, 2.0, 4.0, 1.0, 4.0, 4.0, 4.0, 4.0, 3.0, 4.0, 5.0, 3.0), Array(2.0, 2.0, 5.0, 2.0, 5.0, 3.0, 4.0, 5.0, 5.0, 3.0, 4.0, 3.0, 5.0), Array(2.0, 3.0, 2.0, 3.0, 3.0, 2.0, 2.0, 3.0, 3.0, 2.0, 3.0, 3.0, 3.0), Array(3.0, 5.0, 4.0, 3.0, 5.0, 5.0, 3.0, 4.0, 3.0, 5.0, 6.0, 6.0, 4.0), Array(6.0, 3.0, 6.0, 1.0, 5.0, 4.0, 5.0, 5.0, 5.0, 6.0, 5.0, 6.0, 4.0), Array(4.0, 4.0, 5.0, 3.0, 4.0, 5.0, 3.0, 4.0, 4.0, 3.0, 3.0, 4.0, 1.0), Array(3.0, 5.0, 5.0, 5.0, 3.0, 4.0, 3.0, 5.0, 5.0, 3.0, 3.0, 3.0, 4.0), Array(1.0, 2.0, 2.0, 3.0, 3.0, 2.0, 3.0, 1.0, 3.0, 2.0, 3.0, 5.0, 3.0), Array(1.0, 3.0, 2.0, 5.0, 3.0, 3.0, 4.0, 4.0, 4.0, 3.0, 3.0, 5.0, 4.0), Array(1.0, 3.0, 2.0, 3.0, 2.0, 3.0, 2.0, 4.0, 3.0, 1.0, 3.0, 3.0, 4.0), Array(4.0, 5.0, 5.0, 5.0, 4.0, 3.0, 3.0, 6.0, 4.0, 3.0, 6.0, 6.0, 5.0), Array(5.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 4.0, 3.0, 4.0, 3.0), Array(3.0, 2.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 4.0, 5.0), Array(4.0, 6.0, 6.0, 4.0, 1.0, 1.0, 4.0, 4.0, 6.0, 3.0, 4.0, 6.0, 3.0), Array(1.0, 3.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 3.0, 4.0, 5.0, 4.0), Array(4.0, 5.0, 4.0, 3.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 3.0, 4.0), Array(3.0, 4.0, 2.0, 5.0, 3.0, 3.0, 3.0, 3.0, 4.0, 3.0, 4.0, 5.0, 4.0), Array(2.0, 4.0, 3.0, 6.0, 4.0, 6.0, 5.0, 5.0, 5.0, 4.0, 4.0, 6.0, 5.0), Array(3.0, 5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 4.0, 1.0, 5.0, 5.0, 6.0, 6.0), Array(6.0, 6.0, 6.0, 4.0, 6.0, 5.0, 6.0, 5.0, 6.0, 6.0, 6.0, 6.0, 5.0), Array(6.0, 3.0, 6.0, 3.0, 5.0, 4.0, 5.0, 5.0, 5.0, 6.0, 5.0, 6.0, 4.0), Array(3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 5.0, 3.0, 3.0, 5.0, 4.0, 5.0), Array(4.0, 3.0, 5.0, 4.0, 4.0, 4.0, 3.0, 3.0, 4.0, 3.0, 3.0, 4.0, 4.0), Array(5.0, 2.0, 6.0, 2.0, 4.0, 3.0, 4.0, 4.0, 4.0, 6.0, 4.0, 5.0, 3.0), Array(1.0, 3.0, 3.0, 4.0, 2.0, 3.0, 2.0, 3.0, 3.0, 3.0, 1.0, 4.0, 4.0), Array(3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 5.0, 3.0), Array(3.0, 2.0, 6.0, 6.0, 5.0, 6.0, 3.0, 6.0, 4.0, 5.0, 6.0, 4.0, 6.0), Array(2.0, 3.0, 1.0, 3.0, 2.0, 5.0, 5.0, 3.0, 3.0, 3.0, 2.0, 3.0, 5.0), Array(2.0, 3.0, 3.0, 6.0, 3.0, 5.0, 4.0, 5.0, 4.0, 3.0, 4.0, 4.0, 5.0), Array(3.0, 6.0, 6.0, 6.0, 3.0, 6.0, 3.0, 6.0, 4.0, 5.0, 4.0, 4.0, 5.0), Array(4.0, 6.0, 5.0, 4.0, 4.0, 4.0, 3.0, 5.0, 5.0, 3.0, 4.0, 4.0, 5.0), Array(2.0, 3.0, 3.0, 3.0, 2.0, 2.0, 3.0, 4.0, 3.0, 2.0, 2.0, 4.0, 5.0), Array(2.0, 3.0, 3.0, 3.0, 2.0, 4.0, 2.0, 4.0, 2.0, 1.0, 3.0, 2.0, 3.0), Array(3.0, 1.0, 3.0, 3.0, 3.0, 2.0, 3.0, 1.0, 3.0, 2.0, 3.0, 4.0, 3.0), Array(3.0, 4.0, 4.0, 3.0, 3.0, 3.0, 2.0, 3.0, 4.0, 4.0, 2.0, 6.0, 3.0), Array(3.0, 5.0, 4.0, 6.0, 5.0, 6.0, 4.0, 6.0, 4.0, 5.0, 5.0, 4.0, 5.0), Array(1.0, 1.0, 2.0, 5.0, 3.0, 5.0, 4.0, 4.0, 4.0, 3.0, 3.0, 5.0, 4.0), Array(6.0, 6.0, 6.0, 6.0, 6.0, 6.0, 6.0, 6.0, 6.0, 5.0, 6.0, 6.0, 6.0), Array(3.0, 4.0, 4.0, 4.0, 4.0, 5.0, 3.0, 6.0, 5.0, 3.0, 5.0, 6.0, 6.0), Array(2.0, 3.0, 3.0, 3.0, 2.0, 3.0, 2.0, 3.0, 3.0, 1.0, 3.0, 2.0, 4.0), Array(3.0, 4.0, 4.0, 6.0, 3.0, 4.0, 6.0, 3.0, 3.0, 5.0, 6.0, 5.0, 5.0), Array(3.0, 3.0, 4.0, 4.0, 2.0, 3.0, 3.0, 4.0, 4.0, 2.0, 2.0, 3.0, 3.0), Array(4.0, 3.0, 5.0, 3.0, 4.0, 3.0, 3.0, 5.0, 5.0, 3.0, 5.0, 5.0, 5.0), Array(4.0, 5.0, 5.0, 5.0, 4.0, 4.0, 3.0, 5.0, 5.0, 3.0, 5.0, 5.0, 5.0), Array(4.0, 4.0, 5.0, 4.0, 5.0, 4.0, 6.0, 6.0, 6.0, 4.0, 3.0, 4.0, 5.0), Array(4.0, 5.0, 5.0, 5.0, 5.0, 3.0, 3.0, 3.0, 5.0, 5.0, 3.0, 4.0, 3.0), Array(4.0, 3.0, 3.0, 5.0, 2.0, 3.0, 4.0, 3.0, 5.0, 3.0, 4.0, 4.0, 5.0), Array(3.0, 2.0, 3.0, 3.0, 3.0, 1.0, 3.0, 1.0, 3.0, 2.0, 3.0, 4.0, 3.0), Array(2.0, 4.0, 3.0, 4.0, 3.0, 4.0, 4.0, 5.0, 3.0, 2.0, 5.0, 6.0, 5.0), Array(4.0, 5.0, 5.0, 5.0, 4.0, 5.0, 4.0, 5.0, 5.0, 3.0, 5.0, 5.0, 6.0), Array(4.0, 3.0, 4.0, 3.0, 4.0, 6.0, 3.0, 3.0, 4.0, 5.0, 3.0, 4.0, 4.0), Array(1.0, 3.0, 2.0, 5.0, 3.0, 3.0, 3.0, 4.0, 3.0, 3.0, 3.0, 5.0, 4.0), Array(3.0, 3.0, 4.0, 3.0, 3.0, 4.0, 3.0, 4.0, 4.0, 2.0, 4.0, 5.0, 4.0), Array(5.0, 4.0, 5.0, 5.0, 5.0, 4.0, 5.0, 3.0, 5.0, 4.0, 5.0, 5.0, 5.0), Array(5.0, 3.0, 6.0, 3.0, 5.0, 3.0, 3.0, 3.0, 2.0, 6.0, 4.0, 6.0, 3.0), Array(3.0, 5.0, 4.0, 3.0, 5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 6.0, 6.0, 4.0), Array(4.0, 4.0, 5.0, 4.0, 4.0, 4.0, 4.0, 4.0, 6.0, 4.0, 6.0, 4.0, 4.0), Array(5.0, 6.0, 6.0, 5.0, 2.0, 5.0, 5.0, 5.0, 6.0, 4.0, 5.0, 6.0, 4.0), Array(3.0, 4.0, 3.0, 4.0, 3.0, 4.0, 4.0, 5.0, 3.0, 3.0, 3.0, 5.0, 4.0), Array(3.0, 3.0, 5.0, 3.0, 3.0, 3.0, 3.0, 4.0, 5.0, 3.0, 3.0, 4.0, 5.0), Array(2.0, 3.0, 3.0, 4.0, 1.0, 3.0, 2.0, 3.0, 3.0, 1.0, 3.0, 3.0, 3.0)) filterIdsWithProfile: (df: org.apache.spark.sql.DataFrame, pathLens: Seq[String], profile: Array[Integer], label: Integer)org.apache.spark.sql.DataFrame
// TODO: assign all users to a profile. Say we take the 100 most common profiles, then each user with a different profile form the 100 first must be assign to one of the 100. This can be done using k-mean for example.
profiles.map(Vectors.dense(_))
res41: Array[org.apache.spark.mllib.linalg.Vector] = Array([1.0,3.0,2.0,5.0,3.0,5.0,4.0,4.0,4.0,3.0,3.0,5.0,4.0], [3.0,2.0,3.0,3.0,3.0,2.0,3.0,1.0,3.0,2.0,3.0,4.0,3.0])
Array("[0.6, 0.6]", "[8.0, 8.0]").map(Vectors.parse(_))
res37: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.6,0.6], [8.0,8.0])
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
//import org.apache.spark.ml.clustering.KMeans
//import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
val initialModel = new KMeansModel(
profiles.map(Vectors.dense(_))
//.map(Vectors.parse(_))
)
val kmeans = new KMeans()
.setK(10)
.setInitialModel(initialModel)
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors initialModel: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@16053b15 kmeans: org.apache.spark.mllib.clustering.KMeans = org.apache.spark.mllib.clustering.KMeans@61881d02
df1.count()
res120: Long = 736034
val idPointRDD = df1.select(columnNames.head, columnNames.tail: _*).rdd.map(s => (s.getLong(0), Vectors.dense(s.getInt(1).toDouble,s.getInt(2).toDouble,s.getInt(3).toDouble,s.getInt(4).toDouble,s.getInt(5).toDouble,s.getInt(6).toDouble,s.getInt(7).toDouble,s.getInt(8).toDouble,s.getInt(9).toDouble,s.getInt(10).toDouble,s.getInt(11).toDouble,s.getInt(12).toDouble,s.getInt(13).toDouble))).cache()
val clusters = kmeans.run(idPointRDD.map(_._2))
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
idPointRDD: org.apache.spark.rdd.RDD[(Long, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[51605] at map at <console>:83 clusters: org.apache.spark.mllib.clustering.KMeansModel = org.apache.spark.mllib.clustering.KMeansModel@3a146e5c clustersRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[51651] at map at KMeansModel.scala:71 idClusterRDD: org.apache.spark.rdd.RDD[(Long, Int)] = ZippedPartitionsRDD2[51653] at zip at <console>:86
val idCluster = idClusterRDD.toDF("id", "cluster").cache()
idCluster: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, cluster: int]
val tm = df1.filter(col(pathLens(0))===0&&col(pathLens(1))===1&&col(pathLens(2))===0&&col(pathLens(3))===0&&col(pathLens(4))===0&&col(pathLens(5))===0&&col(pathLens(6))===0&&col(pathLens(7))===0&&col(pathLens(8))===0&&col(pathLens(9))===0&&col(pathLens(10))===0&&col(pathLens(11))===0&&col(pathLens(12))===0)
tm: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, jeremycorbynESumGeom: double ... 25 more fields]
tm.count() //11572
res17: Long = 11546
val tmNetwork = retweetNetwork.join(tm, $"CPostUserID"===$"id")
tmNetwork: org.apache.spark.sql.DataFrame = [OPostUserIDinRT: bigint, OPostUserSNinRT: string ... 31 more fields]
tmNetwork.count()
res20: Long = 20732
display(tmNetwork.groupBy("OPostUserSNinRT").agg(sum("Weight").as("sum")).orderBy($"sum".desc))
OPostUserSNinRT | sum |
---|---|
theresa_may | 14140.0 |
MailOnline | 158.0 |
realDonaldTrump | 147.0 |
RichardDawkins | 128.0 |
JaimeRicardoRam | 95.0 |
dbdevletbahceli | 76.0 |
MattyBRaps | 48.0 |
narendramodi | 47.0 |
giphz | 44.0 |
BishopJakes | 44.0 |
Daily_Express | 43.0 |
DailyMailUK | 43.0 |
pilard2017 | 42.0 |
AlArabiya_Brk | 42.0 |
Reyhan_News | 40.0 |
KBRILondon | 32.0 |
BillGates | 31.0 |
FinancialTimes | 29.0 |
TRobinsonNewEra | 26.0 |
youthparty_ng | 25.0 |
shanedawson | 24.0 |
rioferdy5 | 23.0 |
Cristiano | 22.0 |
CNBCArabia | 22.0 |
akshaykumar | 22.0 |
Pontifex | 20.0 |
naosejatrouxa | 20.0 |
kartalanalizcom | 18.0 |
Cooper4SAE | 18.0 |
SenusiTekkesi | 18.0 |
Truncated to 30 rows
println(df1.count()) //1083714
println(idCluster.count())
//736034
//736034
736034 736034
val clusterTotal = idCluster.select("cluster").withColumn("w", lit(1)).groupBy("cluster").agg(sum("w").as("total"))
clusterTotal: org.apache.spark.sql.DataFrame = [cluster: int, total: bigint]
df1.count()
res160: Long = 736034
display(clusterTotal.agg(sum("total")))
sum(total) |
---|
736034.0 |
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val columnNames = Seq("Id") ++ pathLens
val assembler1 = new VectorAssembler().
setInputCols(pathLens.toArray).
setOutputCol("features")
val assembled1 = assembler1.transform(df1.select(columnNames.head, columnNames.tail: _*)).select("id", "features")
display(assembled1)
display(eDF.join(idCluster, $"Id" === $"SrcVid"))
srcVid | dstVid | GeomProb | id | cluster |
---|---|---|---|---|
1.541364193e9 | 8.5871359392320717e17 | 5.8823529411764705e-2 | 1.541364193e9 | 13.0 |
1.541364193e9 | 2.172538184e9 | 0.1111111111111111 | 1.541364193e9 | 13.0 |
3.30717019e8 | 7.44856700969193472e17 | 2.631578947368421e-2 | 3.30717019e8 | 8.0 |
1.541364193e9 | 1.5245722e7 | 0.5 | 1.541364193e9 | 13.0 |
1.4085096e7 | 1.4623866e7 | 0.1111111111111111 | 1.4085096e7 | 4.0 |
2.2021978e7 | 3.161348518e9 | 7.692307692307693e-2 | 2.2021978e7 | 21.0 |
1.9902709e7 | 8.44126543819476992e17 | 3.4482758620689655e-2 | 1.9902709e7 | 21.0 |
2.5275453e7 | 7.1741696e7 | 0.3333333333333333 | 2.5275453e7 | 13.0 |
1.1777769e8 | 1.71061628e9 | 0.5 | 1.1777769e8 | 21.0 |
1.7061815e7 | 8.16610692648357888e17 | 0.3333333333333333 | 1.7061815e7 | 3.0 |
3.331501e7 | 6.34912541e8 | 5.405405405405406e-3 | 3.331501e7 | 3.0 |
3.751450582e9 | 2.40871024e8 | 0.3333333333333333 | 3.751450582e9 | 21.0 |
8.63729074103078912e17 | 8.84734946e8 | 0.3333333333333333 | 8.63729074103078912e17 | 21.0 |
2.653613168e9 | 1.49984507e8 | 0.16666666666666666 | 2.653613168e9 | 12.0 |
1.5010349e7 | 7.53447445611307008e17 | 0.2 | 1.5010349e7 | 18.0 |
5.45081356e8 | 9.06412351e8 | 0.5 | 5.45081356e8 | 21.0 |
2.91372292e9 | 7.6436317e7 | 0.25 | 2.91372292e9 | 5.0 |
3.331501e7 | 2.0116884e8 | 0.25 | 3.331501e7 | 3.0 |
1.6973333e7 | 1.0275162e7 | 0.1 | 1.6973333e7 | 2.0 |
1.1239873e8 | 3.311830742e9 | 0.5 | 1.1239873e8 | 24.0 |
6.5045121e7 | 3.87899173e8 | 8.333333333333333e-2 | 6.5045121e7 | 21.0 |
6.1183568e7 | 4.459625361e9 | 0.3333333333333333 | 6.1183568e7 | 4.0 |
6.2123765e7 | 2.41904099e8 | 0.5 | 6.2123765e7 | 1.0 |
1.4515799e7 | 7.96686724885807104e17 | 0.5 | 1.4515799e7 | 4.0 |
6.5045121e7 | 2.777899492e9 | 0.5 | 6.5045121e7 | 21.0 |
1.6139649e7 | 7.76728465231978496e17 | 0.25 | 1.6139649e7 | 13.0 |
5.24235256e8 | 2.4036615e7 | 0.5 | 5.24235256e8 | 2.0 |
1.1777769e8 | 2.78142484e9 | 9.523809523809525e-3 | 1.1777769e8 | 21.0 |
1.6973333e7 | 2.214619884e9 | 0.125 | 1.6973333e7 | 2.0 |
5.3674515e8 | 2.22631703e8 | 0.5 | 5.3674515e8 | 19.0 |
Truncated to 30 rows
object TupleUDFs {
import org.apache.spark.sql.functions.udf
// type tag is required, as we have a generic udf
import scala.reflect.runtime.universe.{TypeTag, typeTag}
def toTuple2[S: TypeTag, T: TypeTag] =
udf[(S, T), S, T]((x: S, y: T) => (x, y))
}
defined object TupleUDFs
val edges = eDF.select("srcVid", "dstVid").join(idCluster.toDF("id","srcCluster"), $"srcVid"===$"id").drop("id").join(idCluster.toDF("id", "dstCluster"), $"dstVid"===$"id").drop("id").withColumn(
"attr", TupleUDFs.toTuple2[Int, Int].apply($"srcCluster", $"dstCluster")
)
edges: org.apache.spark.sql.DataFrame = [srcVid: bigint, dstVid: bigint ... 3 more fields]
display(edges)
val res = ArrayBuffer[(Int,Int,Long)]()
for (i <- Range(0, 25); j <- Range(i, 25)) {
var count = edges.filter($"srcCluster"===i).filter($"dstCluster"===j).count()
println((i,j)+","+count);
res += ((i,j, count))
}
(0,0),7371 (0,1),3727 (0,2),218 (0,3),17154 (0,4),3248 (0,5),868 (0,6),164 (0,7),41 (0,8),633 (0,9),7 (0,10),22 (0,11),320 (0,12),196 (0,13),1299 (0,14),18 (0,15),134 (0,16),353 (0,17),4292 (0,18),2313 (0,19),14386 (0,20),102 (0,21),31880 (0,22),7535 (0,23),121 (0,24),2273 (1,1),113474 (1,2),238 (1,3),18615 (1,4),10124 (1,5),2367 (1,6),160 (1,7),16 (1,8),2636 (1,9),6 (1,10),23 (1,11),1531 (1,12),4584 (1,13),25047 (1,14),70 (1,15),250 (1,16),49 (1,17),108 (1,18),2153 (1,19),9363 (1,20),174 (1,21),32431 (1,22),6530 (1,23),549 (1,24),1871 (2,2),31418 (2,3),9530 (2,4),3135 (2,5),673 (2,6),19 (2,7),4 (2,8),2052 (2,9),47 (2,10),30 (2,11),111 (2,12),484 (2,13),2657 (2,14),10 (2,15),99 (2,16),55 (2,17),98 (2,18),1479 (2,19),5207 (2,20),61 (2,21),11799 (2,22),6156 (2,23),163 (2,24),939 (3,3),48104 (3,4),9617 (3,5),22609 (3,6),484 (3,7),18 (3,8),3735 (3,9),4 (3,10),75 (3,11),537 (3,12),362 (3,13),9298 (3,14),33 (3,15),449 (3,16),299 (3,17),199 (3,18),16555 (3,19),32148 (3,20),445 (3,21),72902 (3,22),17320 (3,23),433 (3,24),9529 (4,4),56089 (4,5),3203 (4,6),224 (4,7),15 (4,8),21621 (4,9),3 (4,10),68 (4,11),355 (4,12),1375 (4,13),37974 (4,14),15 (4,15),1423 (4,16),284 (4,17),118 (4,18),6513 (4,19),23076 (4,20),368 (4,21),64318 (4,22),10097 (4,23),363 (4,24),8721 (5,5),1919 (5,6),128 (5,7),141 (5,8),764 (5,9),2 (5,10),106 (5,11),206 (5,12),545 (5,13),2566 (5,14),8019 (5,15),127 (5,16),318 (5,17),2479 (5,18),1300 (5,19),4650 (5,20),239 (5,21),10322 (5,22),2603 (5,23),473 (5,24),998 (6,6),129 (6,7),5 (6,8),53 (6,9),17 (6,10),0 (6,11),25 (6,12),228 (6,13),422 (6,14),0 (6,15),18 (6,16),15 (6,17),14 (6,18),38 (6,19),459 (6,20),5 (6,21),1006 (6,22),237 (6,23),83 (6,24),174 (7,7),14743 (7,8),445 (7,9),15 (7,10),30 (7,11),459 (7,12),1130 (7,13),2654 (7,14),486 (7,15),3 (7,16),0 (7,17),83 (7,18),177 (7,19),1016 (7,20),92 (7,21),2937 (7,22),2571 (7,23),397 (7,24),989 (8,8),2782 (8,9),2 (8,10),92 (8,11),67 (8,12),1038 (8,13),2460 (8,14),7 (8,15),218 (8,16),577 (8,17),66 (8,18),2142 (8,19),7024 (8,20),120 (8,21),15079 (8,22),2215 (8,23),304 (8,24),1120 (9,9),1056 (9,10),135 (9,11),77 (9,12),193 (9,13),595 (9,14),28 (9,15),36 (9,16),17 (9,17),55 (9,18),248 (9,19),746 (9,20),31 (9,21),1213 (9,22),523 (9,23),162 (9,24),211 (10,10),487 (10,11),138 (10,12),318 (10,13),1185 (10,14),78 (10,15),81 (10,16),80 (10,17),141 (10,18),759 (10,19),2033 (10,20),61 (10,21),3839 (10,22),1135 (10,23),344 (10,24),574 (11,11),458 (11,12),440 (11,13),1815 (11,14),3080 (11,15),64 (11,16),50 (11,17),140 (11,18),938 (11,19),4189 (11,20),101 (11,21),9321 (11,22),3069 (11,23),662 (11,24),996 (12,12),10057 (12,13),6212 (12,14),186 (12,15),127 (12,16),62 (12,17),213 (12,18),716 (12,19),2728 (12,20),100 (12,21),6047 (12,22),1679 (12,23),915 (12,24),1489 (13,13),20811 (13,14),351 (13,15),590 (13,16),469 (13,17),1085 (13,18),4599 (13,19),17273 (13,20),665 (13,21),39454 (13,22),13080 (13,23),8016 (13,24),5179 (14,14),209 (14,15),10 (14,16),19 (14,17),106 (14,18),197 (14,19),521 (14,20),23 (14,21),989 (14,22),439 (14,23),102 (14,24),205 (15,15),7203 (15,16),435 (15,17),104 (15,18),684 (15,19),1665 (15,20),83 (15,21),2969 (15,22),990 (15,23),308 (15,24),1138 (16,16),6964 (16,17),41 (16,18),1026 (16,19),3187 (16,20),112 (16,21),5325 (16,22),1459 (16,23),323 (16,24),939 (17,17),6152 (17,18),690 (17,19),2245 (17,20),48 (17,21),4696 (17,22),1527 (17,23),136 (17,24),588 (18,18),7507 (18,19),13163 (18,20),345 (18,21),24793 (18,22),6124 (18,23),3036 (18,24),2741 (19,19),24247 (19,20),101 (19,21),50396 (19,22),7392 (19,23),285 (19,24),3380 (20,20),4778 (20,21),9191 (20,22),603 (20,23),65 (20,24),759 (21,21),202621 (21,22),58087 (21,23),178 (21,24),12625 (22,22),13929 (22,23),2318 (22,24),4548 (23,23),1332 (23,24),1806 (24,24),7828 res: scala.collection.mutable.ArrayBuffer[(Int, Int, Long)] = ArrayBuffer((0,0,7371), (0,1,3727), (0,2,218), (0,3,17154), (0,4,3248), (0,5,868), (0,6,164), (0,7,41), (0,8,633), (0,9,7), (0,10,22), (0,11,320), (0,12,196), (0,13,1299), (0,14,18), (0,15,134), (0,16,353), (0,17,4292), (0,18,2313), (0,19,14386), (0,20,102), (0,21,31880), (0,22,7535), (0,23,121), (0,24,2273), (1,1,113474), (1,2,238), (1,3,18615), (1,4,10124), (1,5,2367), (1,6,160), (1,7,16), (1,8,2636), (1,9,6), (1,10,23), (1,11,1531), (1,12,4584), (1,13,25047), (1,14,70), (1,15,250), (1,16,49), (1,17,108), (1,18,2153), (1,19,9363), (1,20,174), (1,21,32431), (1,22,6530), (1,23,549), (1,24,1871), (2,2,31418), (2,3,9530), (2,4,3135), (2,5,673), (2,6,19), (2,7,4), (2,8,2052), (2,9,47), (2,10,30), (2,11,111), (2,12,484), (2,13,2657), (2,14,10), (2,15,99), (2,16,55), (2,17,98), (2,18,1479), (2,19,5207), (2,20,61), (2,21,11799), (2,22,6156), (2,23,163), (2,24,939), (3,3,48104), (3,4,9617), (3,5,22609), (3,6,484), (3,7,18), (3,8,3735), (3,9,4), (3,10,75), (3,11,537), (3,12,362), (3,13,9298), (3,14,33), (3,15,449), (3,16,299), (3,17,199), (3,18,16555), (3,19,32148), (3,20,445), (3,21,72902), (3,22,17320), (3,23,433), (3,24,9529), (4,4,56089), (4,5,3203), (4,6,224), (4,7,15), (4,8,21621), (4,9,3), (4,10,68), (4,11,355), (4,12,1375), (4,13,37974), (4,14,15), (4,15,1423), (4,16,284), (4,17,118), (4,18,6513), (4,19,23076), (4,20,368), (4,21,64318), (4,22,10097), (4,23,363), (4,24,8721), (5,5,1919), (5,6,128), (5,7,141), (5,8,764), (5,9,2), (5,10,106), (5,11,206), (5,12,545), (5,13,2566), (5,14,8019), (5,15,127), (5,16,318), (5,17,2479), (5,18,1300), (5,19,4650), (5,20,239), (5,21,10322), (5,22,2603), (5,23,473), (5,24,998), (6,6,129), (6,7,5), (6,8,53), (6,9,17), (6,10,0), (6,11,25), (6,12,228), (6,13,422), (6,14,0), (6,15,18), (6,16,15), (6,17,14), (6,18,38), (6,19,459), (6,20,5), (6,21,1006), (6,22,237), (6,23,83), (6,24,174), (7,7,14743), (7,8,445), (7,9,15), (7,10,30), (7,11,459), (7,12,1130), (7,13,2654), (7,14,486), (7,15,3), (7,16,0), (7,17,83), (7,18,177), (7,19,1016), (7,20,92), (7,21,2937), (7,22,2571), (7,23,397), (7,24,989), (8,8,2782), (8,9,2), (8,10,92), (8,11,67), (8,12,1038), (8,13,2460), (8,14,7), (8,15,218), (8,16,577), (8,17,66), (8,18,2142), (8,19,7024), (8,20,120), (8,21,15079), (8,22,2215), (8,23,304), (8,24,1120), (9,9,1056), (9,10,135), (9,11,77), (9,12,193), (9,13,595), (9,14,28), (9,15,36), (9,16,17), (9,17,55), (9,18,248), (9,19,746), (9,20,31), (9,21,1213), (9,22,523), (9,23,162), (9,24,211), (10,10,487), (10,11,138), (10,12,318), (10,13,1185), (10,14,78), (10,15,81), (10,16,80), (10,17,141), (10,18,759), (10,19,2033), (10,20,61), (10,21,3839), (10,22,1135), (10,23,344), (10,24,574), (11,11,458), (11,12,440), (11,13,1815), (11,14,3080), (11,15,64), (11,16,50), (11,17,140), (11,18,938), (11,19,4189), (11,20,101), (11,21,9321), (11,22,3069), (11,23,662), (11,24,996), (12,12,10057), (12,13,6212), (12,14,186), (12,15,127), (12,16,62), (12,17,213), (12,18,716), (12,19,2728), (12,20,100), (12,21,6047), (12,22,1679), (12,23,915), (12,24,1489), (13,13,20811), (13,14,351), (13,15,590), (13,16,469), (13,17,1085), (13,18,4599), (13,19,17273), (13,20,665), (13,21,39454), (13,22,13080), (13,23,8016), (13,24,5179), (14,14,209), (14,15,10), (14,16,19), (14,17,106), (14,18,197), (14,19,521), (14,20,23), (14,21,989), (14,22,439), (14,23,102), (14,24,205), (15,15,7203), (15,16,435), (15,17,104), (15,18,684), (15,19,1665), (15,20,83), (15,21,2969), (15,22,990), (15,23,308), (15,24,1138), (16,16,6964), (16,17,41), (16,18,1026), (16,19,3187), (16,20,112), (16,21,5325), (16,22,1459), (16,23,323), (16,24,939), (17,17,6152), (17,18,690), (17,19,2245), (17,20,48), (17,21,4696), (17,22,1527), (17,23,136), (17,24,588), (18,18,7507), (18,19,13163), (18,20,345), (18,21,24793), (18,22,6124), (18,23,3036), (18,24,2741), (19,19,24247), (19,20,101), (19,21,50396), (19,22,7392), (19,23,285), (19,24,3380), (20,20,4778), (20,21,9191), (20,22,603), (20,23,65), (20,24,759), (21,21,202621), (21,22,58087), (21,23,178), (21,24,12625), (22,22,13929), (22,23,2318), (22,24,4548), (23,23,1332), (23,24,1806), (24,24,7828))
// Now we loop through all subgraphs with respect to each cluster index.
val connectionsInSubgraph = collection.mutable.Map[Int, Long]()
for (i <- Range(0,3)) {
for (j <- Range(0,3)) {
}
val subgraph = graph.subgraph(epred = (id, attr) => attr._2 == i)
connectionsInSubgraph(i) = subgraph.edges.count()
}
// Now we loop through all subgraphs with respect to each cluster index.
val connectionsInSubgraph = collection.mutable.Map[Int, Long]()
for (i <- Range(0,3)) {
val subgraph = graph.subgraph(vpred = (id, attr) => attr._2 == i)
connectionsInSubgraph(i) = subgraph.edges.count()
}
connectionsInSubgraph: scala.collection.mutable.Map[Int,Long] = Map(2 -> 31418, 1 -> 113474, 0 -> 7371)
URLS
val allUrlAndDomains = (sc.textFile("/FileStore/tables/7yaczwjd1501068230338/checkedUrlWithNetLocFinal.csv")
.map(x => x.split(","))
.map(x => (x(0).toString, x(1).toString))
.toDF(Seq("URL", "Domain"): _*))
allUrlAndDomains: org.apache.spark.sql.DataFrame = [URL: string, Domain: string]
allUrlAndDomains.count
res169: Long = 30562
val urlNetwork = retweetNetwork.join(allUrlAndDomains, Seq("URL"), "outer")
urlNetwork: org.apache.spark.sql.DataFrame = [URL: string, OPostUserIDinRT: bigint ... 5 more fields]
display(urlNetwork)
URL | OPostUserIDinRT | OPostUserSNinRT | CPostUserID | CPostUserSN | Weight | Domain |
---|---|---|---|---|---|---|
http://53eig.ht/2rp4MtB | 2.62872637e8 | MoisesNaim | 3.21589427e8 | stephenWalt | 1.0 | null |
http://Cambridge-United.co.uk | 1.69046895e8 | CambridgeUtdFC | 2.0317326e7 | Annkell | 1.0 | www.cambridge-united.co.uk |
http://Cambridge-United.co.uk | 1.69046895e8 | CambridgeUtdFC | 4.0623515e7 | AGraizevsky | 1.0 | www.cambridge-united.co.uk |
http://Cambridge-United.co.uk | 1.69046895e8 | CambridgeUtdFC | 1.439542537e9 | DSole1 | 1.0 | www.cambridge-united.co.uk |
http://FilmFreeway.com | 6.9775165e7 | Jeff_Hansen | 1.7156e7 | seashepherd | 1.0 | null |
http://FilmFreeway.com | 4.5830001e7 | Wilygoose | 6.3141362e7 | corinstuart | 1.0 | null |
http://FilmFreeway.com | 4.5830001e7 | Wilygoose | 6.3141362e7 | corinstuart | 1.0 | null |
http://FilmFreeway.com | 4.5830001e7 | Wilygoose | 6.3141362e7 | corinstuart | 1.0 | null |
http://FilmFreeway.com | 4.5830001e7 | Wilygoose | 6.3141362e7 | corinstuart | 1.0 | null |
http://FilmFreeway.com | 1.405527242e9 | PFMediaUK | 2.190444236e9 | DannersJameson | 1.0 | null |
http://FilmFreeway.com | 2.791892884e9 | LiTphils | 3.31294754e8 | KellConnery | 1.0 | null |
http://FilmFreeway.com | 7.04984124952436736e17 | punks4westpapua | 2.3586483e7 | evaa31 | 1.0 | null |
http://FilmFreeway.com | 2.177985414e9 | GenesiusPicture | 3.130056581e9 | DebbieGfilm | 1.0 | null |
http://FilmFreeway.com | 4.22048034e8 | JackGrewar | 5.6108861e7 | bmattyb323 | 1.0 | null |
http://Gov.uk/register-to-vote | 1.544495239e9 | ShefHallamLab | 4.904071421e9 | Stannington_Lab | 1.0 | null |
http://Wonderful.org | 3.909202761e9 | wonderful_org | 2.40869183e8 | al_ritchie | 1.0 | null |
http://Www.kendalpoetryfestival.co.uk | 7.05434144910741504e17 | KendalPoetry | 5.97696492e8 | hannahlowepoet | 1.0 | null |
http://a.msn.com/01/en-gb/BBCnjbg?ocid=st | 2.04864531e8 | EtonOldBoys | 1.82342346e8 | HerefordLabour | 1.0 | www.independent.co.uk |
http://a.msn.com/01/en-gb/BBCnjbg?ocid=st | 2.04864531e8 | EtonOldBoys | 1.82342346e8 | HerefordLabour | 1.0 | www.independent.co.uk |
http://a.msn.com/r/2/BBByhyv?a=1&m=EN-GB | 5.26621053e8 | FountainsCourt | 8.111994504881152e17 | amicon_13 | 1.0 | null |
http://abc7.la/2s9Fov8 | 1.6374678e7 | ABC7 | 7.4837469e7 | sicvic24 | 1.0 | null |
http://absrad.io/2rB8GD7 | 1.6085557e7 | absoluteradio | 1.1361392e8 | NoelGallagher | 1.0 | null |
http://agbr.me/2tiss6K | 1.123082485e9 | arianagrandebr | 8.49084665394864128e17 | ArianaG04076749 | 1.0 | null |
http://aje.io/mjjs | 4970411.0 | AJEnglish | 4.85254523e8 | HamdunH | 1.0 | null |
http://aje.io/y9ql | 4970411.0 | AJEnglish | 7.26485502141014016e17 | TurdWorldWar | 1.0 | null |
http://allafrica.com/stories/201705020090.html?utm\_campaign=allafrica%3Ainternal&utm\_medium=social&utm\_source=twitter&utm\_content=promote%3Aaans%3Aabafbt | 1.6683014e7 | allafrica | 1.288430442e9 | eyesopenershaw | 1.0 | null |
http://amzn.to/2aTHU2e | 1.412560921e9 | myrddinsheir | 3.18686456e8 | mystery1165 | 1.0 | null |
http://apne.ws/2qL6k0x | 5.1241574e7 | AP | 6.22938139e8 | HUBBE_PAKISTAN | 1.0 | null |
http://apne.ws/2qL6k0x | 5.1241574e7 | AP | 2.531129465e9 | ZoraGouhary | 1.0 | null |
http://apne.ws/2syD0y1 | 5.1241574e7 | AP | 8.49027382090416128e17 | bluenewsnow | 1.0 | null |
Truncated to 30 rows
urlNetwork.count()
res167: Long = 6382207
urlNetwork.filter($"URL"==="").count()
res175: Long = 5344405
6382207-5344405 // number of tweets with URLs
res176: Int = 1037802
urlNetwork.select("URL").distinct().count()
res168: Long = 271684
urlNetwork.select("URL","Domain").distinct().count()
dbutils.fs.rm("/datasets/MEP/GB/RetweetUrlNetworkAsParquetDF",true)
res172: Boolean = true
// write this urlNetwork to parquet for processing later
urlNetwork.write.parquet("/datasets/MEP/GB/RetweetUrlNetworkAsParquetDF")