Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ project/plugins/project/
#Eclipse specific
.classpath
.project

# Vim specific
.*.swp
21 changes: 19 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,30 @@ scalaVersion := "2.9.3"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.9.3" % "0.8.0-incubating",
"org.apache.spark" % "spark-mllib_2.9.3" % "0.8.0-incubating",
"org.scalatest" %% "scalatest" % "1.9.1" % "test"
"org.slf4j" % "slf4j-log4j12" % "1.7.5",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
// Custom build of jblas including the "orgqr" function,
// pending merge of https://github.com/mikiobraun/jblas/pull/34
"org.jblas" % "jblas" % "1.2.4-amplab-SNAPSHOT"
)

resolvers ++= Seq(
"Typesafe" at "http://repo.typesafe.com/typesafe/releases",
"Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/",
"ScalaNLP Maven2" at "http://repo.scalanlp.org/repo",
"Spray" at "http://repo.spray.cc"
"Spray" at "http://repo.spray.cc",
"Local Maven Repo" at "file://" + Path.userHome + "/.m2/repository"
)

test in assembly := {}

mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case "log4j.properties" => MergeStrategy.concat
case _ => MergeStrategy.first
}

net.virtualvoid.sbt.graph.Plugin.graphSettings
Binary file removed lib/jblas-1.2.3.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.11.2
sbt.version=0.12.4
4 changes: 3 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.1.0-SNAPSHOT")

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.0.0")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.7.2")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.2")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4")
25 changes: 12 additions & 13 deletions src/main/scala/feat/NGrams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ object NGrams extends FeatureExtractor with Serializable {
def buildDictionary(table: MLTable, col: Int, numGrams: Int, numFeatures: Int, stopWords: Set[String]): Seq[(String,Int)] = {

//This is the classic Word Count, sorted descending by frequency.
val freqTable = table.flatMap(rowToNgramRows(_, col, numGrams, stopWords))
.map(row => MLRow(row(0),1))
.reduceBy(Seq(0), (x,y) => MLRow(x(0),x(1).toNumber+y(1).toNumber)) //Run a reduceByKey on the first column.
val freqTable = table.flatMap(rowToNgramTuples(_, col, numGrams, stopWords))
.map(row => MLTuple.create(row(0),1))
.reduceBy(Seq(0), (x,y) => MLTuple.create(x(0),x(1).toNumber+y(1).toNumber)) //Run a reduceByKey on the first column.
.sortBy(Seq(1), false) //Run a sortby on the second column.
.take(numFeatures) //Return the top rows.
.map(r => (r(0).toString, r(1).toNumber.toInt))
Expand All @@ -43,13 +43,13 @@ object NGrams extends FeatureExtractor with Serializable {
}

//Responsible for tokenizing data for dictionary calculation.
def rowToNgramRows(row: MLRow, col: Int, n: Int, stopWords: Set[String]): Seq[MLRow] = {
def rowToNgramTuples(row: MLTuple, col: Int, n: Int, stopWords: Set[String]): Seq[MLTuple] = {
val grams = ngrams(row(col).toString, n, stopWords)
grams.map(s => MLRow(MLValue(s))).toSeq
grams.map(s => MLTuple.create(MLValue(s))).toSeq
}

//The main map function - given a row and a dictionary, return a new row which is an n-gram feature vector.
def rowToFilteredNgrams(row: MLRow, col: Int, dict: Seq[(String,Int)], n: Int, stopWords: Set[String]): MLRow = {
def rowToFilteredNgrams(row: MLRow, col: Int, dict: Seq[(String,Int)], n: Int, stopWords: Set[String]): MLTuple = {

//Pull out the ngrams for a specific string.
val x = ngrams(row(col).toString, n, stopWords)
Expand All @@ -59,8 +59,7 @@ object NGrams extends FeatureExtractor with Serializable {
val coll = dict.zipWithIndex.filter{ case((a,b), c) => x.contains(a)}.map {case ((a,b), c) => (c, MLValue(1.0))}

//Return a new sparse row based on this feature vector.
val sparsemlval = SparseMLRow.fromNumericSeq(row.drop(Seq(col)))
SparseMLRow.fromNumericSeq(row.drop(Seq(col))) ++ SparseMLRow.fromSparseCollection(coll, dict.length, MLValue(0.0))
SparseMLTuple.fromNumericSeq(row.drop(col)) ++ SparseMLTuple.fromSparseCollection(coll, dict.length, MLValue(0.0))
}

/**
Expand All @@ -71,7 +70,7 @@ object NGrams extends FeatureExtractor with Serializable {
* @param k Top-k features to keep.
* @return Table of featurized data.
*/
def extractNGrams(in: MLTable, c: Int=0, n: Int=1, k: Int=20000, stopWords: Set[String] = stopWords): (MLTable, MLRow => MLRow) = {
def extractNGrams(in: MLTable, c: Int=0, n: Int=1, k: Int=20000, stopWords: Set[String] = stopWords): (MLTable, MLRow => MLTuple) = {

//Build dictionary.
val dict = buildDictionary(in, c, n, k, stopWords)
Expand All @@ -83,7 +82,7 @@ object NGrams extends FeatureExtractor with Serializable {
val existingColNames = in.drop(Seq(c)).schema.columns.map(_.name.getOrElse(""))

//Set the column names of the table to match the NGram features.
out.setColNames(existingColNames ++ dict.map(_._1))
//FIXME out.setColNames(existingColNames ++ dict.map(_._1))
(out, featurizer)
}

Expand All @@ -93,12 +92,12 @@ object NGrams extends FeatureExtractor with Serializable {
* @return TF-IDF features for an MLTable.
*/
def tfIdf(in: MLTable, c: Int=0): MLTable = {
val df = in.reduce(_ plus _)
val df = in.reduce(_.toVector plus _.toVector)
val df2 = df.toDoubleArray
df2(c) = 1.0
val df3 = MLVector(df2)
val newtab = in.map(r => r over df3)
newtab.setSchema(in.schema)
//FIXME newtab.setSchema(in.schema)
newtab
}

Expand Down Expand Up @@ -155,4 +154,4 @@ object NGrams extends FeatureExtractor with Serializable {
"while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will",
"with", "within", "without", "would", "yet", "you", "your", "yours", "yourself",
"yourselves")
}
}
100 changes: 100 additions & 0 deletions src/main/scala/feat/RandomizedTruncatedSVD.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mli.feat
import mli.interface.MLTypes._
import mli.interface._
import mli.interface.impl.SparkMLTable
import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas, Singular}
import org.apache.spark.{AccumulableParam, AccumulatorParam}
import collection.JavaConversions._

// represents the rank-one matrix left * right
private class RankOneUpdate(val left: DoubleMatrix, val right: DoubleMatrix) {
require(left.isColumnVector)
require(right.isRowVector)
}

private class RankOneMatrixAccumulator extends AccumulableParam[DoubleMatrix, RankOneUpdate] {
override def addAccumulator(left: DoubleMatrix, right: RankOneUpdate): DoubleMatrix = {
left.rankOneUpdate(right.left, right.right)
left
}

override def addInPlace(left: DoubleMatrix, right: DoubleMatrix): DoubleMatrix = {
left.addi(right)
left
}

override def zero(a: DoubleMatrix): DoubleMatrix = {
DoubleMatrix.zeros(a.rows, a.columns)
}
}

object RandomizedSVD {
def compute(mat: MLTable, k: Index, q: Index) = {
val A = mat
val m = A.numRows.asInstanceOf[Index]
val n = A.numCols.asInstanceOf[Index]
// add a few dimensions to improve accuracy of the final rank-k approximation
val ell = math.min(k + 5, math.min(m, n))
val omega = DoubleMatrix.randn(n, ell)

// find initial rank-ell subspace Q_0
val Y0 = mmul(A, omega)
val Q0 = range(Y0)

// apply power iteration to get final Q
val Qq = (1 to q).foldRight(Q0)((idx, Q) => {
val Ytwiddle = mmulTransposed(A, Q)
val Qtwiddle = range(Ytwiddle)
val Y = mmul(A, Qtwiddle)
range(Y)
})

// compute SVD of projection of A onto Q_q
val B = mmulTransposed(A, Qq).transpose
val svd = Singular.sparseSVD(B)
val U0 = Qq mmul svd(0)
val S0 = svd(1)
val V0 = svd(2)
val U = U0.getRange(0, m, 0, k)
val S = S0.getRange(0, k, 0, 1)
val V = V0.getRange(0, n, 0, k)

// A \approx U \diag(S) V'
(U, S, V) // TODO: use object representing the decomposition
}

// orthogonal basis for range of A
private def range(A: DoubleMatrix): DoubleMatrix = {
val result = A.dup();
val tau = new DoubleMatrix(math.min(A.rows, A.columns));
SimpleBlas.geqrf(result, tau);
SimpleBlas.orgqr(A.columns, A.columns, result, tau)
result
}

private[mli] def mmul(left: MLTable, right: DoubleMatrix): DoubleMatrix = {
assert(left.numCols == right.rows)
val resultTable = left.map(row => {
assert(row.length == left.numCols)
new MLVector(row.toVector.toMatrix.mat mmul right)
})
val result = resultTable.toLocalMatrix.mat
assert(result.rows == left.numRows)
assert(result.columns == right.columns)
result
}

private[mli] def mmulTransposed(left: MLTable, right: DoubleMatrix): DoubleMatrix = {
assert(left.numRows == right.rows)
val resultRows = left.numCols
val resultCols = right.columns
val ctx = left.context
val accum = ctx.accumulable(DoubleMatrix.zeros(resultRows, resultCols))(new RankOneMatrixAccumulator)
left.foreach(leftRow => {
val leftPart = leftRow.toVector.toMatrix.mat.transpose
val rightPart = right.getRow(leftRow.id.toInt)
accum += new RankOneUpdate(leftPart, rightPart)
})
accum.value
}
}
17 changes: 9 additions & 8 deletions src/main/scala/feat/Scale.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package mli.feat

import scala.math.sqrt

import mli.interface.{MLRow, MLVector, MLTable}
import mli.interface.{MLTuple, MLVector, MLTable}

/**
* The Scale feature extractor rescales features according to their standard deviation. This will preserve
Expand All @@ -17,26 +17,27 @@ object Scale extends FeatureExtractor with Serializable {
* @param label Index of the label (will not be scaled).
* @return Scaled MLTable.
*/
def scale(x: MLTable, label: Int = 0, featurizer: MLRow => MLRow): (MLTable, MLRow => MLRow) = {
def scale(x: MLTable, label: Int = 0, featurizer: MLTuple => MLTuple): (MLTable, MLTuple => MLTuple) = {
//First we compute the variance - note this could be done in one pass over the data.
val ssq = x.map(r => r times r).reduce(_ plus _) over x.numRows
val sum = x.reduce(_ plus _) over x.numRows
// FIXME: many toVector calls
val ssq = x.map(r => r times r).reduce(_.toVector plus _.toVector).toVector over x.numRows
val sum = x.reduce(_.toVector plus _.toVector).toVector over x.numRows
val sd = ssq minus (sum times sum)

//Now we divide by the standard deviation vector. We do not scale values with no standard deviation.
val n = sd.toArray.map(sqrt).map(i => if(i == 0.0) 1.0 else i)
val n = sd.toDoubleArray.map(sqrt).map(i => if(i == 0.0) 1.0 else i)

//We also do not scale the training label.
n(label) = 1.0
val sdn = MLVector(n)

val newtab = x.map(_ over sdn)
newtab.setSchema(x.schema)
def newfeaturizer(r: MLRow) = featurizer(r) over sdn
// FIXME newtab.setSchema(x.schema)
def newfeaturizer(r: MLTuple) = featurizer(r).toVector over sdn

(newtab, newfeaturizer)
}

def extract(in: MLTable): MLTable = scale(in, featurizer = r => r)._1

}
}
30 changes: 14 additions & 16 deletions src/main/scala/impl/JblasDenseMLMatrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@ import mli.interface._
import mli.interface.MLTypes._
import org.jblas.{DoubleMatrix,Solve}

class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix {
class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix with Serializable {

def toMLRows: Iterator[MLRow] = {
(0 until numRows).map(r => DenseMLRow.fromSeq(mat.getRow(r).data.map(d => MLValue(d)).toSeq)).toIterator
}

def toMLVectors: Iterator[MLVector] = {
(0 until numRows).map(r => MLVector(mat.getRow(r).data)).toIterator
def toVectors: Iterator[MLVector] = {
(0 until numRows).map(r => new MLVector(mat.getRow(r).data)).toIterator
}

def numRows: Index = mat.rows
Expand All @@ -20,6 +16,10 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix {
def apply(r: Index, c: Index): Scalar = mat.get(r, c)
def apply(rows: Slice, cols: Slice): LocalMatrix = new DenseMLMatrix(mat.get(cols.toArray, rows.toArray))

def getRows(rows: TraversableOnce[Index]): LocalMatrix = {
new DenseMLMatrix(mat.getRows(rows.toArray))
}

def update(r: Index, c: Index, v: Scalar) = mat.put(r, c, v)
def update(rows: Slice, cols: Slice, v: LocalMatrix) = {
//Jblas Matrices are row-major, so this shoudl be faster in the congiguous case.
Expand All @@ -42,6 +42,7 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix {

def solve(y: LocalMatrix) = new DenseMLMatrix(Solve.solve(mat, y.mat))
def times(y: LocalMatrix) = new DenseMLMatrix(mat.mmul(y.mat))
def times(y: MLTuple): LocalMatrix = new DenseMLMatrix(mat.mmul(new DoubleMatrix(y.toDoubleArray)))
def transpose = new DenseMLMatrix(mat.transpose())

//TODO need to decide on types for these.
Expand All @@ -52,24 +53,21 @@ class DenseMLMatrix(var mat: DoubleMatrix) extends LocalMatrix {
//Composition
def on(y: DenseMLMatrix): LocalMatrix = new DenseMLMatrix(DoubleMatrix.concatVertically(mat, y.mat))
def then(y: LocalMatrix): LocalMatrix = new DenseMLMatrix(DoubleMatrix.concatHorizontally(mat, y.mat))

override def toString = mat.toString
}

/**
* Contains facilities for creating a Dense Matrix from rows (either a Seq of MLRow or MLVectors).
* Contains facilities for creating a Dense Matrix from rows (either a Seq of MLRow or MLTuples).
*/
object DenseMLMatrix {
def apply(rows: DenseMLRow*) = fromSeq(rows.toSeq)
def fromSeq(rows: Seq[DenseMLRow]) = {
def apply(rows: Seq[MLVector]) = {
val dat = rows.map(_.toDoubleArray).toArray
new DenseMLMatrix(new DoubleMatrix(dat))
}

def fromVecs(rows: Seq[MLVector]) = {
val dat = rows.map(_.toArray).toArray
new DenseMLMatrix(new DoubleMatrix(dat))
}

def zeros(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.zeros(rows, cols))
def eye(n: Index) = new DenseMLMatrix(DoubleMatrix.eye(n))

def rand(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.rand(rows, cols))
def randn(rows: Index, cols: Index) = new DenseMLMatrix(DoubleMatrix.randn(rows, cols))
}
10 changes: 5 additions & 5 deletions src/main/scala/impl/MLNumericTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti
def schema(): Schema = {
tableSchema match {
case Some(s) => s
case None => Schema(rdd.first)
case None => rdd.first.schema
}
}

Expand Down Expand Up @@ -124,9 +124,9 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti
// val res = f(mat)
// res.toMLRows
// }
def cachedMatrixMap(m: LocalMatrix): Iterator[MLVector] = f(m).toMLVectors
def cachedMatrixMap(m: LocalMatrix): Iterator[MLVector] = f(m).toVectors

def createMatrix(i: Iterator[MLVector]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix.fromVecs(i.toSeq))
def createMatrix(i: Iterator[MLVector]): Iterator[LocalMatrix] = Iterator(DenseMLMatrix(i.toSeq))


cachedMat match {
Expand Down Expand Up @@ -192,7 +192,7 @@ class MLNumericTable(@transient protected var rdd: RDD[MLVector], inSchema: Opti

def print(count: Int) = take(count).foreach(row => println(row.mkString("\t")))

def toMLTable = SparkMLTable.fromMLRowRdd(rdd.map(MLRow(_)))
def toMLTable = SparkMLTable(rdd.map(_.asInstanceOf[MLTuple]))
}

object MLNumericTable {
Expand All @@ -209,4 +209,4 @@ object MLNumericTable {
new MLNumericTable(rdd)
}

}
}
Loading