SDS-2.2, Scalable Data Science
Archived YouTube video of this live unedited lab-lecture:
This is an elaboration of the Apache Spark mllib-progamming-guide on mllib-data-types.
Overview
Data Types - MLlib Programming Guide
MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by Breeze and jblas. A training example used in supervised learning is called a “labeled point” in MLlib.
IndexedRowMatrix in Scala
An IndexedRowMatrix
is similar to a RowMatrix
but with meaningful
row indices. It is backed by an RDD of indexed rows, so that each row is
represented by its index (long-typed) and a local vector.
An IndexedRowMatrix
can be created from an RDD[IndexedRow]
instance, where
IndexedRow
is a wrapper over (Long, Vector)
. An IndexedRowMatrix
can be
converted to a RowMatrix
by dropping its row indices.
Refer to the IndexedRowMatrix
Scala docs
for details on the API.
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
Vector(12.0, -51.0, 4.0) // note Vector is a scala collection
res8: scala.collection.immutable.Vector[Double] = Vector(12.0, -51.0, 4.0)
Vectors.dense(12.0, -51.0, 4.0) // while this is a mllib.linalg.Vector
res9: org.apache.spark.mllib.linalg.Vector = [12.0,-51.0,4.0]
val rows: RDD[IndexedRow] = sc.parallelize(Array(IndexedRow(2, Vectors.dense(1,3)), IndexedRow(4, Vectors.dense(4,5)))) // an RDD of indexed rows
rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.distributed.IndexedRow] = ParallelCollectionRDD[252] at parallelize at <console>:41
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
mat: org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@2a57e8ca
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
m: Long = 5 n: Long = 2
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
rowMat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@37fba875
rowMat.rows.collect()
res11: Array[org.apache.spark.mllib.linalg.Vector] = Array([1.0,3.0], [4.0,5.0])
IndexedRowMatrix in Python
An IndexedRowMatrix
can be created from an RDD
of IndexedRow
s, where
IndexedRow
is a wrapper over (long, vector)
. An IndexedRowMatrix
can be
converted to a RowMatrix
by dropping its row indices.
Refer to the IndexedRowMatrix
Python docs
for more details on the API.
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
print (m,n)
# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows
# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()
(4L, 3L)