SDS-2.2, Scalable Data Science

Archived YouTube video of this live unedited lab-lecture:

Archived YouTube video of this live unedited lab-lecture

Power Forecasting

Student Project

by Gustav Björdal, Mahmoud Shepero and Dennis van der Meer

Create labeled training data from timeseries dataset

/scalable-data-science/streaming-forecast/01-load-data-from-JSON
path name size
dbfs:/FileStore/tables/forecasting/clean_data.json clean_data.json 7.1327975e7
inputPath: String = /FileStore/tables/forecasting/
import org.apache.spark.sql.types._
jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,true), StructField(timeStamp,TimestampType,true), StructField(DataList,StructType(StructField(WXT530,StructType(StructField(DN,StringType,true), StructField(SN,StringType,true), StructField(GT3U,StringType,true), StructField(GM41,StringType,true), StructField(GP41,StringType,true), StructField(RC,StringType,true), StructField(RD,StringType,true), StructField(RI,StringType,true)),true), StructField(MX41,StructType(StructField(P,StringType,true)),true)),true))

DF: org.apache.spark.sql.DataFrame = [ID: string, timeStamp: timestamp ... 1 more field]
res1: Long = 367965

FinalDF: org.apache.spark.sql.DataFrame = [Power: double, WindDirection: double ... 8 more fields]

import org.apache.spark.sql.functions.{lead, lag}
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@50b23287
leadDf: org.apache.spark.sql.DataFrame = [Power: double, WindDirection: double ... 10 more fields]

Reduce noise in data

We reduce the noise in the data by averaging over every 30 seconds.

This replaces the old data

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

val averagedData = leadDf
      .groupBy(window(leadDf.col("timeStamp"),"30 seconds","30 seconds"))
      .agg(
        //Power
          avg("Power").as("Power"),
        //WindDir
          avg("WindDirection").as("WindDirection"),
        //Windspeed
          avg("WindSpeed").as("WindSpeed"),
        //Temperature
          avg("Temperature").as("Temperature"),
        //RH
          avg("RH").as("RH"),
        //AP
          avg("AP").as("AP"),
        //Rain cumulative
          avg("RainCumulative").as("RainCumulative"),
        //Rain Dur
          avg("RainDur").as("RainDur"),
        //Rain Intens
          avg("RainIntens").as("RainIntens")        
          )
.orderBy("window")
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
averagedData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 8 more fields]
//display(averagedData)

Calculate mean and stddev using code from notebook 21 of the SDS course

Create a window of size 60.

Since every row in the dataset corresponds to the average over 30 seconds, a window size of 60 corresponds to a 30 minute time window.

 // Import the window functions.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window specification
val windowSize = 2*30
val wSpec1 = Window.orderBy("window").rowsBetween(-windowSize, 0)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
windowSize: Int = 60
wSpec1: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7050b68f

For every column in the dataset, compute the mean and stddev:

 // Calculate the moving window statistics from data
      val dataFeatDF = averagedData
      .withColumn( "meanPower", mean($"Power").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanPower", abs($"Power" - $"meanPower") )
      .withColumn( "meanAbsDevFromMeanPower", mean("absDevFromMeanPower").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanPower", pow($"absDevFromMeanPower",2.0) )
      .withColumn( "variancePower", mean("sqrDevFromMeanPower").over(wSpec1) )
      .withColumn( "stddevPower", pow($"variancePower",0.50) )

      .withColumn( "meanWindDirection", mean($"WindDirection").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanWindDirection", abs($"WindDirection" - $"meanWindDirection") )
      .withColumn( "meanAbsDevFromMeanWindDirection", mean("absDevFromMeanWindDirection").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanWindDirection", pow($"absDevFromMeanWindDirection",2.0) )
      .withColumn( "varianceWindDirection", mean("sqrDevFromMeanWindDirection").over(wSpec1) )
      .withColumn( "stddevWindDirection", pow($"varianceWindDirection",0.50) )

      .withColumn( "meanWindSpeed", mean($"WindSpeed").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanWindSpeed", abs($"WindSpeed" - $"meanWindSpeed") )
      .withColumn( "meanAbsDevFromMeanWindSpeed", mean("absDevFromMeanWindSpeed").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanWindSpeed", pow($"absDevFromMeanWindSpeed",2.0) )
      .withColumn( "varianceWindSpeed", mean("sqrDevFromMeanWindSpeed").over(wSpec1) )
      .withColumn( "stddevWindSpeed", pow($"varianceWindSpeed",0.50) )

.withColumn( "meanTemperature", mean($"Temperature").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanTemperature", abs($"Temperature" - $"meanTemperature") )
      .withColumn( "meanAbsDevFromMeanTemperature", mean("absDevFromMeanTemperature").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanTemperature", pow($"absDevFromMeanTemperature",2.0) )
      .withColumn( "varianceTemperature", mean("sqrDevFromMeanTemperature").over(wSpec1) )
      .withColumn( "stddevTemperature", pow($"varianceTemperature",0.50) )

.withColumn( "meanRH", mean($"RH").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanRH", abs($"RH" - $"meanRH") )
      .withColumn( "meanAbsDevFromMeanRH", mean("absDevFromMeanRH").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanRH", pow($"absDevFromMeanRH",2.0) )
      .withColumn( "varianceRH", mean("sqrDevFromMeanRH").over(wSpec1) )
      .withColumn( "stddevRH", pow($"varianceRH",0.50) )

      .withColumn( "meanAP", mean($"AP").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanAP", abs($"AP" - $"meanAP") )
      .withColumn( "meanAbsDevFromMeanAP", mean("absDevFromMeanAP").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanAP", pow($"absDevFromMeanAP",2.0) )
      .withColumn( "varianceAP", mean("sqrDevFromMeanAP").over(wSpec1) )
      .withColumn( "stddevAP", pow($"varianceAP",0.50) )

.withColumn( "meanRainCumulative", mean($"RainCumulative").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanRainCumulative", abs($"RainCumulative" - $"meanRainCumulative") )
      .withColumn( "meanAbsDevFromMeanRainCumulative", mean("absDevFromMeanRainCumulative").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanRainCumulative", pow($"absDevFromMeanRainCumulative",2.0) )
      .withColumn( "varianceRainCumulative", mean("sqrDevFromMeanRainCumulative").over(wSpec1) )
      .withColumn( "stddevRainCumulative", pow($"varianceRainCumulative",0.50) )

.withColumn( "meanRainDur", mean($"RainDur").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanRainDur", abs($"RainDur" - $"meanRainDur") )
      .withColumn( "meanAbsDevFromMeanRainDur", mean("absDevFromMeanRainDur").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanRainDur", pow($"absDevFromMeanRainDur",2.0) )
      .withColumn( "varianceRainDur", mean("sqrDevFromMeanRainDur").over(wSpec1) )
      .withColumn( "stddevRainDur", pow($"varianceRainDur",0.50) )

.withColumn( "meanRainIntens", mean($"RainIntens").over(wSpec1)  )
// (1 / n ) * ∑ |b - mean_b|, for b in {x,y,z} 
      .withColumn( "absDevFromMeanRainIntens", abs($"RainIntens" - $"meanRainIntens") )
      .withColumn( "meanAbsDevFromMeanRainIntens", mean("absDevFromMeanRainIntens").over(wSpec1) )
//standard deviation  = √ variance = √ 1/n * ∑ (x - u)² with u = mean x
      .withColumn( "sqrDevFromMeanRainIntens", pow($"absDevFromMeanRainIntens",2.0) )
      .withColumn( "varianceRainIntens", mean("sqrDevFromMeanRainIntens").over(wSpec1) )
      .withColumn( "stddevRainIntens", pow($"varianceRainIntens",0.50) )
dataFeatDF: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, Power: double ... 62 more fields]
//display(dataFeatDF)
println("Size of dataset: " + dataFeatDF.count)
res12: Long = 61473

Find the value to predict for every row

For every row, take the mean power 120 rows down as the power to predict. This corresponds to predicting 60 minutes into the future.

We call this column the future_power

val w = org.apache.spark.sql.expressions.Window.orderBy("window")    

val leadSteps = 2*60

val dataset = dataFeatDF.withColumn("future_power",lead("meanPower",leadSteps,0).over(w)).orderBy("window")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@552488ce
leadSteps: Int = 120
dataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]
//display(dataset)

Split the data into a training and test set

This is for modelgeneralizability.

var Array(split20, split80) = dataset.randomSplit(Array(0.20, 0.80), 1800009193L)
split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]
split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]
// Let's cache these datasets for performance
val testSet = split20.cache()
val trainingSet = split80.cache()
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]
trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]

Exporting:

  • testSet
  • trainingSet