Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import scalariform.formatter.preferences._
import com.typesafe.sbt.SbtScalariform
import com.typesafe.sbt.SbtScalariform.ScalariformKeys

name := "template-scala-parallel-universal-recommendation"

version := "0.4.2"
version := "0.2.3"

organization := "io.prediction"

val mahoutVersion = "0.13.0-SNAPSHOT"

val pioVersion = "0.9.7-aml"
val mahoutVersion = "0.11.1"

libraryDependencies ++= Seq(
"io.prediction" %% "core" % pioVersion % "provided",
"org.apache.spark" %% "spark-core" % "1.4.0" % "provided",
"org.apache.spark" %% "spark-mllib" % "1.4.0" % "provided",
"org.apache.predictionio" %% "apache-predictionio-core" % pioVersion.value % "provided",
"org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
"org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided",
"org.xerial.snappy" % "snappy-java" % "1.1.1.7",
// Mahout's Spark libs
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
Expand All @@ -28,22 +22,14 @@ libraryDependencies ++= Seq(
// other external libs
"com.thoughtworks.xstream" % "xstream" % "1.4.4"
exclude("xmlpull", "xmlpull"),
"org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.2"
"org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.0.Beta4"
exclude("org.apache.spark", "spark-catalyst_2.10")
exclude("org.apache.spark", "spark-sql_2.10"),
"org.json4s" %% "json4s-native" % "3.2.10")
.map(_.exclude("org.apache.lucene","lucene-core")).map(_.exclude("org.apache.lucene","lucene-analyzers-common"))
"org.json4s" %% "json4s-native" % "3.2.11"
)

resolvers += Resolver.mavenLocal

SbtScalariform.scalariformSettings

ScalariformKeys.preferences := ScalariformKeys.preferences.value
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentClassDeclaration, true)
.setPreference(DanglingCloseParenthesis, Prevent)
.setPreference(MultilineScaladocCommentsStartOnFirstLine, true)

assemblyMergeStrategy in assembly := {
case "plugin.properties" => MergeStrategy.discard
case PathList(ps @ _*) if ps.last endsWith "package-info.class" =>
Expand Down
106 changes: 50 additions & 56 deletions src/main/scala/DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,101 +15,95 @@
* limitations under the License.
*/

package org.template

import _root_.io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, PDataSource, Params }
import _root_.io.prediction.data.storage.PropertyMap
import _root_.io.prediction.data.store.PEventStore
import grizzled.slf4j.Logger
import io.prediction.core.{ EventWindow, SelfCleaningDataSource }
package com.uniflash

import _root_.org.apache.predictionio.controller.PDataSource
import _root_.org.apache.predictionio.controller.EmptyEvaluationInfo
import _root_.org.apache.predictionio.controller.EmptyActualResult
import _root_.org.apache.predictionio.controller.Params
import _root_.org.apache.predictionio.data.storage.{PropertyMap, Event}
import _root_.org.apache.predictionio.data.store.PEventStore
import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.template.conversions.{ ActionID, ItemID }
import org.template.conversions._
import grizzled.slf4j.Logger

/** Taken from engine.json these are passed in to the DataSource constructor
*
* @param appName registered name for the app
* @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These
* will be used to create the primary correlator and cross-cooccurrence secondary correlators.
*/
*
* @param appName registered name for the app
* @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These
* will be used to create the primary correlator and cross-cooccurrence secondary correlators.
*/
case class DataSourceParams(
appName: String,
eventNames: List[String], // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames
eventWindow: Option[EventWindow]) extends Params
appName: String,
eventNames: List[String]) // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames
extends Params

/** Read specified events from the PEventStore and creates RDDs for each event. A list of pairs (eventName, eventRDD)
* are sent to the Preparator for further processing.
* @param dsp parameters taken from engine.json
*/
* are sent to the Preparator for further processing.
* @param dsp parameters taken from engine.json
*/
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult]
with SelfCleaningDataSource {

@transient override lazy implicit val logger: Logger = Logger[this.type]
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {

override def appName: String = dsp.appName
override def eventWindow: Option[EventWindow] = dsp.eventWindow

drawInfo("Init DataSource", Seq(
("══════════════════════════════", "════════════════════════════"),
("App name", appName),
("Event window", eventWindow),
("Event names", dsp.eventNames)))
@transient lazy val logger = Logger[this.type]

/** Reads events from PEventStore and create and RDD for each */
override def readTraining(sc: SparkContext): TrainingData = {
override
def readTraining(sc: SparkContext): TrainingData = {

val eventNames = dsp.eventNames
cleanPersistedPEvents(sc)

val eventsRDD = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(eventNames),
targetEntityType = Some(Some("item")))(sc).repartition(sc.defaultParallelism)
targetEntityType = Some(Some("item")))(sc)

// now separate the events by event name
val actionRDDs: List[(ActionID, RDD[(UserID, ItemID)])] = eventNames.map { eventName =>
val actionRDDs = eventNames.map { eventName =>
val actionRDD = eventsRDD.filter { event =>
require(eventNames.contains(event.event), s"Unexpected event $event is read.") // is this really needed?

require(eventNames.contains(event.event), s"Unexpected event ${event} is read.") // is this really needed?
require(event.entityId.nonEmpty && event.targetEntityId.get.nonEmpty, "Empty user or item ID")

eventName.equals(event.event)

}.map { event =>
(event.entityId, event.targetEntityId.get)
}
}.cache()

(eventName, actionRDD)
} filterNot { case (_, actionRDD) => actionRDD.isEmpty() }

logger.debug(s"Received actions for events ${actionRDDs.map(_._1)}")
}

// aggregating all $set/$unsets for metadata fields, which are attached to items
val fieldsRDD: RDD[(ItemID, PropertyMap)] = PEventStore.aggregateProperties(
appName = dsp.appName,
entityType = "item")(sc)
// logger.debug(s"FieldsRDD\n${fieldsRDD.take(25).mkString("\n")}")
val fieldsRDD = PEventStore.aggregateProperties(
appName= dsp.appName,
entityType= "item")(sc)

// Have a list of (actionName, RDD), for each action
// todo: some day allow data to be content, which requires rethinking how to use EventStore
TrainingData(actionRDDs, fieldsRDD)
new TrainingData(actionRDDs, fieldsRDD)
}
}

