SDS-2.2, Scalable Data Science

Million Song Dataset - Kaggle Challenge

Predict which songs a user will listen to.

SOURCE: This is just a Scala-rification of the Python notebook published in databricks community edition in 2016.

CAUTION: This notebook is expected to have an error in command 28 (Cmd 28 in databricks notebook). You are meant to learn how to fix this error with simple exception-handling to become a better data scientist. So ignore this warning, if any.

Archived YouTube video of this live unedited lab-lecture:

Archived YouTube video of this live unedited lab-lecture

Stage 1: Parsing songs data

ETL

This is the first notebook in this tutorial. In this notebook we will read data from DBFS (DataBricks FileSystem). We will parse data and load it as a table that can be readily used in following notebooks.

By going through this notebook you can expect to learn how to read distributed data as an RDD, how to transform RDDs, and how to construct a Spark DataFrame from an RDD and register it as a table.

We first explore different files in our distributed file system. We use a header file to construct a Spark Schema object. We write a function that takes the header and casts strings in each line of our data to corresponding types. Once we run this function on the data we find that it fails on some corner caes. We update our function and finally get a parsed RDD. We combine that RDD and the Schema to construct a DataFame and register it as a temporary table in SparkSQL.

Text data files are stored in dbfs:/databricks-datasets/songs/data-001

You can conveniently list files on distributed file system (DBFS, S3 or HDFS) using %fs commands.

ls /databricks-datasets/songs/data-001/
path name size
dbfs:/databricks-datasets/songs/data-001/header.txt header.txt 377.0
dbfs:/databricks-datasets/songs/data-001/part-00000 part-00000 52837.0
dbfs:/databricks-datasets/songs/data-001/part-00001 part-00001 52469.0
dbfs:/databricks-datasets/songs/data-001/part-00002 part-00002 51778.0
dbfs:/databricks-datasets/songs/data-001/part-00003 part-00003 50551.0
dbfs:/databricks-datasets/songs/data-001/part-00004 part-00004 53449.0
dbfs:/databricks-datasets/songs/data-001/part-00005 part-00005 53301.0
dbfs:/databricks-datasets/songs/data-001/part-00006 part-00006 54184.0
dbfs:/databricks-datasets/songs/data-001/part-00007 part-00007 50924.0
dbfs:/databricks-datasets/songs/data-001/part-00008 part-00008 52533.0
dbfs:/databricks-datasets/songs/data-001/part-00009 part-00009 54570.0
dbfs:/databricks-datasets/songs/data-001/part-00010 part-00010 54338.0
dbfs:/databricks-datasets/songs/data-001/part-00011 part-00011 51836.0
dbfs:/databricks-datasets/songs/data-001/part-00012 part-00012 52297.0
dbfs:/databricks-datasets/songs/data-001/part-00013 part-00013 52044.0
dbfs:/databricks-datasets/songs/data-001/part-00014 part-00014 50704.0
dbfs:/databricks-datasets/songs/data-001/part-00015 part-00015 54158.0
dbfs:/databricks-datasets/songs/data-001/part-00016 part-00016 50080.0
dbfs:/databricks-datasets/songs/data-001/part-00017 part-00017 47708.0
dbfs:/databricks-datasets/songs/data-001/part-00018 part-00018 8858.0
dbfs:/databricks-datasets/songs/data-001/part-00019 part-00019 53323.0
dbfs:/databricks-datasets/songs/data-001/part-00020 part-00020 57877.0
dbfs:/databricks-datasets/songs/data-001/part-00021 part-00021 52491.0
dbfs:/databricks-datasets/songs/data-001/part-00022 part-00022 54791.0
dbfs:/databricks-datasets/songs/data-001/part-00023 part-00023 50682.0
dbfs:/databricks-datasets/songs/data-001/part-00024 part-00024 52863.0
dbfs:/databricks-datasets/songs/data-001/part-00025 part-00025 47416.0
dbfs:/databricks-datasets/songs/data-001/part-00026 part-00026 50130.0
dbfs:/databricks-datasets/songs/data-001/part-00027 part-00027 53462.0
dbfs:/databricks-datasets/songs/data-001/part-00028 part-00028 54179.0

Truncated to 30 rows

As you can see in the listing we have data files and a single header file. The header file seems interesting and worth a first inspection at first. The file is 377 bytes, therefore it is safe to collect the entire content of the file in the notebook.

