SDS-2.2, Scalable Data Science
Archived YouTube video of this live unedited lab-lecture:
Tweet Collector - capture live tweets
Here are the main steps in this notebook:
let's collect from the public twitter stream and write to DBFS as json strings in a boiler-plate manner to understand the componets better.
Then we will turn the collector into a function and use it
- Finally we will use some DataFrame-based pipelines to convert the raw tweets into other structured content.
We will call extendedTwitterUtils notebook from here.
But first install the following libraries:
- gson
- twitter4j-examples
"scalable-data-science/sds-2-2/025_a_extendedTwitterUtils2run"
Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.
val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true)
res40: Boolean = true
Capture tweets in every sliding window of slideInterval
many milliseconds.
val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms
Recall that Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark?s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.
Let's import google's json library next.
import com.google.gson.Gson
import com.google.gson.Gson
Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.
// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@ced654d
CAUTION
Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take legal responsibility for the use of this knowledge and conform to the rules and policies linked below.
Remeber that the use of twitter itself comes with various strings attached. Read:
Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:
Enter your own Twitter API Credentials.
- Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
- Get your own Twitter API Credentials:
consumerKey
,consumerSecret
,accessToken
andaccessTokenSecret
and enter them in the cell below.
Ethical/Legal Aspects
See Background Readings/Viewings in Project MEP:
"scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block
/*
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
def MyconsumerKey = "xxx"
def MyconsumerSecret = "xxx"
def Mytoken = "xxx"
def MytokenSecret = "xxx"
System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
*/
// Create a Twitter Stream for the input source.
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='fo0EEh1tnH8WVJdgJPrZ47wD0', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}}) twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@571b3fd4
Let's map the tweets into json formatted string (one tweet per line).
val twitterStreamJson = twitterStream.map(
x => { val gson = new Gson();
val xJson = gson.toJson(x)
xJson
}
)
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@6817a6ee twitter OAuth Credentials loaded MyconsumerKey: String MyconsumerSecret: String Mytoken: String MytokenSecret: String import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder
var numTweetsCollected = 0L // track number of tweets collected
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
twitterStreamJson.foreachRDD(
(rdd, time) => { // for each RDD in the DStream
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile
numTweetsCollected += count // update with the latest count
}
}
)
numTweetsCollected: Long = 0 partitionsEachInterval: Int = 1
Nothing has actually happened yet.
Let's start the spark streaming context we have created next.
ssc.start()
Let's look at the spark UI now and monitor the streaming job in action! Go to Clusters
on the left and click on UI
and then Streaming
.
numTweetsCollected // number of tweets collected so far
res48: Long = 0
Let's try seeing again in a few seconds how many tweets have been collected up to now.
numTweetsCollected // number of tweets collected so far
res11: Long = 187
Note that you could easilt fill up disk space!!!
So let's stop the streaming job next.
ssc.stop(stopSparkContext = false) // gotto stop soon!!!
Let's make sure that the Streaming
UI is not active in the Clusters
UI
.
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // extra cautious stopping of all active streaming contexts
Let's examine what was saved in dbfs
display(dbutils.fs.ls(outputDirectoryRoot))
path | name | size |
---|---|---|
dbfs:/datasets/tweetsStreamTmp/tweets_1507184798000/ | tweets_1507184798000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184799000/ | tweets_1507184799000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184800000/ | tweets_1507184800000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184801000/ | tweets_1507184801000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/ | tweets_1507184802000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184803000/ | tweets_1507184803000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184804000/ | tweets_1507184804000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184805000/ | tweets_1507184805000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184806000/ | tweets_1507184806000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184807000/ | tweets_1507184807000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184808000/ | tweets_1507184808000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184809000/ | tweets_1507184809000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184810000/ | tweets_1507184810000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184811000/ | tweets_1507184811000/ | 0.0 |
val tweetsDir = outputDirectoryRoot+"/tweets_1507184802000/" // use an existing file, may have to rename folder based on output above!
tweetsDir: String = /datasets/tweetsStreamTmp/tweets_1507184802000/
display(dbutils.fs.ls(tweetsDir))
path | name | size |
---|---|---|
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/_SUCCESS | _SUCCESS | 0.0 |
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/part-00000 | part-00000 | 125602.0 |
sc.textFile(tweetsDir+"part-00000").count()
res16: Long = 36
val outJson = sqlContext.read.json(tweetsDir+"part-00000")
outJson: org.apache.spark.sql.DataFrame = [contributorsIDs: array<string>, createdAt: string ... 25 more fields]
outJson.printSchema()
root |-- contributorsIDs: array (nullable = true) | |-- element: string (containsNull = true) |-- createdAt: string (nullable = true) |-- currentUserRetweetId: long (nullable = true) |-- displayTextRangeEnd: long (nullable = true) |-- displayTextRangeStart: long (nullable = true) |-- favoriteCount: long (nullable = true) |-- hashtagEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- end: long (nullable = true) | | |-- start: long (nullable = true) | | |-- text: string (nullable = true) |-- id: long (nullable = true) |-- inReplyToScreenName: string (nullable = true) |-- inReplyToStatusId: long (nullable = true) |-- inReplyToUserId: long (nullable = true) |-- isFavorited: boolean (nullable = true) |-- isPossiblySensitive: boolean (nullable = true) |-- isRetweeted: boolean (nullable = true) |-- isTruncated: boolean (nullable = true) |-- lang: string (nullable = true) |-- mediaEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- displayURL: string (nullable = true) | | |-- end: long (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- id: long (nullable = true) | | |-- mediaURL: string (nullable = true) | | |-- mediaURLHttps: string (nullable = true) | | |-- sizes: struct (nullable = true) | | | |-- 0: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 1: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 2: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- 3: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- resize: long (nullable = true) | | | | |-- width: long (nullable = true) | | |-- start: long (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- videoAspectRatioHeight: long (nullable = true) | | |-- videoAspectRatioWidth: long (nullable = true) | | |-- videoDurationMillis: long (nullable = true) | | |-- videoVariants: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- bitrate: long (nullable = true) | | | | |-- contentType: string (nullable = true) | | | | |-- url: string (nullable = true) |-- quotedStatus: struct (nullable = true) | |-- contributorsIDs: array (nullable = true) | | |-- element: string (containsNull = true) | |-- createdAt: string (nullable = true) | |-- currentUserRetweetId: long (nullable = true) | |-- displayTextRangeEnd: long (nullable = true) | |-- displayTextRangeStart: long (nullable = true) | |-- favoriteCount: long (nullable = true) | |-- hashtagEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- end: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- text: string (nullable = true) | |-- id: long (nullable = true) | |-- inReplyToScreenName: string (nullable = true) | |-- inReplyToStatusId: long (nullable = true) | |-- inReplyToUserId: long (nullable = true) | |-- isFavorited: boolean (nullable = true) | |-- isPossiblySensitive: boolean (nullable = true) | |-- isRetweeted: boolean (nullable = true) | |-- isTruncated: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- mediaEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- quotedStatusId: long (nullable = true) | |-- retweetCount: long (nullable = true) | |-- source: string (nullable = true) | |-- symbolEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- text: string (nullable = true) | |-- urlEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- start: long (nullable = true) | | | |-- url: string (nullable = true) | |-- user: struct (nullable = true) | | |-- createdAt: string (nullable = true) | | |-- description: string (nullable = true) | | |-- descriptionURLEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- favouritesCount: long (nullable = true) | | |-- followersCount: long (nullable = true) | | |-- friendsCount: long (nullable = true) | | |-- id: long (nullable = true) | | |-- isContributorsEnabled: boolean (nullable = true) | | |-- isDefaultProfile: boolean (nullable = true) | | |-- isDefaultProfileImage: boolean (nullable = true) | | |-- isFollowRequestSent: boolean (nullable = true) | | |-- isGeoEnabled: boolean (nullable = true) | | |-- isProtected: boolean (nullable = true) | | |-- isVerified: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- listedCount: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- profileBackgroundColor: string (nullable = true) | | |-- profileBackgroundImageUrl: string (nullable = true) | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | |-- profileBackgroundTiled: boolean (nullable = true) | | |-- profileBannerImageUrl: string (nullable = true) | | |-- profileImageUrl: string (nullable = true) | | |-- profileImageUrlHttps: string (nullable = true) | | |-- profileLinkColor: string (nullable = true) | | |-- profileSidebarBorderColor: string (nullable = true) | | |-- profileSidebarFillColor: string (nullable = true) | | |-- profileTextColor: string (nullable = true) | | |-- profileUseBackgroundImage: boolean (nullable = true) | | |-- screenName: string (nullable = true) | | |-- showAllInlineMedia: boolean (nullable = true) | | |-- statusesCount: long (nullable = true) | | |-- timeZone: string (nullable = true) | | |-- translator: boolean (nullable = true) | | |-- url: string (nullable = true) | | |-- utcOffset: long (nullable = true) | |-- userMentionEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- end: long (nullable = true) | | | |-- id: long (nullable = true) | | | |-- name: string (nullable = true) | | | |-- screenName: string (nullable = true) | | | |-- start: long (nullable = true) |-- quotedStatusId: long (nullable = true) |-- retweetCount: long (nullable = true) |-- retweetedStatus: struct (nullable = true) | |-- contributorsIDs: array (nullable = true) | | |-- element: string (containsNull = true) | |-- createdAt: string (nullable = true) | |-- currentUserRetweetId: long (nullable = true) | |-- displayTextRangeEnd: long (nullable = true) | |-- displayTextRangeStart: long (nullable = true) | |-- favoriteCount: long (nullable = true) | |-- hashtagEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- end: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- text: string (nullable = true) | |-- id: long (nullable = true) | |-- inReplyToScreenName: string (nullable = true) | |-- inReplyToStatusId: long (nullable = true) | |-- inReplyToUserId: long (nullable = true) | |-- isFavorited: boolean (nullable = true) | |-- isPossiblySensitive: boolean (nullable = true) | |-- isRetweeted: boolean (nullable = true) | |-- isTruncated: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- mediaEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- id: long (nullable = true) | | | |-- mediaURL: string (nullable = true) | | | |-- mediaURLHttps: string (nullable = true) | | | |-- sizes: struct (nullable = true) | | | | |-- 0: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 1: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 2: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | | |-- 3: struct (nullable = true) | | | | | |-- height: long (nullable = true) | | | | | |-- resize: long (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- start: long (nullable = true) | | | |-- type: string (nullable = true) | | | |-- url: string (nullable = true) | | | |-- videoAspectRatioHeight: long (nullable = true) | | | |-- videoAspectRatioWidth: long (nullable = true) | | | |-- videoDurationMillis: long (nullable = true) | | | |-- videoVariants: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- bitrate: long (nullable = true) | | | | | |-- contentType: string (nullable = true) | | | | | |-- url: string (nullable = true) | |-- place: struct (nullable = true) | | |-- boundingBoxCoordinates: array (nullable = true) | | | |-- element: array (containsNull = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- latitude: double (nullable = true) | | | | | |-- longitude: double (nullable = true) | | |-- boundingBoxType: string (nullable = true) | | |-- country: string (nullable = true) | | |-- countryCode: string (nullable = true) | | |-- fullName: string (nullable = true) | | |-- id: string (nullable = true) | | |-- name: string (nullable = true) | | |-- placeType: string (nullable = true) | | |-- url: string (nullable = true) | |-- quotedStatus: struct (nullable = true) | | |-- contributorsIDs: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- createdAt: string (nullable = true) | | |-- currentUserRetweetId: long (nullable = true) | | |-- displayTextRangeEnd: long (nullable = true) | | |-- displayTextRangeStart: long (nullable = true) | | |-- favoriteCount: long (nullable = true) | | |-- hashtagEntities: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- end: long (nullable = true) | | | | |-- start: long (nullable = true) | | | | |-- text: string (nullable = true) | | |-- id: long (nullable = true) | | |-- inReplyToScreenName: string (nullable = true) | | |-- inReplyToStatusId: long (nullable = true) | | |-- inReplyToUserId: long (nullable = true) | | |-- isFavorited: boolean (nullable = true) | | |-- isPossiblySensitive: boolean (nullable = true) | | |-- isRetweeted: boolean (nullable = true) | | |-- isTruncated: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- mediaEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- quotedStatusId: long (nullable = true) | | |-- retweetCount: long (nullable = true) | | |-- source: string (nullable = true) | | |-- symbolEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- text: string (nullable = true) | | |-- urlEntities: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- displayURL: string (nullable = true) | | | | |-- end: long (nullable = true) | | | | |-- expandedURL: string (nullable = true) | | | | |-- start: long (nullable = true) | | | | |-- url: string (nullable = true) | | |-- user: struct (nullable = true) | | | |-- createdAt: string (nullable = true) | | | |-- description: string (nullable = true) | | | |-- descriptionURLEntities: array (nullable = true) | | | | |-- element: string (containsNull = true) | | | |-- favouritesCount: long (nullable = true) | | | |-- followersCount: long (nullable = true) | | | |-- friendsCount: long (nullable = true) | | | |-- id: long (nullable = true) | | | |-- isContributorsEnabled: boolean (nullable = true) | | | |-- isDefaultProfile: boolean (nullable = true) | | | |-- isDefaultProfileImage: boolean (nullable = true) | | | |-- isFollowRequestSent: boolean (nullable = true) | | | |-- isGeoEnabled: boolean (nullable = true) | | | |-- isProtected: boolean (nullable = true) | | | |-- isVerified: boolean (nullable = true) | | | |-- lang: string (nullable = true) | | | |-- listedCount: long (nullable = true) | | | |-- location: string (nullable = true) | | | |-- name: string (nullable = true) | | | |-- profileBackgroundColor: string (nullable = true) | | | |-- profileBackgroundImageUrl: string (nullable = true) | | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | | |-- profileBackgroundTiled: boolean (nullable = true) | | | |-- profileBannerImageUrl: string (nullable = true) | | | |-- profileImageUrl: string (nullable = true) | | | |-- profileImageUrlHttps: string (nullable = true) | | | |-- profileLinkColor: string (nullable = true) | | | |-- profileSidebarBorderColor: string (nullable = true) | | | |-- profileSidebarFillColor: string (nullable = true) | | | |-- profileTextColor: string (nullable = true) | | | |-- profileUseBackgroundImage: boolean (nullable = true) | | | |-- screenName: string (nullable = true) | | | |-- showAllInlineMedia: boolean (nullable = true) | | | |-- statusesCount: long (nullable = true) | | | |-- timeZone: string (nullable = true) | | | |-- translator: boolean (nullable = true) | | | |-- url: string (nullable = true) | | | |-- utcOffset: long (nullable = true) | | |-- userMentionEntities: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- end: long (nullable = true) | | | | |-- id: long (nullable = true) | | | | |-- name: string (nullable = true) | | | | |-- screenName: string (nullable = true) | | | | |-- start: long (nullable = true) | |-- quotedStatusId: long (nullable = true) | |-- retweetCount: long (nullable = true) | |-- source: string (nullable = true) | |-- symbolEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- text: string (nullable = true) | |-- urlEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- displayURL: string (nullable = true) | | | |-- end: long (nullable = true) | | | |-- expandedURL: string (nullable = true) | | | |-- start: long (nullable = true) | | | |-- url: string (nullable = true) | |-- user: struct (nullable = true) | | |-- createdAt: string (nullable = true) | | |-- description: string (nullable = true) | | |-- descriptionURLEntities: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- favouritesCount: long (nullable = true) | | |-- followersCount: long (nullable = true) | | |-- friendsCount: long (nullable = true) | | |-- id: long (nullable = true) | | |-- isContributorsEnabled: boolean (nullable = true) | | |-- isDefaultProfile: boolean (nullable = true) | | |-- isDefaultProfileImage: boolean (nullable = true) | | |-- isFollowRequestSent: boolean (nullable = true) | | |-- isGeoEnabled: boolean (nullable = true) | | |-- isProtected: boolean (nullable = true) | | |-- isVerified: boolean (nullable = true) | | |-- lang: string (nullable = true) | | |-- listedCount: long (nullable = true) | | |-- location: string (nullable = true) | | |-- name: string (nullable = true) | | |-- profileBackgroundColor: string (nullable = true) | | |-- profileBackgroundImageUrl: string (nullable = true) | | |-- profileBackgroundImageUrlHttps: string (nullable = true) | | |-- profileBackgroundTiled: boolean (nullable = true) | | |-- profileBannerImageUrl: string (nullable = true) | | |-- profileImageUrl: string (nullable = true) | | |-- profileImageUrlHttps: string (nullable = true) | | |-- profileLinkColor: string (nullable = true) | | |-- profileSidebarBorderColor: string (nullable = true) | | |-- profileSidebarFillColor: string (nullable = true) | | |-- profileTextColor: string (nullable = true) | | |-- profileUseBackgroundImage: boolean (nullable = true) | | |-- screenName: string (nullable = true) | | |-- showAllInlineMedia: boolean (nullable = true) | | |-- statusesCount: long (nullable = true) | | |-- timeZone: string (nullable = true) | | |-- translator: boolean (nullable = true) | | |-- url: string (nullable = true) | | |-- utcOffset: long (nullable = true) | |-- userMentionEntities: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- end: long (nullable = true) | | | |-- id: long (nullable = true) | | | |-- name: string (nullable = true) | | | |-- screenName: string (nullable = true) | | | |-- start: long (nullable = true) |-- source: string (nullable = true) |-- symbolEntities: array (nullable = true) | |-- element: string (containsNull = true) |-- text: string (nullable = true) |-- urlEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- displayURL: string (nullable = true) | | |-- end: long (nullable = true) | | |-- expandedURL: string (nullable = true) | | |-- start: long (nullable = true) | | |-- url: string (nullable = true) |-- user: struct (nullable = true) | |-- createdAt: string (nullable = true) | |-- description: string (nullable = true) | |-- descriptionURLEntities: array (nullable = true) | | |-- element: string (containsNull = true) | |-- favouritesCount: long (nullable = true) | |-- followersCount: long (nullable = true) | |-- friendsCount: long (nullable = true) | |-- id: long (nullable = true) | |-- isContributorsEnabled: boolean (nullable = true) | |-- isDefaultProfile: boolean (nullable = true) | |-- isDefaultProfileImage: boolean (nullable = true) | |-- isFollowRequestSent: boolean (nullable = true) | |-- isGeoEnabled: boolean (nullable = true) | |-- isProtected: boolean (nullable = true) | |-- isVerified: boolean (nullable = true) | |-- lang: string (nullable = true) | |-- listedCount: long (nullable = true) | |-- location: string (nullable = true) | |-- name: string (nullable = true) | |-- profileBackgroundColor: string (nullable = true) | |-- profileBackgroundImageUrl: string (nullable = true) | |-- profileBackgroundImageUrlHttps: string (nullable = true) | |-- profileBackgroundTiled: boolean (nullable = true) | |-- profileBannerImageUrl: string (nullable = true) | |-- profileImageUrl: string (nullable = true) | |-- profileImageUrlHttps: string (nullable = true) | |-- profileLinkColor: string (nullable = true) | |-- profileSidebarBorderColor: string (nullable = true) | |-- profileSidebarFillColor: string (nullable = true) | |-- profileTextColor: string (nullable = true) | |-- profileUseBackgroundImage: boolean (nullable = true) | |-- screenName: string (nullable = true) | |-- showAllInlineMedia: boolean (nullable = true) | |-- statusesCount: long (nullable = true) | |-- timeZone: string (nullable = true) | |-- translator: boolean (nullable = true) | |-- url: string (nullable = true) | |-- utcOffset: long (nullable = true) |-- userMentionEntities: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- end: long (nullable = true) | | |-- id: long (nullable = true) | | |-- name: string (nullable = true) | | |-- screenName: string (nullable = true) | | |-- start: long (nullable = true)
outJson.select("id","text").show(false)
+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |id |text | +------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |915825548626145280|one person followed me and 2 people unfollowed me // automatically checked by https://t.co/M7i6tIzxMk | |915825548609208320|Penawaran terbatas! Dptkan #PromoTiket ke Tokyo 4.2Jt,hanya sampai 11Okt'17 ini Langsung hubungi 24hrs Hotline Ticketing kami 021-2963 1999. https://t.co/FKR4lz1KPJ| |915825548600979456|Let's tweet #MonChevy pics/gifs this Friday at 8 PM Paris time (7 PM UK) to send healing thoughts to our #Versailles sister @MadameLisabi https://t.co/muZenzineP | |915825552807706624|RT @travis999998: ใครก็ได้ช่วยน้องเค้าด้วย ว่าวไปมือประคองไข่ นิ้วแยงก้น กดแรงๆ ตอนแตกจะเสียวมาก . น่ารักอะ. https://t.co/a1CPDgHVOa | |915825548605009922|Felt sick all day :( | |915825548621864961|【無料】「キン肉マン」を読んでるよ!LINEマンガなら今すぐ無料で第1話が読める!#LINEマンガ https://t.co/6DrFlGGqfs | |915825548605009921|RT @ar14design: วันไปไหว้ร.๙ฝนตกหนัก เลยมีคนเอาเชือกผ้าใบไปผูกกับรถเพื่อน 5555555555 https://t.co/FfCaPWAoH2 | |915825548605001728|바고안하겟지...지킬이나합시다... | |915825548609359872|RT @RamIsRising: Follow everyone who retweets this 🛍 | |915825548600991744|RT @ZZayadh: الآن ✨⭕️لزيادة عدد متابعينك✨⭕️ ⑴ تابعني أتابعك ✨ ⑵ ♻️ رتويت ♻️✨ ⑶ تابع من عمل رتويت ✨⑷ إلتزم تستفيد ༻✩October 05, 2017 at 08:3… | |915825548626202624|Design a new logo by ProjektMate: We are a non-profit organisation from India. We would… https://t.co/8xfL3QcleF | |915825548617592832|____ /____\ / /⊂ニ⊃ ⊂ニ⊃ やれやれだぞ || L i⌒  ̄  ̄ ヽ ヽ_ | (ヽ >、___0_ノ |V / ヽレ<ヲ |_/ | |/ |――――| | |915825548626100225|Build website with easy API installation by iircaz55: I need a website very likely the same… https://t.co/WbNC5g8L0z | |915825548592537601|身の程をわきまえないものは自滅する https://t.co/hqfTqBcRSp | |915825548592545798|@aimeelizzette @HornyGlF I don't think I've ever retweeted it | |915825548617797632|أذكار الأذان:اللهم رب هذه الدعوة التامة والصلاة القائم https://t.co/EmwkpQBKtf | |915825548609314816|RT @lespros_tetsuya: はーい!🙋10/10にうれしいお知らせしますよー😇😇😇 | |915825548613402624|RT @imagineforestcr: ドシンドシン | |915825548605104128|ここにてLADA NIVAの新車が購入できます。 https://t.co/me8fB3qA2o | |915825548613570560|RT @EsseoNWE: Previews w/ @RealKlash https://t.co/497ZYhpUSM | +------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 20 rows
display(outJson)
Now, let's be good at house-keeping and clean-up the unnecessary data in dbfs, our distributed file system (in databricks).
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true)
res50: Boolean = false
Clearly there is a lot one can do with tweets!
Enspecially, after you can get a few more primitives under your belt from the following areas:
- Natural Language Processing (MLlib, beyond word counts of course),
- Distributed vertex programming (Graph Frames, which you already know), and
- Scalable geospatial computing with location data on open street maps (roughly a third of tweets are geo-enabled with Latitude and Longitude of the tweet location) - we will get into this.
Making a function for Spark Streaming job
Let's try to throw the bits and bobs of code above into a function called streamFunc
for simplicity and modularity.
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength = batchInterval * 5
var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, Minutes(batchInterval))
// Create the OAuth Twitter credentials
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
// Create a Twitter Stream for the input source.
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
// Transform the discrete RDDs into JSON
val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
val xJson = gson.toJson(x)
xJson
})
// take care
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.
// what we want done with each discrete RDD tuple: (rdd, time)
twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
// to write to parquet directly in append mode in one directory per 'time'------------
val outputDF = outputRDD.toDF("tweetAsJsonString")
// get some time fields from current `.Date()`
val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
// write to a file with a clear time-based hierarchical directory structure for example
outputDF.write.mode(SaveMode.Append)
.parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds)
// end of writing as parquet file-------------------------------------
numTweetsCollected += count // update with the latest count
}
})
newContextCreated = true
ssc
}
import com.google.gson.Gson import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ outputDirectoryRoot: String = /datasets/tweetsStreamTmp batchInterval: Int = 1 timeoutJobLength: Int = 5 newContextCreated: Boolean = false numTweetsCollected: Long = 0 streamFunc: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3c75550d
// you only need one of these to start
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength)
// this will make sure all streaming job in the cluster are stopped
// but let' run it for a few minutes before stopping it
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
display(dbutils.fs.ls(outputDirectoryRoot))
path | name | size |
---|---|---|
dbfs:/datasets/tweetsStreamTmp/2017/ | 2017/ | 0.0 |
display(dbutils.fs.ls(outputDirectoryRoot+"/2017/10/05/09/")) // keep adding sub-dirs and descent into time-tree'd directory hierarchy
path | name | size |
---|---|---|
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196400000/ | 1507196400000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196460000/ | 1507196460000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196520000/ | 1507196520000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196580000/ | 1507196580000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196640000/ | 1507196640000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196700000/ | 1507196700000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196760000/ | 1507196760000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196820000/ | 1507196820000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196880000/ | 1507196880000/ | 0.0 |
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196940000/ | 1507196940000/ | 0.0 |
Next, let us take a quick peek at the notebook scalable-data-science/sds-2-2/025_b_TTTDFfunctions
to see how we have pipelined the JSON tweets into DataFrames.
Please see http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html to understand more deeply.
"scalable-data-science/sds-2-2/025_b_TTTDFfunctions"
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2017/10/*/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string] TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
TTTsDF.count()
res63: Long = 19505
display(TTTsDF)
CurrentTweetDate | CurrentTwID | CreationDateOfOrgTwInRT | OriginalTwIDinRT | CreationDateOfOrgTwInQT | OriginalTwIDinQT | OriginalTwIDinReply | CPostUserId | userCreatedAtDate | OPostUserIdinRT | OPostUserIdinQT | OPostUserIdinReply |
---|---|---|---|---|---|---|---|---|---|---|---|
2017-10-05T09:45:00.000+0000 | 9.15875460826906624e17 | 2017-10-05T09:23:13.000+0000 | 9.15869979395989504e17 | null | null | -1.0 | 8.51144345864425476e17 | 2017-04-09T18:46:40.000+0000 | 2.687293212e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.1587546082260992e17 | null | null | null | null | -1.0 | 1.919692951e9 | 2013-09-30T10:46:43.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460847767552e17 | 2017-09-30T09:37:51.000+0000 | 9.1406172267925504e17 | null | null | -1.0 | 8.55046958951448576e17 | 2017-04-20T13:14:15.000+0000 | 2.52475597e8 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460843573248e17 | 2017-10-05T01:00:53.000+0000 | 9.1574356003233792e17 | null | null | -1.0 | 4.113203892e9 | 2015-11-03T13:30:22.000+0000 | 2.47772567e8 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460814274565e17 | 2017-10-04T15:52:43.000+0000 | 9.15605609210187777e17 | null | null | -1.0 | 2.474134897e9 | 2014-05-02T14:45:43.000+0000 | 1.75058448e8 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460835295232e17 | null | null | null | null | 9.15873015858028544e17 | 1.647621744e9 | 2013-08-05T12:08:53.000+0000 | null | null | 7.06385933877182464e17 |
2017-10-05T09:45:00.000+0000 | 9.15875460835241984e17 | 2017-10-05T09:27:02.000+0000 | 9.15870937454567429e17 | null | null | -1.0 | 6.6252706e7 | 2009-08-17T02:23:44.000+0000 | 2.533787994e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460839436288e17 | 2017-10-05T02:06:02.000+0000 | 9.15759958150365185e17 | null | null | -1.0 | 7.5133962e8 | 2012-08-11T14:23:23.000+0000 | 2.67537691e8 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460810022912e17 | null | null | null | null | -1.0 | 1.246637089e9 | 2013-03-06T17:45:34.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460826906627e17 | 2017-10-04T17:38:53.000+0000 | 9.15632327220273153e17 | null | null | -1.0 | 1.939655432e9 | 2013-10-06T03:30:43.000+0000 | 9.06137972176650242e17 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460847886336e17 | null | null | null | null | 9.15875308179304448e17 | 2.605335913e9 | 2014-07-05T11:24:22.000+0000 | null | null | 7.34329199486345217e17 |
2017-10-05T09:45:00.000+0000 | 9.1587546084358144e17 | 2015-09-06T11:13:08.000+0000 | 6.4048286132631552e17 | null | null | -1.0 | 2.868684997e9 | 2014-10-21T08:13:51.000+0000 | 2.232816559e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460831219713e17 | null | null | null | null | -1.0 | 3.106466466e9 | 2015-03-24T09:41:47.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460818628608e17 | 2017-10-01T21:17:46.000+0000 | 9.14600247036399617e17 | null | null | -1.0 | 1.453768837e9 | 2013-05-24T09:40:40.000+0000 | 2.538853697e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460818419713e17 | null | null | null | null | 9.158742085922816e17 | 1.396111124e9 | 2013-05-02T02:50:13.000+0000 | null | null | 3.194959158e9 |
2017-10-05T09:45:00.000+0000 | 9.1587546083946496e17 | 2017-10-05T06:07:56.000+0000 | 9.15820832651227136e17 | 2017-10-04T13:56:22.000+0000 | 9.15576331026423808e17 | -1.0 | 2.48374816e9 | 2014-04-14T09:18:13.000+0000 | 1.358917686e9 | 2.451476942e9 | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460827029504e17 | null | null | null | null | -1.0 | 1.93879296e9 | 2013-10-05T20:57:12.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460814385152e17 | 2017-10-02T22:40:35.000+0000 | 9.14983479112237056e17 | null | null | -1.0 | 1.614107269e9 | 2013-07-23T01:40:51.000+0000 | 2.984363662e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.1587546081421312e17 | null | null | null | null | 9.1587530143491277e17 | 3.670594153e9 | 2015-09-24T13:04:03.000+0000 | null | null | 2.33524175e8 |
2017-10-05T09:45:00.000+0000 | 9.15875460827009025e17 | 2017-10-04T22:22:50.000+0000 | 9.1570378856822784e17 | null | null | -1.0 | 6.26912455e8 | 2012-07-05T00:25:42.000+0000 | 1.39261959e8 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460835192833e17 | 2017-10-05T09:27:44.000+0000 | 9.15871114819072e17 | null | null | -1.0 | 8.32807704473145345e17 | 2017-02-18T04:23:24.000+0000 | 4.81101105e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460818591744e17 | null | null | null | null | -1.0 | 1.641507391e9 | 2013-08-02T22:39:44.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460835311616e17 | null | null | null | null | -1.0 | 3.048544857e9 | 2015-02-21T03:26:23.000+0000 | null | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.1587546084374528e17 | 2017-10-05T08:06:42.000+0000 | 9.15850721643384832e17 | null | null | -1.0 | 9.07681288689250305e17 | 2017-09-12T19:04:17.000+0000 | 2.379003705e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460814340096e17 | 2017-10-05T09:44:30.000+0000 | 9.15875334649729024e17 | null | null | -1.0 | 1.0719639e8 | 2010-01-21T20:34:05.000+0000 | 2.905373914e9 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460826800128e17 | 2017-10-02T19:54:05.000+0000 | 9.14941576908017664e17 | null | null | -1.0 | 7.4454505989111808e17 | 2016-06-19T14:59:10.000+0000 | 8.61722650929840129e17 | null | -1.0 |
2017-10-05T09:45:00.000+0000 | 9.15875460826906625e17 | null | null | null | null | -1.0 | 1.226568745e9 | 2013-02-28T03:06:12.000+0000 | null | null | -1.0 |
2017-10-05T09:45:01.000+0000 | 9.15875465037889536e17 | 2017-10-05T08:37:48.000+0000 | 9.1585855070175232e17 | null | null | -1.0 | 2.35842718e8 | 2011-01-09T05:54:03.000+0000 | 8.84629505758842881e17 | null | -1.0 |
2017-10-05T09:45:01.000+0000 | 9.158754650128384e17 | 2017-09-26T23:01:04.000+0000 | 9.12814307523551232e17 | 2017-07-15T19:18:05.000+0000 | 8.86303874675605504e17 | -1.0 | 1.455451148e9 | 2013-05-24T22:51:04.000+0000 | 2.171491136e9 | 2.372122717e9 | -1.0 |
2017-10-05T09:45:01.000+0000 | 9.15875465021272064e17 | null | null | null | null | -1.0 | 1.4444461e7 | 2008-04-19T16:23:20.000+0000 | null | null | -1.0 |
Truncated to 30 rows
Truncated to 12 cols
display(TTTsDF.groupBy($"tweetType").count().orderBy($"count".desc))
tweetType | count |
---|---|
ReTweet | 8401.0 |
Original Tweet | 6626.0 |
Reply Tweet | 3472.0 |
Retweet of Quoted Tweet | 556.0 |
Quoted Tweet | 439.0 |
Reply of Quoted Tweet | 11.0 |
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true)
res67: Boolean = true
%md
Next, let's write the tweets into a scalable commercial cloud storage system
We will make sure to write the tweets to AWS's simple storage service or S3, a scalable storage system in the cloud. See https://aws.amazon.com/s3/.
skip this section if you don't have AWS account.
But all the main syntactic bits are here for your future convenience :)
// Replace with your AWS S3 credentials
//
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.
val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"
Now just mount s3 as follows:
dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")
Now you can use the dbutils
commands freely to access data in the mounted S3.
dbutils.fs.help()
copying:
// to copy all the tweets to s3
dbutils.fs.cp("dbfs:/rawTweets",s"/mnt/$MountName/rawTweetsInS3/",recurse=true)
deleting:
// to remove all the files from s3
dbutils.fs.rm(s"/mnt/$MountName/rawTweetsInS3",recurse=true)
unmounting:
// finally unmount when done - IMPORTANT!
dbutils.fs.unmount(s"/mnt/$MountName")