SDS-2.2, Scalable Data Science

What is Geospatial Analytics?

(watch now 3 minutes and 23 seconds: 111-314 seconds):

Spark Summit East 2016 - What is Geospatial Analytics by Ram Sri Harsha

Some Concrete Examples of Scalable Geospatial Analytics

Let us check out cross-domain data fusion in MSR's Urban Computing Group

Several sciences are naturally geospatial

  • forestry,
  • geography,
  • geology,
  • seismology,
  • etc. etc.

See for example the global EQ datastreams from US geological Service below.

For a global data source, see US geological Service's Earthquake hazards Program "

Introduction to Magellan for Scalable Geospatial Analytics

This is a minor augmentation of Ram Harsha's Magellan code blogged here:

First you need to attach the following library:

  • the magellan library (maven coordinates harsha2010:magellan:1.0.5-s_2.11)

Do we need one more geospatial analytics library?

From Ram's slide 4 of this Spark Summit East 2016 talk at slideshare:

  • Spatial Analytics at scale is challenging
    • Simplicity + Scalability = Hard
  • Ancient Data Formats
    • metadata, indexing not handled well, inefficient storage
  • Geospatial Analytics is not simply Business Intelligence anymore
    • Statistical + Machine Learning being leveraged in geospatial
  • Now is the time to do it!
    • Explosion of mobile data
    • Finer granularity of data collection for geometries
    • Analytics stretching the limits of traditional approaches
    • Spark SQL + Catalyst + Tungsten makes extensible SQL engines easier than ever before!

Nuts and Bolts of Magellan

Let us go and grab this databricks notebook:

and look at the magellan README in github:

HOMEWORK: Watch the magellan presentation by Ram Harsha (Hortonworks) in Spark Summit East 2016.

Other resources for magellan:

Let's get our hands dirty with basics in magellan.

Data Structures

  • Points
  • Polygons
  • lines
  • Polylines


  • within
  • intersects
// create a points DataFrame
val points = sc.parallelize(Seq((-1.0, -1.0), (-1.0, 1.0), (1.0, -1.0))).toDF("x", "y")
points: org.apache.spark.sql.DataFrame = [x: double, y: double]
// transform (lat,lon) into Point using custom user-defined function
import magellan.Point
import org.apache.spark.sql.functions.udf
val toPointUDF = udf{(x:Double,y:Double) => Point(x,y) }
import magellan.Point
import org.apache.spark.sql.functions.udf
toPointUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,org.apache.spark.sql.types.PointUDT@105fcd11,Some(List(DoubleType, DoubleType)))
// let's show the results of the DF with a new column called point
points.withColumn("point", toPointUDF('x, 'y)).show()
|   x|   y|            point|
|-1.0|-1.0|Point(-1.0, -1.0)|
|-1.0| 1.0| Point(-1.0, 1.0)|
| 1.0|-1.0| Point(1.0, -1.0)|
// Let's instead use the built-in expression to do the same - it's much faster on larger DataFrames due to code-gen
import org.apache.spark.sql.magellan.dsl.expressions._

points.withColumn("point", point('x, 'y)).show()
|   x|   y|            point|
|-1.0|-1.0|Point(-1.0, -1.0)|
|-1.0| 1.0| Point(-1.0, 1.0)|
| 1.0|-1.0| Point(1.0, -1.0)|

import org.apache.spark.sql.magellan.dsl.expressions._

Let's verify empirically if it is indeed faster for larger DataFrames.

// to generate a sequence of pairs of random numbers we can do:
import util.Random.nextDouble
import util.Random.nextDouble
res2: Seq[(Double, Double)] = List((-0.020043427602710828,0.9375053662414891), (-0.994920524839198,0.845271190508138), (-0.1501812761209732,0.10704139325335771), (-0.9891649012229055,0.8031283537358862), (-0.9576677869252214,0.4852309234418518), (-0.3615417292821861,0.026888794684844397), (-0.20066285059225897,0.32093278495843036), (-0.7157377454281582,0.9061198917840395), (-0.1812174392506678,0.19036607653819304), (-0.0999544225947615,0.5381675138406278))
// using the UDF method with 1 million points we can do a count action of the DF with point column
// don'yt add too many zeros as it may crash your driver program
  .toDF("x", "y")
  .withColumn("point", toPointUDF('x, 'y))
