SDS-2.2, Scalable Data Science
Archived YouTube video of this live unedited lab-lecture:
Twitter Hashtag Count
Using Twitter Streaming is a great way to learn Spark Streaming if you don't have your streaming datasource and want a great rich input dataset to try Spark Streaming transformations on.
In this example, we show how to calculate the top hashtags seen in the last X window of time every Y time unit.
Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take responsibility for the use of this knowledge and conform to the rules and policies linked below.
Remeber that the use of twitter itself comes with various strings attached. Read:
Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:
"scalable-data-science/sds-2-2/025_a_extendedTwitterUtils2run"
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark._ import org.apache.spark.storage._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter.TwitterUtils import scala.math.Ordering import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder
Step 1: Enter your Twitter API Credentials.
- Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
- Run the code in a cell to Enter your own credentials.
// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block
/*
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
def MyconsumerKey = "xxx"
def MyconsumerSecret = "xxx"
def Mytoken = "xxx"
def MytokenSecret = "xxx"
System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
*/
The cell-below is hidden to not expose my Twitter API Credentials: consumerKey
, consumerSecret
, accessToken
and accessTokenSecret
. Use the code above to enter your own credentials!
"scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
If you see warnings then ignore for now: https://forums.databricks.com/questions/6941/change-in-getargument-for-notebook-input.html.
Step 2: Configure where to output the top hashtags and how often to compute them.
- Run this cell for the input cells to appear.
- Enter your credentials.
- Run the cell again to pick up your defaults.
val outputDirectory = "/datasets/tweetsStreamTmp" // output directory
//Recompute the top hashtags every N seconds. N=1
val slideInterval = new Duration(10 * 1000) // 1000 milliseconds is 1 second!
//Compute the top hashtags for the last M seconds. M=5
val windowLength = new Duration(30 * 1000)
// Wait W seconds before stopping the streaming job. W=100
val timeoutJobLength = 20 * 1000
outputDirectory: String = /datasets/tweetsStreamTmp slideInterval: org.apache.spark.streaming.Duration = 10000 ms windowLength: org.apache.spark.streaming.Duration = 30000 ms timeoutJobLength: Int = 20000
Step 3: Run the Twitter Streaming job.
Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
Clean up any old files.
dbutils.fs.rm(outputDirectory, true)
res17: Boolean = true
Let us write the function that creates the Streaming Context and sets up the streaming job.
var newContextCreated = false
var num = 0
// This is a helper class used for ordering by the second value in a (String, Int) tuple
import scala.math.Ordering
object SecondValueOrdering extends Ordering[(String, Int)] {
def compare(a: (String, Int), b: (String, Int)) = {
a._2 compare b._2
}
}
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def creatingFunc(): StreamingContext = {
// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
// Create a Twitter Stream for the input source.
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
// Parse the tweets and gather the hashTags.
val hashTagStream = twitterStream.map(_.getText).flatMap(_.split(" ")).filter(_.startsWith("#"))
// Compute the counts of each hashtag by window.
// reduceByKey on a window of length windowLength
// Once this is computed, slide the window by slideInterval and calculate reduceByKey again for the second window
val windowedhashTagCountStream = hashTagStream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowLength, slideInterval)
// For each window, calculate the top hashtags for that time period.
windowedhashTagCountStream.foreachRDD(hashTagCountRDD => {
val topEndpoints = hashTagCountRDD.top(20)(SecondValueOrdering)
dbutils.fs.put(s"${outputDirectory}/top_hashtags_${num}", topEndpoints.mkString("\n"), true)
println(s"------ TOP HASHTAGS For window ${num}")
println(topEndpoints.mkString("\n"))
num = num + 1
})
newContextCreated = true
ssc
}
newContextCreated: Boolean = false num: Int = 0 import scala.math.Ordering defined object SecondValueOrdering creatingFunc: ()org.apache.spark.streaming.StreamingContext
Create the StreamingContext using getActiveOrCreate, as required when starting a streaming job in Databricks.
val ssc = StreamingContext.getActiveOrCreate(creatingFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3f9f7eb1
Start the Spark Streaming Context and return when the Streaming job exits or return with the specified timeout.
ssc.start()
ssc.awaitTerminationOrTimeout(timeoutJobLength)
ssc.stop(stopSparkContext = false)
Wrote 477 bytes. ------ TOP HASHTAGS For window 0 (#izmirescort,2) (#YouAre,2) (#갓세븐_YouAre_정오공개,1) (#TFB,1) (#Doctors,1) (#CCTVDiskominfoMKS,1) (#longsor,1) (#MakeLifeBetter,1) (#갓세븐 #7for7,1) (#ㅅㅏ설토ㅌㅗ추천사ㅇㅣ트 #ㅅㅏ설ㅌㅗ토추천사ㅇㅣ트 ❕☽❇M☣️⛹ A ☄ ➡️들어가기➡️,1) (#ターナーアワード,1) (#Zaragoza,1) (#박우진,1) (#DiaInternacionaldelaNena,1) (#윤지성,1) (#방탄소년단,1) (#정국,1) (#워너원…,1) (#ﷺ,1) (#JeuConcours,1) Wrote 511 bytes. ------ TOP HASHTAGS For window 1 (#YouAre,4) (#izmirescort,4) (#MPN,3) (#シナモンメルツ,3) (#방탄소년단,3) (#LittleMix,2) (#KCAArgentina,2) (#恋愛,1) (#안전공원 #ㅅㅏ설토토ㅅㅏㅇㅣ트추천 안-전-놀-이-터 ✩안전-놀-이-터✩ ㊙,1) (#tspo,1) (#BackTheBlue,1) (#odaibako 帰りたいですね,1) (#amwriting,1) (#Resistencia,1) (#拡散希望,1) (#セクシー,1) (#Passion https://t.co/CBKI0z2rSR,1) (#ゴーストホスピタル 療養病棟担当 オーレリー,1) (#Dosogas,1) (#BiggBoss11…,1)
Check out the Clusters 'Streaming` UI as the job is running.
It should automatically stop the streaming job after timeoutJobLength
.
If not, then stop any active Streaming Contexts, but don't stop the spark contexts they are attached to using the following command.
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
Step 4: View the Results.
display(dbutils.fs.ls(outputDirectory))
path | name | size |
---|---|---|
dbfs:/datasets/tweetsStreamTmp/top_hashtags_0 | top_hashtags_0 | 0.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_1 | top_hashtags_1 | 30.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_10 | top_hashtags_10 | 163.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_11 | top_hashtags_11 | 277.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_12 | top_hashtags_12 | 317.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_13 | top_hashtags_13 | 345.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_14 | top_hashtags_14 | 290.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_15 | top_hashtags_15 | 256.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_16 | top_hashtags_16 | 227.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_17 | top_hashtags_17 | 184.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_18 | top_hashtags_18 | 141.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_19 | top_hashtags_19 | 170.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_2 | top_hashtags_2 | 106.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_20 | top_hashtags_20 | 183.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_21 | top_hashtags_21 | 165.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_22 | top_hashtags_22 | 203.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_23 | top_hashtags_23 | 239.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_24 | top_hashtags_24 | 200.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_25 | top_hashtags_25 | 175.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_26 | top_hashtags_26 | 211.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_27 | top_hashtags_27 | 192.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_28 | top_hashtags_28 | 217.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_29 | top_hashtags_29 | 216.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_3 | top_hashtags_3 | 256.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_30 | top_hashtags_30 | 349.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_31 | top_hashtags_31 | 291.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_32 | top_hashtags_32 | 168.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_33 | top_hashtags_33 | 205.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_34 | top_hashtags_34 | 191.0 |
dbfs:/datasets/tweetsStreamTmp/top_hashtags_35 | top_hashtags_35 | 239.0 |
Truncated to 30 rows
There should be 100 intervals for each second and the top hashtags for each of them should be in the file top_hashtags_N
for N
in 0,1,2,...,99 and the top hashtags should be based on the past 5 seconds window.
dbutils.fs.head(s"${outputDirectory}/top_hashtags_11")
res15: String = (#Nami,1) (#WorldOctopusDay,1) (#Terrebonne,1) (#ヒグチユウコ,1) (#シナモンメルツ,1) (#مملكة_قلم_للدعم #المقابيل_للدعم #ذئب_سلمان_للدعم #مملكه_شيخه_للدعم,1) (#CCH32,1) (#beBee,1) (#WeeklyIdol,1) (#SAMURAIBLUE,1) twitter OAuth Credentials loaded MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder
Let's brainstorm a bit now
What could you do with this type of streaming capability?
- marketing?
- pharmaceutical vigilance?
- linking twitter activity to mass media activity?
- ...
Note that there are various Spark Streaming ML algorithms that one could easily throw at such reduceByKeyAndWindow
tweet streams:
Student Project or Volunteer for next Meetup - let's check it out now:
HOME-WORK: