SDS-2.2, Scalable Data Science
Archived YouTube video of this live unedited lab-lecture:
Power Forecasting
Student Project
by Gustav Björdal, Mahmoud Shepero and Dennis van der Meer
Run the create-training-data-21 notebook to create all the data
/scalable-data-science/streaming-forecast/02-create-training-data
path | name | size |
---|---|---|
dbfs:/FileStore/tables/forecasting/clean_data.json | clean_data.json | 7.1327975e7 |
inputPath: String = /FileStore/tables/forecasting/ import org.apache.spark.sql.types._ jsonSchema: org.apache.spark.sql.types.StructType = StructType(StructField(ID,StringType,true), StructField(timeStamp,TimestampType,true), StructField(DataList,StructType(StructField(WXT530,StructType(StructField(DN,StringType,true), StructField(SN,StringType,true), StructField(GT3U,StringType,true), StructField(GM41,StringType,true), StructField(GP41,StringType,true), StructField(RC,StringType,true), StructField(RD,StringType,true), StructField(RI,StringType,true)),true), StructField(MX41,StructType(StructField(P,StringType,true)),true)),true)) DF: org.apache.spark.sql.DataFrame = [ID: string, timeStamp: timestamp ... 1 more field] res67: Long = 367965 FinalDF: org.apache.spark.sql.DataFrame = [Power: double, WindDirection: double ... 8 more fields] import org.apache.spark.sql.functions.{lead, lag} w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7876017d leadDf: org.apache.spark.sql.DataFrame = [Power: double, WindDirection: double ... 10 more fields] import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ averagedData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 8 more fields] import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ windowSize: Int = 60 wSpec1: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@6e7540e dataFeatDF: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, Power: double ... 62 more fields] Size of dataset: 61473 w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5599f9e6 leadSteps: Int = 120 dataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields] split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields] split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields] testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields] trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 63 more fields]
Set up the vectorizer
import org.apache.spark.ml.feature.VectorAssembler
val vectorizer = new VectorAssembler()
.setInputCols(
Array(
"meanPower", "stddevPower",
"meanWindDirection", "stddevWindDirection",
"meanWindSpeed", "stddevWindSpeed",
"meanTemperature", "stddevTemperature",
"meanRH","stddevRH",
"meanAP", "stddevAP",
"meanRainCumulative", "stddevRainCumulative",
"meanRainDur", "stddevRainDur",
"meanRainIntens", "stddevRainIntens" ))
.setOutputCol("features")
import org.apache.spark.ml.feature.VectorAssembler vectorizer: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_d02669c9129a
Train a linear regression model
// ***** LINEAR REGRESSION MODEL ****
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.Pipeline
import org.apache.spark.mllib.evaluation.RegressionMetrics
// Let's initialize our linear regression learner
val lr = new LinearRegression()
// We use explain params to dump the parameters we can use
lr.explainParams()
// Now we set the parameters for the method
lr.setPredictionCol("Predicted_Power")
.setLabelCol("future_power")
.setMaxIter(100)
.setRegParam(0.1)
// We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
val lrPipeline = new Pipeline()
lrPipeline.setStages(Array(vectorizer, lr))
// Let's first train on the entire dataset to see what we get
val lrModel = lrPipeline.fit(trainingSet)
val predictionsAndLabels = lrModel.transform(testSet)
val metrics = new RegressionMetrics(predictionsAndLabels.select("Predicted_Power", "future_power").map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])).rdd)
val rmse = metrics.rootMeanSquaredError
val explainedVariance = metrics.explainedVariance
val r2 = metrics.r2
println (f"Root Mean Squared Error: $rmse")
println (f"Explained Variance: $explainedVariance")
println (f"R2: $r2")
Root Mean Squared Error: 5.371102763373557 Explained Variance: 183.27403720831734 R2: 0.8650238794181951 import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.ml.Pipeline import org.apache.spark.mllib.evaluation.RegressionMetrics lr: org.apache.spark.ml.regression.LinearRegression = linReg_d056f664f4f3 lrPipeline: org.apache.spark.ml.Pipeline = pipeline_471de568ed2e lrModel: org.apache.spark.ml.PipelineModel = pipeline_471de568ed2e predictionsAndLabels: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, Power: double ... 65 more fields] metrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@517e318c rmse: Double = 5.371102763373557 explainedVariance: Double = 183.27403720831734 r2: Double = 0.8650238794181951
Not too bad, but let's see if we can improve the results using cross validation.
// Copied from notebook 30
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation._
//Let's set up our evaluator class to judge the model based on the best root mean squared error
val regEval = new RegressionEvaluator()
regEval.setLabelCol("future_power")
.setPredictionCol("Predicted_Power")
.setMetricName("rmse")
//Let's create our crossvalidator with 3 fold cross validation
val crossval = new CrossValidator()
crossval.setEstimator(lrPipeline)
crossval.setNumFolds(3)
crossval.setEvaluator(regEval)
val regParam = ((1 to 10) toArray).map(x => (x /100.0))
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, regParam)
.build()
crossval.setEstimatorParamMaps(paramGrid)
//Now let's create our model
val cvModel = crossval.fit(trainingSet)
val predictionsAndLabels = cvModel.transform(testSet)
val metrics = new RegressionMetrics(predictionsAndLabels.select("Predicted_Power", "future_power").rdd.map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])))
val rmse = metrics.rootMeanSquaredError
val explainedVariance = metrics.explainedVariance
val r2 = metrics.r2
warning: there was one feature warning; re-run with -feature for details import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} import org.apache.spark.ml.evaluation._ regEval: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_d62c017ca16e crossval: org.apache.spark.ml.tuning.CrossValidator = cv_bbfe7b54e4c0 regParam: Array[Double] = Array(0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.1) paramGrid: Array[org.apache.spark.ml.param.ParamMap] = Array({ linReg_d056f664f4f3-regParam: 0.01 }, { linReg_d056f664f4f3-regParam: 0.02 }, { linReg_d056f664f4f3-regParam: 0.03 }, { linReg_d056f664f4f3-regParam: 0.04 }, { linReg_d056f664f4f3-regParam: 0.05 }, { linReg_d056f664f4f3-regParam: 0.06 }, { linReg_d056f664f4f3-regParam: 0.07 }, { linReg_d056f664f4f3-regParam: 0.08 }, { linReg_d056f664f4f3-regParam: 0.09 }, { linReg_d056f664f4f3-regParam: 0.1 }) cvModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_bbfe7b54e4c0 predictionsAndLabels: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, Power: double ... 65 more fields] metrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@5d2f15c0 rmse: Double = 5.3684288676362915 explainedVariance: Double = 185.85849068387805 r2: Double = 0.8651582362729922
rmse: Double = 5.3684288676362915
explainedVariance: Double = 185.85849068387805
r2: Double = 0.8651582362729922
Train a gradient boosted tree
This is basically just copied from notebook 20 (power plant pipeline)
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.Pipeline
val gbt = new GBTRegressor()
gbt.setLabelCol("future_power")
gbt.setPredictionCol("Predicted_Power")
gbt.setFeaturesCol("features")
gbt.setSeed(100088121L)
gbt.setMaxBins(100)
gbt.setMaxIter(120)
val gbtPipeline = new Pipeline()
gbtPipeline.setStages(Array(vectorizer, gbt))
val regEval = new RegressionEvaluator()
regEval.setLabelCol("future_power")
.setPredictionCol("Predicted_Power")
.setMetricName("rmse")
val crossval = new CrossValidator()
crossval.setNumFolds(3)
crossval.setEvaluator(regEval)
crossval.setEstimator(gbtPipeline)
val paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxDepth, Array(2, 3))
.build()
crossval.setEstimatorParamMaps(paramGrid)
//gbt.explainParams
val gbtModel = crossval.fit(trainingSet)
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} import org.apache.spark.ml.evaluation._ import org.apache.spark.ml.regression.GBTRegressor import org.apache.spark.ml.Pipeline gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_775275335c61 gbtPipeline: org.apache.spark.ml.Pipeline = pipeline_7d65fadcdb83 regEval: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_59bac5a3de87 crossval: org.apache.spark.ml.tuning.CrossValidator = cv_3a60f9d0ffd5 paramGrid: Array[org.apache.spark.ml.param.ParamMap] = Array({ gbtr_775275335c61-maxDepth: 2 }, { gbtr_775275335c61-maxDepth: 3 }) gbtModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_3a60f9d0ffd5
import org.apache.spark.ml.regression.GBTRegressionModel
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictionsAndLabels = gbtModel.bestModel.transform(testSet)
val metrics = new RegressionMetrics(predictionsAndLabels.select("Predicted_Power", "future_power").map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])).rdd)
val rmse = metrics.rootMeanSquaredError
val explainedVariance = metrics.explainedVariance
val r2 = metrics.r2
println (f"Root Mean Squared Error: $rmse")
println (f"Explained Variance: $explainedVariance")
println (f"R2: $r2")
Root Mean Squared Error: 2.863424384194055 Explained Variance: 202.1087124482088 R2: 0.961637980977275 import org.apache.spark.ml.regression.GBTRegressionModel import org.apache.spark.mllib.evaluation.RegressionMetrics predictionsAndLabels: org.apache.spark.sql.DataFrame = [window: struct<start: timestamp, end: timestamp>, Power: double ... 65 more fields] metrics: org.apache.spark.mllib.evaluation.RegressionMetrics = org.apache.spark.mllib.evaluation.RegressionMetrics@50154faf rmse: Double = 2.863424384194055 explainedVariance: Double = 202.1087124482088 r2: Double = 0.961637980977275
rmse: Double = 2.863424384194055
explainedVariance: Double = 202.1087124482088
r2: Double = 0.961637980977275
val results = predictionsAndLabels.withColumn("difference", $"Predicted_Power" - $"future_power").cache()
results: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [window: struct<start: timestamp, end: timestamp>, Power: double ... 66 more fields]
Displaying the results
Note that the predicted power is 30 minutes into the future and you need to "visually shift" one of the lines
display(predictionsAndLabels)