/** Low level RDD based representation of the data ready for the Preparator
*
* @param actions List of Tuples (actionName, actionRDD)qw
* @param fieldsRDD RDD of item keyed PropertyMap for item metadata
*/
case class TrainingData(
actions: Seq[(ActionID, RDD[(UserID, ItemID)])],
fieldsRDD: RDD[(ItemID, PropertyMap)]) extends Serializable {

override def toString: String = {
*
* @param actions List of Tuples (actionName, actionRDD)qw
* @param fieldsRDD RDD of item keyed PropertyMap for item metadata
*/
class TrainingData(
val actions: List[(String, RDD[(String, String)])],
val fieldsRDD: RDD[(String, PropertyMap)])
extends Serializable {

override def toString = {
val a = actions.map { t =>
s"${t._1} actions: [count:${t._2.count()}] + sample:${t._2.take(2).toList} "
}.toString()
val f = s"Item metadata: [count:${fieldsRDD.count}] + sample:${fieldsRDD.take(2).toList} "
a + f
}

}
}
82 changes: 38 additions & 44 deletions src/main/scala/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,62 @@
* limitations under the License.
*/

package org.template
package com.uniflash

import grizzled.slf4j.Logger
import io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, Engine, EngineFactory }
import org.template.conversions._
import java.util.Date

import org.apache.predictionio.controller.{EngineFactory, Engine}

/** This file contains case classes that are used with reflection to specify how query and config
* JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be
* formed. The same for param case classes.
*/
* JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be
* formed. The same for param case classes.
*/

/** The Query spec with optional values. The only hard rule is that there must be either a user or
* an item id. All other values are optional.
*/
* an item id. All other values are optional. */
case class Query(
user: Option[String] = None, // must be a user or item id
userBias: Option[Float] = None, // default: whatever is in algorithm params or 1
item: Option[String] = None, // must be a user or item id
itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1
fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None
currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's
// expireDateName value and availableDateName value, all are ISO 8601 dates
dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field
blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None
returnSelf: Option[Boolean] = None, // means for an item query should the item itself be returned, defaults
// to what is in the algorithm params or false
num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20
eventNames: Option[List[String]], // names used to ID all user actions
withRanks: Option[Boolean] = None) // Add to ItemScore rank fields values, default fasle
extends Serializable
user: Option[String] = None, // must be a user or item id
userBias: Option[Float] = None, // default: whatever is in algorithm params or 1
item: Option[String] = None, // must be a user or item id
itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1
fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None
currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's
// expireDateName value and availableDateName value, all are ISO 8601 dates
dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field
blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None
returnSelf: Option[Boolean] = None,// means for an item query should the item itself be returned, defaults
// to what is in the algorithm params or false
num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20
eventNames: Option[List[String]]) // names used to ID all user actions
extends Serializable

/** Used to specify how Fields are represented in engine.json */
case class Field( // no optional values for fields, whne specified
name: String, // name of metadata field
values: Seq[String], // fields can have multiple values like tags of a single value as when using hierarchical
// taxonomies
bias: Float) // any positive value is a boost, negative is a filter
extends Serializable
name: String, // name of metadata field
values: Array[String], // fields can have multiple values like tags of a single value as when using hierarchical
// taxonomies
bias: Float)// any positive value is a boost, negative is a filter
extends Serializable

/** Used to specify the date range for a query */
case class DateRange(
name: String, // name of item property for the date comparison
before: Option[String], // empty strings means no filter
after: Option[String]) // both empty should be ignored
extends Serializable
name: String, // name of item property for the date comparison
before: Option[String], // empty strings means no filter
after: Option[String]) // both empty should be ignored
extends Serializable

/** results of a URAlgoritm.predict */
/** results of a MMRAlgoritm.predict */
case class PredictedResult(
itemScores: Array[ItemScore])
extends Serializable
itemScores: Array[ItemScore])
extends Serializable

case class ItemScore(
item: ItemID, // item id
score: Double, // used to rank, original score returned from teh search engine
ranks: Option[Map[String, Double]] = None) extends Serializable
item: String, // item id
score: Double )// used to rank, original score returned from teh search engine
extends Serializable

object RecommendationEngine extends EngineFactory {

@transient lazy implicit val logger: Logger = Logger[this.type]
drawActionML

def apply(): Engine[TrainingData, EmptyEvaluationInfo, PreparedData, Query, PredictedResult, EmptyActualResult] = {
def apply() = {
new Engine(
classOf[DataSource],
classOf[Preparator],
Expand Down
Loading