// Databricks notebook source exported at Sun, 26 Jun 2016 01:43:20 UTC

Scalable Data Science

Course Project by Akinwande Atanda

supported by and

The html source url of this databricks notebook and its recorded Uji Image of Uji, Dogen's Time-Being:

sds/uji/studentProjects/02_AkinwandeAtanda/Tweet_Analytics/043_TA02_ETL_Tweets

Tweet Analytics

Presentation contents.

Extract-Transform-Load (ETL) Processing of Streamed Tweets

This notebook should be runned after the tweets have been collected in batches. The operations performed in this notebooks are:

  • Read/Load the Streamed Tweets in batches of RDD
  • Read/Load the Streamed Tweets in merged batches of RDDs
  • Save the Tweets in Parquet format, convert to Dataframe Table and run SQL queries
  • Explore the Streamed Tweets using SQL queries: Filter, Plot and Re-Shape

display(dbutils.fs.ls(s"/mnt/s3Data/filteredTweet"))

val tweet = sc.textFile(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000").take(1)

val tweetDF = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/tweets_1463600700000")

tweetDF.show()

tweetDF.printSchema()

tweetDF.select("text").show

tweetDF.select("createdAt", "text", "favoriteCount","retweetCount").show

tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")+1,tweetDF("retweetCount")+2).show

tweetDF.select(tweetDF("createdAt"), tweetDF("text"), tweetDF("favoriteCount")>1,tweetDF("retweetCount")).show

tweetDF.groupBy("favoriteCount").count().show

tweetDF.groupBy("retweetCount").count().show

tweetDF.groupBy("createdAt").count().show

tweetDF.groupBy("user").count().show

tweetDF.filter(tweetDF("favoriteCount")<1).show

tweetDF.filter(tweetDF("tex")==="trump").show

import sqlContext.implicits._

val tweedDS = tweetDF.select("text").show

Merged RDD Streams


val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")

tweetDF2.show

tweetDF2.select("createdAt","text").show

tweetDF2.groupBy("createdAt").count.show

tweetDF2.groupBy("favoriteCount").count.show

tweetDF2.groupBy("retweetCount").count.show

tweetDF2.groupBy("text").count.show

Saving the Tweet as a Table and Run SQL Queries

  • Unmerged Tweet (tweetDF)

tweetDF.registerTempTable("tweetTable") // Register the DataFrames as a table.

val tweetText = sqlContext.sql("SELECT text FROM tweetTable") //Run SQL query

tweetText.take(1).foreach(println)

tweetText.map(t=>(t,1)).take(1).foreach(println)

Merged RDD Streams


val tweetDF2 = sqlContext.read.json(s"dbfs:/mnt/s3Data/filteredTweet/*")

Saving the Tweet as a Table and Run SQL Queries

Parquet Data format: Save, Preview from the Tables menu, and Query directly without transforming to DataFrame


tweetDF2.select("createdAt", "text", "favoriteCount","retweetCount").write.save("filterTweet.parquet")  
//Save the filter Tweet as parquest data format and go to create table to load it from the directory.

val mergedTweets = sqlContext.read.format("parquet").load("dbfs:/filterTweet.parquet") 
//This reads all the tweets in the parquet data format

mergedTweets.registerTempTable("mergedTweetsTable") // Save as a Table

val mergedTweetQuery = sqlContext.sql("SELECT * FROM mergedTweetsTable")

Use SQL syntax to extract required fileds from the registered table


%sql SELECT * FROM mergedTweetsTable LIMIT 1

mergedTweetQuery.cache

mergedTweetQuery.map(c=>c(1)).foreach(println)

mergedTweetQuery.take(1)

val mergedTextQuery = sqlContext.sql("SELECT text FROM mergedTweetsTable").cache

mergedTextQuery.map(c=>c).take(1).foreach(println)

dbutils.fs.help

display(dbutils.fs.ls("dbfs:/"))

dbutils.fs.rm("dbfs:/filterTweet.parquet",recurse=true)

display(dbutils.fs.ls("dbfs:/"))

//display(dbutils.fs.ls("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt"))

//dbutils.fs.rm("dbfs:/mnt/s3Data/filteredTweet/Trumps.txt",recurse=true)

ETL Operationalization: Actual Tweeets Project


//dbutils.fs.rm(s"dbfs:/mnt/s3Data/TrumpTweetText.parquet",recurse=true)

display(dbutils.fs.ls(s"dbfs:/mnt/s3Data"))

dbutils.fs.rm(s"dbfs:/mnt/s3Data/tweetAll.parquet",recurse=true)

val tweetAll = sqlContext.read.json(s"dbfs:/mnt/s3Data/twitterNew/*")
tweetAll.cache

//Save the filter Tweet as parquest data format and go to create table to load it from the directory.
tweetAll.select("createdAt", "text", "favoriteCount","retweetCount").write.save(s"dbfs:/mnt/s3Data/tweetAll.parquet")

val mergedTweetAll = sqlContext.read.format("parquet").load(s"dbfs:/mnt/s3Data/tweetAll.parquet") //This reads all the tweets in the parquet data format

mergedTweetAll.registerTempTable("mergedTweetTable")

%sql SELECT * FROM mergedTweetTable LIMIT 1

mergedTweetAll.show

Returns the number of Tweets in the merged dataset (26.3million Tweets as at 3.30p.m today)


%sql SELECT COUNT(text) as TweetCount FROM mergedTweetTable

Returns the number of Tweets in the merged dataset group and sort by Date of Created Tweet


%sql SELECT COUNT(text) as TweetCount, createdAt FROM mergedTweetTable group by createdAt order by createdAt asc

%sql SELECT COUNT(text) as TweetCount, createdAt as DStreamTime FROM mergedTweetTable group by createdAt order by createdAt asc

%sql SELECT distinct createdAt FROM mergedTweetTable where favoriteCount == 1

Filter Query by Keywords


%sql SELECT count(*) as TrumpsTweet FROM mergedTweetTable where text like "%trump%"

%sql SELECT COUNT(text) as TrumpsTweetCount, createdAt as DStreamTime FROM mergedTweetTable where text like "%trump%" group by createdAt order by createdAt asc

%sql SELECT createdAt as DStreamTime, text as TrumpsTweet FROM mergedTweetTable where text like "%trump%" order by createdAt asc limit 1

val TrumpTextQuery = sqlContext.sql("SELECT createdAt as date, text as review, favoriteCount as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache

val TrumpQuery = sqlContext.sql("SELECT createdAt as date, text as review, CAST(favoriteCount as FLOAT) as category FROM mergedTweetTable where text like '%trump%' order by createdAt asc").cache

TrumpTextQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpTweet.parquet")

TrumpQuery.select("date", "review", "category").write.save(s"dbfs:/mnt/s3Data/TrumpSentiment.parquet")

TrumpTextQuery.registerTempTable("TrumpTweetTable")

%sql ALTER TABLE TrumpTweetTable ALTER COLUMN category

%sql SELECT * FROM TrumpTweetTable limit 3

TrumpTextQuery.count //Returns the number of tweets

TrumpTextQuery.take(3).foreach(println)

Scalable Data Science

Course Project by Akinwande Atanda

supported by and

results matching ""

    No results matching ""