// Databricks notebook source exported at Mon, 14 Mar 2016 04:43:58 UTC
Scalable Data Science
prepared by Raazesh Sainudiin and Sivanand Sivaram
This is an elaboration of the Apache Spark 1.6 sql-progamming-guide.
Performance Tuning
Spark Sql Programming Guide
- Overview
- SQL
- DataFrames
- Datasets
- Getting Started
- Starting Point: SQLContext
- Creating DataFrames
- DataFrame Operations
- Running SQL Queries Programmatically
- Creating Datasets
- Interoperating with RDDs
- Inferring the Schema Using Reflection
- Programmatically Specifying the Schema
- Data Sources
- Generic Load/Save Functions
- Manually Specifying Options
- Run SQL on files directly
- Save Modes
- Saving to Persistent Tables
- Parquet Files
- Loading Data Programmatically
- Partition Discovery
- Schema Merging
- Hive metastore Parquet table conversion
- Hive/Parquet Schema Reconciliation
- Metadata Refreshing
- Configuration
- JSON Datasets
- Hive Tables
- Interacting with Different Versions of Hive Metastore
- JDBC To Other Databases
- Troubleshooting
- Generic Load/Save Functions
- Performance Tuning
- Caching Data In Memory
- Other Configuration Options
- Distributed SQL Engine
- Running the Thrift JDBC/ODBC server
- Running the Spark SQL CLI
Performance Tuning
For some workloads it is possible to improve performance by either caching data in memory, or by turning on some experimental options.
Caching Data In Memory
Spark SQL can cache tables using an in-memory columnar format by calling
sqlContext.cacheTable("tableName")
or dataFrame.cache()
. Then Spark
SQL will scan only required columns and will automatically tune
compression to minimize memory usage and GC pressure. You can call
sqlContext.uncacheTable("tableName")
to remove the table from memory.
Configuration of in-memory caching can be done using the setConf
method on SQLContext
or by running SET key=value
commands using SQL.
Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed
true When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.
spark.sql.inMemoryColumnarStorage.batchSize
10000 Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
Other Configuration Options
The following options can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.
Property Name Default Meaning
spark.sql.autoBroadcastJoinThreshold
10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan
has been run.
spark.sql.tungsten.enabled
true When true, use the optimized Tungsten physical execution backend which explicitly manages memory and dynamically generates bytecode for expression evaluation.
spark.sql.shuffle.partitions
200 Configures the number of partitions to use when shuffling data for joins or aggregations.