res3: Long = 1000000
// seems twice as fast with code-gen
  .toDF("x", "y")
  .withColumn("point", point('x, 'y))
res4: Long = 1000000

Read the following for more on catalyst optimizer and whole-stage code generation.

Try bench-marks here:

// Create a Polygon DataFrame
import magellan.Polygon

case class PolygonExample(polygon: Polygon)

val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
val polygon = Polygon(Array(0), ring)

val polygons = sc.parallelize(Seq(
  PolygonExample(Polygon(Array(0), ring))
import magellan.Polygon
defined class PolygonExample
ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0), Point(1.0, 1.0))
polygon: magellan.Polygon = magellan.Polygon@1ed26b1
polygons: org.apache.spark.sql.DataFrame = [polygon: polygon]
|polygon                  |


// join points with polygons upon intersection
points.withColumn("point", point('x, 'y))
      .where($"point" intersects $"polygon")
res13: Long = 3
// join points with polygons upon within or containement
points.withColumn("point", point('x, 'y))
      .where($"point" within $"polygon")
res14: Long = 0
//creating line from two points
import magellan.Line

case class LineExample(line: Line)

val line = Line(Point(1.0, 1.0), Point(1.0, -1.0))

val lines = sc.parallelize(Seq(

// creating polyline
import magellan.PolyLine

case class PolyLineExample(polyline: PolyLine)

val ring = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0))

