SDS-2.2, Scalable Data Science

Archived YouTube video of this live unedited lab-lecture:

Archived YouTube video of this live unedited lab-lecture Archived YouTube video of this live unedited lab-lecture

Tweet Collector - capture live tweets

Here are the main steps in this notebook:

  1. let's collect from the public twitter stream and write to DBFS as json strings in a boiler-plate manner to understand the componets better.

  2. Then we will turn the collector into a function and use it

  3. Finally we will use some DataFrame-based pipelines to convert the raw tweets into other structured content.

We will call extendedTwitterUtils notebook from here.

But first install the following libraries:

  • gson
  • twitter4j-examples
"scalable-data-science/sds-2-2/025_a_extendedTwitterUtils2run"

Go to SparkUI and see if a streaming job is already running. If so you need to terminate it before starting a new streaming job. Only one streaming job can be run on the DB CE.

// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }

Let's create a directory in dbfs for storing tweets in the cluster's distributed file system.

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true)
res40: Boolean = true

Capture tweets in every sliding window of slideInterval many milliseconds.

val slideInterval = new Duration(1 * 1000) // 1 * 1000 = 1000 milli-seconds = 1 sec
slideInterval: org.apache.spark.streaming.Duration = 1000 ms

Recall that Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark?s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Spark
Streaming

Let's import google's json library next.

import com.google.gson.Gson
import com.google.gson.Gson

Our goal is to take each RDD in the twitter DStream and write it as a json file in our dbfs.

// Create a Spark Streaming Context.
val ssc = new StreamingContext(sc, slideInterval)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@ced654d

CAUTION

Extracting knowledge from tweets is "easy" using techniques shown here, but one has to take legal responsibility for the use of this knowledge and conform to the rules and policies linked below.

Remeber that the use of twitter itself comes with various strings attached. Read:

Crucially, the use of the content from twitter by you (as done in this worksheet) comes with some strings. Read:

Enter your own Twitter API Credentials.

  • Go to https://apps.twitter.com and look up your Twitter API Credentials, or create an app to create them.
  • Get your own Twitter API Credentials: consumerKey, consumerSecret, accessToken and accessTokenSecret and enter them in the cell below.

Ethical/Legal Aspects

See Background Readings/Viewings in Project MEP:

"scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"
// put your own twitter developer credentials below instead of xxx
// instead of the '%run "scalable-data-science/secrets/026_secret_MyTwitterOAuthCredentials"' above
// this notebook we just ran contains the following commented code block

/*
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

def MyconsumerKey       = "xxx"
def MyconsumerSecret    = "xxx"
def Mytoken             = "xxx"
def MytokenSecret       = "xxx"

System.setProperty("twitter4j.oauth.consumerKey", MyconsumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", MyconsumerSecret)
System.setProperty("twitter4j.oauth.accessToken", Mytoken)
System.setProperty("twitter4j.oauth.accessTokenSecret", MytokenSecret)
*/
// Create a Twitter Stream for the input source. 
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
auth: Some[twitter4j.auth.OAuthAuthorization] = Some(OAuthAuthorization{consumerKey='fo0EEh1tnH8WVJdgJPrZ47wD0', consumerSecret='******************************************', oauthToken=AccessToken{screenName='null', userId=4173723312}})
twitterStream: org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] = ExtendedTwitterInputDStream@571b3fd4

Let's map the tweets into json formatted string (one tweet per line).

val twitterStreamJson = twitterStream.map(
                                            x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                                 }
                                          )
twitterStreamJson: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@6817a6ee

twitter OAuth Credentials loaded
MyconsumerKey: String
MyconsumerSecret: String
Mytoken: String
MytokenSecret: String
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
var numTweetsCollected = 0L // track number of tweets collected
val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

twitterStreamJson.foreachRDD( 
  (rdd, time) => { // for each RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        outputRDD.saveAsTextFile(outputDirectoryRoot + "/tweets_" + time.milliseconds.toString) // save as textfile
        numTweetsCollected += count // update with the latest count
      }
  }
)
numTweetsCollected: Long = 0
partitionsEachInterval: Int = 1