sc.textFile("databricks-datasets/songs/data-001/header.txt").collect()
res1: Array[String] = Array(artist_id:string, artist_latitude:double, artist_longitude:double, artist_location:string, artist_name:string, duration:double, end_of_fade_in:double, key:int, key_confidence:double, loudness:double, release:string, song_hotnes:double, song_id:string, start_of_fade_out:double, tempo:double, time_signature:double, time_signature_confidence:double, title:string, year:double, partial_sequence:int)

Remember you can collect() a huge RDD and crash the driver program - so it is a good practise to take a couple lines and count the number of lines, especially if you have no idea what file you are trying to read.

sc.textFile("databricks-datasets/songs/data-001/header.txt").take(2)
res3: Array[String] = Array(artist_id:string, artist_latitude:double)
sc.textFile("databricks-datasets/songs/data-001/header.txt").count()
res4: Long = 20
//sc.textFile("databricks-datasets/songs/data-001/header.txt").collect.map(println) // uncomment to see line-by-line

As seen above each line in the header consists of a name and a type separated by colon. We will need to parse the header file as follows:

val header = sc.textFile("/databricks-datasets/songs/data-001/header.txt").map(line => {
                val headerElement = line.split(":")
                (headerElement(0), headerElement(1))
            }
           ).collect()
header: Array[(String, String)] = Array((artist_id,string), (artist_latitude,double), (artist_longitude,double), (artist_location,string), (artist_name,string), (duration,double), (end_of_fade_in,double), (key,int), (key_confidence,double), (loudness,double), (release,string), (song_hotnes,double), (song_id,string), (start_of_fade_out,double), (tempo,double), (time_signature,double), (time_signature_confidence,double), (title,string), (year,double), (partial_sequence,int))

Let's define a case class called Song that will be used to represent each row of data in the files:

  • /databricks-datasets/songs/data-001/part-00000 through /databricks-datasets/songs/data-001/part-00119 or the last .../part-***** file.
case class Song(artist_id: String, artist_latitude: Double, artist_longitude: Double, artist_location: String, artist_name: String, duration: Double, end_of_fade_in: Double, key: Int, key_confidence: Double, loudness: Double, release: String, song_hotness: Double, song_id: String, start_of_fade_out: Double, tempo: Double, time_signature: Double, time_signature_confidence: Double, title: String, year: Double, partial_sequence: Int)
defined class Song

Now we turn to data files. First, step is inspecting the first line of data to inspect its format.

// this is loads all the data - a subset of the 1M songs dataset
val dataRDD = sc.textFile("/databricks-datasets/songs/data-001/part-*")
dataRDD: org.apache.spark.rdd.RDD[String] = /databricks-datasets/songs/data-001/part-* MapPartitionsRDD[13983] at textFile at <console>:35
dataRDD.count // number of songs
res5: Long = 31369
dataRDD.take(3)
res6: Array[String] = Array(AR81V6H1187FB48872    nan    nan        Earl Sixteen    213.7073    0.0    11    0.419    -12.106    Soldier of Jah Army    nan    SOVNZSZ12AB018A9B8    208.289    125.882    1    0.0    Rastaman    2003    --, ARVVZQP11E2835DBCB    nan    nan        Wavves    133.25016    0.0    0    0.282    0.596    Wavvves    0.471578247701    SOJTQHQ12A8C143C5F    128.116    89.519    1    0.0    I Want To See You (And Go To The Movies)    2009    --, ARFG9M11187FB3BBCB    nan    nan    Nashua USA    C-Side    247.32689    0.0    9    0.612    -4.896    Santa Festival Compilation 2008 vol.1    nan    SOAJSQL12AB0180501    242.196    171.278    5    1.0    Loose on the Dancefloor    0    225261)

Each line of data consists of multiple fields separated by \t. With that information and what we learned from the header file, we set out to parse our data.

  • We have already created a case class based on the header (which seems to agree with the 3 lines above).
  • Next, we will create a function that takes each line as input and returns the case class as output.
// let's do this 'by hand' to re-flex our RDD-muscles :)
// although this is not a robust way to read from a data engineering perspective (without fielding exceptions)
def parseLine(line: String): Song = {

  val tokens = line.split("\t")
  Song(tokens(0), tokens(1).toDouble, tokens(2).toDouble, tokens(3), tokens(4), tokens(5).toDouble, tokens(6).toDouble, tokens(7).toInt, tokens(8).toDouble, tokens(9).toDouble, tokens(10), tokens(11).toDouble, tokens(12), tokens(13).toDouble, tokens(14).toDouble, tokens(15).toDouble, tokens(16).toDouble, tokens(17), tokens(18).toDouble, tokens(19).toInt)
}
parseLine: (line: String)Song