val polylines1 = sc.parallelize(Seq(
  PolyLineExample(PolyLine(Array(0), ring))
import magellan.PolyLine
defined class PolyLineExample
ring: Array[magellan.Point] = Array(Point(1.0, 1.0), Point(1.0, -1.0), Point(-1.0, -1.0), Point(-1.0, 1.0))
polylines1: org.apache.spark.sql.DataFrame = [polyline: polyline]
// now let's make a polyline with two or more lines out of the same ring
val polylines2 = sc.parallelize(Seq(
  PolyLineExample(PolyLine(Array(0,2), ring)) // first line starts are index 0 and second one starts at index 2


Check out the NYC Taxi Dataset in Magellan

This is a much larger dataset and we may need access to a larger cluster - unless we just analyse a smaller subset of the data (perhaps just a month of Taxi rides in NYC). We can understand the same concepts using a much smaller dataset of Uber rides in San Francisco. We will analyse this next.

Uber Dataset for the Demo done by Ram Harsha in Europe Spark Summit 2015

First the datasets have to be loaded. See the section below on Downloading datasets and putting them in distributed file system for doing this anew (This only needs to be done once if the data is persisted in the distributed file system).

After downloading the data, we expect to have the following files in distributed file system (dbfs):

  • all.tsv is the file of all uber trajectories
  • SFNbhd is the directory containing SF neighborhood shape files.
display("dbfs:/datasets/magellan/")) // display the contents of the dbfs directory "dbfs:/datasets/magellan/"
path name size
dbfs:/datasets/magellan/SFNbhd/ SFNbhd/ 0.0
dbfs:/datasets/magellan/all.tsv all.tsv 6.0947802e7

First five lines or rows of the uber data containing: tripID, timestamp, Lon, Lat

00001    2007-01-07T10:54:50+00:00    37.782551    -122.445368
00001    2007-01-07T10:54:54+00:00    37.782745    -122.444586
00001    2007-01-07T10:54:58+00:00    37.782842    -122.443688
00001    2007-01-07T10:55:02+00:00    37.782919    -122.442815
00001    2007-01-07T10:55:06+00:00    37.782992    -122.442112
display("dbfs:/datasets/magellan/SFNbhd")) // legacy shape files
path name size
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.dbf planning_neighborhoods.dbf 1028.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.prj planning_neighborhoods.prj 567.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.sbn planning_neighborhoods.sbn 516.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.sbx planning_neighborhoods.sbx 164.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shp planning_neighborhoods.shp 214576.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shp.xml planning_neighborhoods.shp.xml 21958.0
dbfs:/datasets/magellan/SFNbhd/planning_neighborhoods.shx planning_neighborhoods.shx 396.0


First watch the more technical magellan presentation by Ram Sri Harsha (Hortonworks) in Spark Summit Europe 2015

![Ram Sri Harsha's Magellan Spark Summit EU 2015 Talk]](\

Second, carefully repeat Ram's original analysis from the following blog as done below.

Ram's blog in HortonWorks and the ZeppelinHub view of the demo code in video above

This is just to get you started... You may need to moidfy this!

case class UberRecord(tripId: String, timestamp: String, point: Point) // a case class for UberRecord
defined class UberRecord
val uber = sc.textFile("dbfs:/datasets/magellan/all.tsv")
              .map { line =>
                      val parts = line.split("\t" )
                      val tripId = parts(0)
                      val timestamp = parts(1)
                      val point = Point(parts(3).toDouble, parts(2).toDouble)
                      UberRecord(tripId, timestamp, point)
                     //.repartition(100) // using default repartition
uber: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val uberRecordCount = uber.count() // how many Uber records?
uberRecordCount: Long = 1128663

So there are over a million UberRecords.

val neighborhoods ="magellan") // this may be busted... try to make it work...
                                   .select($"polygon", $"metadata")
neighborhoods: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [polygon: polygon, metadata: map<string,string>]
neighborhoods.count() // how many neighbourhoods in SF?
res28: Long = 37
 |-- polygon: polygon (nullable = true)
 |-- metadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true),false) // see the first two neighbourhoods
|polygon                  |metadata                                    |
|magellan.Polygon@e18dd641|Map(neighborho -> Twin Peaks               )|
|magellan.Polygon@46d47c8 |Map(neighborho -> Pacific Heights          )|
only showing top 2 rows
import org.apache.spark.sql.functions._ // this is needed for sql functions like explode, etc.
import org.apache.spark.sql.functions._
//names of all 37 neighborhoods of San Francisco$"metadata").as(Seq("k", "v"))).show(37,false)
|k         |v                        |
|neighborho|Twin Peaks               |
|neighborho|Pacific Heights          |
|neighborho|Visitacion Valley        |
|neighborho|Potrero Hill             |
|neighborho|Crocker Amazon           |
|neighborho|Outer Mission            |
|neighborho|Bayview                  |
|neighborho|Lakeshore                |
|neighborho|Russian Hill             |
|neighborho|Golden Gate Park         |
|neighborho|Outer Sunset             |
|neighborho|Inner Sunset             |
|neighborho|Excelsior                |
|neighborho|Outer Richmond           |
|neighborho|Parkside                 |
|neighborho|Bernal Heights           |
|neighborho|Noe Valley               |
|neighborho|Presidio                 |
|neighborho|Nob Hill                 |
|neighborho|Financial District       |
|neighborho|Glen Park                |
|neighborho|Marina                   |
|neighborho|Seacliff                 |
|neighborho|Mission                  |
|neighborho|Downtown/Civic Center    |
|neighborho|South of Market          |
|neighborho|Presidio Heights         |
|neighborho|Inner Richmond           |
|neighborho|Castro/Upper Market      |
|neighborho|West of Twin Peaks       |
|neighborho|Ocean View               |
|neighborho|Treasure Island/YBI      |
|neighborho|Chinatown                |
|neighborho|Western Addition         |
|neighborho|North Beach              |
|neighborho|Diamond Heights          |
|neighborho|Haight Ashbury           |

This join below yields nothing.

So what's going on?

Watch Ram's 2015 Spark Summit talk for details on geospatial formats and transformations.

  .where($"point" within $"polygon")
  .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v")))
  .withColumnRenamed("v", "neighborhood")

Need the right transformer to transform the points into the right coordinate system of the shape files.

// This code was removed from magellan in this commit:
// We bring this back to show how to roll our own transformations.
import magellan.Point

class NAD83(params: Map[String, Any]) {
  val RAD = 180d / Math.PI
  val ER  = 6378137.toDouble  // semi-major axis for GRS-80
  val RF  = 298.257222101  // reciprocal flattening for GRS-80
  val F   = 1.toDouble / RF  // flattening for GRS-80
  val ESQ = F + F - (F * F)
  val E   = StrictMath.sqrt(ESQ)

  private val ZONES =  Map(
    401 -> Array(122.toDouble, 2000000.0001016,
      500000.0001016001, 40.0,
      41.66666666666667, 39.33333333333333),
    403 -> Array(120.5, 2000000.0001016,
      500000.0001016001, 37.06666666666667,
      38.43333333333333, 36.5)

  def from() = {
    val zone = params("zone").asInstanceOf[Int]
    ZONES.get(zone) match {
      case Some(x) => if (x.length == 5) {
      } else {
      case None => ???

  def to() = {
    val zone = params("zone").asInstanceOf[Int]
    ZONES.get(zone) match {
      case Some(x) => if (x.length == 5) {
      } else {
      case None => ???

  def qqq(e: Double, s: Double) = {
    (StrictMath.log((1 + s) / (1 - s)) - e *
      StrictMath.log((1 + e * s) / (1 - e * s))) / 2

  def toLambertConic(params: Array[Double]) = {
    val cm = params(0) / RAD  // CENTRAL MERIDIAN (CM)
    val eo = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val fis = params(3) / RAD  // LATITUDE OF SO. STD. PARALLEL
    val fin = params(4) / RAD  // LATITUDE OF NO. STD. PARALLEL
    val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL
    val sinfs = StrictMath.sin(fis)
    val cosfs = StrictMath.cos(fis)
    val sinfn = StrictMath.sin(fin)
    val cosfn = StrictMath.cos(fin)
    val sinfb = StrictMath.sin(fib)
    val qs = qqq(E, sinfs)
    val qn = qqq(E, sinfn)
    val qb = qqq(E, sinfb)
    val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs)
    val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn)
    val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs)
    val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo)
    val rb = k / StrictMath.exp(qb * sinfo)

    (point: Point) => {
      val (long, lat) = (point.getX(), point.getY())
      val l = - long / RAD
      val f = lat / RAD
      val q = qqq(E, StrictMath.sin(f))
      val r = k / StrictMath.exp(q * sinfo)
      val gam = (cm - l) * sinfo
      val n = rb + nb - (r * StrictMath.cos(gam))
      val e = eo + (r * StrictMath.sin(gam))
      Point(e, n)

  def toTransverseMercator(params: Array[Double]) = {
    (point: Point) => {

  def fromLambertConic(params: Array[Double]) = {
    val cm = params(0) / RAD  // CENTRAL MERIDIAN (CM)
    val eo = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val fis = params(3) / RAD  // LATITUDE OF SO. STD. PARALLEL
    val fin = params(4) / RAD  // LATITUDE OF NO. STD. PARALLEL
    val fib = params(5) / RAD // LATITUDE OF SOUTHERNMOST PARALLEL
    val sinfs = StrictMath.sin(fis)
    val cosfs = StrictMath.cos(fis)
    val sinfn = StrictMath.sin(fin)
    val cosfn = StrictMath.cos(fin)
    val sinfb = StrictMath.sin(fib)

    val qs = qqq(E, sinfs)
    val qn = qqq(E, sinfn)
    val qb = qqq(E, sinfb)
    val w1 = StrictMath.sqrt(1.toDouble - ESQ * sinfs * sinfs)
    val w2 = StrictMath.sqrt(1.toDouble - ESQ * sinfn * sinfn)
    val sinfo = StrictMath.log(w2 * cosfs / (w1 * cosfn)) / (qn - qs)
    val k = ER * cosfs * StrictMath.exp(qs * sinfo) / (w1 * sinfo)
    val rb = k / StrictMath.exp(qb * sinfo)
    (point: Point) => {
      val easting = point.getX()
      val northing = point.getY()
      val npr = rb - northing + nb
      val epr = easting - eo
      val gam = StrictMath.atan(epr / npr)
      val lon = cm - (gam / sinfo)
      val rpt = StrictMath.sqrt(npr * npr + epr * epr)
      val q = StrictMath.log(k / rpt) / sinfo
      val temp = StrictMath.exp(q + q)
      var sine = (temp - 1.toDouble) / (temp + 1.toDouble)
      var f1, f2 = 0.0
      for (i <- 0 until 2) {
        f1 = ((StrictMath.log((1.toDouble + sine) / (1.toDouble - sine)) - E *
          StrictMath.log((1.toDouble + E * sine) / (1.toDouble - E * sine))) / 2.toDouble) - q
        f2 = 1.toDouble / (1.toDouble - sine * sine) - ESQ / (1.toDouble - ESQ * sine * sine)
        sine -= (f1/ f2)
      Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(StrictMath.asin(sine)))

  def fromTransverseMercator(params: Array[Double]) = {
    val cm = params(0)  // CENTRAL MERIDIAN (CM)
    val fe = params(1)  // FALSE EASTING VALUE AT THE CM (METERS)
    val or = params(2) / RAD  // origin latitude
    val sf = 1.0 - (1.0 / params(3)) // scale factor
    val fn = params(4)  // false northing
    // translated from TCONPC subroutine
    val eps = ESQ / (1.0 - ESQ)
    val pr = (1.0 - F) * ER
    val en = (ER - pr) / (ER + pr)
    val en2 = en * en
    val en3 = en * en * en
    val en4 = en2 * en2

    var c2 = -3.0 * en / 2.0 + 9.0 * en3 / 16.0
    var c4 = 15.0d * en2 / 16.0d - 15.0d * en4 /32.0
    var c6 = -35.0 * en3 / 48.0
    var c8 = 315.0 * en4 / 512.0
    val u0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8)
    val u2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8)
    val u4 = 32.0 * (c6 - 6.0 * c8)
    val u6 = 129.0 * c8

    c2 = 3.0 * en / 2.0 - 27.0 * en3 / 32.0
    c4 = 21.0 * en2 / 16.0 - 55.0 * en4 / 32.0d
    c6 = 151.0 * en3 / 96.0
    c8 = 1097.0d * en4 / 512.0
    val v0 = 2.0 * (c2 - 2.0 * c4 + 3.0 * c6 - 4.0 * c8)
    val v2 = 8.0 * (c4 - 4.0 * c6 + 10.0 * c8)
    val v4 = 32.0 * (c6 - 6.0 * c8)
    val v6 = 128.0 * c8

    val r = ER * (1.0 - en) * (1.0 - en * en) * (1.0 + 2.25 * en * en + (225.0 / 64.0) * en4)
    val cosor = StrictMath.cos(or)
    val omo = or + StrictMath.sin(or) * cosor *
      (u0 + u2 * cosor * cosor + u4 * StrictMath.pow(cosor, 4) + u6 * StrictMath.pow(cosor, 6))
    val so = sf * r * omo

    (point: Point) => {
      val easting = point.getX()
      val northing = point.getY()
      // translated from TMGEOD subroutine
      val om = (northing - fn + so) / (r * sf)
      val cosom = StrictMath.cos(om)
      val foot = om + StrictMath.sin(om) * cosom *
        (v0 + v2 * cosom * cosom + v4 * StrictMath.pow(cosom, 4) + v6 * StrictMath.pow(cosom, 6))
      val sinf = StrictMath.sin(foot)
      val cosf = StrictMath.cos(foot)
      val tn = sinf / cosf
      val ts = tn * tn
      val ets = eps * cosf * cosf
      val rn = ER * sf / StrictMath.sqrt(1.0 - ESQ * sinf * sinf)
      val q = (easting - fe) / rn
      val qs = q * q
      val b2 = -tn * (1.0 + ets) / 2.0
      val b4 = -(5.0 + 3.0 * ts + ets * (1.0 - 9.0 * ts) - 4.0 * ets * ets) / 12.0
      val b6 = (61.0 + 45.0 * ts * (2.0 + ts) + ets * (46.0 - 252.0 * ts -60.0 * ts * ts)) / 360.0
      val b1 = 1.0
      val b3 = -(1.0 + ts + ts + ets) / 6.0
      val b5 = (5.0 + ts * (28.0 + 24.0 * ts) + ets * (6.0 + 8.0 * ts)) / 120.0
      val b7 = -(61.0 + 662.0 * ts + 1320.0 * ts * ts + 720.0 * StrictMath.pow(ts, 3)) / 5040.0
      val lat = foot + b2 * qs * (1.0 + qs * (b4 + b6 * qs))
      val l = b1 * q * (1.0 + qs * (b3 + qs * (b5 + b7 * qs)))
      val lon = -l / cosf + cm
      Point(StrictMath.toDegrees(lon) * -1, StrictMath.toDegrees(lat))
import magellan.Point
defined class NAD83
val transformer: Point => Point = (point: Point) => {
  val from = new NAD83(Map("zone" -> 403)).from()
  val p = point.transform(from)
  Point(3.28084 * p.getX, 3.28084 * p.getY)

// add a new column in nad83 coordinates
val uberTransformed = uber
                      .withColumn("nad83", $"point".transform(transformer))
transformer: magellan.Point => magellan.Point = <function1>
uberTransformed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 2 more fields]
res42: Long = 1128663,false) // nad83 transformed points
|tripId|timestamp                |point                        |nad83                                        |
|00001 |2007-01-07T10:54:50+00:00|Point(-122.445368, 37.782551)|Point(5999523.477715266, 2113253.7290443885) |
|00001 |2007-01-07T10:54:54+00:00|Point(-122.444586, 37.782745)|Point(5999750.8888492435, 2113319.6570987953)|
|00001 |2007-01-07T10:54:58+00:00|Point(-122.443688, 37.782842)|Point(6000011.08106823, 2113349.5785887106)  |
|00001 |2007-01-07T10:55:02+00:00|Point(-122.442815, 37.782919)|Point(6000263.898268142, 2113372.3716762937) |
|00001 |2007-01-07T10:55:06+00:00|Point(-122.442112, 37.782992)|Point(6000467.566895697, 2113394.7303657546) |
only showing top 5 rows"tripId").distinct().count() // number of unique tripIds
res45: Long = 24999

Let' try the join again after appropriate transformation of coordinate system.

val joined = neighborhoods
              .where($"nad83" within $"polygon")
              .select($"tripId", $"timestamp", explode($"metadata").as(Seq("k", "v")))
              .withColumnRenamed("v", "neighborhood")
joined: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [tripId: string, timestamp: string ... 1 more field]
val UberRecordsInNbhdsCount = joined.count() // about 131 seconds for first action (doing broadcast hash join)
UberRecordsInNbhdsCount: Long = 1085087,false)
|tripId|timestamp                |neighborhood             |
|00001 |2007-01-07T10:54:50+00:00|Western Addition         |
|00001 |2007-01-07T10:54:54+00:00|Western Addition         |
|00001 |2007-01-07T10:54:58+00:00|Western Addition         |
|00001 |2007-01-07T10:55:02+00:00|Western Addition         |
|00001 |2007-01-07T10:55:06+00:00|Western Addition         |
only showing top 5 rows
uberRecordCount - UberRecordsInNbhdsCount // records not in the neighbouthood shape files
res48: Long = 43576
|neighborhood             |trips|
|South of Market          |9891 |
|Western Addition         |6794 |
|Downtown/Civic Center    |6697 |
|Financial District       |6038 |
|Mission                  |5620 |
only showing top 5 rows

Spatio-temporal Queries

can be expressed in SQL using the Boolean predicates such as, ,,\in , \cap, \ldots, that operate over space-time sets given products of 2D magellan objects and 1D time intervals.

Want to scalably do the following:

  • Given :
    • a set of trajectories as labelled points in space-time and
    • a product of a time interval [ts,te] and a polygon P
  • Find all labelled space-time points that satisfy the following relations:
    • intersect with [ts,te] X P
    • the start-time of the ride or the end time of the ride intersects with [ts,te] X P
    • intersect within a given distance d of any point or a given point in P (optional)

This will allow us to answer questions like:

  • Where did the passengers who were using Uber and present in the SoMa neighbourhood in a given time interval get off?

See 2016 student project by George Dillon on a detailed analysis of spatio-temporal taxi trajectories using the Beijing taxi dataset from Microsoft Research (including map-matching with open-street maps using magellan and graphhopper).

(watch later from 34 minutes for the first student presentation in Scalable Data Science from Middle Earth 2016):

Spark Summit East 2016 - What is Geospatial Analytics by Ram Sri Harsha

Other spatial Algorithms in Spark are being explored for generic and more efficient scalable geospatial analytic tasks

See the Spark Summit East 2016 Talk by Ram on "what next?" and the latest notebooks on NYC taxi datasets in Ram's blogs.

Latest versionb of magellan is already using clever spatial indexing structures.

  • SpatialSpark aims to provide efficient spatial operations using Apache Spark.
    • Spatial Partition
      • Generate a spatial partition from input dataset, currently Fixed-Grid Partition (FGP), Binary-Split Partition (BSP) and Sort-Tile Partition (STP) are supported.
    • Spatial Range Query
      • includes both indexed and non-indexed query (useful for neighbourhood searches)
  • z-order Knn join
    • A space-filling curve trick to index multi-dimensional metric data into 1 Dimension. See: ieee paper and the slides.
  • AkNN = All K Nearest Neighbours - identify the k nearesy neighbours for all nodes simultaneously (cont AkNN is the streaming form of AkNN)
    • need to identify the right resources to do this scalably.
  • spark-knn-graphs:

Downloading datasets and putting them in dbfs

getting uber data

(This only needs to be done once per shard!)

