diff --git a/.gitignore b/.gitignore index 700ce67..ea39a52 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,6 @@ project/plugins/project/ #Eclipse specific .classpath .project + +# Vim specific +.*.swp diff --git a/build.sbt b/build.sbt index 6256d27..639a3b5 100644 --- a/build.sbt +++ b/build.sbt @@ -13,13 +13,30 @@ scalaVersion := "2.9.3" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.9.3" % "0.8.0-incubating", "org.apache.spark" % "spark-mllib_2.9.3" % "0.8.0-incubating", - "org.scalatest" %% "scalatest" % "1.9.1" % "test" + "org.slf4j" % "slf4j-log4j12" % "1.7.5", + "org.scalatest" %% "scalatest" % "1.9.1" % "test", + // Custom build of jblas including the "orgqr" function, + // pending merge of https://github.com/mikiobraun/jblas/pull/34 + "org.jblas" % "jblas" % "1.2.4-amplab-SNAPSHOT" ) resolvers ++= Seq( "Typesafe" at "http://repo.typesafe.com/typesafe/releases", "Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/", "ScalaNLP Maven2" at "http://repo.scalanlp.org/repo", - "Spray" at "http://repo.spray.cc" + "Spray" at "http://repo.spray.cc", + "Local Maven Repo" at "file://" + Path.userHome + "/.m2/repository" ) +test in assembly := {} + +mergeStrategy in assembly := { + case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard + case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard + case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat + case "reference.conf" => MergeStrategy.concat + case "log4j.properties" => MergeStrategy.concat + case _ => MergeStrategy.first +} + +net.virtualvoid.sbt.graph.Plugin.graphSettings diff --git a/lib/jblas-1.2.3.jar b/lib/jblas-1.2.3.jar deleted file mode 100644 index e0c7aba..0000000 Binary files a/lib/jblas-1.2.3.jar and /dev/null differ diff --git a/project/build.properties b/project/build.properties index f4ff7a5..5e96e96 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.11.2 +sbt.version=0.12.4 diff --git a/project/plugins.sbt b/project/plugins.sbt index c7bf3d1..21c6a1a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,4 +4,6 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.1.0-SNAPSHOT") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.7.2") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2") + +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") diff --git a/src/main/scala/feat/NGrams.scala b/src/main/scala/feat/NGrams.scala index 2751603..017a03e 100644 --- a/src/main/scala/feat/NGrams.scala +++ b/src/main/scala/feat/NGrams.scala @@ -15,9 +15,9 @@ object NGrams extends FeatureExtractor with Serializable { def buildDictionary(table: MLTable, col: Int, numGrams: Int, numFeatures: Int, stopWords: Set[String]): Seq[(String,Int)] = { //This is the classic Word Count, sorted descending by frequency. - val freqTable = table.flatMap(rowToNgramRows(_, col, numGrams, stopWords)) - .map(row => MLRow(row(0),1)) - .reduceBy(Seq(0), (x,y) => MLRow(x(0),x(1).toNumber+y(1).toNumber)) //Run a reduceByKey on the first column. + val freqTable = table.flatMap(rowToNgramTuples(_, col, numGrams, stopWords)) + .map(row => MLTuple.create(row(0),1)) + .reduceBy(Seq(0), (x,y) => MLTuple.create(x(0),x(1).toNumber+y(1).toNumber)) //Run a reduceByKey on the first column. .sortBy(Seq(1), false) //Run a sortby on the second column. .take(numFeatures) //Return the top rows. .map(r => (r(0).toString, r(1).toNumber.toInt)) @@ -43,13 +43,13 @@ object NGrams extends FeatureExtractor with Serializable { } //Responsible for tokenizing data for dictionary calculation. - def rowToNgramRows(row: MLRow, col: Int, n: Int, stopWords: Set[String]): Seq[MLRow] = { + def rowToNgramTuples(row: MLTuple, col: Int, n: Int, stopWords: Set[String]): Seq[MLTuple] = { val grams = ngrams(row(col).toString, n, stopWords) - grams.map(s => MLRow(MLValue(s))).toSeq + grams.map(s => MLTuple.create(MLValue(s))).toSeq } //The main map function - given a row and a dictionary, return a new row which is an n-gram feature vector. - def rowToFilteredNgrams(row: MLRow, col: Int, dict: Seq[(String,Int)], n: Int, stopWords: Set[String]): MLRow = { + def rowToFilteredNgrams(row: MLRow, col: Int, dict: Seq[(String,Int)], n: Int, stopWords: Set[String]): MLTuple = { //Pull out the ngrams for a specific string. val x = ngrams(row(col).toString, n, stopWords) @@ -59,8 +59,7 @@ object NGrams extends FeatureExtractor with Serializable { val coll = dict.zipWithIndex.filter{ case((a,b), c) => x.contains(a)}.map {case ((a,b), c) => (c, MLValue(1.0))} //Return a new sparse row based on this feature vector. - val sparsemlval = SparseMLRow.fromNumericSeq(row.drop(Seq(col))) - SparseMLRow.fromNumericSeq(row.drop(Seq(col))) ++ SparseMLRow.fromSparseCollection(coll, dict.length, MLValue(0.0)) + SparseMLTuple.fromNumericSeq(row.drop(col)) ++ SparseMLTuple.fromSparseCollection(coll, dict.length, MLValue(0.0)) } /** @@ -71,7 +70,7 @@ object NGrams extends FeatureExtractor with Serializable { * @param k Top-k features to keep. * @return Table of featurized data. */ - def extractNGrams(in: MLTable, c: Int=0, n: Int=1, k: Int=20000, stopWords: Set[String] = stopWords): (MLTable, MLRow => MLRow) = { + def extractNGrams(in: MLTable, c: Int=0, n: Int=1, k: Int=20000, stopWords: Set[String] = stopWords): (MLTable, MLRow => MLTuple) = { //Build dictionary. val dict = buildDictionary(in, c, n, k, stopWords) @@ -83,7 +82,7 @@ object NGrams extends FeatureExtractor with Serializable { val existingColNames = in.drop(Seq(c)).schema.columns.map(_.name.getOrElse("")) //Set the column names of the table to match the NGram features. - out.setColNames(existingColNames ++ dict.map(_._1)) + //FIXME out.setColNames(existingColNames ++ dict.map(_._1)) (out, featurizer) } @@ -93,12 +92,12 @@ object NGrams extends FeatureExtractor with Serializable { * @return TF-IDF features for an MLTable. */ def tfIdf(in: MLTable, c: Int=0): MLTable = { - val df = in.reduce(_ plus _) + val df = in.reduce(_.toVector plus _.toVector) val df2 = df.toDoubleArray df2(c) = 1.0 val df3 = MLVector(df2) val newtab = in.map(r => r over df3) - newtab.setSchema(in.schema) + //FIXME newtab.setSchema(in.schema) newtab } @@ -155,4 +154,4 @@ object NGrams extends FeatureExtractor with Serializable { "while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will", "with", "within", "without", "would", "yet", "you", "your", "yours", "yourself", "yourselves") -} \ No newline at end of file +} diff --git a/src/main/scala/feat/RandomizedTruncatedSVD.scala b/src/main/scala/feat/RandomizedTruncatedSVD.scala new file mode 100644 index 0000000..94447a9 --- /dev/null +++ b/src/main/scala/feat/RandomizedTruncatedSVD.scala @@ -0,0 +1,100 @@ +package mli.feat +import mli.interface.MLTypes._ +import mli.interface._ +import mli.interface.impl.SparkMLTable +import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas, Singular} +import org.apache.spark.{AccumulableParam, AccumulatorParam} +import collection.JavaConversions._ + +// represents the rank-one matrix left * right +private class RankOneUpdate(val left: DoubleMatrix, val right: DoubleMatrix) { + require(left.isColumnVector) + require(right.isRowVector) +} + +private class RankOneMatrixAccumulator extends AccumulableParam[DoubleMatrix, RankOneUpdate] { + override def addAccumulator(left: DoubleMatrix, right: RankOneUpdate): DoubleMatrix = { + left.rankOneUpdate(right.left, right.right) + left + } + + override def addInPlace(left: DoubleMatrix, right: DoubleMatrix): DoubleMatrix = { + left.addi(right) + left + } + + override def zero(a: DoubleMatrix): DoubleMatrix = { + DoubleMatrix.zeros(a.rows, a.columns) + } +} + +object RandomizedSVD { + def compute(mat: MLTable, k: Index, q: Index) = { + val A = mat + val m = A.numRows.asInstanceOf[Index] + val n = A.numCols.asInstanceOf[Index] + // add a few dimensions to improve accuracy of the final rank-k approximation + val ell = math.min(k + 5, math.min(m, n)) + val omega = DoubleMatrix.randn(n, ell) + + // find initial rank-ell subspace Q_0 + val Y0 = mmul(A, omega) + val Q0 = range(Y0) + + // apply power iteration to get final Q + val Qq = (1 to q).foldRight(Q0)((idx, Q) => { + val Ytwiddle = mmulTransposed(A, Q) + val Qtwiddle = range(Ytwiddle) + val Y = mmul(A, Qtwiddle) + range(Y) + }) + + // compute SVD of projection of A onto Q_q + val B = mmulTransposed(A, Qq).transpose + val svd = Singular.sparseSVD(B) + val U0 = Qq mmul svd(0) + val S0 = svd(1) + val V0 = svd(2) + val U = U0.getRange(0, m, 0, k) + val S = S0.getRange(0, k, 0, 1) + val V = V0.getRange(0, n, 0, k) + + // A \approx U \diag(S) V' + (U, S, V) // TODO: use object representing the decomposition + } + + // orthogonal basis for range of A + private def range(A: DoubleMatrix): DoubleMatrix = { + val result = A.dup(); + val tau = new DoubleMatrix(math.min(A.rows, A.columns)); + SimpleBlas.geqrf(result, tau); + SimpleBlas.orgqr(A.columns, A.columns, result, tau) + result + } + + private[mli] def mmul(left: MLTable, right: DoubleMatrix): DoubleMatrix = { + assert(left.numCols == right.rows) + val resultTable = left.map(row => { + assert(row.length == left.numCols) + new MLVector(row.toVector.toMatrix.mat mmul right) + }) + val result = resultTable.toLocalMatrix.mat + assert(result.rows == left.numRows) + assert(result.columns == right.columns) + result + } + + private[mli] def mmulTransposed(left: MLTable, right: DoubleMatrix): DoubleMatrix = { + assert(left.numRows == right.rows) + val resultRows = left.numCols + val resultCols = right.columns + val ctx = left.context + val accum = ctx.accumulable(DoubleMatrix.zeros(resultRows, resultCols))(new RankOneMatrixAccumulator) + left.foreach(leftRow => { + val leftPart = leftRow.toVector.toMatrix.mat.transpose + val rightPart = right.getRow(leftRow.id.toInt) + accum += new RankOneUpdate(leftPart, rightPart) + }) + accum.value + } +} diff --git a/src/main/scala/feat/Scale.scala b/src/main/scala/feat/Scale.scala index db39ceb..1ae2f6a 100644 --- a/src/main/scala/feat/Scale.scala +++ b/src/main/scala/feat/Scale.scala @@ -2,7 +2,7 @@ package mli.feat import scala.math.sqrt -import mli.interface.{MLRow, MLVector, MLTable} +import mli.interface.{MLTuple, MLVector, MLTable} /** * The Scale feature extractor rescales features according to their standard deviation. This will preserve @@ -17,26 +17,27 @@ object Scale extends FeatureExtractor with Serializable { * @param label Index of the label (will not be scaled). * @return Scaled MLTable. */ - def scale(x: MLTable, label: Int = 0, featurizer: MLRow => MLRow): (MLTable, MLRow => MLRow) = { + def scale(x: MLTable, label: Int = 0, featurizer: MLTuple => MLTuple): (MLTable, MLTuple => MLTuple) = { //First we compute the variance - note this could be done in one pass over the data. - val ssq = x.map(r => r times r).reduce(_ plus _) over x.numRows - val sum = x.reduce(_ plus _) over x.numRows + // FIXME: many toVector calls + val ssq = x.map(r => r times r).reduce(_.toVector plus _.toVector).toVector over x.numRows + val sum = x.reduce(_.toVector plus _.toVector).toVector over x.numRows val sd = ssq minus (sum times sum) //Now we divide by the standard deviation vector. We do not scale values with no standard deviation. - val n = sd.toArray.map(sqrt).map(i => if(i == 0.0) 1.0 else i) + val n = sd.toDoubleArray.map(sqrt).map(i => if(i == 0.0) 1.0 else i) //We also do not scale the training label. n(label) = 1.0 val sdn = MLVector(n) val newtab = x.map(_ over sdn) - newtab.setSchema(x.schema) - def newfeaturizer(r: MLRow) = featurizer(r) over sdn + // FIXME newtab.setSchema(x.schema) + def newfeaturizer(r: MLTuple) = featurizer(r).toVector over sdn (newtab, newfeaturizer) } def extract(in: MLTable): MLTable = scale(in, featurizer = r => r)._1 -} \ No newline at end of file +} diff --git a/src/main/scala/impl/JblasDenseMLMatrix.scala b/src/main/scala/impl/JblasDenseMLMatrix.scala index 76a0d34..51265b9 100644 --- a/src/main/scala/impl/JblasDenseMLMatrix.scala +++ b/src/main/scala/impl/JblasDenseMLMatrix.scala @@ -4,14 +4,10 @@ import mli.interface._ import mli.interface.MLTypes._ import org.jblas.{DoubleMatrix,Solve} -class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix { +class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix with Serializable { - def toMLRows: Iterator[MLRow] = { - (0 until numRows).map(r => DenseMLRow.fromSeq(mat.getRow(r).data.map(d => MLValue(d)).toSeq)).toIterator - } - - def toMLVectors: Iterator[MLVector] = { - (0 until numRows).map(r => MLVector(mat.getRow(r).data)).toIterator + def toVectors: Iterator[MLVector] = { + (0 until numRows).map(r => new MLVector(mat.getRow(r).data)).toIterator } def numRows: Index = mat.rows @@ -20,6 +16,10 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix { def apply(r: Index, c: Index): Scalar = mat.get(r, c) def apply(rows: Slice, cols: Slice): LocalMatrix = new DenseMLMatrix(mat.get(cols.toArray, rows.toArray)) + def getRows(rows: TraversableOnce[Index]): LocalMatrix = { + new DenseMLMatrix(mat.getRows(rows.toArray)) + } + def update(r: Index, c: Index, v: Scalar) = mat.put(r, c, v) def update(rows: Slice, cols: Slice, v: LocalMatrix) = { //Jblas Matrices are row-major, so this shoudl be faster in the congiguous case. @@ -42,6 +42,7 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix { def solve(y: LocalMatrix) = new DenseMLMatrix(Solve.solve(mat, y.mat)) def times(y: LocalMatrix) = new DenseMLMatrix(mat.mmul(y.mat)) + def times(y: MLTuple): LocalMatrix = new DenseMLMatrix(mat.mmul(new DoubleMatrix(y.toDoubleArray))) def transpose = new DenseMLMatrix(mat.transpose()) //TODO need to decide on types for these. @@ -52,24 +53,21 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix { //Composition def on(y: DenseMLMatrix): LocalMatrix = new DenseMLMatrix(DoubleMatrix.concatVertically(mat, y.mat)) def then(y: LocalMatrix): LocalMatrix = new DenseMLMatrix(DoubleMatrix.concatHorizontally(mat, y.mat)) + + override def toString = mat.toString } /** - * Contains facilities for creating a Dense Matrix from rows (either a Seq of MLRow or MLVectors). + * Contains facilities for creating a Dense Matrix from rows (either a Seq of MLRow or MLTuples). */ object DenseMLMatrix { - def apply(rows: DenseMLRow*) = fromSeq(rows.toSeq) - def fromSeq(rows: Seq[DenseMLRow]) = { + def apply(rows: Seq[MLVector]) = { val dat = rows.map(_.toDoubleArray).toArray new DenseMLMatrix(new DoubleMatrix(dat)) } - def fromVecs(rows: Seq[MLVector]) = { - val dat = rows.map(_.toArray).toArray - new DenseMLMatrix(new DoubleMatrix(dat)) - } - def zeros(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.zeros(rows, cols)) def eye(n: Index) = new DenseMLMatrix(DoubleMatrix.eye(n)) - + def rand(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.rand(rows, cols)) + def randn(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.randn(rows, cols)) } diff --git a/src/main/scala/impl/MLNumericTable.scala b/src/main/scala/impl/MLNumericTable.scala index 8522ed3..0c8530c 100644 --- a/src/main/scala/impl/MLNumericTable.scala +++ b/src/main/scala/impl/MLNumericTable.scala @@ -24,7 +24,7 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti def schema(): Schema = { tableSchema match { case Some(s) => s - case None => Schema(rdd.first) + case None => rdd.first.schema } } @@ -124,9 +124,9 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti // val res = f(mat) // res.toMLRows // } - def cachedMatrixMap(m: LocalMatrix): Iterator[MLVector] = f(m).toMLVectors + def cachedMatrixMap(m: LocalMatrix): Iterator[MLVector] = f(m).toVectors - def createMatrix(i: Iterator[MLVector]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix.fromVecs(i.toSeq)) + def createMatrix(i: Iterator[MLVector]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix(i.toSeq)) cachedMat match { @@ -192,7 +192,7 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti def print(count: Int) = take(count).foreach(row => println(row.mkString("\t"))) - def toMLTable = SparkMLTable.fromMLRowRdd(rdd.map(MLRow(_))) + def toMLTable = SparkMLTable(rdd.map(_.asInstanceOf[MLTuple])) } object MLNumericTable { @@ -209,4 +209,4 @@ object MLNumericTable { new MLNumericTable(rdd) } -} \ No newline at end of file +} diff --git a/src/main/scala/impl/SparkMLTable.scala b/src/main/scala/impl/SparkMLTable.scala index c46f855..d4b459f 100644 --- a/src/main/scala/impl/SparkMLTable.scala +++ b/src/main/scala/impl/SparkMLTable.scala @@ -1,37 +1,43 @@ package mli.interface.impl import mli.interface._ -import mli.impl.DenseMLMatrix import mli.interface.MLTypes._ +import mli.impl.DenseMLMatrix import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint +import Function.tupled class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Schema] = None) extends MLTable with Serializable { lazy val numCols = rdd.first.size - - lazy val numRows = rdd.count + lazy val numRows = 1 + rdd.map(_.id.toLong).reduce(_ max _) var tableSchema = inSchema var cachedMat: Option[RDD[LocalMatrix]] = None + /** * Return the current schema. Since schema is inferred, we wait to do this until it is explicitly asked for in the * common case. */ - def schema(): Schema = { - tableSchema match { + lazy val schema: Schema = { + inSchema match { case Some(s) => s - case None => Schema(rdd.first) + case None => rdd.first.schema } } + def sparkContext = rdd.sparkContext + override def context = sparkContext + + def foreach(f: MLRow => Unit): Unit = rdd.foreach(f) + /** * Return a new RDD containing only the elements that satisfy a predicate. */ - def filter(f: MLRow => Boolean): SparkMLTable = new SparkMLTable(rdd.filter(f), tableSchema) + def filter(f: MLRow => Boolean): SparkMLTable = new SparkMLTable(rdd.filter(f), Some(schema)) /** * Return the union of this RDD and another one. Any identical elements will appear multiple @@ -65,11 +71,10 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc //Join the table, combine the rows, and create a new schema. //val newRdd = t1.join(t2).map(k => MLRow(k._1 ++ k._2._1 ++ k._2._2)) - val newRdd = t1.join(t2).map { case (a,(b,c)) => MLRow.chooseRepresentation(a ++ b ++ c)} + val newRdd = t1.join(t2).map { case (a,(b,c)) => MLTuple(a ++ b ++ c)} lazy val newSchema = schema.join(other.schema, cols) - new SparkMLTable(newRdd, Some(newSchema)) - + SparkMLTable(newRdd, Some(newSchema)) } @@ -77,10 +82,8 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc * Return a new RDD by applying a function to all elements of this RDD. Schema is inferred from the results. * User is expected to provide a map function which produces elements of a consistent schema as output. */ - def map(f: MLRow => MLRow): SparkMLTable = { - val newRdd = rdd.map(f) - - SparkMLTable.fromMLRowRdd(newRdd) + def map(f: MLRow => MLTuple): SparkMLTable = { + new SparkMLTable(rdd.map(row => new MLRow(row.id, f(row)))) } @@ -88,31 +91,42 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap(f: MLRow => TraversableOnce[MLRow]): SparkMLTable = new SparkMLTable(rdd.flatMap(f)) + def flatMap(f: MLRow => TraversableOnce[MLTuple]): SparkMLTable = SparkMLTable(rdd.flatMap(f)) /** * Return a value by applying a reduce function to every element of the table. */ - def reduce(f: (MLRow, MLRow) => MLRow): MLRow = rdd.reduce(f) + def reduce(f: (MLTuple, MLTuple) => MLTuple): MLTuple = rdd.map(_.tuple).reduce(f) /** * Run a reduce on all values of the row, grouped by key. */ - def reduceBy(key: Seq[Index], f: (MLRow, MLRow) => MLRow): MLTable = { + def reduceBy(key: Seq[Index], f: (MLTuple, MLTuple) => MLTuple): MLTable = { //val notKey = nonCols(key, schema) - val newRdd = rdd.map(r => (r(key), r)).reduceByKey(f).map(_._2) - - SparkMLTable.fromMLRowRdd(newRdd) - } + val newRdd = rdd.map(r => (r.tuple(key), r.tuple)).reduceByKey(f).map(_._2) - def pairToRow(p: (MLRow, MLRow)) = { - MLRow(p._1 ++ p._2) + SparkMLTable(newRdd) } /** * Creates a new MLTable based on the cached version of the RDD. */ - def cache() = new SparkMLTable(rdd.cache(), tableSchema) + def cache() = new SparkMLTable(rdd.cache(), Some(schema)) + + /** + * Returns an MLTable containing the transpose of the source table. Expensive. + */ + override protected def computeTranspose: SparkMLTable = { + def explodeRow(row: MLRow) = { + row.tuple.nonZeros.map(tupled((column, value) => (column, (row.id.toInt, value)))) + } + val shuffled = rdd.flatMap(explodeRow).sortByKey(true).groupByKey + val newSize = numRows.asInstanceOf[Int] + val rows = shuffled.map(tupled((column, entries) => { + new MLRow(new MLRowID(column), SparseMLTuple.fromSparseCollection(entries, newSize, MLValue(0.0))) + })) + new SparkMLTable(rows) + } /** * Sort a table based on a key. @@ -120,13 +134,13 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc def sortBy(key: Seq[Index], ascending: Boolean = true): MLTable = { //val notKey = nonCols(key, schema) val newRdd = rdd.map(r => (r(key), r)).sortByKey(ascending).map(_._2) - new SparkMLTable(newRdd, tableSchema) + new SparkMLTable(newRdd, Some(schema)) } /** * Return a value by applying a function to all elements of this the table and then reducing them. */ - def mapReduce(f: MLRow => MLRow, sum: (MLRow, MLRow) => MLRow): MLRow = rdd.map(f).reduce(sum) + def mapReduce(f: MLRow => MLTuple, sum: (MLTuple, MLTuple) => MLTuple): MLTuple = rdd.map(f).reduce(sum) /** @@ -139,17 +153,17 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc // val res = f(mat) // res.toMLRows // } - def cachedMatrixMap(m: LocalMatrix): Iterator[MLRow] = f(m).toMLRows + def cachedMatrixMap(m: LocalMatrix): Iterator[MLVector] = f(m).toVectors - def createMatrix(i: Iterator[MLRow]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix.fromVecs(i.map(_.toVector).toSeq)) + def createMatrix(i: Iterator[MLRow]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix(i.map(_.toVector).toSeq)) cachedMat match { case None => { cachedMat = Some(rdd.mapPartitions(createMatrix(_)).cache()) - SparkMLTable.fromMLRowRdd(cachedMat.get.flatMap(cachedMatrixMap(_))) + SparkMLTable(cachedMat.get.flatMap(cachedMatrixMap(_))) } - case Some(value) => SparkMLTable.fromMLRowRdd(value.flatMap(cachedMatrixMap(_))) + case Some(value) => SparkMLTable(value.flatMap(cachedMatrixMap(_))) } //SparkMLTable.fromMLRowRdd(cachedMat.map(cachedMatrixMap(_))) @@ -163,7 +177,7 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc */ def project(cols: Seq[Index]) = { //TODO - project should project schema as well. - map(row => MLRow.chooseRepresentation(cols.map(i => row(i)).toSeq)) + map(row => MLTuple(cols.map(i => row(i)).toSeq)) } @@ -202,6 +216,14 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc def collect() = rdd.collect + override def toLocalMatrix(): LocalMatrix = { + val zeros = new Array[Double](numCols.asInstanceOf[Int]) + val dest = Array.fill(numRows.asInstanceOf[Int])(zeros) + val rows = rdd.map(row => (row.id.toInt, row.toVector.toMatrix.mat.toArray)).collect + rows .foreach(tupled((id, vec) => dest(id) = vec)) + new DenseMLMatrix(new org.jblas.DoubleMatrix(dest)) + } + def print() = rdd.collect.foreach(row => println(row.mkString("\t"))) @@ -209,17 +231,37 @@ class SparkMLTable(@transient protected var rdd: RDD[MLRow], inSchema: Option[Sc } object SparkMLTable { + def apply(rdd: RDD[MLTuple], schema: Option[Schema] = None): SparkMLTable = { + // determine size of each partition + val counts = rdd.mapPartitionsWithIndex((idx, part) => Iterator.single((idx, part.size))).collect.sorted + // compute base index for each partition + val bases = new Array[Int](counts.size) + counts.foreach(tupled((idx, sz) => { + if(idx == 0) { + bases(idx) = 0 + } else { + bases(idx) = bases(idx-1) + sz + } + })) + val newRdd = rdd.mapPartitionsWithIndex((idx, part) => { + val base = bases(idx) + part.zipWithIndex.map(tupled((tuple, idx) => new MLRow(base + idx, tuple))) + }) + new SparkMLTable(newRdd, schema) + } + + def fromDoubleArrays(rdd: RDD[Array[Double]]): SparkMLTable = { + SparkMLTable(rdd.map(MLVector(_))) + } + + /* def doubleArrayToMLRow(a: Array[Double]): MLRow = { val mldArray = a.map(MLValue(_)) DenseMLRow.fromSeq(mldArray) } - def apply(rdd: RDD[Array[Double]]): SparkMLTable = { - val mldRdd = rdd.map(row => MLRow.chooseRepresentation(row.map(MLValue(_)))) - new SparkMLTable(mldRdd) - } - def fromMLRowRdd(rdd: RDD[MLRow]): SparkMLTable = { new SparkMLTable(rdd) } -} \ No newline at end of file + */ +} diff --git a/src/main/scala/interface/LocalMatrix.scala b/src/main/scala/interface/LocalMatrix.scala index c79c38b..d8c1c12 100644 --- a/src/main/scala/interface/LocalMatrix.scala +++ b/src/main/scala/interface/LocalMatrix.scala @@ -11,8 +11,7 @@ abstract class LocalMatrix { //protected val mat: MLMatrix //Getting data out - def toMLRows: Iterator[MLRow] - def toMLVectors: Iterator[MLVector] + def toVectors: Iterator[MLVector] //Shape def numCols: Index @@ -39,6 +38,7 @@ abstract class LocalMatrix { //Matrix Algebra def times(y: LocalMatrix): LocalMatrix + def times(y: MLTuple): LocalMatrix def solve(y: LocalMatrix): LocalMatrix def transpose: LocalMatrix @@ -57,4 +57,4 @@ abstract class LocalMatrix { object LocalMatrix { def zeros(r: Index, c: Index): LocalMatrix = DenseMLMatrix.zeros(r,c) -} \ No newline at end of file +} diff --git a/src/main/scala/interface/MLContext.scala b/src/main/scala/interface/MLContext.scala index 70b5142..8ea76f7 100644 --- a/src/main/scala/interface/MLContext.scala +++ b/src/main/scala/interface/MLContext.scala @@ -1,10 +1,11 @@ package mli.interface -import org.apache.spark +import org.apache.spark.SparkContext + import mli.interface.impl.SparkMLTable import org.apache.spark.broadcast.Broadcast -class MLContext(@transient val sc: spark.SparkContext) extends Serializable { +class MLContext(@transient val sc: SparkContext) extends Serializable { /** * Stop the MLContext @@ -33,10 +34,12 @@ class MLContext(@transient val sc: spark.SparkContext) extends Serializable { } val rdd = sc.textFile(path) - if (isNumeric) SparkMLTable(rdd.map(parsePoint(_,sep))) - //TODO: Need to build the non-numeric case. Also need to pass in header info. - else SparkMLTable.fromMLRowRdd(rdd.map(x => DenseMLRow.fromSeq(x.split(sep).map(MLValue(_))))) - //else new DenseSparkMLTable(rdd.map(_.split(sep.toArray).map(str => MLValue(str.trim()))).map(MLRow.chooseRepresentation)) + if (isNumeric) { + SparkMLTable.fromDoubleArrays(rdd.map(parsePoint(_, sep))) + } else { + //TODO: Need to build the non-numeric case. Also need to pass in header info. + SparkMLTable(rdd.map(x => MLTuple(x.split(sep).map(MLValue(_))))) + } } /** @@ -47,13 +50,13 @@ class MLContext(@transient val sc: spark.SparkContext) extends Serializable { */ def loadCsvFile(path: String, isNumeric: Boolean = false): MLTable = loadFile(path, ",", isNumeric) - def loadText(path: String): MLTable = new SparkMLTable(sc.textFile(path).map((x: String) => MLRow(MLValue(x))), - Some(new Schema(Seq(ColumnSpec(Some("string"), ColumnType.String))))) + def loadText(path: String): MLTable = SparkMLTable(sc.textFile(path).map((x: String) => MLTuple(Seq(MLValue(x)))), + Some(new Schema(Seq(ColumnSpec(ColumnType.StringType, Some("string")))))) def load(data: Array[Array[Double]], numSlices: Int = 4) = { //val newRdd = sc.makeRDD(data.map(row => MLRow.chooseRepresentation(row.map(MLValue(_))))) val newRdd = sc.makeRDD(data) - SparkMLTable(newRdd) + SparkMLTable.fromDoubleArrays(newRdd) } /** @@ -82,15 +85,16 @@ class MLContext(@transient val sc: spark.SparkContext) extends Serializable { }) } - def makeRow(x: Array[(Int, Double)], maxIndex: Int): MLRow = { + def makeRow(x: Array[(Int, Double)], maxIndex: Int): MLTuple = { val newRow = x.map(v => (v._1, MLDouble(Some(v._2)))).toIterable - SparseMLRow.fromSparseCollection(newRow, maxIndex, 0.0) + SparseMLTuple.fromSparseCollection(newRow, maxIndex, 0.0) } + val rdd = sc.textFile(path).map(parseSparsePoint) val maxIndex = rdd.map(x => x.last._1).reduce(_ max _) val newRdd = rdd.map(makeRow(_, maxIndex)) - SparkMLTable.fromMLRowRdd(newRdd) + SparkMLTable(newRdd) } } diff --git a/src/main/scala/interface/MLRow.scala b/src/main/scala/interface/MLRow.scala deleted file mode 100644 index accb399..0000000 --- a/src/main/scala/interface/MLRow.scala +++ /dev/null @@ -1,179 +0,0 @@ -package mli.interface - -import mli.interface.MLTypes._ -import scala.collection.immutable.TreeMap -import scala.collection.mutable.ArrayBuffer -import scala.collection._ -import generic.{HasNewBuilder, SeqFactory, CanBuildFrom} -import scala._ -import scala.collection.immutable -import scala.collection.IndexedSeq -import scala.Some -import scala.Iterator -import scala.collection.Seq -import scala.collection.TraversableOnce -import scala.Some - -/** - * A single row composed of zero or more columns. MLRow is typically used in - * an MLTable or as a standalone parameter vector. - * - * MLVector operations supported by this type are generally fast. Scala - * collections operations inherited from Seq, like map(), are provided for - * convenience but should generally not be used in performance-critical code. - * When an MLRow could be large and sparse, prefer MLVector operations or - * operations that act on non-zero entries, such as the iterator nonZeros(). - */ -trait MLRow extends IndexedSeq[MLValue] - with IndexedSeqLike[MLValue, MLRow] - with MLVectorable - with Ordered[MLRow] - with Serializable { - - /** Supports indexing by a sequence of indices. */ - def apply(inds: Seq[Int]): MLRow = MLRow(inds.map(this(_)):_*) - - def drop(cols: Seq[Int]) = { - val converse = (0 until this.length).diff(cols).toArray - apply(converse) - } - - /** An iterator through the nonzero rows ((index, value) pairs) of this row. */ - def nonZeros(): Iterator[(Int, MLValue)] - - /** A method for building the implementation from a sequence of MLValues. */ - def newMlRowBuilder: mutable.Builder[MLValue, MLRow] - - implicit def vectorToRow(v: MLVector): MLRow = MLRow.chooseRepresentation(v) - - override protected[this] def newBuilder = MLRow.newBuilder - - override def toString = this.mkString("\t") - - /** Implements dense lexicographic ordering on an MLRow */ - def compare(that: MLRow): Int = { - for ((thisOne,thatOne) <- this.zip(that)) { - if(thisOne != thatOne) { - if (thisOne > thatOne) return 1 - else return -1 - } - } - return 0 - } - - def toDoubleArray: Array[Double] -} - -object MLRow { - private val MIN_SIZE_FOR_SPARSE_REPRESENTATION = 1000 - private val MAX_DENSITY_FOR_SPARSE_REPRESENTATION = .5 - - def apply(row: MLValue*) = chooseRepresentation(row.toSeq) - def apply(row: MLVector) = chooseRepresentation(row.data.data.map(MLValue(_)).toSeq) - - /** - * Choose a reasonable sparse or dense representation for @row. - * - * @param row should contain only numeric values. - */ - def chooseRepresentation(row: Seq[MLValue]): MLRow = { - val len = row.length - if (len >= MIN_SIZE_FOR_SPARSE_REPRESENTATION - && row.count(_.toNumber != 0) < len * MAX_DENSITY_FOR_SPARSE_REPRESENTATION ) { - SparseMLRow.fromNumericSeq(row) - } else { - DenseMLRow.fromSeq(row.toIndexedSeq) - } - } - - /** @see chooseRepresentation(row: Seq) */ - def chooseRepresentation(row: Array[MLValue]): MLRow = chooseRepresentation(row.toSeq) - - def newBuilder: mutable.Builder[MLValue, MLRow] = new ArrayBuffer[MLValue]().mapResult({array => chooseRepresentation(array.toSeq)}) - - implicit def canBuildFrom: CanBuildFrom[MLRow, MLValue, MLRow] = new CanBuildFrom[MLRow, MLValue, MLRow] { - override def apply() = newBuilder - // Allow subclasses to define their own builders. - override def apply(from: MLRow) = from.newMlRowBuilder - } - - implicit def rowToVector(from: MLRow): MLVector = from.toVector -} - - -/** - * An implementation of MLRow targeted for rows with mostly interesting (e.g. - * non-zero) values. - * - * @param row the actual values of the row. - */ -class DenseMLRow(private val row: immutable.IndexedSeq[MLValue]) extends MLRow { - override def length = row.length - - override def apply(index: Int) = row.apply(index) - - override def iterator = row.iterator - - override def nonZeros = (0 until length).map({index => (index, row(index))}).filter(_._2.toNumber != 0).iterator - - override def newMlRowBuilder = new ArrayBuffer[MLValue]().mapResult({array => DenseMLRow.fromSeq(array.toSeq)}) - - lazy val vec = MLVector(row.toArray) - - override implicit def toVector = vec - - def toDoubleArray = row.map(_.toNumber).toArray -} - -object DenseMLRow { - def apply(row: MLValue*) = fromSeq(row.toSeq) - - def fromSeq(row: Seq[MLValue]) = new DenseMLRow(row.toIndexedSeq) -} - -/** - * A simple implementation of a sparse row. Could use some actual - * optimization for vector ops. - * - * @param sparseElements is a sparse representation of this row, a map from column - * indices to MLValues. Indices not present in this map will - * be considered empty. - * @param trueLength is the actual length of the row, including empty columns. - * @param emptyValue is the default value for elements not in @param elements. - */ -class SparseMLRow private( - private val sparseElements: immutable.SortedMap[Int, MLValue], - private val trueLength: Int, - private val emptyValue: MLValue) extends MLRow { - override def length = trueLength - - override def apply(index: Int): MLValue = sparseElements.get(index) match { - case None => emptyValue - case Some(value) => value - } - - override def iterator = (0 until trueLength).map(index => apply(index)).iterator - - override def nonZeros = sparseElements.iterator - - lazy val vec = MLVector(iterator.toArray) - override implicit def toVector = MLVector(iterator.toArray) - def toDoubleArray = this.toVector.data.data - - //TODO: Need to do some performance testing here. - override def newMlRowBuilder = new ArrayBuffer[MLValue]().mapResult({array => - val n = TreeMap[Int, MLValue](array.zipWithIndex.filter(_._1 != emptyValue).map(x => (x._2,x._1)):_*) - new SparseMLRow(n, array.length, emptyValue) - }) -} - - -object SparseMLRow { - def fromSparseCollection(elements: TraversableOnce[(Int, MLValue)], trueLength: Int, emptyValue: MLValue) - = new SparseMLRow(TreeMap.empty[Int, MLValue] ++ elements, trueLength, emptyValue) - - def fromNumericSeq(row: Seq[MLValue]) = { - val nonZeros: Seq[(Int, MLValue)] = (0 until row.length).zip(row).filter(_._2.toNumber != 0) - new SparseMLRow(TreeMap.empty[Int, MLValue] ++ nonZeros, row.length, MLValue(0.0)) - } -} diff --git a/src/main/scala/interface/MLTable.scala b/src/main/scala/interface/MLTable.scala index d8717f1..2474b92 100644 --- a/src/main/scala/interface/MLTable.scala +++ b/src/main/scala/interface/MLTable.scala @@ -4,89 +4,39 @@ import mli.interface.impl.{MLNumericTable, SparkMLTable} import mli.interface.MLTypes._ import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import SparkContext._ import org.apache.spark.mllib.regression.LabeledPoint +class MLRowID(val id: Long) extends Serializable with math.Ordered[MLRowID] { + override def toString = id.toString -/** - * Enumerated column type. Currently supports Int, Double, String, and Empty. - */ -object ColumnType extends Enumeration with Serializable{ - val Int, Double, String, Empty = Value -} + def compare(that: MLRowID) = id compare that.id + def toInt = id.asInstanceOf[Int] -/** - * Contains metadata about a particular column. Contains an Optional name and enumerated Column Type. - * @param name Optional Column name currently accessed through Schema.lookup() - * @param kind Enumerated Column type. - */ -class ColumnSpec(val name: Option[String], val kind: ColumnType.Value) extends Serializable -object ColumnSpec { - def apply(name: Option[String], kind: ColumnType.Value): ColumnSpec = new ColumnSpec(name, kind) + def toLong = id } +object MLRowID { + implicit def Long2MLRowID(id: Long) = new MLRowID(id) +} -/** - * A schema represents the types of the columns of an MLTable. Users may use schema information to infer - * properties of the table columns - which are numeric vs. text, which have missing values, etc. - * @param columns The specification of each column, in order. - */ -class Schema(val columns: Seq[ColumnSpec]) extends Serializable { - val hasText: Boolean = columns.map(_.kind).contains(ColumnType.String) - - val hasMissing: Boolean = columns.map(_.kind).contains(ColumnType.Empty) - - val isNumeric: Boolean = columns.forall(Set(ColumnType.Int, ColumnType.Double) contains _.kind) - - val numericCols: Seq[Index] = columns.zipWithIndex.filter(Set(ColumnType.Int, ColumnType.Double) contains _._1.kind).map(_._2) - val emptyCols: Seq[Index] = columns.zipWithIndex.filter(_._1.kind == ColumnType.Empty).map(_._2) - val textCols: Seq[Index] = columns.zipWithIndex.filter(_._1.kind == ColumnType.String).map(_._2) - - /** - * Function - * @param other - * @param cols - * @return - */ - def join(other: Schema, cols: Seq[Index]): Schema = { - - val joincols = cols.map(columns(_)) - val otherjoincols = cols.map(other.columns(_)) - assert(joincols == otherjoincols) - - val t1OtherSchema = columns.indices.diff(cols).map(columns(_)) - val t2OtherSchema = other.columns.indices.diff(cols).map(other.columns(_)) - - new Schema(joincols ++ t1OtherSchema ++ t2OtherSchema) - } - - override def toString = columns.zipWithIndex.map { case (c,i) => c.name.getOrElse(i) }.mkString("\t") - - //Helper functions. +class MLRow(val id: MLRowID, val tuple: MLTuple) extends Serializable { + // FIXME: kludge + lazy val toVector = tuple.toVector - /** - * Return column indexes from column names. Current implementation is expensive for wide rows. - * @param names Column names of interest. - * @return A list of indexes in order corresponding to the string names. - * If a column name does not exist, it is omitted from the result list. - */ - def lookup(names: Seq[String]): Seq[Index] = names.map(n => columns.indexWhere(c => c.name.getOrElse("") == n)).filter(_ != -1) + override def toString = "(%s) %s".format(id, tuple) } -object Schema { - def apply(row: MLRow) = new Schema(row.map(c => { - c match { - case MLInt(i) => new ColumnSpec(None, ColumnType.Int) - case MLDouble(d) => new ColumnSpec(None, ColumnType.Double) - case MLString(s) => new ColumnSpec(None, ColumnType.String) - } - })) +trait MLRowLowPriority { + // FIXME: complete utter kludge + implicit def MLRowToMLVector(row: MLRow) = row.toVector } -class SchemaException(val error: String) extends Exception - +object MLRow extends MLRowLowPriority { + implicit def MLRowToMLTuple(row: MLRow) = row.tuple +} /** * This is the base interface for an MLTable object and defines the basic operations it needs to support. @@ -97,30 +47,36 @@ class SchemaException(val error: String) extends Exception trait MLTable { val numCols: Int val numRows: Long - var tableSchema: Option[Schema] - - def schema(): Schema + val schema: Schema + def foreach(f: MLRow => Unit): Unit def filter(f: MLRow => Boolean): MLTable def union(other: MLTable): MLTable - def map(f: MLRow => MLRow): MLTable - def mapReduce(m: MLRow => MLRow, r: (MLRow, MLRow) => MLRow ): MLRow + def map(f: MLRow => MLTuple): MLTable + def mapReduce(m: MLRow => MLTuple, r: (MLTuple, MLTuple) => MLTuple): MLTuple def matrixBatchMap(f: LocalMatrix => LocalMatrix): MLTable def project(cols: Seq[Index]): MLTable def join(other: MLTable, cols: Seq[Index]): MLTable - def flatMap(m: MLRow => TraversableOnce[MLRow]): MLTable + def flatMap(m: MLRow => TraversableOnce[MLTuple]): MLTable def cache(): MLTable - def reduce(f: (MLRow, MLRow) => MLRow): MLRow - def reduceBy(keys: Seq[Index], f: (MLRow, MLRow) => MLRow): MLTable + // FIXME: should return MLContext + def context: SparkContext + + lazy val transpose: MLTable = computeTranspose + protected def computeTranspose: MLTable + + def reduce(f: (MLTuple, MLTuple) => MLTuple): MLTuple + def reduceBy(keys: Seq[Index], f: (MLTuple, MLTuple) => MLTuple): MLTable def sortBy(keys: Seq[Index], ascending: Boolean=true): MLTable //No support for full table to Matrix just yet. - //def toMatrix: MLMatrix + //def toMatrix: LocalMatrix //No support for iterator yet. //def iterator(): Iterator[MLRow] def collect(): Seq[MLRow] def take(n: Int): Seq[MLRow] + def toLocalMatrix(): LocalMatrix //We also want to support sampling. def sample(fraction: Double, withReplacement: Boolean = false, seed: Int = 42): MLTable @@ -139,24 +95,21 @@ trait MLTable { project(converse) } - def setSchema(newSchema: Schema) = { - tableSchema = Some(newSchema) - } - override def toString = { schema.toString + "\n" + this.take(200).mkString("\n") } + /* FIXME def setColNames(names: Seq[String]) = { val theSchema = schema() - val newcols = (0 until names.length).map(i => new ColumnSpec(Some(names(i)), theSchema.columns(i).kind)) + val newcols = (0 until names.length).map(i => new ColumnSpec(theSchema.columns(i).kind, Some(names(i)))) tableSchema = Some(new Schema(newcols)) } + */ } object MLTable { - def apply(dat: RDD[Array[Double]]) = SparkMLTable(dat) + def apply(dat: RDD[Array[Double]]) = SparkMLTable.fromDoubleArrays(dat) implicit def toNumericTable(tab: MLTable) = MLNumericTable(tab.toDoubleArrayRDD()) } - diff --git a/src/main/scala/interface/MLValue.scala b/src/main/scala/interface/MLValue.scala index 6055c15..e548a37 100644 --- a/src/main/scala/interface/MLValue.scala +++ b/src/main/scala/interface/MLValue.scala @@ -4,6 +4,7 @@ package mli.interface * Base class for basic ML types. */ abstract class MLValue() extends Ordered[MLValue] with Serializable { + def kind: ColumnType.Value def isEmpty: Boolean = false def isNumeric: Boolean def toNumber: Double @@ -14,9 +15,10 @@ abstract class MLValue() extends Ordered[MLValue] with Serializable { } case class MLInt(value: Option[Int]) extends MLValue { + override def kind: ColumnType.Value = ColumnType.IntType override def isEmpty = value.isEmpty - def isNumeric = true - def toNumber = value.getOrElse(0).toDouble + override def isNumeric = true + override def toNumber = value.getOrElse(0).toDouble override def toString = value match { case Some(x) => x.toString @@ -26,9 +28,10 @@ case class MLInt(value: Option[Int]) extends MLValue { case class MLDouble(value: Option[Double]) extends MLValue { + override def kind: ColumnType.Value = ColumnType.DoubleType override def isEmpty = value.isEmpty - def isNumeric = true - def toNumber = value.getOrElse(0.0) + override def isNumeric = true + override def toNumber = value.getOrElse(0.0) override def toString = value match { case Some(x) => x.toString @@ -38,9 +41,10 @@ case class MLDouble(value: Option[Double]) extends MLValue { case class MLString(value: Option[String]) extends MLValue { + override def kind: ColumnType.Value = ColumnType.StringType override def isEmpty = value.isEmpty - def isNumeric = false - def toNumber = 0.0 + override def isNumeric = false + override def toNumber = 0.0 override def toString = value match { case Some(x) => x.toString @@ -76,4 +80,4 @@ object MLValue { //Do we need an implicit for none? //implicit def emptyToMLValue(value: ) -} \ No newline at end of file +} diff --git a/src/main/scala/interface/MLVector.scala b/src/main/scala/interface/MLVector.scala index f3adf9d..bbbc8ed 100644 --- a/src/main/scala/interface/MLVector.scala +++ b/src/main/scala/interface/MLVector.scala @@ -1,23 +1,166 @@ package mli.interface -import org.jblas.DoubleMatrix import mli.impl.DenseMLMatrix +import scala.collection.{IndexedSeqLike, IndexedSeqOptimized} +import scala.collection.mutable.{ArrayBuffer, Builder, Queue => MutableQueue} +import scala.collection.immutable.{TreeMap, SortedMap} +import scala.collection.generic.{HasNewBuilder, SeqFactory, CanBuildFrom} +import org.jblas.DoubleMatrix + +trait MLTuple extends IndexedSeq[MLValue] + with IndexedSeqLike[MLValue, MLTuple] + with Ordered[MLTuple] + with Serializable { + + def schema: Schema + + def apply(inds: Seq[Int]): MLTuple = MLTuple(inds.map(this(_))) + + def toDoubleArray: Array[Double] = map(_.toNumber).toArray -trait MLVectorable { def toVector: MLVector + + /** Implements dense lexicographic ordering on an MLTuple */ + override def compare(that: MLTuple): Int = { + for ((thisOne,thatOne) <- this.zip(that)) { + if(thisOne != thatOne) { + val comparison = thisOne compare thatOne + if(comparison != 0) comparison + } + } + return 0 + } + + override def newBuilder: Builder[MLValue, MLTuple] = { + MLTuple.newBuilder + } + + def newTupleBuilder: Builder[MLValue, MLTuple] + + def nonZeroIndices: Iterable[Int] = 0 until length + + def nonZeroProjection: MLTuple = this + + def nonZeros: Iterable[(Int, MLValue)] = { + nonZeroIndices.zip(nonZeroProjection) + } +} + +object MLTuple { + + def newBuilder: Builder[MLValue, MLTuple] = { + new MLTupleBuilder() + } + + def create(data: MLValue*): MLTuple = { + apply(data.toSeq) + } + + def apply(data: Seq[MLValue]): MLTuple = { + (newBuilder ++= data).result + } + + def apply[T: ClassManifest](data: Seq[Double]): MLVector = { + new MLVector(data) + } + + implicit def canBuildFrom: CanBuildFrom[MLTuple, MLValue, MLTuple] = new CanBuildFrom[MLTuple, MLValue, MLTuple] { + override def apply() = newBuilder + // Allow subclasses to define their own builders. + override def apply(from: MLTuple) = from.newTupleBuilder + } +} + +class DenseMLTuple(private val values: Array[MLValue]) extends MLTuple { + + override def apply(idx: Int) = values(idx) + + override def length = values.length + + override def schema = new Schema(values.map(value => new ColumnSpec(value.kind))) + + override def toVector = new MLVector(values.map(_.toNumber)) + + override def newTupleBuilder = new ArrayBuffer[MLValue]().mapResult(buf => new DenseMLTuple(buf.toArray)) } /** - * A numerical vector supporting various mathematical operations efficiently. + * A simple implementation of a sparse row. Could use some actual + * optimization for vector ops. + * + * @param sparseElements is a sparse representation of this row, a map from column + * indices to MLValues. Indices not present in this map will + * be considered empty. + * @param trueLength is the actual length of the row, including empty columns. + * @param emptyValue is the default value for elements not in @param elements. */ -class MLVector(val data: DoubleMatrix) extends IndexedSeq[Double] with Serializable { - override def toString: String = data.toString +class SparseMLTuple private( + private val sparseElements: SortedMap[Int, MLValue], + private val trueLength: Int, + private val emptyValue: MLValue) extends MLTuple { + + override def length = trueLength + + override def apply(index: Int): MLValue = sparseElements.get(index) match { + case None => emptyValue + case Some(value) => value + } + + override def iterator = (0 until trueLength).map(index => apply(index)).iterator - //We need this for slice syntax. - def apply(idx: Int): Double = data.get(idx) + // FIXME: put real impl here + override def schema = null + + // FIXME: put real impl here + override def toVector = null + + //TODO: Need to do some performance testing here. + override def newTupleBuilder = new ArrayBuffer[MLValue]().mapResult({array => + val n = TreeMap[Int, MLValue](array.zipWithIndex.filter(_._1 != emptyValue).map(x => (x._2,x._1)):_*) + new SparseMLTuple(n, array.length, emptyValue) + }) + + override def nonZeros = sparseElements + + override def nonZeroIndices = sparseElements.keys + + override def nonZeroProjection = MLVector(sparseElements.values.toSeq) +} + +object SparseMLTuple { + def fromSparseCollection(elements: TraversableOnce[(Int, MLValue)], trueLength: Int, emptyValue: MLValue) + = new SparseMLTuple(TreeMap.empty[Int, MLValue] ++ elements, trueLength, emptyValue) + + def fromNumericSeq(row: Seq[MLValue]) = { + val nonZeros: Seq[(Int, MLValue)] = (0 until row.length).zip(row).filter(_._2.toNumber != 0) + new SparseMLTuple(TreeMap.empty[Int, MLValue] ++ nonZeros, row.length, MLValue(0.0)) + } +} + +class MLVector(val data: DoubleMatrix) extends MLTuple { + require(data.isRowVector()) + + def this(inData: Seq[Double]) = { + // FIXME: do we really want MLVectors to be row vectors? + this(new DoubleMatrix(inData.toArray).transpose) + } + + override def apply(idx: Int) = data.get(idx) + + override def length = data.length + + override def schema = new Schema(List.fill(length)(ColumnType.DoubleType)) + + override def toVector = this + + // FIXME: is this correct? see CanBuildFrom in "Architecture of Scala Collections" + override def newTupleBuilder = new ArrayBuffer[MLValue]().mapResult(buf => new DenseMLTuple(buf.toArray)) + + def toArray: Array[Double] = data.toArray + + def toMatrix: LocalMatrix = new DenseMLMatrix(data) - //Here we provide a few useful vector ops. def dot(other: MLVector): Double = data dot other.data def times(other: MLVector): MLVector = new MLVector(data.mul(other.data)) def plus(other: MLVector): MLVector = new MLVector(data.add(other.data)) @@ -29,25 +172,41 @@ class MLVector(val data: DoubleMatrix) extends IndexedSeq[Double] with Serializa def minus(other: Double): MLVector = new MLVector(data.sub(other)) def over(other: Double): MLVector = new MLVector(data.div(other)) - - //def outer(other: MLVector): MLMatrix = new MLMatrix(transpose(data) * data) - def length = data.length def sum: Double = data.sum - def toMatrix: LocalMatrix = new DenseMLMatrix(new DoubleMatrix(data.data).transpose) - def toArray = data.data - //val row = MLRow(this) + override def toString = data.toString } - object MLVector { def apply(data: Iterator[Double]): MLVector = apply(data.toArray) def apply(data: IndexedSeq[Double]): MLVector = apply(data.toArray) def apply(data: Seq[Double]): MLVector = apply(data.toArray) - def apply(data: Array[Double]): MLVector = new MLVector(new DoubleMatrix(data)) - def apply(data: Array[MLValue]): MLVector = apply(data.map(_.toNumber)) - - //Returns a zero vector of length D. + def apply(data: Array[Double]): MLVector = new MLVector(data) + def apply[T: ClassManifest](data: Seq[MLValue]): MLVector = apply(data.map(_.toNumber)) def zeros(d: Int): MLVector = apply(new Array[Double](d)) - implicit def vectorToRow(from: MLVector): MLRow = MLRow(from)//from.row -} \ No newline at end of file +} + +class MLTupleBuilder extends Builder[MLValue, MLTuple] { + + var queue: MutableQueue[MLValue] = new MutableQueue[MLValue] + var isDoubles = true + + override def clear = { + queue = new MutableQueue[MLValue] + isDoubles = true + } + + override def += (elem: MLValue) = { + queue += elem + isDoubles &&= elem.isInstanceOf[MLDouble] + this + } + + override def result = { + if(!queue.isEmpty && isDoubles) { + new MLVector(queue.map(_.toNumber).toArray) + } else { + new DenseMLTuple(queue.toArray) + } + } +} diff --git a/src/main/scala/interface/Schema.scala b/src/main/scala/interface/Schema.scala new file mode 100644 index 0000000..0193a8a --- /dev/null +++ b/src/main/scala/interface/Schema.scala @@ -0,0 +1,92 @@ +package mli.interface + +import mli.interface.MLTypes._ +import mli.interface._ + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import SparkContext._ + + +/** + * Enumerated column type. Currently supports Int, Double, String, and Empty. + */ +object ColumnType extends Enumeration with Serializable{ + val IntType, DoubleType, StringType, EmptyType = Value +} + +import ColumnType._ + +/** + * Contains metadata about a particular column. Contains an Optional name and enumerated Column Type. + * @param name Optional Column name currently accessed through Schema.lookup() + * @param kind Enumerated Column type. + */ +class ColumnSpec(val kind: ColumnType.Value, val name: Option[String] = None) extends Serializable + +object ColumnSpec { + def apply(kind: ColumnType.Value, name: Option[String]): ColumnSpec = new ColumnSpec(kind, name) + + implicit def ColumnTypeToColumnSpec(kind: ColumnType.Value) = new ColumnSpec(kind) +} + + +/** + * A schema represents the types of the columns of an MLTable. Users may use schema information to infer + * properties of the table columns - which are numeric vs. text, which have missing values, etc. + * @param columns The specification of each column, in order. + */ +class Schema(val columns: Seq[ColumnSpec]) extends Serializable { + val kinds: Seq[ColumnType.Value] = columns.map(_.kind) + + val hasText: Boolean = kinds.contains(StringType) + + val hasMissing: Boolean = kinds.contains(EmptyType) + + val isNumeric: Boolean = columns.forall(Set(IntType, DoubleType) contains _.kind) + + val isDoubles: Boolean = columns.forall(_.kind == DoubleType) + + val numericCols: Seq[Index] = columns.zipWithIndex.filter(Set(IntType, DoubleType) contains _._1.kind).map(_._2) + val emptyCols: Seq[Index] = columns.zipWithIndex.filter(_._1.kind == EmptyType).map(_._2) + val textCols: Seq[Index] = columns.zipWithIndex.filter(_._1.kind == StringType).map(_._2) + + override def equals(other: Any) = other match { + case that: Schema => + this.kinds == that.kinds + case _ => + false + } + + /** + * Function + * @param other + * @param cols + * @return + */ + def join(other: Schema, cols: Seq[Index]): Schema = { + + val joincols = cols.map(columns(_)) + val otherjoincols = cols.map(other.columns(_)) + assert(joincols == otherjoincols) + + val t1OtherSchema = columns.indices.diff(cols).map(columns(_)) + val t2OtherSchema = other.columns.indices.diff(cols).map(other.columns(_)) + + new Schema(joincols ++ t1OtherSchema ++ t2OtherSchema) + } + + override def toString = columns.zipWithIndex.map { case (c,i) => c.name.getOrElse(i) }.mkString("\t") + + //Helper functions. + + /** + * Return column indexes from column names. Current implementation is expensive for wide rows. + * @param names Column names of interest. + * @return A list of indexes in order corresponding to the string names. + * If a column name does not exist, it is omitted from the result list. + */ + def lookup(names: Seq[String]): Seq[Index] = names.map(n => columns.indexWhere(c => c.name.getOrElse("") == n)).filter(_ != -1) +} + +class SchemaException(val error: String) extends Exception diff --git a/src/main/scala/ml/Model.scala b/src/main/scala/ml/Model.scala index 79fb6d7..64fa0f5 100644 --- a/src/main/scala/ml/Model.scala +++ b/src/main/scala/ml/Model.scala @@ -13,9 +13,9 @@ abstract class Model[P](val trainingData: MLTable, val trainingParams: P) extends Serializable{ /* Predicts the label of a given data point. */ - def predict(x: MLRow) : MLValue + def predict(x: MLTuple) : MLValue def predict(tbl: MLTable) : MLTable = { - tbl.map((x: MLRow) => MLRow.chooseRepresentation(Seq(predict(x)))) + tbl.map(x => MLTuple(Seq(predict(x)))) } /** diff --git a/src/main/scala/ml/cf/BroadcastALS.scala b/src/main/scala/ml/cf/BroadcastALS.scala new file mode 100644 index 0000000..3b941af --- /dev/null +++ b/src/main/scala/ml/cf/BroadcastALS.scala @@ -0,0 +1,76 @@ +package mli.ml.cf + +import mli.ml._ +import mli.interface._ +import mli.interface.impl.SparkMLTable +import mli.impl.DenseMLMatrix +import org.apache.spark.broadcast.Broadcast + +object BroadcastALS { + def train(trainData: MLTable, k: Int, lambda: Double, maxIter: Int): (DenseMLMatrix, DenseMLMatrix) = { + val ctx = trainData.context + val m = trainData.numRows.asInstanceOf[Int] + val n = trainData.numCols.asInstanceOf[Int] + val trainDataTrans = trainData.transpose + val lambI = DenseMLMatrix.eye(k) * lambda + // Initialize U and V matrices randomly + val U0: DenseMLMatrix = DenseMLMatrix.rand(m, k) + val V0: DenseMLMatrix = DenseMLMatrix.rand(n, k) + (0 until maxIter).foldLeft((U0, V0))((UV, iterNum) => { + val U = UV._1 + val V = UV._2 + // Broadcast V + System.err.println("ALS %s: broadcast V".format(iterNum)) + val V_b = ctx.broadcast(V) + // Update U matrix + System.err.println("ALS %s: compute U".format(iterNum)) + val newU = computeFactor(trainData, V_b, lambI) + assert(newU.numRows == U.numRows) + assert(newU.numCols == U.numCols) + // Broadcast U + System.err.println("ALS %s: broadcast U".format(iterNum)) + val U_b = ctx.broadcast(newU) + // Update V matrix + System.err.println("ALS %s: compute V".format(iterNum)) + assert(trainDataTrans.numRows == V.numRows) + val newV = computeFactor(trainDataTrans, U_b, lambI) + assert(newV.numRows == V.numRows) + assert(newV.numCols == V.numCols) + // computing the residual is slow, so disable it by default + //System.err.println("ALS %s: RMSE = %s".format(iterNum, computeResidual(trainData, ctx.broadcast(newU), ctx.broadcast(newV)))) + (newU, newV) + }) + } + + def computeFactor(trainData: MLTable, fixedFactor: Broadcast[DenseMLMatrix], lambI: DenseMLMatrix): DenseMLMatrix = { + // FIXME: cast + trainData.map(localALS(_, fixedFactor.value, lambI)).toLocalMatrix.asInstanceOf[DenseMLMatrix] + } + + def localALS(trainDataPart: MLRow, Y: DenseMLMatrix, lambI: DenseMLMatrix) = { + val tuple = trainDataPart.tuple + // get rows of Y corresponding to observed entries of trainDataPart + val Yq = Y.getRows(tuple.nonZeroIndices) + // compute the local factor + val resultMat = ((Yq.transpose times Yq) + lambI).solve(Yq.transpose times tuple.nonZeroProjection) + new MLVector(resultMat.mat.transpose) + } + + def computeResidual(trainData: MLTable, U_b: Broadcast[DenseMLMatrix], V_b: Broadcast[DenseMLMatrix]) = { + import org.apache.spark.SparkContext._ + + def predict(user: Int, movie: Int) = { + val U = U_b.value.mat + val V = V_b.value.mat + U.getRow(user).dot(V.getRow(movie)) + } + + val errors = trainData.asInstanceOf[SparkMLTable].toMLRowRdd.flatMap(row => { + val user = row.id.toInt + val tuple = row.tuple.asInstanceOf[SparseMLTuple] + tuple.nonZeros.map(Function.tupled((movie, value) => math.pow(value.toNumber - predict(user, movie), 2.0))) + }) + + math.pow(errors.mean, 0.5) + } +} diff --git a/src/main/scala/ml/classification/SVM.scala b/src/main/scala/ml/classification/SVM.scala index 52808bc..01a69b5 100644 --- a/src/main/scala/ml/classification/SVM.scala +++ b/src/main/scala/ml/classification/SVM.scala @@ -14,7 +14,7 @@ class SVMModel( /* Predicts the label of a given data point. */ - def predict(x: MLRow) : MLValue = { + def predict(x: MLTuple) : MLValue = { MLValue(model.predict(x.toDoubleArray)) } diff --git a/src/main/scala/ml/clustering/KMeans.scala b/src/main/scala/ml/clustering/KMeans.scala index b86c2da..edd445d 100644 --- a/src/main/scala/ml/clustering/KMeans.scala +++ b/src/main/scala/ml/clustering/KMeans.scala @@ -14,7 +14,7 @@ class KMeansModel( /* Predicts the label of a given data point. */ - def predict(x: MLRow) : MLValue = { + def predict(x: MLTuple) : MLValue = { MLValue(model.predict(x.toDoubleArray)) } diff --git a/src/main/scala/ml/opt/GradientDescent.scala b/src/main/scala/ml/opt/GradientDescent.scala index 0b13574..20a8147 100644 --- a/src/main/scala/ml/opt/GradientDescent.scala +++ b/src/main/scala/ml/opt/GradientDescent.scala @@ -32,8 +32,9 @@ object GradientDescent extends MLOpt { val lambda = 1.0/iter wOld = wNew - val totalGradient = dat.mapReduce(grad(_, wOld), arraySum(_, _)) - val avgGradient = (totalGradient times lambda) over dat.numRows + // FIXME: get rid of toVector calls + val totalGradient = dat.mapReduce(r => grad(r, wOld), (x, y) => arraySum(x.toVector, y.toVector)) + val avgGradient = (totalGradient.toVector times lambda) over dat.numRows wNew = wOld minus avgGradient iter += 1 } diff --git a/src/main/scala/ml/opt/StochasticGradientDescent.scala b/src/main/scala/ml/opt/StochasticGradientDescent.scala index cb5c42f..db9a176 100644 --- a/src/main/scala/ml/opt/StochasticGradientDescent.scala +++ b/src/main/scala/ml/opt/StochasticGradientDescent.scala @@ -35,12 +35,16 @@ object StochasticGradientDescent extends MLOpt with Serializable { runSGD(data, params.wInit, params.learningRate, params.grad, params.maxIter, params.eps) } - def runSGD(data: MLNumericTable, - wInit: MLVector, - learningRate: Double, - grad: (MLVector, MLVector) => MLVector, - maxIter: Int, - eps: Double): MLVector = { + def runSGD( + data: MLNumericTable, + wInit: MLVector, + learningRate: Double, + grad: (MLVector, MLVector) => MLVector, + maxIter: Int, + eps: Double + ): MLVector = { + + //Initialize the model weights and set data size. var weights = wInit var weightsOld = weights @@ -50,8 +54,8 @@ object StochasticGradientDescent extends MLOpt with Serializable { //Main loop of SGD. Calls local SGD and averages parameters. Checks for convergence after each pass. while(i < maxIter && diff > eps) { - //weights = data.map(sgdStep(_, weights, learningRate, grad)).reduce(_ plus _) over n - weights = data.matrixBatchMap(localSGD(_, weights, learningRate, grad)).reduce(_ plus _) over n + // FIXME: get rid of these toVector calls + weights = data.matrixBatchMap(localSGD(_, weights, learningRate, grad)).reduce(_.toVector plus _.toVector).toVector over n diff = ((weights minus weightsOld) dot (weights minus weightsOld)) println(diff) @@ -75,9 +79,9 @@ object StochasticGradientDescent extends MLOpt with Serializable { var loc = weights //For each row in the matrix. - for (i <- data.toMLVectors) { + for (i <- data.toVectors) { //Compute the gradient. - val grad = gradientFunction(i, loc) + val grad = gradientFunction(i, loc.toVector) //Update according to the learning rate. loc = loc minus (grad times lambda) diff --git a/src/main/scala/ml/regression/LinearRegression.scala b/src/main/scala/ml/regression/LinearRegression.scala index 5f0e30d..6e59608 100644 --- a/src/main/scala/ml/regression/LinearRegression.scala +++ b/src/main/scala/ml/regression/LinearRegression.scala @@ -13,7 +13,7 @@ class LinearRegressionModel( /* Predicts the label of a given data point. */ - def predict(x: MLRow) : MLValue = { + def predict(x: MLTuple) : MLValue = { MLValue(model.predict(x.toDoubleArray)) } diff --git a/src/main/scala/temp/CFTest.scala b/src/main/scala/temp/CFTest.scala new file mode 100644 index 0000000..933ba24 --- /dev/null +++ b/src/main/scala/temp/CFTest.scala @@ -0,0 +1,54 @@ +package mli.impl + +import mli.interface._ +import mli.interface.MLTypes._ +import mli.interface.impl.SparkMLTable +import mli.feat.RandomizedSVD +import mli.ml.cf.BroadcastALS +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.jblas.{DoubleMatrix, Solve} +import Function.tupled + +// collaborative filtering test +object CFTest { + def parseLine(line: String) = { + val parts = line.split(",") + (parts(0).toInt, parts(1).toInt, parts(2).toDouble) + } + + def replicate(rdd: RDD[(Int, Int, Double)], multiple: Int): RDD[(Int, Int, Double)] = { + val numUsers = 1 + rdd.map(_._1).reduce(_ max _) + rdd.flatMap(entry => { + val user = entry._1 + val movie = entry._2 + val rating = entry._3 + (0 until multiple).map(m => (m * numUsers + user, movie, rating)) + }) + } + + def main(args: Array[String]) { + System.setProperty("spark.storage.blockManagerHeartBeatMs", "300000") + System.setProperty("spark.executor.memory", "4g") + val sc = new SparkContext(args(0), "cf-test")//, "/root/spark", Seq("/root/MLI-assembly-1.0.jar")) + //val input = sc.textFile(args(1) + "/data/movies/training_matrix.txt") + //val input = sc.textFile(args(1) + "/data/movies/50k-users-2k-movies-b.txt") + //val triples = replicate(input.map(parseLine), args(2).toInt) + //triples = triples.filter(tupled((row, col, value) => row < 50000 && col < 2000)).coalesce(8) + var triples = sc.objectFile(args(1) + "/data/movies/50k-users-2k-movies-b.txt").asInstanceOf[RDD[(Int,Int,Double)]] + val numCols: Int = 1 + triples.map(tupled((user, movie, value) => movie)).reduce(_ max _) + val movieAvg = triples. + map(tupled((user, movie, value) => (movie, (value, 1)))). + reduceByKey((left, right) => (left._1 + right._1, left._2 + right._2)). + mapValues(tupled((sum, count) => sum / count)). + collectAsMap + val rows = triples.map(tupled((r, c, v) => (r, (c, MLValue(v - movieAvg(c)))))).groupByKey.mapValues(SparseMLTuple.fromSparseCollection(_, numCols, 0.0)) + val table = new SparkMLTable(rows.map(tupled((id, row) => new MLRow(new MLRowID(id), row))).cache()) + System.out.println("rows: " + table.numRows) + val rank = 10 + //val svd = RandomizedSVD.compute(table, rank, 0) + //System.out.println("singular values: " + svd._2) + val UV = BroadcastALS.train(table, rank, 0.01, 50) + } +} diff --git a/src/main/scala/util/MLLRDemo.scala b/src/main/scala/util/MLLRDemo.scala index 01e96e2..b1e4d0e 100644 --- a/src/main/scala/util/MLLRDemo.scala +++ b/src/main/scala/util/MLLRDemo.scala @@ -5,6 +5,7 @@ import mli.interface.impl.MLNumericTable import mli.ml.classification._ import org.apache.spark.SparkContext +/* FIXME object MLILRDemo { def main(args: Array[String]) = { val mc = new MLContext(new SparkContext("local[4]", "MLILRtest")) @@ -17,4 +18,5 @@ object MLILRDemo { println("Time to train: " + model.trainingTime) } -} \ No newline at end of file +} +*/ diff --git a/src/main/scala/util/MLTableUtils.scala b/src/main/scala/util/MLTableUtils.scala index 81f99e6..26f63c6 100644 --- a/src/main/scala/util/MLTableUtils.scala +++ b/src/main/scala/util/MLTableUtils.scala @@ -7,12 +7,12 @@ object MLTableUtils { //TODO: Implement toNumeric(?) //def toNumeric(mlt: MLTable): MLNumericTable = ??? - def normalize(mlnt: MLTable): (MLTable, MLRow => MLRow) = { + def normalize(mlnt: MLTable): (MLTable, MLRow => MLTuple) = { //Todo: we can do this in one pass over the data by keeping track of sum(x**2). - val mean = (mlnt.reduce(_ plus _)) over (mlnt.numRows.toDouble) - val sd = mlnt.map(x => (x minus mean) times (x minus mean)).reduce(_ plus _) + val mean = (mlnt.reduce(_.toVector plus _.toVector)).toVector over (mlnt.numRows.toDouble) + val sd = mlnt.map(x => (x minus mean) times (x minus mean)).reduce(_.toVector plus _.toVector) - def trans(x: MLRow): MLRow = (x minus mean) over sd + def trans(x: MLRow): MLTuple = (x minus mean).toVector over sd.toVector val res = mlnt.map(trans) (res, trans) diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..26b73a1 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file core/target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN +org.eclipse.jetty.LEVEL=WARN diff --git a/src/test/scala/feat/RandomizedSVDSuite.scala b/src/test/scala/feat/RandomizedSVDSuite.scala new file mode 100644 index 0000000..2203b4c --- /dev/null +++ b/src/test/scala/feat/RandomizedSVDSuite.scala @@ -0,0 +1,110 @@ +package mli.feat + +import mli.interface.MLTypes._ +import mli.interface._ +import mli.interface.impl.SparkMLTable + +import org.scalatest.FunSuite +import org.jblas.{DoubleMatrix, Singular} +import org.jblas.ranges.IntervalRange +import scala.util.Random +import org.apache.spark.SparkContext +import mli.test.testsupport.LocalSparkContext + +class RandomizedSVDSuite extends FunSuite with LocalSparkContext { + test("Test MLTable mmul") { + sc = new SparkContext("local", "svdtest") + val rows = 250 + val cols = 50 + val rank = 10 + val lhs = randomMatrix(rows, cols) + val rhs = randomMatrix(cols, rank) + val reference = lhs mmul rhs + val test = RandomizedSVD.mmul(toTable(lhs), rhs) + val error = (reference sub test).norm2 + shutdownSparkContext + System.err.println("test mmul error: " + error) + assert(error < 1e-9) + } + + test("Test MLTable transpose mmul") { + sc = new SparkContext("local", "svdtest") + val rows = 250 + val cols = 50 + val rank = 10 + val lhs = randomMatrix(rows, cols) + val rhs = DoubleMatrix.eye(rows) + val reference = lhs.transpose mmul rhs + val table = toTable(lhs) + val test = RandomizedSVD.mmulTransposed(table, rhs) + val error = (reference sub test).norm2 + shutdownSparkContext + System.err.println("transpose mmul test error: " + error) + assert(math.abs(error) < 1e-9) + } + + test("Basic test of randomized SVD") { + sc = new SparkContext("local", "svdtest") + val rows = 250 + val cols = 50 + val rank = 10 + val orig = randomMatrix(rows, cols) + val reference = bestApproximation(orig, rank) + val test = randomizedApproximation(toTable(orig), rank) + val referr = (orig sub reference).norm2 + val testerr = (orig sub test).norm2 + val relerr = (testerr - referr) / referr + shutdownSparkContext + System.err.println("reference error: " + referr) + System.err.println("test error: " + testerr) + System.err.println("relative error: %.2f%%".format(100.0 * relerr)) + assert(relerr > 0.0 && relerr < 0.01) + } + + // returns matrix with exponentially distributed spectrum + private def randomMatrix(m: Int, n: Int): DoubleMatrix = { + val orig = DoubleMatrix.randn(m, n) + val spectrum = DoubleMatrix.diag(exponentialSamples(math.min(m, n))) + val svd = Singular.sparseSVD(orig) + svd(0) mmul spectrum mmul svd(2).transpose + } + + private def exponentialSamples(count: Int): DoubleMatrix = { + val samples = Seq.fill(count)(Random.nextDouble).map(exponentialInverseCdf) + new DoubleMatrix(samples.toArray) + } + + private def exponentialInverseCdf(p: Double): Double = { + val lambda = 0.25 + -math.log(1.0 - p) / lambda + } + + private def toTable(matrix: DoubleMatrix): MLTable = { + SparkMLTable.fromDoubleArrays(sc.parallelize(matrix.toArray2)) + } + + private def randomizedApproximation(mat: MLTable, rank: Index): DoubleMatrix = { + val svd = RandomizedSVD.compute(mat, rank, 2) + val U = svd._1 + val S = DoubleMatrix.diag(svd._2) + val V = svd._3 + return U mmul S mmul V.transpose + } + + private def bestApproximation(mat: DoubleMatrix, rank: Index): DoubleMatrix = { + val svd = Singular.fullSVD(mat) + assert(isSorted(svd(1).data)) + val U = rtrim(svd(0), rank) + val S = DoubleMatrix.diag(svd(1)).getRange(0, rank, 0, rank) + val V = rtrim(svd(2), rank) + return U mmul S mmul V.transpose + } + + // only keeps the first k columns, discards the rest + private def rtrim(mat: DoubleMatrix, k: Int) = { + assert(mat.columns >= k) + mat.getRange(0, mat.rows, 0, k) + } + + private def isSorted(s: Seq[Double]) = (s, s.tail).zipped.forall(_ >= _) +} diff --git a/src/test/scala/impl/MLRowSuite.scala b/src/test/scala/impl/MLRowSuite.scala deleted file mode 100644 index e815bea..0000000 --- a/src/test/scala/impl/MLRowSuite.scala +++ /dev/null @@ -1,43 +0,0 @@ -package mli.test.impl - -import org.scalatest.FunSuite -import mli.interface._ - - -class MLRowSuite extends FunSuite { - - test("DenseMLRow can be constructed") { - val row: DenseMLRow = DenseMLRow(MLValue(0.0), MLValue("foo")) - } - - test("SparseMLRow can be constructed") { - val row: SparseMLRow = SparseMLRow.fromSparseCollection(Map(0 -> MLValue(0.0), 100000 -> MLValue("foo")), 200000, MLValue(0.0)) - } - - test("MLRow.chooseRepresentation() chooses a sparse representation for a sparse vector") { - val row: MLRow = MLRow.chooseRepresentation((0 until 10000).map({value => if (value == 400) MLValue(2) else MLValue(0)})) - assert(row.getClass === classOf[SparseMLRow]) - } - - test("MLRow.chooseRepresentation() chooses a dense representation for a dense vector") { - val row: MLRow = MLRow.chooseRepresentation((0 until 10000).map(MLValue.apply)) - assert(row.getClass === classOf[DenseMLRow]) - } - - test("MLRow.chooseRepresentation() chooses a dense representation for a small vector") { - val row: MLRow = MLRow.chooseRepresentation((0 until 10).map({value => MLValue.apply(0.0)})) - assert(row.getClass === classOf[DenseMLRow]) - } - - test("Mapping a SparseMLRow produces a SparseMLRow") { - val row: MLRow = SparseMLRow.fromSparseCollection(Map(1 -> MLValue(1), 100000 -> MLValue(2)), 200000, MLValue(0.0)) - val mappedRow: MLRow = row.map(value => MLValue(value.toNumber * 2)) - assert(row.getClass === classOf[SparseMLRow]) - } - - test("Mapping a DenseMLRow produces a DenseMLRow") { - val row: MLRow = DenseMLRow.fromSeq((0 until 10000).map(MLValue.apply)) - val mappedRow: MLRow = row.map(value => MLValue(value.toNumber + 1)) - assert(row.getClass === classOf[DenseMLRow]) - } -} diff --git a/src/test/scala/impl/MLTableSuite.scala b/src/test/scala/impl/MLTableSuite.scala deleted file mode 100644 index 791a360..0000000 --- a/src/test/scala/impl/MLTableSuite.scala +++ /dev/null @@ -1,59 +0,0 @@ -//package mli.test.interface -// -//import org.scalatest.FunSuite -//import scala.util.Random -//import mli.test.testsupport.LocalSparkContext -// -//class MLTableSuite extends FunSuite with LocalSparkContext { -// def makeToyLRData() = { -// -// val d = 5 -// val n = 1000 -// val weight = Array.fill(d){ Random.nextGaussian() } -// val X = Array.fill(n,d){ Random.nextGaussian() } -// val data = X.map( row => -// (if(row.zipAll(weight, 0.0, 0.0).map(x => x._1*x._2).sum > 0) 1.0 else 0.0) +: (row ++ Array("stuff", "", "another"))) -// val corruptedData = -// data.map(row => row.map(x => -// if(math.random < 0.01) -// (if(math.random < 0.5) "bug" else "") -// else x.toString).mkString("\t") ) -// -// corruptedData -// -// } -// -// -// val rawData = makeToyLRData() -// -// -// test("Created successfully and not numeric") { -// sc = new spark.SparkContext("local[4]", "test") -// val mc = new MLContext(sc) -// val mlt = mc.loadStringArray(rawData) -// -// assert(true)//!mlt.isNumeric) -// } -// -// test("Drop non-numeric") { -// sc = new spark.SparkContext("local[4]", "test") -// val mc = new MLContext(sc) -// val mlt = mc.loadStringArray(rawData) -// -// val mltNumeric = mlt.dropNonNumeric() -// assert(mltNumeric.isNumeric) -// } -// -// test("Normalization/Transformation") { -// sc = new spark.SparkContext("local[4]", "test") -// val mc = new MLContext(sc) -// val mlt = mc.loadStringArray(rawData) -// -// val mltNumeric = mlt.dropNonNumeric() -// val (normalizedMlt, transform) = mltNumeric.normalize(1 to mltNumeric.numCols) -// val eps = 0.0001 -// -// assert(normalizedMlt.mean.reduce(_+_)/normalizedMlt.mean.length - 0.0 < eps) -// assert(normalizedMlt.stdev.reduce(_+_)/normalizedMlt.stdev.length - 1.0 < eps) -// } -//} diff --git a/src/test/scala/interface/SchemaSuite.scala b/src/test/scala/interface/SchemaSuite.scala new file mode 100644 index 0000000..a4bbb07 --- /dev/null +++ b/src/test/scala/interface/SchemaSuite.scala @@ -0,0 +1,23 @@ +package mli.test.interface + +import org.scalatest.FunSuite +import mli.interface._ +import ColumnType._ + +class SchemaSuite extends FunSuite { + + test("Schema (in)equality tests") { + val schema1 = new Schema(Seq(IntType, IntType, StringType)) + val schema1b = new Schema(Seq( + new ColumnSpec(IntType, Some("foo")), + new ColumnSpec(IntType, Some("bar")), + new ColumnSpec(StringType, Some("baz")))) + val schema2 = new Schema(Seq(DoubleType, DoubleType, DoubleType)) + val schema2b = new Schema(Seq(DoubleType, DoubleType, DoubleType)) + val schema3 = new Schema(Seq(DoubleType, DoubleType, DoubleType, DoubleType)) + assert(schema1 == schema1b) + assert(schema1 != schema2) + assert(schema2 == schema2b) + assert(schema2 != schema3) + } +} diff --git a/src/test/scala/interface/SparkMLTableSuite.scala b/src/test/scala/interface/SparkMLTableSuite.scala new file mode 100644 index 0000000..cccbf5e --- /dev/null +++ b/src/test/scala/interface/SparkMLTableSuite.scala @@ -0,0 +1,30 @@ +package mli.test.interface + + +import mli.interface._ +import mli.interface.impl.SparkMLTable +import mli.test.testsupport.LocalSparkContext +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter +import org.jblas.DoubleMatrix +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ + +class SparkMLTableSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { + test("toLocalMatrix") { + sc = new SparkContext("local[4]", "toLocalMatrix test") + val table = new SparkMLTable(sc.parallelize(Seq( + new MLRow(0, MLVector(Array(1.0, 2.0, 3.0))), + new MLRow(1, MLVector(Array(2.0, 4.0, 6.0))), + new MLRow(3, MLVector(Array(4.0, 8.0, 12.0))) + ))) + val test = table.toLocalMatrix + val reference = new DoubleMatrix(Array( + Array(1.0, 2.0, 3.0), + Array(2.0, 4.0, 6.0), + Array(0.0, 0.0, 0.0), + Array(4.0, 8.0, 12.0) + )) + assert(test.mat == reference) + } +} diff --git a/src/test/scala/interface/VectorSuite.scala b/src/test/scala/interface/VectorSuite.scala new file mode 100644 index 0000000..408a90a --- /dev/null +++ b/src/test/scala/interface/VectorSuite.scala @@ -0,0 +1,33 @@ +package mli.test.interface + +import org.scalatest.FunSuite +import mli.interface._ + + +class VectorSuite extends FunSuite { + + test("Building with mixed data constructs a DenseMLTuple") { + val vector = (MLTuple.newBuilder ++= List(1.0, 2.0, "hello", 3)).result + // welcome to the Department of Redundancy Departments (parse error) + assert(vector.isInstanceOf[DenseMLTuple]) + assert(!vector.isInstanceOf[MLVector]) + } + + test("Building with doubles constructs an MLVector") { + val vector = (MLTuple.newBuilder ++= List(1.0, 2.0, 3.0)).result + assert(vector.isInstanceOf[MLVector]) + assert(!vector.isInstanceOf[DenseMLTuple]) + } + + test("Building with an int constructs a DenseMLTuple") { + val vector = (MLTuple.newBuilder ++= List(1.0, 2.0, 3)).result + assert(vector.isInstanceOf[DenseMLTuple]) + assert(!vector.isInstanceOf[MLVector]) + } + + test("Vector addition") { + val v1 = MLTuple(Seq(1.0, 2.0, 3.0)) + val v2 = MLTuple(Seq(4.0, 5.0, 6.0)) + val v3 = v1 plus v2 + } +} diff --git a/src/test/scala/testsupport/LocalSparkContext.scala b/src/test/scala/testsupport/LocalSparkContext.scala index 9ceb0be..93a19ca 100644 --- a/src/test/scala/testsupport/LocalSparkContext.scala +++ b/src/test/scala/testsupport/LocalSparkContext.scala @@ -24,6 +24,10 @@ trait LocalSparkContext extends BeforeAndAfterEach { self: Suite => } } + def shutdownSparkContext() = { + resetSparkContext + } + } object LocalSparkContext { @@ -42,4 +46,4 @@ object LocalSparkContext { } } -} \ No newline at end of file +}