With this function we can transform the dataRDD to another RDD that consists of Song case classes

val parsedRDD = dataRDD.map(parseLine)
parsedRDD: org.apache.spark.rdd.RDD[Song] = MapPartitionsRDD[13984] at map at <console>:36

To convert an RDD of case classes to a DataFrame, we just need to call the toDF method

val df = parsedRDD.toDF
df: org.apache.spark.sql.DataFrame = [artist_id: string, artist_latitude: double ... 18 more fields]

Once we get a DataFrame we can register it as a temporary table. That will allow us to use its name in SQL queries.

df.createOrReplaceTempView("songsTable")

We can now cache our table. So far all operations have been lazy. This is the first time Spark will attempt to actually read all our data and apply the transformations.

If you are running Spark 1.6+ the next command will throw a parsing error.

cache table songsTable

The error means that we are trying to convert a missing value to a Double. Here is an updated version of the parseLine function to deal with missing values

// good data engineering science practise
def parseLine(line: String): Song = {


  def toDouble(value: String, defaultVal: Double): Double = {
    try {
       value.toDouble
    } catch {
      case e: Exception => defaultVal
    }
  }

  def toInt(value: String, defaultVal: Int): Int = {
    try {
       value.toInt
      } catch {
      case e: Exception => defaultVal
    }
  }

  val tokens = line.split("\t")
  Song(tokens(0), toDouble(tokens(1), 0.0), toDouble(tokens(2), 0.0), tokens(3), tokens(4), toDouble(tokens(5), 0.0), toDouble(tokens(6), 0.0), toInt(tokens(7), -1), toDouble(tokens(8), 0.0), toDouble(tokens(9), 0.0), tokens(10), toDouble(tokens(11), 0.0), tokens(12), toDouble(tokens(13), 0.0), toDouble(tokens(14), 0.0), toDouble(tokens(15), 0.0), toDouble(tokens(16), 0.0), tokens(17), toDouble(tokens(18), 0.0), toInt(tokens(19), -1))
}
parseLine: (line: String)Song
val df = dataRDD.map(parseLine).toDF
df.createOrReplaceTempView("songsTable")
df: org.apache.spark.sql.DataFrame = [artist_id: string, artist_latitude: double ... 18 more fields]

And let's try caching the table. We are going to access this data multiple times in following notebooks, therefore it is a good idea to cache it in memory for faster subsequent access.

cache table songsTable

From now on we can easily query our data using the temporary table we just created and cached in memory. Since it is registered as a table we can conveniently use SQL as well as Spark API to access it.

select * from songsTable limit 10
artist_id artist_latitude artist_longitude artist_location artist_name duration end_of_fade_in key key_confidence loudness release song_hotness
AR81V6H1187FB48872 0.0 0.0 Earl Sixteen 213.7073 0.0 11.0 0.419 -12.106 Soldier of Jah Army 0.0
ARVVZQP11E2835DBCB 0.0 0.0 Wavves 133.25016 0.0 0.0 0.282 0.596 Wavvves 0.471578247701
ARFG9M11187FB3BBCB 0.0 0.0 Nashua USA C-Side 247.32689 0.0 9.0 0.612 -4.896 Santa Festival Compilation 2008 vol.1 0.0
ARK4Z2O1187FB45FF0 0.0 0.0 Harvest 337.05751 0.247 4.0 0.46 -9.092 Underground Community 0.0
AR4VQSG1187FB57E18 35.25082 -91.74015 Searcy, AR Gossip 430.23628 0.0 2.0 3.4e-2 -6.846 Yr Mangled Heart 0.0
ARNBV1X1187B996249 0.0 0.0 Alex 186.80118 0.0 4.0 0.641 -16.108 Jolgaledin 0.0
ARXOEZX1187B9B82A1 0.0 0.0 Elie Attieh 361.89995 0.0 7.0 0.863 -4.919 ELITE 0.0
ARXPUIA1187B9A32F1 0.0 0.0 Rome, Italy Simone Cristicchi 220.00281 2.119 4.0 0.486 -6.52 Dall'Altra Parte Del Cancello 0.484225272411
ARNPPTH1187B9AD429 51.4855 -0.37196 Heston, Middlesex, England Jimmy Page 156.86485 0.334 7.0 0.493 -9.962 No Introduction Necessary [Deluxe Edition] 0.0
AROGWRA122988FEE45 0.0 0.0 Christos Dantis 256.67873 2.537 9.0 0.742 -13.404 Daktilika Apotipomata 0.0

Truncated to 12 cols

Next up is exploring this data. Click on the Exploration notebook to continue the tutorial.