Nothing has actually happened yet.

Let's start the spark streaming context we have created next.

ssc.start()

Let's look at the spark UI now and monitor the streaming job in action! Go to Clusters on the left and click on UI and then Streaming.

numTweetsCollected // number of tweets collected so far
res48: Long = 0

Let's try seeing again in a few seconds how many tweets have been collected up to now.

numTweetsCollected // number of tweets collected so far
res11: Long = 187

Note that you could easilt fill up disk space!!!

So let's stop the streaming job next.

ssc.stop(stopSparkContext = false) // gotto stop soon!!!

Let's make sure that the Streaming UI is not active in the Clusters UI.

StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } // extra cautious stopping of all active streaming contexts

Let's examine what was saved in dbfs

display(dbutils.fs.ls(outputDirectoryRoot))
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1507184798000/ tweets_1507184798000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184799000/ tweets_1507184799000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184800000/ tweets_1507184800000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184801000/ tweets_1507184801000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/ tweets_1507184802000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184803000/ tweets_1507184803000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184804000/ tweets_1507184804000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184805000/ tweets_1507184805000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184806000/ tweets_1507184806000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184807000/ tweets_1507184807000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184808000/ tweets_1507184808000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184809000/ tweets_1507184809000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184810000/ tweets_1507184810000/ 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184811000/ tweets_1507184811000/ 0.0
val tweetsDir = outputDirectoryRoot+"/tweets_1507184802000/" // use an existing file, may have to rename folder based on output above!
tweetsDir: String = /datasets/tweetsStreamTmp/tweets_1507184802000/
display(dbutils.fs.ls(tweetsDir))
path name size
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/_SUCCESS _SUCCESS 0.0
dbfs:/datasets/tweetsStreamTmp/tweets_1507184802000/part-00000 part-00000 125602.0
sc.textFile(tweetsDir+"part-00000").count()
res16: Long = 36
val outJson = sqlContext.read.json(tweetsDir+"part-00000")
outJson: org.apache.spark.sql.DataFrame = [contributorsIDs: array<string>, createdAt: string ... 25 more fields]
outJson.printSchema()
root
 |-- contributorsIDs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createdAt: string (nullable = true)
 |-- currentUserRetweetId: long (nullable = true)
 |-- displayTextRangeEnd: long (nullable = true)
 |-- displayTextRangeStart: long (nullable = true)
 |-- favoriteCount: long (nullable = true)
 |-- hashtagEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- inReplyToScreenName: string (nullable = true)
 |-- inReplyToStatusId: long (nullable = true)
 |-- inReplyToUserId: long (nullable = true)
 |-- isFavorited: boolean (nullable = true)
 |-- isPossiblySensitive: boolean (nullable = true)
 |-- isRetweeted: boolean (nullable = true)
 |-- isTruncated: boolean (nullable = true)
 |-- lang: string (nullable = true)
 |-- mediaEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- mediaURL: string (nullable = true)
 |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |-- sizes: struct (nullable = true)
 |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |-- quotedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- quotedStatusId: long (nullable = true)
 |-- retweetCount: long (nullable = true)
 |-- retweetedStatus: struct (nullable = true)
 |    |-- contributorsIDs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- currentUserRetweetId: long (nullable = true)
 |    |-- displayTextRangeEnd: long (nullable = true)
 |    |-- displayTextRangeStart: long (nullable = true)
 |    |-- favoriteCount: long (nullable = true)
 |    |-- hashtagEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- inReplyToScreenName: string (nullable = true)
 |    |-- inReplyToStatusId: long (nullable = true)
 |    |-- inReplyToUserId: long (nullable = true)
 |    |-- isFavorited: boolean (nullable = true)
 |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |-- isRetweeted: boolean (nullable = true)
 |    |-- isTruncated: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- mediaEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- mediaURL: string (nullable = true)
 |    |    |    |-- mediaURLHttps: string (nullable = true)
 |    |    |    |-- sizes: struct (nullable = true)
 |    |    |    |    |-- 0: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- 3: struct (nullable = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- resize: long (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- videoAspectRatioHeight: long (nullable = true)
 |    |    |    |-- videoAspectRatioWidth: long (nullable = true)
 |    |    |    |-- videoDurationMillis: long (nullable = true)
 |    |    |    |-- videoVariants: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- bitrate: long (nullable = true)
 |    |    |    |    |    |-- contentType: string (nullable = true)
 |    |    |    |    |    |-- url: string (nullable = true)
 |    |-- place: struct (nullable = true)
 |    |    |-- boundingBoxCoordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |    |    |-- longitude: double (nullable = true)
 |    |    |-- boundingBoxType: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- countryCode: string (nullable = true)
 |    |    |-- fullName: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- placeType: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |-- quotedStatus: struct (nullable = true)
 |    |    |-- contributorsIDs: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- currentUserRetweetId: long (nullable = true)
 |    |    |-- displayTextRangeEnd: long (nullable = true)
 |    |    |-- displayTextRangeStart: long (nullable = true)
 |    |    |-- favoriteCount: long (nullable = true)
 |    |    |-- hashtagEntities: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- inReplyToScreenName: string (nullable = true)
 |    |    |-- inReplyToStatusId: long (nullable = true)
 |    |    |-- inReplyToUserId: long (nullable = true)
 |    |    |-- isFavorited: boolean (nullable = true)
 |    |    |-- isPossiblySensitive: boolean (nullable = true)
 |    |    |-- isRetweeted: boolean (nullable = true)
 |    |    |-- isTruncated: boolean (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- mediaEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- quotedStatusId: long (nullable = true)
 |    |    |-- retweetCount: long (nullable = true)
 |    |    |-- source: string (nullable = true)
 |    |    |-- symbolEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- urlEntities: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |-- user: struct (nullable = true)
 |    |    |    |-- createdAt: string (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- favouritesCount: long (nullable = true)
 |    |    |    |-- followersCount: long (nullable = true)
 |    |    |    |-- friendsCount: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |    |-- isProtected: boolean (nullable = true)
 |    |    |    |-- isVerified: boolean (nullable = true)
 |    |    |    |-- lang: string (nullable = true)
 |    |    |    |-- listedCount: long (nullable = true)
 |    |    |    |-- location: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |    |-- profileTextColor: string (nullable = true)
 |    |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |    |-- statusesCount: long (nullable = true)
 |    |    |    |-- timeZone: string (nullable = true)
 |    |    |    |-- translator: boolean (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- utcOffset: long (nullable = true)
 |    |    |-- userMentionEntities: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |-- quotedStatusId: long (nullable = true)
 |    |-- retweetCount: long (nullable = true)
 |    |-- source: string (nullable = true)
 |    |-- symbolEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- text: string (nullable = true)
 |    |-- urlEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- displayURL: string (nullable = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- expandedURL: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- createdAt: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- favouritesCount: long (nullable = true)
 |    |    |-- followersCount: long (nullable = true)
 |    |    |-- friendsCount: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |    |-- isDefaultProfile: boolean (nullable = true)
 |    |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |    |-- isGeoEnabled: boolean (nullable = true)
 |    |    |-- isProtected: boolean (nullable = true)
 |    |    |-- isVerified: boolean (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    |    |-- listedCount: long (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profileBackgroundColor: string (nullable = true)
 |    |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |    |-- profileBannerImageUrl: string (nullable = true)
 |    |    |-- profileImageUrl: string (nullable = true)
 |    |    |-- profileImageUrlHttps: string (nullable = true)
 |    |    |-- profileLinkColor: string (nullable = true)
 |    |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |    |-- profileSidebarFillColor: string (nullable = true)
 |    |    |-- profileTextColor: string (nullable = true)
 |    |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |    |-- statusesCount: long (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |    |-- translator: boolean (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- utcOffset: long (nullable = true)
 |    |-- userMentionEntities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- end: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- screenName: string (nullable = true)
 |    |    |    |-- start: long (nullable = true)
 |-- source: string (nullable = true)
 |-- symbolEntities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text: string (nullable = true)
 |-- urlEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- displayURL: string (nullable = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- expandedURL: string (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- createdAt: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- descriptionURLEntities: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- favouritesCount: long (nullable = true)
 |    |-- followersCount: long (nullable = true)
 |    |-- friendsCount: long (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- isContributorsEnabled: boolean (nullable = true)
 |    |-- isDefaultProfile: boolean (nullable = true)
 |    |-- isDefaultProfileImage: boolean (nullable = true)
 |    |-- isFollowRequestSent: boolean (nullable = true)
 |    |-- isGeoEnabled: boolean (nullable = true)
 |    |-- isProtected: boolean (nullable = true)
 |    |-- isVerified: boolean (nullable = true)
 |    |-- lang: string (nullable = true)
 |    |-- listedCount: long (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- profileBackgroundColor: string (nullable = true)
 |    |-- profileBackgroundImageUrl: string (nullable = true)
 |    |-- profileBackgroundImageUrlHttps: string (nullable = true)
 |    |-- profileBackgroundTiled: boolean (nullable = true)
 |    |-- profileBannerImageUrl: string (nullable = true)
 |    |-- profileImageUrl: string (nullable = true)
 |    |-- profileImageUrlHttps: string (nullable = true)
 |    |-- profileLinkColor: string (nullable = true)
 |    |-- profileSidebarBorderColor: string (nullable = true)
 |    |-- profileSidebarFillColor: string (nullable = true)
 |    |-- profileTextColor: string (nullable = true)
 |    |-- profileUseBackgroundImage: boolean (nullable = true)
 |    |-- screenName: string (nullable = true)
 |    |-- showAllInlineMedia: boolean (nullable = true)
 |    |-- statusesCount: long (nullable = true)
 |    |-- timeZone: string (nullable = true)
 |    |-- translator: boolean (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- utcOffset: long (nullable = true)
 |-- userMentionEntities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screenName: string (nullable = true)
 |    |    |-- start: long (nullable = true)
outJson.select("id","text").show(false)
+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id                |text                                                                                                                                                                |
+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|915825548626145280|one person followed me and 2 people unfollowed me // automatically checked by https://t.co/M7i6tIzxMk                                                               |
|915825548609208320|Penawaran terbatas!
Dptkan #PromoTiket ke Tokyo 4.2Jt,hanya sampai 11Okt'17 ini
Langsung hubungi 24hrs Hotline Ticketing kami 021-2963 1999. https://t.co/FKR4lz1KPJ|
|915825548600979456|Let's tweet #MonChevy pics/gifs this Friday at 8 PM Paris time (7 PM UK) to send healing thoughts to our #Versailles sister @MadameLisabi https://t.co/muZenzineP   |
|915825552807706624|RT @travis999998: ใครก็ได้ช่วยน้องเค้าด้วย ว่าวไปมือประคองไข่ นิ้วแยงก้น กดแรงๆ ตอนแตกจะเสียวมาก . น่ารักอะ. https://t.co/a1CPDgHVOa                                |
|915825548605009922|Felt sick all day :(                                                                                                                                                |
|915825548621864961|【無料】「キン肉マン」を読んでるよ!LINEマンガなら今すぐ無料で第1話が読める!#LINEマンガ https://t.co/6DrFlGGqfs                                                                                           |
|915825548605009921|RT @ar14design: วันไปไหว้ร.๙ฝนตกหนัก เลยมีคนเอาเชือกผ้าใบไปผูกกับรถเพื่อน 5555555555 https://t.co/FfCaPWAoH2                                                        |
|915825548605001728|바고안하겟지...지킬이나합시다...                                                                                                                                                 |
|915825548609359872|RT @RamIsRising: Follow everyone who retweets this  🛍                                                                                                              |
|915825548600991744|RT @ZZayadh: الآن ✨⭕️لزيادة عدد متابعينك✨⭕️ ⑴ تابعني أتابعك ✨ ⑵ ♻️ رتويت ♻️✨ ⑶ تابع من عمل رتويت ✨⑷ إلتزم تستفيد ༻✩October 05, 2017 at 08:3…                        |
|915825548626202624|Design a new logo by ProjektMate: We are a non-profit organisation from India. We would… https://t.co/8xfL3QcleF                                                    |
|915825548617592832|____
  /____\
  / /⊂ニ⊃   ⊂ニ⊃  やれやれだぞ
 ||      L
 i⌒   ̄   ̄  ヽ
 ヽ_        |
(ヽ >、___0_ノ
 |V  /    ヽレ<ヲ
 |_/ |    |/
   |――――|                                         |
|915825548626100225|Build website with easy API installation by iircaz55: I need a website very likely the same… https://t.co/WbNC5g8L0z                                                |
|915825548592537601|身の程をわきまえないものは自滅する https://t.co/hqfTqBcRSp                                                                                                                           |
|915825548592545798|@aimeelizzette @HornyGlF I don't think I've ever retweeted it                                                                                                       |
|915825548617797632|أذكار الأذان:اللهم رب هذه الدعوة التامة والصلاة القائم https://t.co/EmwkpQBKtf                                                                                      |
|915825548609314816|RT @lespros_tetsuya: はーい!🙋10/10にうれしいお知らせしますよー😇😇😇                                                                                                                |
|915825548613402624|RT @imagineforestcr: ドシンドシン                                                                                                                                         |
|915825548605104128|ここにてLADA NIVAの新車が購入できます。
https://t.co/me8fB3qA2o                                                                                                                    |
|915825548613570560|RT @EsseoNWE: Previews w/ @RealKlash https://t.co/497ZYhpUSM                                                                                                        |
+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 20 rows
display(outJson)

Now, let's be good at house-keeping and clean-up the unnecessary data in dbfs, our distributed file system (in databricks).

// to remove a pre-existing directory and start from scratch uncomment next line and evaluate this cell
dbutils.fs.rm(outputDirectoryRoot, true)
res50: Boolean = false

Clearly there is a lot one can do with tweets!

Enspecially, after you can get a few more primitives under your belt from the following areas:

  • Natural Language Processing (MLlib, beyond word counts of course),
  • Distributed vertex programming (Graph Frames, which you already know), and
  • Scalable geospatial computing with location data on open street maps (roughly a third of tweets are geo-enabled with Latitude and Longitude of the tweet location) - we will get into this.

Making a function for Spark Streaming job

Let's try to throw the bits and bobs of code above into a function called streamFunc for simplicity and modularity.

import com.google.gson.Gson 
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val outputDirectoryRoot = "/datasets/tweetsStreamTmp" // output directory
val batchInterval = 1 // in minutes
val timeoutJobLength =  batchInterval * 5

var newContextCreated = false
var numTweetsCollected = 0L // track number of tweets collected
//val conf = new SparkConf().setAppName("TrackedTweetCollector").setMaster("local")
// This is the function that creates the SteamingContext and sets up the Spark Streaming job.
def streamFunc(): StreamingContext = {
  // Create a Spark Streaming Context.
  val ssc = new StreamingContext(sc, Minutes(batchInterval))
  // Create the OAuth Twitter credentials 
  val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
  // Create a Twitter Stream for the input source.  
  val twitterStream = ExtendedTwitterUtils.createStream(ssc, auth)
  // Transform the discrete RDDs into JSON
  val twitterStreamJson = twitterStream.map(x => { val gson = new Gson();
                                                 val xJson = gson.toJson(x)
                                                 xJson
                                               }) 
  // take care
  val partitionsEachInterval = 1 // This tells the number of partitions in each RDD of tweets in the DStream.

  // what we want done with each discrete RDD tuple: (rdd, time)
  twitterStreamJson.foreachRDD((rdd, time) => { // for each filtered RDD in the DStream
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(partitionsEachInterval) // repartition as desired
        // to write to parquet directly in append mode in one directory per 'time'------------       
        val outputDF = outputRDD.toDF("tweetAsJsonString")
        // get some time fields from current `.Date()`
        val year = (new java.text.SimpleDateFormat("yyyy")).format(new java.util.Date())
        val month = (new java.text.SimpleDateFormat("MM")).format(new java.util.Date())
        val day = (new java.text.SimpleDateFormat("dd")).format(new java.util.Date())
        val hour = (new java.text.SimpleDateFormat("HH")).format(new java.util.Date())
        // write to a file with a clear time-based hierarchical directory structure for example
        outputDF.write.mode(SaveMode.Append)
                .parquet(outputDirectoryRoot+ "/"+ year + "/" + month + "/" + day + "/" + hour + "/" + time.milliseconds) 
        // end of writing as parquet file-------------------------------------
        numTweetsCollected += count // update with the latest count
      }
  })
  newContextCreated = true
  ssc
}
import com.google.gson.Gson
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
outputDirectoryRoot: String = /datasets/tweetsStreamTmp
batchInterval: Int = 1
timeoutJobLength: Int = 5
newContextCreated: Boolean = false
numTweetsCollected: Long = 0
streamFunc: ()org.apache.spark.streaming.StreamingContext
// Now just use the function to create a Spark Streaming Context
val ssc = StreamingContext.getActiveOrCreate(streamFunc)
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@3c75550d
// you only need one of these to start
ssc.start()
//ssc.awaitTerminationOrTimeout(timeoutJobLength)
// this will make sure all streaming job in the cluster are stopped
// but let' run it for a few minutes before stopping it
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }
display(dbutils.fs.ls(outputDirectoryRoot))
path name size
dbfs:/datasets/tweetsStreamTmp/2017/ 2017/ 0.0
display(dbutils.fs.ls(outputDirectoryRoot+"/2017/10/05/09/")) // keep adding sub-dirs and descent into time-tree'd directory hierarchy
path name size
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196400000/ 1507196400000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196460000/ 1507196460000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196520000/ 1507196520000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196580000/ 1507196580000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196640000/ 1507196640000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196700000/ 1507196700000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196760000/ 1507196760000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196820000/ 1507196820000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196880000/ 1507196880000/ 0.0
dbfs:/datasets/tweetsStreamTmp/2017/10/05/09/1507196940000/ 1507196940000/ 0.0

Next, let us take a quick peek at the notebook scalable-data-science/sds-2-2/025_b_TTTDFfunctions to see how we have pipelined the JSON tweets into DataFrames.

Please see http://lamastex.org/lmse/mep/src/TweetAnatomyAndTransmissionTree.html to understand more deeply.

"scalable-data-science/sds-2-2/025_b_TTTDFfunctions"
val rawDF = fromParquetFile2DF("/datasets/tweetsStreamTmp/2017/10/*/*/*/*") //.cache()
val TTTsDF = tweetsDF2TTTDF(tweetsJsonStringDF2TweetsDF(rawDF)).cache()
rawDF: org.apache.spark.sql.DataFrame = [tweetAsJsonString: string]
TTTsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CurrentTweetDate: timestamp, CurrentTwID: bigint ... 32 more fields]
TTTsDF.count()
res63: Long = 19505
display(TTTsDF)
CurrentTweetDate CurrentTwID CreationDateOfOrgTwInRT OriginalTwIDinRT CreationDateOfOrgTwInQT OriginalTwIDinQT OriginalTwIDinReply CPostUserId userCreatedAtDate OPostUserIdinRT OPostUserIdinQT OPostUserIdinReply
2017-10-05T09:45:00.000+0000 9.15875460826906624e17 2017-10-05T09:23:13.000+0000 9.15869979395989504e17 null null -1.0 8.51144345864425476e17 2017-04-09T18:46:40.000+0000 2.687293212e9 null -1.0
2017-10-05T09:45:00.000+0000 9.1587546082260992e17 null null null null -1.0 1.919692951e9 2013-09-30T10:46:43.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.15875460847767552e17 2017-09-30T09:37:51.000+0000 9.1406172267925504e17 null null -1.0 8.55046958951448576e17 2017-04-20T13:14:15.000+0000 2.52475597e8 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460843573248e17 2017-10-05T01:00:53.000+0000 9.1574356003233792e17 null null -1.0 4.113203892e9 2015-11-03T13:30:22.000+0000 2.47772567e8 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460814274565e17 2017-10-04T15:52:43.000+0000 9.15605609210187777e17 null null -1.0 2.474134897e9 2014-05-02T14:45:43.000+0000 1.75058448e8 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460835295232e17 null null null null 9.15873015858028544e17 1.647621744e9 2013-08-05T12:08:53.000+0000 null null 7.06385933877182464e17
2017-10-05T09:45:00.000+0000 9.15875460835241984e17 2017-10-05T09:27:02.000+0000 9.15870937454567429e17 null null -1.0 6.6252706e7 2009-08-17T02:23:44.000+0000 2.533787994e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460839436288e17 2017-10-05T02:06:02.000+0000 9.15759958150365185e17 null null -1.0 7.5133962e8 2012-08-11T14:23:23.000+0000 2.67537691e8 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460810022912e17 null null null null -1.0 1.246637089e9 2013-03-06T17:45:34.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.15875460826906627e17 2017-10-04T17:38:53.000+0000 9.15632327220273153e17 null null -1.0 1.939655432e9 2013-10-06T03:30:43.000+0000 9.06137972176650242e17 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460847886336e17 null null null null 9.15875308179304448e17 2.605335913e9 2014-07-05T11:24:22.000+0000 null null 7.34329199486345217e17
2017-10-05T09:45:00.000+0000 9.1587546084358144e17 2015-09-06T11:13:08.000+0000 6.4048286132631552e17 null null -1.0 2.868684997e9 2014-10-21T08:13:51.000+0000 2.232816559e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460831219713e17 null null null null -1.0 3.106466466e9 2015-03-24T09:41:47.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.15875460818628608e17 2017-10-01T21:17:46.000+0000 9.14600247036399617e17 null null -1.0 1.453768837e9 2013-05-24T09:40:40.000+0000 2.538853697e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460818419713e17 null null null null 9.158742085922816e17 1.396111124e9 2013-05-02T02:50:13.000+0000 null null 3.194959158e9
2017-10-05T09:45:00.000+0000 9.1587546083946496e17 2017-10-05T06:07:56.000+0000 9.15820832651227136e17 2017-10-04T13:56:22.000+0000 9.15576331026423808e17 -1.0 2.48374816e9 2014-04-14T09:18:13.000+0000 1.358917686e9 2.451476942e9 -1.0
2017-10-05T09:45:00.000+0000 9.15875460827029504e17 null null null null -1.0 1.93879296e9 2013-10-05T20:57:12.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.15875460814385152e17 2017-10-02T22:40:35.000+0000 9.14983479112237056e17 null null -1.0 1.614107269e9 2013-07-23T01:40:51.000+0000 2.984363662e9 null -1.0
2017-10-05T09:45:00.000+0000 9.1587546081421312e17 null null null null 9.1587530143491277e17 3.670594153e9 2015-09-24T13:04:03.000+0000 null null 2.33524175e8
2017-10-05T09:45:00.000+0000 9.15875460827009025e17 2017-10-04T22:22:50.000+0000 9.1570378856822784e17 null null -1.0 6.26912455e8 2012-07-05T00:25:42.000+0000 1.39261959e8 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460835192833e17 2017-10-05T09:27:44.000+0000 9.15871114819072e17 null null -1.0 8.32807704473145345e17 2017-02-18T04:23:24.000+0000 4.81101105e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460818591744e17 null null null null -1.0 1.641507391e9 2013-08-02T22:39:44.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.15875460835311616e17 null null null null -1.0 3.048544857e9 2015-02-21T03:26:23.000+0000 null null -1.0
2017-10-05T09:45:00.000+0000 9.1587546084374528e17 2017-10-05T08:06:42.000+0000 9.15850721643384832e17 null null -1.0 9.07681288689250305e17 2017-09-12T19:04:17.000+0000 2.379003705e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460814340096e17 2017-10-05T09:44:30.000+0000 9.15875334649729024e17 null null -1.0 1.0719639e8 2010-01-21T20:34:05.000+0000 2.905373914e9 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460826800128e17 2017-10-02T19:54:05.000+0000 9.14941576908017664e17 null null -1.0 7.4454505989111808e17 2016-06-19T14:59:10.000+0000 8.61722650929840129e17 null -1.0
2017-10-05T09:45:00.000+0000 9.15875460826906625e17 null null null null -1.0 1.226568745e9 2013-02-28T03:06:12.000+0000 null null -1.0
2017-10-05T09:45:01.000+0000 9.15875465037889536e17 2017-10-05T08:37:48.000+0000 9.1585855070175232e17 null null -1.0 2.35842718e8 2011-01-09T05:54:03.000+0000 8.84629505758842881e17 null -1.0
2017-10-05T09:45:01.000+0000 9.158754650128384e17 2017-09-26T23:01:04.000+0000 9.12814307523551232e17 2017-07-15T19:18:05.000+0000 8.86303874675605504e17 -1.0 1.455451148e9 2013-05-24T22:51:04.000+0000 2.171491136e9 2.372122717e9 -1.0
2017-10-05T09:45:01.000+0000 9.15875465021272064e17 null null null null -1.0 1.4444461e7 2008-04-19T16:23:20.000+0000 null null -1.0

Truncated to 30 rows

Truncated to 12 cols

display(TTTsDF.groupBy($"tweetType").count().orderBy($"count".desc))
tweetType count
ReTweet 8401.0
Original Tweet 6626.0
Reply Tweet 3472.0
Retweet of Quoted Tweet 556.0
Quoted Tweet 439.0
Reply of Quoted Tweet 11.0
// this will make sure all streaming job in the cluster are stopped
StreamingContext.getActive.foreach{ _.stop(stopSparkContext = false) }
// this will delete what we collected to keep the disk usage tight and tidy
dbutils.fs.rm(outputDirectoryRoot, true)
res67: Boolean = true

%md

Next, let's write the tweets into a scalable commercial cloud storage system

We will make sure to write the tweets to AWS's simple storage service or S3, a scalable storage system in the cloud. See https://aws.amazon.com/s3/.

skip this section if you don't have AWS account.

But all the main syntactic bits are here for your future convenience :)

// Replace with your AWS S3 credentials
//
// NOTE: Set the access to this notebook appropriately to protect the security of your keys.
// Or you can delete this cell after you run the mount command below once successfully.

val AccessKey = getArgument("1. ACCESS_KEY", "REPLACE_WITH_YOUR_ACCESS_KEY")
val SecretKey = getArgument("2. SECRET_KEY", "REPLACE_WITH_YOUR_SECRET_KEY")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = getArgument("3. S3_BUCKET", "REPLACE_WITH_YOUR_S3_BUCKET")
val MountName = getArgument("4. MNT_NAME", "REPLACE_WITH_YOUR_MOUNT_NAME")
val s3Filename = "tweetDump"

Now just mount s3 as follows:

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")

Now you can use the dbutils commands freely to access data in the mounted S3.

dbutils.fs.help()

copying:

// to copy all the tweets to s3
dbutils.fs.cp("dbfs:/rawTweets",s"/mnt/$MountName/rawTweetsInS3/",recurse=true) 

deleting:

// to remove all the files from s3
dbutils.fs.rm(s"/mnt/$MountName/rawTweetsInS3",recurse=true) 

unmounting:

// finally unmount when done - IMPORTANT!
dbutils.fs.unmount(s"/mnt/$MountName")