diff --git a/build.sbt b/build.sbt
index 29ea0c3..20a1a08 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,21 +1,15 @@
-import scalariform.formatter.preferences._
-import com.typesafe.sbt.SbtScalariform
-import com.typesafe.sbt.SbtScalariform.ScalariformKeys
-
name := "template-scala-parallel-universal-recommendation"
-version := "0.4.2"
+version := "0.2.3"
organization := "io.prediction"
-val mahoutVersion = "0.13.0-SNAPSHOT"
-
-val pioVersion = "0.9.7-aml"
+val mahoutVersion = "0.11.1"
libraryDependencies ++= Seq(
- "io.prediction" %% "core" % pioVersion % "provided",
- "org.apache.spark" %% "spark-core" % "1.4.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.4.0" % "provided",
+ "org.apache.predictionio" %% "apache-predictionio-core" % pioVersion.value % "provided",
+ "org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided",
"org.xerial.snappy" % "snappy-java" % "1.1.1.7",
// Mahout's Spark libs
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
@@ -28,22 +22,14 @@ libraryDependencies ++= Seq(
// other external libs
"com.thoughtworks.xstream" % "xstream" % "1.4.4"
exclude("xmlpull", "xmlpull"),
- "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.2"
+ "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.0.Beta4"
exclude("org.apache.spark", "spark-catalyst_2.10")
exclude("org.apache.spark", "spark-sql_2.10"),
- "org.json4s" %% "json4s-native" % "3.2.10")
- .map(_.exclude("org.apache.lucene","lucene-core")).map(_.exclude("org.apache.lucene","lucene-analyzers-common"))
+ "org.json4s" %% "json4s-native" % "3.2.11"
+)
resolvers += Resolver.mavenLocal
-SbtScalariform.scalariformSettings
-
-ScalariformKeys.preferences := ScalariformKeys.preferences.value
- .setPreference(AlignSingleLineCaseStatements, true)
- .setPreference(DoubleIndentClassDeclaration, true)
- .setPreference(DanglingCloseParenthesis, Prevent)
- .setPreference(MultilineScaladocCommentsStartOnFirstLine, true)
-
assemblyMergeStrategy in assembly := {
case "plugin.properties" => MergeStrategy.discard
case PathList(ps @ _*) if ps.last endsWith "package-info.class" =>
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 0c3b979..40685ae 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -15,96 +15,90 @@
* limitations under the License.
*/
-package org.template
-
-import _root_.io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, PDataSource, Params }
-import _root_.io.prediction.data.storage.PropertyMap
-import _root_.io.prediction.data.store.PEventStore
-import grizzled.slf4j.Logger
-import io.prediction.core.{ EventWindow, SelfCleaningDataSource }
+package com.uniflash
+
+import _root_.org.apache.predictionio.controller.PDataSource
+import _root_.org.apache.predictionio.controller.EmptyEvaluationInfo
+import _root_.org.apache.predictionio.controller.EmptyActualResult
+import _root_.org.apache.predictionio.controller.Params
+import _root_.org.apache.predictionio.data.storage.{PropertyMap, Event}
+import _root_.org.apache.predictionio.data.store.PEventStore
+import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.template.conversions.{ ActionID, ItemID }
-import org.template.conversions._
+import grizzled.slf4j.Logger
/** Taken from engine.json these are passed in to the DataSource constructor
- *
- * @param appName registered name for the app
- * @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These
- * will be used to create the primary correlator and cross-cooccurrence secondary correlators.
- */
+ *
+ * @param appName registered name for the app
+ * @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These
+ * will be used to create the primary correlator and cross-cooccurrence secondary correlators.
+ */
case class DataSourceParams(
- appName: String,
- eventNames: List[String], // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames
- eventWindow: Option[EventWindow]) extends Params
+ appName: String,
+ eventNames: List[String]) // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames
+ extends Params
/** Read specified events from the PEventStore and creates RDDs for each event. A list of pairs (eventName, eventRDD)
- * are sent to the Preparator for further processing.
- * @param dsp parameters taken from engine.json
- */
+ * are sent to the Preparator for further processing.
+ * @param dsp parameters taken from engine.json
+ */
class DataSource(val dsp: DataSourceParams)
- extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult]
- with SelfCleaningDataSource {
-
- @transient override lazy implicit val logger: Logger = Logger[this.type]
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, EmptyActualResult] {
- override def appName: String = dsp.appName
- override def eventWindow: Option[EventWindow] = dsp.eventWindow
-
- drawInfo("Init DataSource", Seq(
- ("══════════════════════════════", "════════════════════════════"),
- ("App name", appName),
- ("Event window", eventWindow),
- ("Event names", dsp.eventNames)))
+ @transient lazy val logger = Logger[this.type]
/** Reads events from PEventStore and create and RDD for each */
- override def readTraining(sc: SparkContext): TrainingData = {
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
val eventNames = dsp.eventNames
- cleanPersistedPEvents(sc)
+
val eventsRDD = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(eventNames),
- targetEntityType = Some(Some("item")))(sc).repartition(sc.defaultParallelism)
+ targetEntityType = Some(Some("item")))(sc)
// now separate the events by event name
- val actionRDDs: List[(ActionID, RDD[(UserID, ItemID)])] = eventNames.map { eventName =>
+ val actionRDDs = eventNames.map { eventName =>
val actionRDD = eventsRDD.filter { event =>
- require(eventNames.contains(event.event), s"Unexpected event $event is read.") // is this really needed?
+
+ require(eventNames.contains(event.event), s"Unexpected event ${event} is read.") // is this really needed?
require(event.entityId.nonEmpty && event.targetEntityId.get.nonEmpty, "Empty user or item ID")
+
eventName.equals(event.event)
+
}.map { event =>
(event.entityId, event.targetEntityId.get)
- }
+ }.cache()
(eventName, actionRDD)
- } filterNot { case (_, actionRDD) => actionRDD.isEmpty() }
-
- logger.debug(s"Received actions for events ${actionRDDs.map(_._1)}")
+ }
// aggregating all $set/$unsets for metadata fields, which are attached to items
- val fieldsRDD: RDD[(ItemID, PropertyMap)] = PEventStore.aggregateProperties(
- appName = dsp.appName,
- entityType = "item")(sc)
- // logger.debug(s"FieldsRDD\n${fieldsRDD.take(25).mkString("\n")}")
+ val fieldsRDD = PEventStore.aggregateProperties(
+ appName= dsp.appName,
+ entityType= "item")(sc)
// Have a list of (actionName, RDD), for each action
// todo: some day allow data to be content, which requires rethinking how to use EventStore
- TrainingData(actionRDDs, fieldsRDD)
+ new TrainingData(actionRDDs, fieldsRDD)
}
}
/** Low level RDD based representation of the data ready for the Preparator
- *
- * @param actions List of Tuples (actionName, actionRDD)qw
- * @param fieldsRDD RDD of item keyed PropertyMap for item metadata
- */
-case class TrainingData(
- actions: Seq[(ActionID, RDD[(UserID, ItemID)])],
- fieldsRDD: RDD[(ItemID, PropertyMap)]) extends Serializable {
-
- override def toString: String = {
+ *
+ * @param actions List of Tuples (actionName, actionRDD)qw
+ * @param fieldsRDD RDD of item keyed PropertyMap for item metadata
+ */
+class TrainingData(
+ val actions: List[(String, RDD[(String, String)])],
+ val fieldsRDD: RDD[(String, PropertyMap)])
+ extends Serializable {
+
+ override def toString = {
val a = actions.map { t =>
s"${t._1} actions: [count:${t._2.count()}] + sample:${t._2.take(2).toList} "
}.toString()
@@ -112,4 +106,4 @@ case class TrainingData(
a + f
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index aee709b..8bc065b 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -15,68 +15,62 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
-import grizzled.slf4j.Logger
-import io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, Engine, EngineFactory }
-import org.template.conversions._
+import java.util.Date
+
+import org.apache.predictionio.controller.{EngineFactory, Engine}
/** This file contains case classes that are used with reflection to specify how query and config
- * JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be
- * formed. The same for param case classes.
- */
+ * JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be
+ * formed. The same for param case classes.
+ */
/** The Query spec with optional values. The only hard rule is that there must be either a user or
- * an item id. All other values are optional.
- */
+ * an item id. All other values are optional. */
case class Query(
- user: Option[String] = None, // must be a user or item id
- userBias: Option[Float] = None, // default: whatever is in algorithm params or 1
- item: Option[String] = None, // must be a user or item id
- itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1
- fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None
- currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's
- // expireDateName value and availableDateName value, all are ISO 8601 dates
- dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field
- blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None
- returnSelf: Option[Boolean] = None, // means for an item query should the item itself be returned, defaults
- // to what is in the algorithm params or false
- num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20
- eventNames: Option[List[String]], // names used to ID all user actions
- withRanks: Option[Boolean] = None) // Add to ItemScore rank fields values, default fasle
- extends Serializable
+ user: Option[String] = None, // must be a user or item id
+ userBias: Option[Float] = None, // default: whatever is in algorithm params or 1
+ item: Option[String] = None, // must be a user or item id
+ itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1
+ fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None
+ currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's
+ // expireDateName value and availableDateName value, all are ISO 8601 dates
+ dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field
+ blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None
+ returnSelf: Option[Boolean] = None,// means for an item query should the item itself be returned, defaults
+ // to what is in the algorithm params or false
+ num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20
+ eventNames: Option[List[String]]) // names used to ID all user actions
+ extends Serializable
/** Used to specify how Fields are represented in engine.json */
case class Field( // no optional values for fields, whne specified
- name: String, // name of metadata field
- values: Seq[String], // fields can have multiple values like tags of a single value as when using hierarchical
- // taxonomies
- bias: Float) // any positive value is a boost, negative is a filter
- extends Serializable
+ name: String, // name of metadata field
+ values: Array[String], // fields can have multiple values like tags of a single value as when using hierarchical
+ // taxonomies
+ bias: Float)// any positive value is a boost, negative is a filter
+ extends Serializable
/** Used to specify the date range for a query */
case class DateRange(
- name: String, // name of item property for the date comparison
- before: Option[String], // empty strings means no filter
- after: Option[String]) // both empty should be ignored
- extends Serializable
+ name: String, // name of item property for the date comparison
+ before: Option[String], // empty strings means no filter
+ after: Option[String]) // both empty should be ignored
+ extends Serializable
-/** results of a URAlgoritm.predict */
+/** results of a MMRAlgoritm.predict */
case class PredictedResult(
- itemScores: Array[ItemScore])
- extends Serializable
+ itemScores: Array[ItemScore])
+ extends Serializable
case class ItemScore(
- item: ItemID, // item id
- score: Double, // used to rank, original score returned from teh search engine
- ranks: Option[Map[String, Double]] = None) extends Serializable
+ item: String, // item id
+ score: Double )// used to rank, original score returned from teh search engine
+ extends Serializable
object RecommendationEngine extends EngineFactory {
-
- @transient lazy implicit val logger: Logger = Logger[this.type]
- drawActionML
-
- def apply(): Engine[TrainingData, EmptyEvaluationInfo, PreparedData, Query, PredictedResult, EmptyActualResult] = {
+ def apply() = {
new Engine(
classOf[DataSource],
classOf[Preparator],
diff --git a/src/main/scala/EsClient.scala b/src/main/scala/EsClient.scala
index 7ac0e33..e06a028 100644
--- a/src/main/scala/EsClient.scala
+++ b/src/main/scala/EsClient.scala
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
import java.util
import grizzled.slf4j.Logger
-import io.prediction.data.storage._
+import org.apache.predictionio.data.storage.{Storage, StorageClientConfig, elasticsearch}
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
@@ -31,51 +31,48 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.transport.TransportClient
-import org.elasticsearch.common.settings.{ ImmutableSettings, Settings }
+import org.elasticsearch.common.settings.{Settings, ImmutableSettings}
import org.joda.time.DateTime
import org.json4s.jackson.JsonMethods._
import org.elasticsearch.spark._
import org.elasticsearch.node.NodeBuilder._
-import org.elasticsearch.search.SearchHits
-import org.json4s.JValue
-import org.template.conversions.{ ItemID, ItemProps }
import scala.collection.immutable
import scala.collection.parallel.mutable
/** Elasticsearch notes:
- * 1) every query clause wil laffect scores unless it has a constant_score and boost: 0
- * 2) the Spark index writer is fast but must assemble all data for the index before the write occurs
- * 3) many operations must be followed by a refresh before the action takes effect--sortof like a transaction commit
- * 4) to use like a DB you must specify that the index of fields are `not_analyzed` so they won't be lowercased,
- * stemmed, tokenized, etc. Then the values are literal and must match exactly what is in the query (no analyzer)
- */
+ * 1) every query clause wil laffect scores unless it has a constant_score and boost: 0
+ * 2) the Spark index writer is fast but must assemble all data for the index before the write occurs
+ * 3) many operations must be followed by a refresh before the action takes effect--sortof like a transaction commit
+ * 4) to use like a DB you must specify that the index of fields are `not_analyzed` so they won't be lowercased,
+ * stemmed, tokenized, etc. Then the values are literal and must match exactly what is in the query (no analyzer)
+ */
/** Defines methods to use on Elasticsearch. */
-object EsClient {
- @transient lazy val logger: Logger = Logger[this.type]
+object esClient {
+ @transient lazy val logger = Logger[this.type]
private lazy val client = if (Storage.getConfig("ELASTICSEARCH").nonEmpty)
- new elasticsearch.StorageClient(Storage.getConfig("ELASTICSEARCH").get).client
- else
- throw new IllegalStateException("No Elasticsearch client configuration detected, check your pio-env.sh for" +
- "proper configuration settings")
+ new elasticsearch.StorageClient(Storage.getConfig("ELASTICSEARCH").get).client
+ else
+ throw new IllegalStateException("No Elasticsearch client configuration detected, check your pio-env.sh for" +
+ "proper configuration settings")
// wrong way that uses only default settings, which will be a localhost ES sever.
//private lazy val client = new elasticsearch.StorageClient(StorageClientConfig()).client
/** Delete all data from an instance but do not commit it. Until the "refresh" is done on the index
- * the changes will not be reflected.
- * @param indexName will delete all types under this index, types are not used by the UR
- * @param refresh
- * @return true if all is well
- */
+ * the changes will not be reflected.
+ * @param indexName will delete all types under this index, types are not used by the UR
+ * @param refresh
+ * @return true if all is well
+ */
def deleteIndex(indexName: String, refresh: Boolean = false): Boolean = {
//val debug = client.connectedNodes()
- if (client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists) {
+ if (client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists()) {
val delete = client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet()
if (!delete.isAcknowledged) {
- logger.info(s"Index $indexName wasn't deleted, but may have quietly failed.")
+ logger.info(s"Index ${indexName} wasn't deleted, but may have quietly failed.")
} else {
// now refresh to get it 'committed'
// todo: should do this after the new index is created so no index downtime
@@ -83,35 +80,35 @@ object EsClient {
}
true
} else {
- logger.warn(s"Elasticsearch index: $indexName wasn't deleted because it didn't exist. This may be an error.")
+ logger.warn(s"Elasticsearch index: ${indexName} wasn't deleted because it didn't exist. This may be an error.")
false
}
}
/** Creates a new empty index in Elasticsearch and initializes mappings for fields that will be used
- * @param indexName elasticsearch name
- * @param indexType names the type of index, usually use the item name
- * @param fieldNames ES field names
- * @param typeMappings indicates which ES fields are to be not_analyzed without norms
- * @param refresh should the index be refreshed so the create is committed
- * @return true if all is well
- */
+ * @param indexName elasticsearch name
+ * @param indexType names the type of index, usually use the item name
+ * @param fieldNames ES field names
+ * @param typeMappings indicates which ES fields are to be not_analyzed without norms
+ * @param refresh should the index be refreshed so the create is committed
+ * @return true if all is well
+ */
def createIndex(
indexName: String,
- indexType: String,
+ indexType: String = "items",
fieldNames: List[String],
- typeMappings: Map[String, String] = Map.empty,
+ typeMappings: Option[Map[String, String]] = None,
refresh: Boolean = false): Boolean = {
- if (!client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists) {
+ if (!client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists()) {
var mappings = """
|{
| "properties": {
""".stripMargin.replace("\n", "")
- def mappingsField(`type`: String) = {
+ def mappingsField(t: String) = {
s"""
| : {
- | "type": "${`type`}",
+ | "type": "${t}",
| "index": "not_analyzed",
| "norms" : {
| "enabled" : false
@@ -133,18 +130,17 @@ object EsClient {
""".stripMargin.replace("\n", "")
fieldNames.foreach { fieldName =>
- if (typeMappings.contains(fieldName))
- mappings += (fieldName + mappingsField(typeMappings(fieldName)))
+ if (typeMappings.nonEmpty && typeMappings.get.contains(fieldName))
+ mappings += (fieldName + mappingsField(typeMappings.get(fieldName)))
else // unspecified fields are treated as not_analyzed strings
mappings += (fieldName + mappingsField("string"))
}
mappings += mappingsTail // any other string is not_analyzed
- // logger.debug(s"ES mapping: $mappings")
- val cir = new CreateIndexRequest(indexName).mapping(indexType, mappings)
+ val cir = new CreateIndexRequest(indexName).mapping("items",mappings)
val create = client.admin().indices().create(cir).actionGet()
if (!create.isAcknowledged) {
- logger.info(s"Index $indexName wasn't created, but may have quietly failed.")
+ logger.info(s"Index ${indexName} wasn't created, but may have quietly failed.")
} else {
// now refresh to get it 'committed'
// todo: should do this after the new index is created so no index downtime
@@ -152,7 +148,7 @@ object EsClient {
}
true
} else {
- logger.warn(s"Elasticsearch index: $indexName wasn't created because it already exists. This may be an error.")
+ logger.warn(s"Elasticsearch index: ${indexName} wasn't created because it already exists. This may be an error.")
false
}
}
@@ -165,25 +161,22 @@ object EsClient {
/** Create new index and hot-swap the new after it's indexed and ready to take over, then delete the old */
def hotSwap(
alias: String,
- typeName: String,
- indexRDD: RDD[Map[String, Any]],
+ typeName: String = "items",
+ indexRDD: RDD[scala.collection.Map[String,Any]],
fieldNames: List[String],
- typeMappings: Map[String, String] = Map.empty): Unit = {
+ typeMappings: Option[Map[String, String]] = None): Unit = {
// get index for alias, change a char, create new one with new id and index it, swap alias and delete old one
val aliasMetadata = client.admin().indices().prepareGetAliases(alias).get().getAliases
val newIndex = alias + "_" + DateTime.now().getMillis.toString
-
- logger.debug(s"Create new index: $newIndex, $typeName, $fieldNames, $typeMappings")
createIndex(newIndex, typeName, fieldNames, typeMappings)
val newIndexURI = "/" + newIndex + "/" + typeName
- // logger.debug(s"Save to ES[$newIndexURI]:\n${indexRDD.take(25).mkString("\n")}")
indexRDD.saveToEs(newIndexURI, Map("es.mapping.id" -> "id"))
//refreshIndex(newIndex)
if (!aliasMetadata.isEmpty
- && aliasMetadata.get(alias) != null
- && aliasMetadata.get(alias).get(0) != null) { // was alias so remove the old one
+ && aliasMetadata.get(alias) != null
+ && aliasMetadata.get(alias).get(0) != null) { // was alias so remove the old one
//append the DateTime to the alias to create an index name
val oldIndex = aliasMetadata.get(alias).get(0).getIndexRouting
client.admin().indices().prepareAliases()
@@ -206,34 +199,40 @@ object EsClient {
}
// clean out any old indexes that were the product of a failed train?
val indices = util.Arrays.asList(client.admin().indices().prepareGetIndex().get().indices()).get(0)
- indices.map { index =>
+ indices.map{ index =>
if (index.contains(alias) && index != newIndex) deleteIndex(index) //clean out any old orphaned indexes
}
}
/** Performs a search using the JSON query String
- *
- * @param query the JSON query string parable by Elasticsearch
- * @param indexName the index to search
- * @return a [PredictedResults] collection
- */
- def search(query: String, indexName: String): Option[SearchHits] = {
+ *
+ * @param query the JSON query string parable by Elasticsearch
+ * @param indexName the index to search
+ * @return a [PredictedResults] collection
+ */
+ def search(query: String, indexName: String): PredictedResult = {
val sr = client.prepareSearch(indexName).setSource(query).get()
+
if (!sr.isTimedOut) {
- Some(sr.getHits)
+ val recs = sr.getHits.getHits.map( hit => new ItemScore(hit.getId, hit.getScore.toDouble) )
+ logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " +
+ s"a possible ${sr.getHits.totalHits()}")
+ new PredictedResult(recs)
} else {
- None
+ logger.info(s"No results for query ${parse(query)}")
+ new PredictedResult(Array.empty[ItemScore])
}
+
}
/** Gets the "source" field of an Elasticsearch document
- *
- * @param indexName index that contains the doc/item
- * @param typeName type name used to construct ES REST URI
- * @param doc for UR the item id
- * @return source [java.util.Map] of field names to any valid field values or null if empty
- */
+ *
+ * @param indexName index that contains the doc/item
+ * @param typeName type name used to construct ES REST URI
+ * @param doc for UR the item id
+ * @return source [java.util.Map] of field names to any valid field values or null if empty
+ */
def getSource(indexName: String, typeName: String, doc: String): util.Map[String, AnyRef] = {
client.prepareGet(indexName, typeName, doc)
.execute()
@@ -257,30 +256,31 @@ object EsClient {
val allIndicesMap = client.admin().indices().getAliases(new GetAliasesRequest(alias)).actionGet().getAliases
if (allIndicesMap.size() == 1) { // must be a 1-1 mapping of alias <-> index
- var indexName: String = ""
- val itr = allIndicesMap.keysIt()
- while (itr.hasNext)
+ var indexName: String = ""
+ var itr = allIndicesMap.keysIt()
+ while ( itr.hasNext )
indexName = itr.next()
Some(indexName) // the one index the alias points to
} else {
// delete all the indices that are pointed to by the alias, they can't be used
logger.warn("There is no 1-1 mapping of index to alias so deleting the old indexes that are referenced by the " +
"alias. This may have been caused by a crashed or stopped `pio train` operation so try running it again.")
- if (!allIndicesMap.isEmpty) {
- val i = allIndicesMap.keys().toArray.asInstanceOf[Array[String]]
- for (indexName <- i) {
- deleteIndex(indexName, refresh = true)
- }
+ val i = allIndicesMap.keys().toArray.asInstanceOf[Array[String]]
+ for ( indexName <- i ){
+ deleteIndex(indexName, true)
}
+
None // if more than one abort, need to clean up bad aliases
}
}
- def getRDD(
- alias: String,
- typeName: String)(implicit sc: SparkContext): RDD[(ItemID, ItemProps)] = {
- getIndexName(alias)
- .map(index => sc.esJsonRDD(alias + "/" + typeName) map { case (itemId, json) => itemId -> DataMap(json).fields })
- .getOrElse(sc.emptyRDD)
+ def getRDD(sc: SparkContext, alias: String, typeName: String):
+ Option[RDD[(String, collection.Map[String, AnyRef])]] = {
+ val index = getIndexName(alias)
+ if (index.nonEmpty) { // ensures there is a 1-1 mapping of alias to index
+ val indexAsRDD = sc.esRDD(alias + "/" + typeName)
+ //val debug = indexAsRDD.count()
+ Some(indexAsRDD)
+ } else None // error so no index for the alias
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/PopModel.scala b/src/main/scala/PopModel.scala
index 03f39f4..41febe6 100644
--- a/src/main/scala/PopModel.scala
+++ b/src/main/scala/PopModel.scala
@@ -15,198 +15,131 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
import grizzled.slf4j.Logger
-import io.prediction.data.storage.Event
-import io.prediction.data.store.PEventStore
+import org.apache.predictionio.data.storage.Event
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.predictionio.data.store.PEventStore
import org.joda.time.format.ISODateTimeFormat
-import org.joda.time.{ DateTime, Interval }
-import org.template.conversions.{ ItemID, ItemProps }
-
-import scala.language.postfixOps
-import scala.util.Random
-
-object RankingFieldName {
- val UserRank = "userRank"
- val UniqueRank = "uniqueRank"
- val PopRank = "popRank"
- val TrendRank = "trendRank"
- val HotRank = "hotRank"
- val UnknownRank = "unknownRank"
- def toSeq: Seq[String] = Seq(UserRank, UniqueRank, PopRank, TrendRank, HotRank)
- override def toString: String = s"$UserRank, $UniqueRank, $PopRank, $TrendRank, $HotRank"
-}
+import org.joda.time.{DateTime, Interval}
-object RankingType {
- val Popular = "popular"
- val Trending = "trending"
- val Hot = "hot"
- val UserDefined = "userDefined"
- val Random = "random"
- def toSeq: Seq[String] = Seq(Popular, Trending, Hot, UserDefined, Random)
- override def toString: String = s"$Popular, $Trending, $Hot, $UserDefined, $Random"
-}
-class PopModel(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext) {
+object PopModel {
- @transient lazy val logger: Logger = Logger[this.type]
+ @transient lazy val logger = Logger[this.type]
- def calc(
- modelName: String,
- eventNames: Seq[String],
+ def calc (
+ modelName: Option[String] = None,
+ eventNames: List[String],
appName: String,
duration: Int = 0,
- offsetDate: Option[String] = None): RDD[(ItemID, Double)] = {
+ endDateOption: Option[String] = None)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = {
- // todo: make end manditory and fill it with "now" upstream if not specified, will simplify logic here
- // end should always be 'now' except in unusual conditions like for testing
- val end = if (offsetDate.isEmpty) DateTime.now else {
+ // startDate should always be 'now' except in unusual conditions like for testing
+ val endDate = if (endDateOption.isEmpty ) DateTime.now else {
try {
- ISODateTimeFormat.dateTimeParser().parseDateTime(offsetDate.get)
+ ISODateTimeFormat.dateTimeParser().parseDateTime(endDateOption.get)
} catch {
- case e: IllegalArgumentException =>
- logger.warn("Bad end for popModel: " + offsetDate.get + " using 'now'")
+ case e: IllegalArgumentException => e
+ logger.warn("Bad endDate for popModel: " + endDateOption.get + " using 'now'")
DateTime.now
}
}
- val interval = new Interval(end.minusSeconds(duration), end)
-
// based on type of popularity model return a set of (item-id, ranking-number) for all items
- logger.info(s"PopModel $modelName using end: $end, and duration: $duration, interval: $interval")
-
- // if None? debatable, this is either an error or may need to default to popular, why call popModel otherwise
modelName match {
- case RankingType.Popular => calcPopular(appName, eventNames, interval)
- case RankingType.Trending => calcTrending(appName, eventNames, interval)
- case RankingType.Hot => calcHot(appName, eventNames, interval)
- case RankingType.Random => calcRandom(appName, interval)
- case RankingType.UserDefined => sc.emptyRDD
- case unknownRankingType =>
- logger.warn(
- s"""
- |Bad rankings param type=[$unknownRankingType] in engine definition params, possibly a bad json value.
- |Use one of the available parameter values ($RankingType).""".stripMargin)
- sc.emptyRDD
+ case Some("popular") => calcPopular(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate))
+ case Some("trending") => calcTrending(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate))
+ case Some("hot") => calcHot(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate))
+ case _ => None // debatable, this is either an error or may need to default to popular, why call popModel otherwise
}
-
- }
-
- /** Create random rank for all items */
- def calcRandom(
- appName: String,
- interval: Interval): RDD[(ItemID, Double)] = {
-
- val events = eventsRDD(appName = appName, interval = interval)
- val actionsRDD = events.map(_.targetEntityId).filter(_.isDefined).map(_.get).distinct()
- val itemsRDD = fieldsRDD.map { case (itemID, _) => itemID }
-
- // logger.debug(s"ActionsRDD: ${actionsRDD.take(25).mkString(", ")}")
- // logger.debug(s"ItemsRDD: ${itemsRDD.take(25).mkString(", ")}")
- actionsRDD.union(itemsRDD).distinct().map { itemID => itemID -> Random.nextDouble() }
}
/** Creates a rank from the number of named events per item for the duration */
- def calcPopular(
- appName: String,
- eventNames: Seq[String],
- interval: Interval): RDD[(ItemID, Double)] = {
+ def calcPopular(appName: String, eventNames: List[String] = List.empty,
+ interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = {
+
val events = eventsRDD(appName, eventNames, interval)
- events.map { e => (e.targetEntityId, e.event) }
+ val retval = events.map { e => (e.targetEntityId, e.event) }
.groupByKey()
- .map { case (itemID, itEvents) => (itemID.get, itEvents.size.toDouble) }
- .reduceByKey(_ + _) // make this a double in Elaseticsearch)
+ .map { case(itemID, itEvents) => (itemID.get, itEvents.size.toFloat)}
+ .reduceByKey (_+_) // make this a double in Elaseticsearch)
+ if (!retval.isEmpty()) Some(retval) else None
}
/** Creates a rank for each item by dividing the duration in two and counting named events in both buckets
- * then dividing most recent by less recent. This ranks by change in popularity or velocity of populatiy change.
- * Interval(start, end) end instant is always greater than or equal to the start instant.
- */
- def calcTrending(
- appName: String,
- eventNames: Seq[String],
- interval: Interval): RDD[(ItemID, Double)] = {
+ * then dividing most recent by less recent. This ranks by change in popularity or velocity of populatiy change.
+ * Interval(start, end) end instant is always greater than or equal to the start instant.
+ */
+ def calcTrending(appName: String, eventNames: List[String] = List.empty,
+ interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = {
- logger.info(s"Current Interval: $interval, ${interval.toDurationMillis}")
- val halfInterval = interval.toDurationMillis / 2
- val olderInterval = new Interval(interval.getStart, interval.getStart.plus(halfInterval))
- logger.info(s"Older Interval: $olderInterval")
- val newerInterval = new Interval(interval.getStart.plus(halfInterval), interval.getEnd)
- logger.info(s"Newer Interval: $newerInterval")
+ val olderInterval = new Interval(interval.getStart,
+ interval.getStart().plusMillis((interval.toDurationMillis/2)toInt))
+ val newerInterval = new Interval(interval.getStart().plusMillis((interval.toDurationMillis/2)toInt), interval.getEnd)
+ val intervalMillis = interval.toDurationMillis
val olderPopRDD = calcPopular(appName, eventNames, olderInterval)
- if (!olderPopRDD.isEmpty()) {
+ if ( olderPopRDD.nonEmpty) {
val newerPopRDD = calcPopular(appName, eventNames, newerInterval)
- newerPopRDD.join(olderPopRDD).map {
- case (item, (newerScore, olderScore)) => item -> (newerScore - olderScore)
- }
- } else sc.emptyRDD
-
+ if ( newerPopRDD.nonEmpty ) {
+ val retval = newerPopRDD.get.join(olderPopRDD.get).map { case (item, (newerScore, olderScore)) =>
+ val velocity = (newerScore - olderScore)
+ (item, velocity)
+ }
+ if (!retval.isEmpty()) Some(retval) else None
+ } else None
+ } else None
}
/** Creates a rank for each item by divding all events per item into three buckets and calculating the change in
- * velocity over time, in other words the acceleration of popularity change.
- */
- def calcHot(
- appName: String,
- eventNames: Seq[String] = List.empty,
- interval: Interval): RDD[(ItemID, Double)] = {
-
- logger.info(s"Current Interval: $interval, ${interval.toDurationMillis}")
- val olderInterval = new Interval(interval.getStart, interval.getStart.plus(interval.toDurationMillis / 3))
- logger.info(s"Older Interval: $olderInterval")
- val middleInterval = new Interval(olderInterval.getEnd, olderInterval.getEnd.plus(olderInterval.toDurationMillis))
- logger.info(s"Middle Interval: $middleInterval")
+ * velocity over time, in other words the acceleration of popularity change.
+ */
+ def calcHot(appName: String, eventNames: List[String] = List.empty,
+ interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = {
+ val olderInterval = new Interval(interval.getStart,
+ interval.getStart().plusMillis((interval.toDurationMillis/3)toInt))
+ val middleInterval = new Interval(olderInterval.getEnd,
+ olderInterval.getEnd().plusMillis((olderInterval.toDurationMillis)toInt))
val newerInterval = new Interval(middleInterval.getEnd, interval.getEnd)
- logger.info(s"Newer Interval: $newerInterval")
val olderPopRDD = calcPopular(appName, eventNames, olderInterval)
- if (!olderPopRDD.isEmpty()) { // todo: may want to allow an interval with no events, give them 0 counts
+ if (olderPopRDD.nonEmpty){ // todo: may want to allow an interval with no events, give them 0 counts
+ //val debug = olderPopRDD.get.count()
val middlePopRDD = calcPopular(appName, eventNames, middleInterval)
- if (!middlePopRDD.isEmpty()) {
+ if (middlePopRDD.nonEmpty){
+ //val debug = middlePopRDD.get.count()
val newerPopRDD = calcPopular(appName, eventNames, newerInterval)
- val newVelocityRDD = newerPopRDD.join(middlePopRDD).map {
- case (item, (newerScore, middleScore)) => item -> (newerScore - middleScore)
- }
- val oldVelocityRDD = middlePopRDD.join(olderPopRDD).map {
- case (item, (middleScore, olderScore)) => item -> (middleScore - olderScore)
- }
- newVelocityRDD.join(oldVelocityRDD).map {
- case (item, (newVelocity, oldVelocity)) => item -> (newVelocity - oldVelocity)
- }
- } else sc.emptyRDD
- } else sc.emptyRDD
+ if (newerPopRDD.nonEmpty){
+ //val debug = newerPopRDD.get.count()
+ val newVelocityRDD = newerPopRDD.get.join(middlePopRDD.get).map { case( item, (newerScore, olderScore)) =>
+ val velocity = (newerScore - olderScore)
+ (item, velocity)
+ }
+ val oldVelocityRDD = middlePopRDD.get.join(olderPopRDD.get).map { case( item, (newerScore, olderScore)) =>
+ val velocity = (newerScore - olderScore)
+ (item, velocity)
+ }
+ Some( newVelocityRDD.join(oldVelocityRDD).map { case (item, (newVelocity, oldVelocity)) =>
+ val acceleration = (newVelocity - oldVelocity)
+ (item, acceleration)
+ })
+ } else None
+ } else None
+ } else None
}
- def eventsRDD(
- appName: String,
- eventNames: Seq[String] = Seq.empty,
- interval: Interval): RDD[Event] = {
+ def eventsRDD(appName: String, eventNames: List[String], interval: Interval)
+ (implicit sc: SparkContext): RDD[Event] = {
- logger.info(s"PopModel getting eventsRDD for startTime: ${interval.getStart} and endTime ${interval.getEnd}")
PEventStore.find(
appName = appName,
startTime = Some(interval.getStart),
untilTime = Some(interval.getEnd),
- eventNames = if (eventNames.nonEmpty) Some(eventNames) else None)(sc)
+ eventNames = Some(eventNames)
+ )(sc)
}
}
-
-object PopModel {
-
- def apply(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext): PopModel = {
- new PopModel(fieldsRDD)
- }
-
- val nameByType: Map[String, String] = Map(
- RankingType.Popular -> RankingFieldName.PopRank,
- RankingType.Trending -> RankingFieldName.TrendRank,
- RankingType.Hot -> RankingFieldName.HotRank,
- RankingType.UserDefined -> RankingFieldName.UserRank,
- RankingType.Random -> RankingFieldName.UniqueRank).withDefaultValue(RankingFieldName.UnknownRank)
-
-}
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
index 5c8ef6e..0d64139 100644
--- a/src/main/scala/Preparator.scala
+++ b/src/main/scala/Preparator.scala
@@ -15,56 +15,52 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
-import io.prediction.controller.PPreparator
-import org.apache.mahout.math.indexeddataset.{ BiDictionary, IndexedDataset }
+import org.apache.predictionio.controller.PPreparator
+import org.apache.predictionio.data.storage.PropertyMap
+import org.apache.mahout.math.indexeddataset.{IndexedDataset, BiDictionary}
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.template.conversions._
class Preparator
- extends PPreparator[TrainingData, PreparedData] {
+ extends PPreparator[TrainingData, PreparedData] {
/** Create [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] rdd backed
- * "distributed row matrices" from the input string keyed rdds.
- * @param sc Spark context
- * @param trainingData list of (actionName, actionRDD)
- * @return list of (correlatorName, correlatorIndexedDataset)
- */
+ * "distributed row matrices" from the input string keyed rdds.
+ * @param sc Spark context
+ * @param trainingData list of (actionName, actionRDD)
+ * @return list of (correlatorName, correlatorIndexedDataset)
+ */
def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
// now that we have all actions in separate RDDs we must merge any user dictionaries and
// make sure the same user ids map to the correct events
var userDictionary: Option[BiDictionary] = None
- val indexedDatasets = trainingData.actions.map {
- case (eventName, eventIDS) =>
+ val indexedDatasets = trainingData.actions.map{ case(eventName, eventIDS) =>
- // passing in previous row dictionary will use the values if they exist
- // and append any new ids, so after all are constructed we have all user ids in the last dictionary
- val ids = IndexedDatasetSpark(eventIDS, userDictionary)(sc)
- userDictionary = Some(ids.rowIDs)
- (eventName, ids)
+ // passing in previous row dictionary will use the values if they exist
+ // and append any new ids, so after all are constructed we have all user ids in the last dictionary
+ val ids = IndexedDatasetSpark(eventIDS, userDictionary)(sc)
+ userDictionary = Some(ids.rowIDs)
+ (eventName, ids)
}
// now make sure all matrices have identical row space since this corresponds to all users
+ val numUsers = userDictionary.get.size
+ val numPrimary = indexedDatasets.head._2.matrix.nrow
// todo: check to see that there are events in primary event IndexedDataset and abort if not.
- val rowAdjustedIds = userDictionary map { userDict =>
- indexedDatasets.map {
- case (eventName, eventIDS) =>
- (eventName, eventIDS.create(eventIDS.matrix, userDictionary.get, eventIDS.columnIDs).newRowCardinality(userDict.size))
- }
- } getOrElse Seq.empty
-
- val fieldsRDD: RDD[(ItemID, ItemProps)] = trainingData.fieldsRDD.map {
- case (itemId, propMap) => itemId -> propMap.fields
+ val rowAdjustedIds = indexedDatasets.map { case(eventName, eventIDS) =>
+ (eventName, eventIDS.create(eventIDS.matrix, userDictionary.get, eventIDS.columnIDs).newRowCardinality(numUsers))
}
- PreparedData(rowAdjustedIds, fieldsRDD)
+
+ new PreparedData(rowAdjustedIds, trainingData.fieldsRDD)
}
}
-case class PreparedData(
- actions: Seq[(ActionID, IndexedDataset)],
- fieldsRDD: RDD[(ItemID, ItemProps)]) extends Serializable
\ No newline at end of file
+class PreparedData(
+ val actions: List[(String, IndexedDataset)],
+ val fieldsRDD: RDD[(String, PropertyMap)])
+ extends Serializable
diff --git a/src/main/scala/Serving.scala b/src/main/scala/Serving.scala
index a63e3d3..4277066 100644
--- a/src/main/scala/Serving.scala
+++ b/src/main/scala/Serving.scala
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
-import io.prediction.controller.LServing
+import org.apache.predictionio.controller.LServing
class Serving
- extends LServing[Query, PredictedResult] {
+ extends LServing[Query, PredictedResult] {
- override def serve(
- query: Query,
+ override
+ def serve(query: Query,
predictedResults: Seq[PredictedResult]): PredictedResult = {
predictedResults.head
}
-}
\ No newline at end of file
+}
diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala
index e6cb570..2c646a4 100644
--- a/src/main/scala/URAlgorithm.scala
+++ b/src/main/scala/URAlgorithm.scala
@@ -15,40 +15,34 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
import java.util
-
-import grizzled.slf4j.Logger
-import io.prediction.controller.{ P2LAlgorithm, Params }
-import io.prediction.data.storage.{ DataMap, Event, NullModel, PropertyMap }
-import io.prediction.data.store.LEventStore
-import org.apache.mahout.math.cf.{ DownsamplableCrossOccurrenceDataset, SimilarityAnalysis }
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data
+import org.apache.predictionio.data.storage.{PropertyMap, Event}
+import org.apache.predictionio.data.store.LEventStore
+import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
-import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
-import org.json4s.JValue
+import org.json4s
+import org.json4s.JsonAST
import org.json4s.JsonAST._
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-import org.template.conversions._
-
import scala.collection.JavaConverters._
+import scala.collection.immutable
import scala.concurrent.duration.Duration
-import scala.language.{ implicitConversions, postfixOps }
-
-/** Available value for algorithm param "RecsModel" */
-object RecsModel { // todo: replace this with rankings
- val All = "all"
- val CF = "collabFiltering"
- val BF = "backfill"
- override def toString: String = s"$All, $CF, $BF"
-}
+import org.apache.spark.SparkContext
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+import scala.collection.convert.wrapAsScala._
+import grizzled.slf4j.Logger
+import org.elasticsearch.spark._
/** Setting the option in the params case class doesn't work as expected when the param is missing from
- * engine.json so set these for use in the algorithm when they are not present in the engine.json
- */
+ * engine.json so set these for use in the algorithm when they are not present in the engine.json
+ */
object defaultURAlgorithmParams {
val DefaultMaxEventsPerEventType = 500
val DefaultNum = 20
@@ -58,676 +52,478 @@ object defaultURAlgorithmParams {
val DefaultExpireDateName = "expireDate" // default name for the expire date property of an item
val DefaultAvailableDateName = "availableDate" //defualt name for and item's available after date
val DefaultDateName = "date" // when using a date range in the query this is the name of the item's date
- val DefaultRecsModel = RecsModel.All // use CF + backfill
- val DefaultRankingParams = RankingParams()
- val DefaultBackfillFieldName = RankingFieldName.PopRank
- val DefaultBackfillType = RankingType.Popular
- val DefaultBackfillDuration = "3650 days" // for all time
-
- val DefaultReturnSelf = false
+ val DefaultRecsModel = "all" // use CF + backfill
+ val DefaultBackfillParams = BackfillField()
+ val DefaultBackfillFieldName = "popRank"
}
-/* default values must be set in code not the case class declaration
case class BackfillField(
- name: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillFieldName),
- backfillType: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillType), // may be 'hot', or 'trending' also
- eventNames: Option[Seq[String]] = None, // None means use the algo eventNames list, otherwise a list of events
- offsetDate: Option[String] = None, // used only for tests, specifies the offset date to start the duration so the most
- // recent date for events going back by from the more recent offsetDate - duration
- duration: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillDuration)) // duration worth of events
- // to use in calculation of backfill
+ name: String = "popRank",
+ backfillType: String = "popular", // may be 'hot', or 'trending' also
+ eventnames: Option[List[String]] = None, // None means use the algo eventnames list, otherwise a list of events
+ endDate: Option[String] = None, // used only for tests, specifies the start (oldest date) of the popModel's duration
+ duration: Int = 259200) // number of seconds worth of events to use in calculation of backfill
+/** Instantiated from engine.json */
case class URAlgorithmParams(
- appName: String, // filled in from engine.json
- indexName: String, // can optionally be used to specify the elasticsearch index name
- typeName: String, // can optionally be used to specify the elasticsearch type name
- recsModel: Option[String] = Some(defaultURAlgorithmParams.DefaultRecsModel), // "all", "collabFiltering", "backfill"
- eventNames: Seq[String], // names used to ID all user actions
- blacklistEvents: Option[Seq[String]] = None,// None means use the primary event, empty array means no filter
- // number of events in user-based recs query
- maxQueryEvents: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxQueryEvents),
- maxEventsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxEventsPerEventType),
- maxCorrelatorsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType),
- num: Option[Int] = Some(defaultURAlgorithmParams.DefaultNum), // default max # of recs requested
- userBias: Option[Float] = None, // will cause the default search engine boost of 1.0
- itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0
- returnSelf: Option[Boolean] = None, // query building logic defaults this to false
- fields: Option[Seq[Field]] = None, //defaults to no fields
- // leave out for default or popular
- backfillField: Option[BackfillField] = None,
- // name of date property field for when the item is available
- availableDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultAvailableDateName),
- // name of date property field for when an item is no longer available
- expireDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultExpireDateName),
- // used as the subject of a dateRange in queries, specifies the name of the item property
- dateName: Option[String] = Some(defaultURAlgorithmParams.DefaultDateName),
- seed: Option[Long] = None) // seed is not used presently
+ appName: String, // filled in from engine.json
+ indexName: String, // can optionally be used to specify the elasticsearch index name
+ typeName: String, // can optionally be used to specify the elasticsearch type name
+ recsModel: Option[String] = Some(defaultURAlgorithmParams.DefaultRecsModel), // "all", "collabFiltering", "backfill"
+ eventNames: List[String], // names used to ID all user actions
+ blacklistEvents: Option[List[String]] = None,// None means use the primary event, empty array means no filter
+ // number of events in user-based recs query
+ maxQueryEvents: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxQueryEvents),
+ maxEventsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxEventsPerEventType),
+ maxCorrelatorsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType),
+ num: Option[Int] = Some(defaultURAlgorithmParams.DefaultNum), // default max # of recs requested
+ userBias: Option[Float] = None, // will cause the default search engine boost of 1.0
+ itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0
+ returnSelf: Option[Boolean] = None, // query building logic defaults this to false
+ fields: Option[List[Field]] = None, //defaults to no fields
+ // leave out for default or popular
+ backfillField: Option[BackfillField] = None,
+ // name of date property field for when the item is avalable
+ availableDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultAvailableDateName),
+ // name of date property field for when an item is no longer available
+ expireDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultExpireDateName),
+ // used as the subject of a dateRange in queries, specifies the name of the item property
+ dateName: Option[String] = Some(defaultURAlgorithmParams.DefaultDateName),
+ seed: Option[Long] = None) // seed is not used presently
extends Params //fixed default make it reproducible unless supplied
- */
-
-case class RankingParams(
- name: Option[String] = None,
- `type`: Option[String] = None, // See [[org.template.BackfillType]]
- eventNames: Option[Seq[String]] = None, // None means use the algo eventNames list, otherwise a list of events
- offsetDate: Option[String] = None, // used only for tests, specifies the offset date to start the duration so the most
- // recent date for events going back by from the more recent offsetDate - duration
- endDate: Option[String] = None,
- duration: Option[String] = None) { // duration worth of events to use in calculation of backfill
- override def toString: String = {
- s"""
- |name: $name,
- |type: ${`type`},
- |eventNames: $eventNames,
- |offsetDate: $offsetDate,
- |endDate: $endDate,
- |duration: $duration
- |""".stripMargin
- }
-}
-
-case class IndicatorParams(
- name: String, // must match one in eventNames
- maxItemsPerUser: Option[Int], // defaults to maxEventsPerEventType
- maxCorrelatorsPerItem: Option[Int], // defaults to maxCorrelatorsPerEventType
- minLLR: Option[Double]) // defaults to none, takes precendence over maxCorrelatorsPerItem
-
-case class URAlgorithmParams(
- appName: String, // filled in from engine.json
- indexName: String, // can optionally be used to specify the elasticsearch index name
- typeName: String, // can optionally be used to specify the elasticsearch type name
- recsModel: Option[String] = None, // "all", "collabFiltering", "backfill"
- eventNames: Option[Seq[String]], // names used to ID all user actions
- blacklistEvents: Option[Seq[String]] = None, // None means use the primary event, empty array means no filter
- // number of events in user-based recs query
- maxQueryEvents: Option[Int] = None,
- maxEventsPerEventType: Option[Int] = None,
- maxCorrelatorsPerEventType: Option[Int] = None,
- num: Option[Int] = None, // default max # of recs requested
- userBias: Option[Float] = None, // will cause the default search engine boost of 1.0
- itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0
- returnSelf: Option[Boolean] = None, // query building logic defaults this to false
- fields: Option[Seq[Field]] = None, //defaults to no fields
- // leave out for default or popular
- rankings: Option[Seq[RankingParams]] = None,
- // name of date property field for when the item is available
- availableDateName: Option[String] = None,
- // name of date property field for when an item is no longer available
- expireDateName: Option[String] = None,
- // used as the subject of a dateRange in queries, specifies the name of the item property
- dateName: Option[String] = None,
- indicators: Option[List[IndicatorParams]] = None, // control params per matrix pair
- seed: Option[Long] = None) // seed is not used presently
- extends Params //fixed default make it reproducible unless supplied
/** Creates cooccurrence, cross-cooccurrence and eventually content correlators with
- * [[org.apache.mahout.math.cf.SimilarityAnalysis]] The analysis part of the recommender is
- * done here but the algorithm can predict only when the coocurrence data is indexed in a
- * search engine like Elasticsearch. This is done in URModel.save.
- *
- * @param ap taken from engine.json to describe limits and event types
- */
+ * [[org.apache.mahout.math.cf.SimilarityAnalysis]] The analysis part of the recommender is
+ * done here but the algorithm can predict only when the coocurrence data is indexed in a
+ * search engine like Elasticsearch. This is done in URModel.save.
+ *
+ * @param ap taken from engine.json to describe limits and event types
+ */
class URAlgorithm(val ap: URAlgorithmParams)
- extends P2LAlgorithm[PreparedData, NullModel, Query, PredictedResult] {
+ extends P2LAlgorithm[PreparedData, URModel, Query, PredictedResult] {
- @transient lazy implicit val logger: Logger = Logger[this.type]
+ case class BoostableCorrelators(actionName: String, itemIDs: Seq[String], boost: Option[Float])
+ case class FilterCorrelators(actionName: String, itemIDs: Seq[String])
- case class BoostableCorrelators(actionName: String, itemIDs: Seq[ItemID], boost: Option[Float]) {
- def toFilterCorrelators: FilterCorrelators = {
- FilterCorrelators(actionName, itemIDs)
- }
- }
- case class FilterCorrelators(actionName: String, itemIDs: Seq[ItemID])
-
- val appName: String = ap.appName
- val recsModel: String = ap.recsModel.getOrElse(defaultURAlgorithmParams.DefaultRecsModel)
- //val eventNames: Seq[String] = ap.eventNames
-
- val userBias: Float = ap.userBias.getOrElse(1f)
- val itemBias: Float = ap.itemBias.getOrElse(1f)
- val maxQueryEvents: Int = ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents)
- val limit: Int = ap.num.getOrElse(defaultURAlgorithmParams.DefaultNum)
-
- val blacklistEvents: Seq[String] = ap.blacklistEvents.getOrEmpty
- val returnSelf: Boolean = ap.returnSelf.getOrElse(defaultURAlgorithmParams.DefaultReturnSelf)
- val fields: Seq[Field] = ap.fields.getOrEmpty
-
- val randomSeed: Int = ap.seed.getOrElse(System.currentTimeMillis()).toInt
- val maxCorrelatorsPerEventType: Int = ap.maxCorrelatorsPerEventType
- .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType)
- val maxEventsPerEventType: Int = ap.maxEventsPerEventType
- .getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType)
-
- lazy val modelEventNames = if (ap.indicators.isEmpty) {
- if (ap.eventNames.isEmpty) {
- throw new IllegalArgumentException("No eventNames or indicators in engine.json and one of these is required")
- } else ap.eventNames.get
- } else {
- var eventNames = Seq.empty[String]
- ap.indicators.get.foreach { indicator =>
- eventNames = eventNames :+ indicator.name
- }
- eventNames
- }
+ @transient lazy val logger = Logger[this.type]
- // Unique by 'type' ranking params, if collision get first.
- lazy val rankingsParams: Seq[RankingParams] = ap.rankings.getOrElse(Seq(RankingParams(
- name = Some(defaultURAlgorithmParams.DefaultBackfillFieldName),
- `type` = Some(defaultURAlgorithmParams.DefaultBackfillType),
- eventNames = Some(modelEventNames.take(1)),
- offsetDate = None,
- endDate = None,
- duration = Some(defaultURAlgorithmParams.DefaultBackfillDuration)))).groupBy(_.`type`).map(_._2.head).toSeq
-
- val rankingFieldNames: Seq[String] = rankingsParams map { rankingParams =>
- val rankingType = rankingParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType)
- val rankingFieldName = rankingParams.name.getOrElse(PopModel.nameByType(rankingType))
- rankingFieldName
- }
+ def train(sc: SparkContext, data: PreparedData): URModel = {
+
+ val dateNames = Some(List(ap.dateName.getOrElse(""), ap.availableDateName.getOrElse(""),
+ ap.expireDateName.getOrElse(""))) // todo: return None if all are empty?
+ val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name
- val dateNames: Seq[String] = Seq(
- ap.dateName,
- ap.availableDateName,
- ap.expireDateName).collect { case Some(date) => date } distinct
-
- val esIndex: String = ap.indexName
- val esType: String = ap.typeName
-
- drawInfo("Init URAlgorithm", Seq(
- ("══════════════════════════════", "════════════════════════════"),
- ("App name", appName),
- ("ES index name", esIndex),
- ("ES type name", esType),
- ("RecsModel", recsModel),
- ("Event names", modelEventNames),
- ("══════════════════════════════", "════════════════════════════"),
- ("Random seed", randomSeed),
- ("MaxCorrelatorsPerEventType", maxCorrelatorsPerEventType),
- ("MaxEventsPerEventType", maxEventsPerEventType),
- ("══════════════════════════════", "════════════════════════════"),
- ("User bias", userBias),
- ("Item bias", itemBias),
- ("Max query events", maxQueryEvents),
- ("Limit", limit),
- ("══════════════════════════════", "════════════════════════════"),
- ("Rankings:", "")) ++ rankingsParams.map(x => (x.`type`.get, x.name)))
-
- def train(sc: SparkContext, data: PreparedData): NullModel = {
-
- recsModel match {
- case RecsModel.All => calcAll(data)(sc)
- case RecsModel.CF => calcAll(data, calcPopular = false)(sc)
- case RecsModel.BF => calcPop(data)(sc)
+ ap.recsModel.getOrElse(defaultURAlgorithmParams.DefaultRecsModel) match {
+ case "all" => calcAll(sc, data, dateNames, backfillFieldName)
+ case "collabFiltering" => calcAll(sc, data, dateNames, backfillFieldName, popular = false )
+ case "backfill" => calcPop(sc, data, dateNames, backfillFieldName)
// error, throw an exception
- case unknownRecsModel =>
- throw new IllegalArgumentException(
- s"""
- |Bad algorithm param recsModel=[$unknownRecsModel] in engine definition params, possibly a bad json value.
- |Use one of the available parameter values ($RecsModel).""".stripMargin)
+ case _ => throw new IllegalArgumentException("Bad recsModel in engine definition params, possibly a bad json value.")
}
}
/** Calculates recs model as well as popularity model */
def calcAll(
+ sc: SparkContext,
data: PreparedData,
- calcPopular: Boolean = true)(implicit sc: SparkContext): NullModel = {
+ dateNames: Option[List[String]] = None,
+ backfillFieldName: String,
+ popular: Boolean = true):
+ URModel = {
// No one likes empty training data.
- require(
- data.actions.take(1).nonEmpty,
- s"""
- |Primary action in PreparedData cannot be empty.
- |Please check if DataSource generates TrainingData
- |and Preparator generates PreparedData correctly.""".stripMargin)
-
- //val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams)
- //val nonDefaultMappings = Map(backfillParams.name.getOrElse(defaultURAlgorithmParams.DefaultBackfillFieldName) -> "float")
+ require(data.actions.take(1).nonEmpty,
+ s"Primary action in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+ val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams)
+ val nonDefaultMappings = Map(backfillParams.name -> "float")
logger.info("Actions read now creating correlators")
- val cooccurrenceIDSs = if (ap.indicators.isEmpty) { // using one global set of algo params
- SimilarityAnalysis.cooccurrencesIDSs(
- data.actions.map(_._2).toArray,
- randomSeed = ap.seed.getOrElse(System.currentTimeMillis()).toInt,
- maxInterestingItemsPerThing = ap.maxCorrelatorsPerEventType
- .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType),
- maxNumInteractions = ap.maxEventsPerEventType.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType))
- .map(_.asInstanceOf[IndexedDatasetSpark])
- } else { // using params per matrix pair, these take the place of eventNames, maxCorrelatorsPerEventType,
- // and maxEventsPerEventType!
- val indicators = ap.indicators.get
- val iDs = data.actions.map(_._2).toSeq
- val datasets = iDs.zipWithIndex.map {
- case (iD, i) =>
- new DownsamplableCrossOccurrenceDataset(
- iD,
- indicators(i).maxItemsPerUser.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType),
- indicators(i).maxCorrelatorsPerItem.getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType),
- indicators(i).minLLR)
- }.toList
-
- SimilarityAnalysis.crossOccurrenceDownsampled(
- datasets,
- ap.seed.getOrElse(System.currentTimeMillis()).toInt)
- .map(_.asInstanceOf[IndexedDatasetSpark])
- }
-
+ val cooccurrenceIDSs = SimilarityAnalysis.cooccurrencesIDSs(
+ data.actions.map(_._2).toArray,
+ randomSeed = ap.seed.getOrElse(System.currentTimeMillis()).toInt,
+ maxInterestingItemsPerThing = ap.maxCorrelatorsPerEventType
+ .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType),
+ maxNumInteractions = ap.maxEventsPerEventType.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType))
+ .map(_.asInstanceOf[IndexedDatasetSpark]) // strip action names
val cooccurrenceCorrelators = cooccurrenceIDSs.zip(data.actions.map(_._1)).map(_.swap) //add back the actionNames
- val propertiesRDD: RDD[(ItemID, ItemProps)] = if (calcPopular) {
- val ranksRdd = getRanksRDD(data.fieldsRDD)
- data.fieldsRDD.fullOuterJoin(ranksRdd).map {
- case (item, (Some(fieldsPropMap), Some(rankPropMap))) => item -> (fieldsPropMap ++ rankPropMap)
- case (item, (Some(fieldsPropMap), None)) => item -> fieldsPropMap
- case (item, (None, Some(rankPropMap))) => item -> rankPropMap
- case (item, _) => item -> Map.empty
+ val popModel = if (popular) {
+ val duration = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).duration
+ val backfillEvents = backfillParams.eventnames.getOrElse(List(ap.eventNames.head))
+ val start = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).endDate
+ PopModel.calc(Some(backfillParams.backfillType), backfillEvents, ap.appName, duration)(sc)
+ } else None
+
+ val allPropertiesRDD = if (popModel.nonEmpty) {
+ data.fieldsRDD.cogroup[Float](popModel.get).map { case (item, pms) =>
+ val pm = if (pms._1.nonEmpty && pms._2.nonEmpty) {
+ val newPM = pms._1.head.fields + (backfillFieldName -> JDouble(pms._2.head))
+ PropertyMap(newPM, pms._1.head.firstUpdated, DateTime.now())
+ } else if (pms._2.nonEmpty) PropertyMap(Map(backfillFieldName -> JDouble(pms._2.head)), DateTime.now(), DateTime.now())
+ else PropertyMap( Map.empty[String, JValue], DateTime.now, DateTime.now) // some error????
+ (item, pm)
}
- } else {
- sc.emptyRDD
- }
+ } else data.fieldsRDD
logger.info("Correlators created now putting into URModel")
new URModel(
- coocurrenceMatrices = cooccurrenceCorrelators,
- propertiesRDDs = Seq(propertiesRDD),
- typeMappings = getRankingMapping).save(dateNames, esIndex, esType)
- new NullModel
+ Some(cooccurrenceCorrelators),
+ Some(allPropertiesRDD),
+ ap.indexName,
+ dateNames,
+ typeMappings = Some(nonDefaultMappings))
}
/** This function creates a URModel from an existing index in Elasticsearch + new popularity ranking
- * It is used when you want to re-calc the popularity model between training on useage data. It leaves
- * the part of the model created from usage data alone and only modifies the popularity ranking.
- */
- def calcPop(data: PreparedData)(implicit sc: SparkContext): NullModel = {
-
- // Aggregating all $set/$unsets properties, which are attached to items
- val fieldsRDD: RDD[(ItemID, ItemProps)] = data.fieldsRDD
- // Calc new ranking properties for all items
- val ranksRdd: RDD[(ItemID, ItemProps)] = getRanksRDD(fieldsRDD)
- // Current items RDD from ES
- val currentMetadataRDD: RDD[(ItemID, ItemProps)] = EsClient.getRDD(esIndex, esType)
- val propertiesRDD: RDD[(ItemID, ItemProps)] = currentMetadataRDD.fullOuterJoin(ranksRdd) map {
- case (itemId, maps) =>
- maps match {
- case (Some(metaProp), Some(rankProp)) => itemId -> (metaProp ++ rankProp)
- case (None, Some(rankProp)) => itemId -> rankProp
- case (Some(metaProp), None) => itemId -> metaProp
- case _ => itemId -> Map.empty
- }
- }
- // logger.debug(s"RanksRdd\n${ranksRdd.take(25).mkString("\n")}")
+ * It is used when you want to re-calc the popularity model between training on useage data. It leaves
+ * the part of the model created from usage data alone and only modifies the popularity ranking.
+ */
+ def calcPop(
+ sc: SparkContext,
+ data: PreparedData,
+ dateNames: Option[List[String]] = None,
+ backfillFieldName: String = ""): URModel = {
+
+ val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams)
+ val backfillEvents = backfillParams.eventnames.getOrElse(List(ap.eventNames.head))//default to first/primary event
+ val start = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).endDate
+ val popModel = PopModel.calc(
+ Some(backfillParams.backfillType),
+ backfillEvents,
+ ap.appName,
+ backfillParams.duration,
+ start)(sc)
+ val popRDD = if (popModel.nonEmpty) {
+ val model = popModel.get.map { case (item, rank) =>
+ val newPM = Map(backfillFieldName -> JDouble(rank))
+ (item, PropertyMap(newPM, DateTime.now, DateTime.now))
+ }
+ Some(model)
+ } else None
+
+ val propertiesRDD = if (popModel.nonEmpty) {
+ val currentMetadata = esClient.getRDD(sc, ap.indexName, ap.typeName)
+ if (currentMetadata.nonEmpty) { // may be an empty index so ignore
+ Some(popModel.get.cogroup[collection.Map[String, AnyRef]](currentMetadata.get)
+ .map { case (item, (ranks, pms)) =>
+ if (ranks.nonEmpty) pms.head + (backfillFieldName -> ranks.head)
+ else if (pms.nonEmpty) pms.head
+ else Map.empty[String, AnyRef] // could happen if only calculating popularity, which may leave out items with
+ // no events
+ })
+ } else None
+ } else None
// returns the existing model plus new popularity ranking
new URModel(
- propertiesRDDs = Seq(fieldsRDD.cache(), propertiesRDD.cache()),
- typeMappings = getRankingMapping).save(dateNames, esIndex, esType)
- new NullModel
+ None,
+ None,
+ ap.indexName,
+ None,
+ propertiesRDD = propertiesRDD,
+ typeMappings = Some(Map(backfillFieldName -> "float")))
}
- var queryEventNames: Seq[String] = Seq.empty[String] // if passed in with the query overrides the engine.json list--used in MAP@k
+ var queryEventNames = List.empty[String] // if passed in with the query overrides the engine.json list--used in MAP@k
//testing, this only effects which events are used in queries.
/** Return a list of items recommended for a user identified in the query
- * The ES json query looks like this:
- * {
- * "size": 20
- * "query": {
- * "bool": {
- * "should": [
- * {
- * "terms": {
- * "rate": ["0", "67", "4"]
- * }
- * },
- * {
- * "terms": {
- * "buy": ["0", "32"],
- * "boost": 2
- * }
- * },
- * { // categorical boosts
- * "terms": {
- * "category": ["cat1"],
- * "boost": 1.05
- * }
- * }
- * ],
- * "must": [ // categorical filters
- * {
- * "terms": {
- * "category": ["cat1"],
- * "boost": 0
- * }
- * },
- * {
- * "must_not": [//blacklisted items
- * {
- * "ids": {
- * "values": ["items-id1", "item-id2", ...]
- * }
- * },
- * {
- * "constant_score": {// date in query must fall between the expire and available dates of an item
- * "filter": {
- * "range": {
- * "availabledate": {
- * "lte": "2015-08-30T12:24:41-07:00"
- * }
- * }
- * },
- * "boost": 0
- * }
- * },
- * {
- * "constant_score": {// date range filter in query must be between these item property values
- * "filter": {
- * "range" : {
- * "expiredate" : {
- * "gte": "2015-08-15T11:28:45.114-07:00"
- * "lt": "2015-08-20T11:28:45.114-07:00"
- * }
- * }
- * }, "boost": 0
- * }
- * },
- * {
- * "constant_score": { // this orders popular items for backfill
- * "filter": {
- * "match_all": {}
- * },
- * "boost": 0.000001 // must have as least a small number to be boostable
- * }
- * }
- * }
- * }
- * }
- *
- * @param model Ignored! since the model is already in Elasticsearch
- * @param query contains query spec
- * @todo Need to prune that query to minimum required for data include, for instance no need for the popularity
- * ranking if no PopModel is being used, same for "must" clause and dates.
- */
- def predict(model: NullModel, query: Query): PredictedResult = {
-
- queryEventNames = query.eventNames.getOrElse(modelEventNames) // eventNames in query take precedence
-
- val (queryStr, blacklist) = buildQuery(ap, query, rankingFieldNames)
- val searchHitsOpt = EsClient.search(queryStr, esIndex)
-
- val withRanks = query.withRanks.getOrElse(false)
- val predictedResult = searchHitsOpt match {
- case Some(searchHits) =>
- val recs = searchHits.getHits.map { hit =>
- if (withRanks) {
- val source = hit.getSource
- val ranks: Map[String, Double] = rankingsParams map { backfillParams =>
- val backfillType = backfillParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType)
- val backfillFieldName = backfillParams.name.getOrElse(PopModel.nameByType(backfillType))
- backfillFieldName -> source.get(backfillFieldName).asInstanceOf[Double]
- } toMap
-
- ItemScore(hit.getId, hit.getScore.toDouble,
- ranks = if (ranks.nonEmpty) Some(ranks) else None)
- } else {
- ItemScore(hit.getId, hit.getScore.toDouble)
- }
- }
- logger.info(s"Results: ${searchHits.getHits.length} retrieved of a possible ${searchHits.totalHits()}")
- PredictedResult(recs)
-
- case _ =>
- logger.info(s"No results for query ${parse(queryStr)}")
- PredictedResult(Array.empty[ItemScore])
- }
-
+ * The ES json query looks like this:
+ * {
+ * "size": 20
+ * "query": {
+ * "bool": {
+ * "should": [
+ * {
+ * "terms": {
+ * "rate": ["0", "67", "4"]
+ * }
+ * },
+ * {
+ * "terms": {
+ * "buy": ["0", "32"],
+ * "boost": 2
+ * }
+ * },
+ * { // categorical boosts
+ * "terms": {
+ * "category": ["cat1"],
+ * "boost": 1.05
+ * }
+ * }
+ * ],
+ * "must": [ // categorical filters
+ * {
+ * "terms": {
+ * "category": ["cat1"],
+ * "boost": 0
+ * }
+ * },
+ * {
+ * "must_not": [//blacklisted items
+ * {
+ * "ids": {
+ * "values": ["items-id1", "item-id2", ...]
+ * }
+ * },
+ * {
+ * "constant_score": {// date in query must fall between the expire and avqilable dates of an item
+ * "filter": {
+ * "range": {
+ * "availabledate": {
+ * "lte": "2015-08-30T12:24:41-07:00"
+ * }
+ * }
+ * },
+ * "boost": 0
+ * }
+ * },
+ * {
+ * "constant_score": {// date range filter in query must be between these item property values
+ * "filter": {
+ * "range" : {
+ * "expiredate" : {
+ * "gte": "2015-08-15T11:28:45.114-07:00"
+ * "lt": "2015-08-20T11:28:45.114-07:00"
+ * }
+ * }
+ * }, "boost": 0
+ * }
+ * },
+ * {
+ * "constant_score": { // this orders popular items for backfill
+ * "filter": {
+ * "match_all": {}
+ * },
+ * "boost": 0.000001 // must have as least a small number to be boostable
+ * }
+ * }
+ * }
+ * }
+ * }
+ *
+ * @param model Ignored! since the model is already in Elasticsearch
+ * @param query contains query spec
+ * @todo Need to prune that query to minimum required for data include, for instance no need for the popularity
+ * ranking if no PopModel is being used, same for "must" clause and dates.
+ */
+ def predict(model: URModel, query: Query): PredictedResult = {
+ logger.info(s"Query received, user id: ${query.user}, item id: ${query.item}")
+
+ queryEventNames = query.eventNames.getOrElse(ap.eventNames) // eventNames in query take precedence for the query
+ // part of their use
+ val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name
+ val queryAndBlacklist = buildQuery(ap, query, backfillFieldName)
+ val recs = esClient.search(queryAndBlacklist._1, ap.indexName)
// should have all blacklisted items excluded
// todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time
// period so the recs ordering stays fixed for that time period.
- predictedResult
- }
-
- /** Calculate all fields and items needed for ranking.
- *
- * @param fieldsRDD all items with their fields
- * @param sc the current Spark context
- * @return
- */
- def getRanksRDD(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext): RDD[(ItemID, ItemProps)] = {
- val popModel = PopModel(fieldsRDD)
- val rankRDDs: Seq[(String, RDD[(ItemID, Double)])] = rankingsParams map { rankingParams =>
- val rankingType = rankingParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType)
- val rankingFieldName = rankingParams.name.getOrElse(PopModel.nameByType(rankingType))
- val durationAsString = rankingParams.duration.getOrElse(defaultURAlgorithmParams.DefaultBackfillDuration)
- val duration = Duration(durationAsString).toSeconds.toInt
- val backfillEvents = rankingParams.eventNames.getOrElse(modelEventNames.take(1))
- val offsetDate = rankingParams.offsetDate
- val rankRdd = popModel.calc(modelName = rankingType, eventNames = backfillEvents, appName, duration, offsetDate)
- rankingFieldName -> rankRdd
- }
-
- // logger.debug(s"RankRDDs[${rankRDDs.size}]\n${rankRDDs.map(_._1).mkString(", ")}\n${rankRDDs.map(_._2.take(25).mkString("\n")).mkString("\n\n")}")
- rankRDDs.foldLeft[RDD[(ItemID, ItemProps)]](sc.emptyRDD) {
- case (leftRdd, (fieldName, rightRdd)) =>
- leftRdd.fullOuterJoin(rightRdd).map {
- case (itemId, (Some(propMap), Some(rank))) => itemId -> (propMap + (fieldName -> JDouble(rank)))
- case (itemId, (Some(propMap), None)) => itemId -> propMap
- case (itemId, (None, Some(rank))) => itemId -> Map(fieldName -> JDouble(rank))
- case (itemId, _) => itemId -> Map.empty
- }
- }
+ recs
}
/** Build a query from default algorithms params and the query itself taking into account defaults */
- def buildQuery(
- ap: URAlgorithmParams,
- query: Query,
- backfillFieldNames: Seq[String] = Seq.empty): (String, Seq[Event]) = {
+ def buildQuery(ap: URAlgorithmParams, query: Query, backfillFieldName: String = ""): (String, List[Event]) = {
- try {
- // create a list of all query correlators that can have a bias (boost or filter) attached
- val (boostable, events) = getBiasedRecentUserActions(query)
+ try{ // require the minimum of a user or item, if not then return popular if any
+ //require( query.item.nonEmpty || query.user.nonEmpty, "Warning: a query must include either a user or item id")
- // since users have action history and items have correlators and both correspond to the same "actions" like
- // purchase or view, we'll pass both to the query if the user history or items correlators are empty
- // then metadata or backfill must be relied on to return results.
- val numRecs = query.num.getOrElse(limit)
- val should = buildQueryShould(query, boostable)
- val must = buildQueryMust(query, boostable)
- val mustNot = buildQueryMustNot(query, events)
- val sort = buildQuerySort()
-
- val json =
- ("size" -> numRecs) ~
- ("query" ->
- ("bool" ->
- ("should" -> should) ~
- ("must" -> must) ~
- ("must_not" -> mustNot) ~
- ("minimum_should_match" -> 1))) ~
- ("sort" -> sort)
-
- val compactJson = compact(render(json))
-
- logger.info(s"Query:\n$compactJson")
- (compactJson, events)
- } catch {
- case e: IllegalArgumentException => ("", Seq.empty[Event])
- }
- }
+ // create a list of all query correlators that can have a bias (boost or filter) attached
+ val alluserEvents = getBiasedRecentUserActions(query)
- /** Build should query part */
- def buildQueryShould(query: Query, boostable: Seq[BoostableCorrelators]): Seq[JValue] = {
+ // create a list of all boosted query correlators
+ val recentUserHistory = if ( ap.userBias.getOrElse(1f) >= 0f )
+ alluserEvents._1.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1)
+ else List.empty[BoostableCorrelators]
- // create a list of all boosted query correlators
- val recentUserHistory: Seq[BoostableCorrelators] = if (userBias >= 0f) {
- boostable.slice(0, maxQueryEvents - 1)
- } else {
- Seq.empty
- }
+ val similarItems = if ( ap.itemBias.getOrElse(1f) >= 0f )
+ getBiasedSimilarItems(query)
+ else List.empty[BoostableCorrelators]
- val similarItems: Seq[BoostableCorrelators] = if (itemBias >= 0f) {
- getBiasedSimilarItems(query)
- } else {
- Seq.empty
- }
+ val boostedMetadata = getBoostedMetadata(query)
- val boostedMetadata = getBoostedMetadata(query)
- val allBoostedCorrelators = recentUserHistory ++ similarItems ++ boostedMetadata
+ val allBoostedCorrelators = recentUserHistory ++ similarItems ++ boostedMetadata
- val shouldFields: Seq[JValue] = allBoostedCorrelators.map {
- case BoostableCorrelators(actionName, itemIDs, boost) =>
- render("terms" -> (actionName -> itemIDs) ~ ("boost" -> boost))
- }
+ // create a lsit of all query correlators that are to be used to filter results
+ val recentUserHistoryFilter = if ( ap.userBias.getOrElse(1f) < 0f ) {
+ // strip any boosts
+ alluserEvents._1.map { i =>
+ FilterCorrelators(i.actionName, i.itemIDs)
+ }.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1)
+ } else List.empty[FilterCorrelators]
- val shouldScore: JValue = parse(
- """
- |{
- | "constant_score": {
- | "filter": {
- | "match_all": {}
- | },
- | "boost": 0
- | }
- |}
- |""".stripMargin)
+ val similarItemsFilter = if ( ap.itemBias.getOrElse(1f) < 0f ) {
+ getBiasedSimilarItems(query).map { i =>
+ FilterCorrelators(i.actionName, i.itemIDs)
+ }.toList
+ } else List.empty[FilterCorrelators]
- shouldFields :+ shouldScore
- }
+ val filteringMetadata = getFilteringMetadata(query)
- /** Build must query part */
- def buildQueryMust(query: Query, boostable: Seq[BoostableCorrelators]): Seq[JValue] = {
+ val filteringDateRange = getFilteringDateRange(query)
- // create a lsit of all query correlators that are to be used to filter results
- val recentUserHistoryFilter: Seq[FilterCorrelators] = if (userBias < 0f) {
- // strip any boosts
- boostable.map(_.toFilterCorrelators).slice(0, maxQueryEvents - 1)
- } else {
- Seq.empty
- }
+ val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata
- val similarItemsFilter: Seq[FilterCorrelators] = if (itemBias < 0f) {
- getBiasedSimilarItems(query).map(_.toFilterCorrelators)
- } else {
- Seq.empty
- }
+ // since users have action history and items have correlators and both correspond to the same "actions" like
+ // purchase or view, we'll pass both to the query if the user history or items correlators are empty
+ // then metadata or backfill must be relied on to return results.
- val filteringMetadata = getFilteringMetadata(query)
- val filteringDateRange = getFilteringDateRange(query)
- val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata
+ val numRecs = query.num.getOrElse(ap.num.getOrElse(defaultURAlgorithmParams.DefaultNum))
- val mustFields: Seq[JValue] = allFilteringCorrelators.map {
- case FilterCorrelators(actionName, itemIDs) =>
- render("terms" -> (actionName -> itemIDs) ~ ("boost" -> 0))
- }
- mustFields ++ filteringDateRange
- }
+ val shouldFields: Option[List[JValue]] = if (allBoostedCorrelators.isEmpty) None
+ else {
+ Some(allBoostedCorrelators.map { i =>
+ render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> i.boost)))
+ }.toList)
+ }
+ val popModelSort = List(parse(
+ """
+ |{
+ | "constant_score": {
+ | "filter": {
+ | "match_all": {}
+ | },
+ | "boost": 0
+ | }
+ |}
+ |""".stripMargin))
+
+ val should: List[JValue] = if (shouldFields.isEmpty) popModelSort else shouldFields.get ::: popModelSort
+
+
+ val mustFields: List[JValue] = allFilteringCorrelators.map { i =>
+ render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> 0)))}.toList
+ val must: List[JValue] = mustFields ::: filteringDateRange
+
+ val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0)))
+ val mustNot: JValue = mustNotFields
+
+ val popQuery = if (ap.recsModel.getOrElse("all") == "all" ||
+ ap.recsModel.getOrElse("all") == "backfill") {
+ Some(List(
+ parse( """{"_score": {"order": "desc"}}"""),
+ parse(
+ s"""
+ |{
+ | "${backfillFieldName}": {
+ | "unmapped_type": "double",
+ | "order": "desc"
+ | }
+ |}""".stripMargin)))
+ } else None
- /** Build not must query part */
- def buildQueryMustNot(query: Query, events: Seq[Event]): JValue = {
- val mustNotFields: JValue = render("ids" -> ("values" -> getExcludedItems(events, query)) ~ ("boost" -> 0))
- mustNotFields
- }
- /** Build sort query part */
- def buildQuerySort(): Seq[JValue] = if (recsModel == RecsModel.All || recsModel == RecsModel.BF) {
- val sortByScore: Seq[JValue] = Seq(parse("""{"_score": {"order": "desc"}}"""))
- val sortByRanks: Seq[JValue] = rankingFieldNames map { fieldName =>
- parse(s"""{ "$fieldName": { "unmapped_type": "double", "order": "desc" } }""")
+ val json =
+ (
+ ("size" -> numRecs) ~
+ ("query"->
+ ("bool"->
+ ("should"-> should) ~
+ ("must"-> must) ~
+ ("must_not"-> mustNot) ~
+ ("minimum_should_match" -> 1))
+ ) ~
+ ("sort" -> popQuery))
+ val j = compact(render(json))
+ logger.info(s"Query: \n${j}\n")
+ (compact(render(json)), alluserEvents._2)
+ } catch {
+ case e: IllegalArgumentException =>
+ ("", List.empty[Event])
}
- sortByScore ++ sortByRanks
- } else {
- Seq.empty
}
/** Create a list of item ids that the user has interacted with or are not to be included in recommendations */
- def getExcludedItems(userEvents: Seq[Event], query: Query): Seq[String] = {
+ def getExcludedItems(userEvents: List[Event], query: Query): List[String] = {
val blacklistedItems = userEvents.filter { event =>
- // either a list or an empty list of filtering events so honor them
- blacklistEvents match {
- case Nil => modelEventNames.head equals event.event
- case _ => blacklistEvents contains event.event
- }
- }.map(_.targetEntityId.getOrElse("")) ++ query.blacklistItems.getOrEmpty.distinct
+ if (ap.blacklistEvents.nonEmpty) {
+ // either a list or an empty list of filtering events so honor them
+ if (ap.blacklistEvents.get == List.empty[String]) false // no filtering events so all are allowed
+ else ap.blacklistEvents.get.contains(event.event) // if its filtered remove it, else allow
+ } else ap.eventNames(0).equals(event.event) // remove the primary event if nothing specified
+ }.map (_.targetEntityId.getOrElse("")) ++ query.blacklistItems.getOrElse(List.empty[String])
+ .distinct
// Now conditionally add the query item itself
- val includeSelf = query.returnSelf.getOrElse(returnSelf)
- val allExcludedItems = if (!includeSelf && query.item.nonEmpty) {
- blacklistedItems :+ query.item.get
- } // add the query item to be excuded
- else {
+ val includeSelf = query.returnSelf.getOrElse(ap.returnSelf.getOrElse(false))
+ val allExcludedItems = if ( !includeSelf && query.item.nonEmpty )
+ blacklistedItems :+ query.item.get // add the query item to be excuded
+ else
blacklistedItems
- }
allExcludedItems.distinct
}
/** Get similar items for an item, these are already in the action correlators in ES */
def getBiasedSimilarItems(query: Query): Seq[BoostableCorrelators] = {
if (query.item.nonEmpty) {
- val m = EsClient.getSource(esIndex, esType, query.item.get)
+ val m = esClient.getSource(ap.indexName, ap.typeName, query.item.get)
if (m != null) {
- val itemEventBias = query.itemBias.getOrElse(itemBias)
+ val itemEventBias = query.itemBias.getOrElse(ap.itemBias.getOrElse(1f))
val itemEventsBoost = if (itemEventBias > 0 && itemEventBias != 1) Some(itemEventBias) else None
- modelEventNames.map { action =>
- val items: Seq[String] = try {
- if (m.containsKey(action) && m.get(action) != null) {
- m.get(action).asInstanceOf[util.ArrayList[String]].asScala
- } else {
- Seq.empty[String]
- }
+ ap.eventNames.map { action =>
+ val items = try {
+ if (m.containsKey(action) && m.get(action) != null) m.get(action).asInstanceOf[util.ArrayList[String]].toList
+ else List.empty[String]
} catch {
case cce: ClassCastException =>
- logger.warn(s"Bad value in item [${query.item}] corresponding to key: [$action] that was not a Seq[String] ignored.")
- Seq.empty[String]
+ logger.warn(s"Bad value in item ${query.item} corresponding to key: ${action} that was not a List[String]" +
+ " ignored.")
+ List.empty[String]
}
- val rItems = if (items.size <= maxQueryEvents) items else items.slice(0, maxQueryEvents - 1)
+ val rItems = if (items.size <= ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents))
+ items else items.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1)
BoostableCorrelators(action, rItems, itemEventsBoost)
}
- } else {
- Seq.empty
- } // no similar items
- } else {
- Seq.empty[BoostableCorrelators]
- } // no item specified
+ } else List.empty[BoostableCorrelators] // no similar items
+ } else List.empty[BoostableCorrelators] // no item specified
}
/** Get recent events of the user on items to create the recommendations query from */
- def getBiasedRecentUserActions(query: Query): (Seq[BoostableCorrelators], Seq[Event]) = {
+ def getBiasedRecentUserActions(
+ query: Query): (Seq[BoostableCorrelators], List[Event]) = {
val recentEvents = try {
LEventStore.findByEntity(
- appName = appName,
+ appName = ap.appName,
// entityType and entityId is specified for fast lookup
entityType = "user",
entityId = query.user.get,
// one query per eventName is not ideal, maybe one query for lots of events then split by eventName
//eventNames = Some(Seq(action)),// get all and separate later
- eventNames = Some(queryEventNames), // get all and separate later
+ eventNames = Some(queryEventNames),// get all and separate later
targetEntityType = None,
// limit = Some(maxQueryEvents), // this will get all history then each action can be limited before using in
// the query
latest = true,
// set time limit to avoid super long DB access
- timeout = Duration(200, "millis")).toList
+ timeout = Duration(200, "millis")
+ ).toList
} catch {
case e: scala.concurrent.TimeoutException =>
- logger.error(s"Timeout when read recent events. Empty list is used. $e")
- Seq.empty[Event]
+ logger.error(s"Timeout when read recent events." +
+ s" Empty list is used. ${e}")
+ List.empty[Event]
case e: NoSuchElementException => // todo: bad form to use an exception to check if there is a user id
logger.info("No user id for recs, returning similar items for the item specified")
- Seq.empty[Event]
+ List.empty[Event]
case e: Exception => // fatal because of error, an empty query
- logger.error(s"Error when read recent events: $e")
+ logger.error(s"Error when read recent events: ${e}")
throw e
}
- val userEventBias = query.userBias.getOrElse(userBias)
+ val userEventBias = query.userBias.getOrElse(ap.userBias.getOrElse(1f))
val userEventsBoost = if (userEventBias > 0 && userEventBias != 1) Some(userEventBias) else None
+ //val rActions = ap.eventNames.map { action =>
val rActions = queryEventNames.map { action =>
- var items = Seq.empty[String]
+ var items = List[String]()
- for (event <- recentEvents)
- if (event.event == action && items.size < maxQueryEvents) {
- items = event.targetEntityId.get +: items
+ for ( event <- recentEvents )
+ if (event.event == action && items.size <
+ ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents)) {
+ items = event.targetEntityId.get :: items
// todo: may throw exception and we should ignore the event instead of crashing
}
// userBias may be None, which will cause no JSON output for this
@@ -737,59 +533,70 @@ class URAlgorithm(val ap: URAlgorithmParams)
}
/** get all metadata fields that potentially have boosts (not filters) */
- def getBoostedMetadata(query: Query): Seq[BoostableCorrelators] = {
- val paramsBoostedFields = fields.filter(_.bias < 0f)
- val queryBoostedFields = query.fields.getOrEmpty.filter(_.bias >= 0f)
+ def getBoostedMetadata( query: Query ): List[BoostableCorrelators] = {
+ val paramsBoostedFields = ap.fields.getOrElse(List.empty[Field]).filter( field => field.bias < 0 ).map { field =>
+ BoostableCorrelators(field.name, field.values, Some(field.bias))
+ }
- (queryBoostedFields ++ paramsBoostedFields)
- .map(field => BoostableCorrelators(field.name, field.values, Some(field.bias)))
- .distinct // de-dup and favor query fields
+ val queryBoostedFields = query.fields.getOrElse(List.empty[Field]).filter { field =>
+ field.bias >= 0f
+ }.map { field =>
+ BoostableCorrelators(field.name, field.values, Some(field.bias))
+ }
+
+ (queryBoostedFields ++ paramsBoostedFields).distinct // de-dup and favor query fields
}
/** get all metadata fields that are filters (not boosts) */
- def getFilteringMetadata(query: Query): Seq[FilterCorrelators] = {
- val paramsFilterFields = fields.filter(_.bias >= 0f)
- val queryFilterFields = query.fields.getOrEmpty.filter(_.bias < 0f)
+ def getFilteringMetadata( query: Query ): List[FilterCorrelators] = {
+ val paramsFilterFields = ap.fields.getOrElse(List.empty[Field]).filter( field => field.bias >= 0 ).map { field =>
+ FilterCorrelators(field.name, field.values)
+ }
+
+ val queryFilterFields = query.fields.getOrElse(List.empty[Field]).filter { field =>
+ field.bias < 0f
+ }.map { field =>
+ FilterCorrelators(field.name, field.values)
+ }
- (queryFilterFields ++ paramsFilterFields)
- .map(field => FilterCorrelators(field.name, field.values))
- .distinct // de-dup and favor query fields
+ (queryFilterFields ++ paramsFilterFields).distinct // de-dup and favor query fields
}
/** get part of query for dates and date ranges */
- def getFilteringDateRange(query: Query): Seq[JValue] = {
+ def getFilteringDateRange( query: Query ): List[JValue] = {
+ var json: List[JValue] = List.empty[JValue]
// currentDate in the query overrides the dateRange in the same query so ignore daterange if both
val currentDate = query.currentDate.getOrElse(DateTime.now().toDateTimeISO.toString)
- val json: Seq[JValue] = if (query.dateRange.nonEmpty &&
+ if (query.dateRange.nonEmpty &&
(query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty)) {
val name = query.dateRange.get.name
val before = query.dateRange.get.before.getOrElse("")
val after = query.dateRange.get.after.getOrElse("")
val rangeStart = s"""
- |{
- | "constant_score": {
- | "filter": {
- | "range": {
- | "$name": {
+ |{
+ | "constant_score": {
+ | "filter": {
+ | "range": {
+ | "${name}": {
""".stripMargin
val rangeAfter = s"""
- | "gt": "$after"
+ | "gt": "${after}"
""".stripMargin
val rangeBefore = s"""
- | "lt": "$before"
+ | "lt": "${before}"
""".stripMargin
val rangeEnd = s"""
- | }
- | }
- | },
- | "boost": 0
- | }
- |}
+ | }
+ | }
+ | },
+ | "boost": 0
+ | }
+ |}
""".stripMargin
var range = rangeStart
@@ -800,52 +607,46 @@ class URAlgorithm(val ap: URAlgorithmParams)
if (!before.isEmpty) range += rangeBefore
range += rangeEnd
- Seq(parse(range))
- } else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) { // use the query date or system date
+ json = json :+ parse(range)
+ } else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) {// use the query date or system date
val availableDate = ap.availableDateName.get // never None
val expireDate = ap.expireDateName.get
+
val available = s"""
- |{
- | "constant_score": {
- | "filter": {
- | "range": {
- | "$availableDate": {
- | "lte": "$currentDate"
- | }
- | }
- | },
- | "boost": 0
- | }
- |}
+ |{
+ | "constant_score": {
+ | "filter": {
+ | "range": {
+ | "${availableDate}": {
+ | "lte": "${currentDate}"
+ | }
+ | }
+ | },
+ | "boost": 0
+ | }
+ |}
""".stripMargin
+
+ json = json :+ parse(available)
val expire = s"""
- |{
- | "constant_score": {
- | "filter": {
- | "range": {
- | "$expireDate": {
- | "gt": "$currentDate"
- | }
- | }
- | },
- | "boost": 0
- | }
- |}
+ |{
+ | "constant_score": {
+ | "filter": {
+ | "range": {
+ | "${expireDate}": {
+ | "gt": "${currentDate}"
+ | }
+ | }
+ | },
+ | "boost": 0
+ | }
+ |}
""".stripMargin
-
- Seq(parse(available), parse(expire))
+ json = json :+ parse(expire)
} else {
- logger.info(
- """
- |Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect.
- |Ingoring date information for this query.""".stripMargin)
- Seq.empty
+ logger.info("Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect.\nIngoring date information for this query.")
}
json
}
- def getRankingMapping: Map[String, String] = rankingFieldNames map { fieldName =>
- fieldName -> "float"
- } toMap
-
}
diff --git a/src/main/scala/URModel.scala b/src/main/scala/URModel.scala
index a93e1fe..13a9ffa 100644
--- a/src/main/scala/URModel.scala
+++ b/src/main/scala/URModel.scala
@@ -15,108 +15,192 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
+
+import java.util.Date
import grizzled.slf4j.Logger
-import io.prediction.data.storage.DataMap
+
+import org.apache.predictionio.controller.{PersistentModelLoader, PersistentModel}
+import org.apache.predictionio.data.storage.PropertyMap
import org.apache.mahout.math.indexeddataset.IndexedDataset
-import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
-import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.joda.time.DateTime
import org.json4s.JsonAST.JArray
import org.json4s._
-import org.template.conversions.{ IndexedDatasetConversions, ItemID, ItemProps }
+import com.uniflash.conversions.IndexedDatasetConversions
+import org.elasticsearch.spark._
+import org.apache.spark.SparkContext
+
/** Universal Recommender models to save in ES */
class URModel(
- coocurrenceMatrices: Seq[(ItemID, IndexedDataset)] = Seq.empty,
- propertiesRDDs: Seq[RDD[(ItemID, ItemProps)]] = Seq.empty,
- typeMappings: Map[String, String] = Map.empty, // maps fieldname that need type mapping in Elasticsearch
- nullModel: Boolean = false)(implicit sc: SparkContext) {
-
- @transient lazy val logger: Logger = Logger[this.type]
+ coocurrenceMatrices: Option[List[(String, IndexedDataset)]],
+ fieldsRDD: Option[RDD[(String, PropertyMap)]],
+ indexName: String,
+ dateNames: Option[List[String]] = None,
+ nullModel: Boolean = false,
+ typeMappings: Option[Map[String, String]] = None, // maps fieldname that need type mapping in Elasticsearch
+ propertiesRDD: Option[RDD[collection.Map[String, Any]]] = None)
+ // a little hack to allow a dummy model used to save but not
+ // retrieve (see companion object's apply)
+ extends PersistentModel[URAlgorithmParams] {
+ @transient lazy val logger = Logger[this.type]
/** Save all fields to be indexed by Elasticsearch and queried for recs
- * This will is something like a table with row IDs = item IDs and separate fields for all
- * cooccurrence and cross-cooccurrence correlators and metadata for each item. Metadata fields are
- * limited to text term collections so vector types. Scalar values can be used but depend on
- * Elasticsearch's support. One exception is the Data scalar, which is also supported
- * @return always returns true since most other reasons to not save cause exceptions
- */
- def save(dateNames: Seq[String], esIndex: String, esType: String): Boolean = {
-
- logger.debug(s"Start save model")
+ * This will is something like a table with row IDs = item IDs and separate fields for all
+ * cooccurrence and cross-cooccurrence correlators and metadata for each item. Metadata fields are
+ * limited to text term collections so vector types. Scalar values can be used but depend on
+ * Elasticsearch's support. One exception is the Data scalar, which is also supported
+ * @param id
+ * @param params from engine.json, algorithm control params
+ * @param sc The spark constext already created for execution
+ * @return always returns true since most other reasons to not save cause exceptions
+ */
+ def save(id: String, params: URAlgorithmParams, sc: SparkContext): Boolean = {
if (nullModel) throw new IllegalStateException("Saving a null model created from loading an old one.")
+ val esIndexURI = s"/${params.indexName}/${params.typeName}"
+
// for ES we need to create the entire index in an rdd of maps, one per item so we'll use
// convert cooccurrence matrices into correlators as RDD[(itemID, (actionName, Seq[itemID])]
// do they need to be in Elasticsearch format
logger.info("Converting cooccurrence matrices into correlators")
- val correlatorRDDs: Seq[RDD[(ItemID, ItemProps)]] = coocurrenceMatrices.map {
- case (actionName, dataset) =>
- dataset.asInstanceOf[IndexedDatasetSpark].toStringMapRDD(actionName)
- }
-
- logger.info("Group all properties RDD")
- val groupedRDD: RDD[(ItemID, ItemProps)] = groupAll(correlatorRDDs ++ propertiesRDDs)
- // logger.debug(s"Grouped RDD\n${groupedRDD.take(25).mkString("\n")}")
-
- val esRDD: RDD[Map[String, Any]] = groupedRDD.mapPartitions { iter =>
- iter map {
- case (itemId, itemProps) =>
- val propsMap = itemProps.map {
- case (propName, propValue) =>
- propName -> URModel.extractJvalue(dateNames, propName, propValue)
+ val correlators = if (coocurrenceMatrices.nonEmpty) coocurrenceMatrices.get.map { case (actionName, dataset) =>
+ dataset.asInstanceOf[IndexedDatasetSpark].toStringMapRDD(actionName).asInstanceOf[RDD[(String, Map[String, Any])]]
+ //} else List.empty[RDD[(String, Map[String, Seq[String]])]] // empty mena only calculating PopModel
+ } else List.empty[RDD[(String, Map[String, Any])]] // empty mena only calculating PopModel
+
+ // getting action names since they will be ES fields
+ logger.info(s"Getting a list of action name strings")
+ val allActions = coocurrenceMatrices.getOrElse(List.empty[(String, IndexedDatasetSpark)]).map(_._1)
+
+ logger.info(s"Ready to pass date fields names to closure ${dateNames}")
+ val closureDateNames = dateNames.getOrElse(List.empty[String])
+ // convert the PropertyMap into Map[String, Seq[String]] for ES
+ logger.info("Converting PropertyMap into Elasticsearch style rdd")
+ var properties = List.empty[RDD[(String, Map[String, Any])]]
+ var allPropKeys = List.empty[String]
+ if (fieldsRDD.nonEmpty) {
+ properties = List(fieldsRDD.get.map { case (item, pm) =>
+ var m: Map[String, Any] = Map()
+ for (key <- pm.keySet) {
+ val k = key
+ val v = pm.get[JValue](key)
+ try {
+ // if we get something unexpected, add ignore and add nothing to the map
+ pm.get[JValue](key) match {
+ case JArray(list) => // assumes all lists are string tokens for bias
+ val l = list.map {
+ case JString(s) => s
+ case _ => ""
+ }
+ m = m + (key -> l)
+ case JString(s) => // name for this field is in engine params
+ if (closureDateNames.contains(key)) {
+ // one of the date fields
+ val dateTime = new DateTime(s)
+ val date: java.util.Date = dateTime.toDate()
+ m = m + (key -> date)
+ }
+ case JDouble(rank) => // only the ranking double from PopModel should be here
+ m = m + (key -> rank)
+ case JInt(someInt) => // not sure what this is but pass it on
+ m = m + (key -> someInt)
+ }
+ } catch {
+ case e: ClassCastException => e
+ case e: IllegalArgumentException => e
+ case e: MatchError => e
+ //got something we didn't expect so ignore it, put nothing in the map
}
- propsMap + ("id" -> itemId)
- }
+ }
+ (item, m)
+ })
+ allPropKeys = properties.head.flatMap(_._2.keySet).distinct.collect().toList
}
- // logger.debug(s"ES RDD\n${esRDD.take(25).mkString("\n")}")
- val esFields: List[String] = esRDD.flatMap(_.keySet).distinct().collect.toList
- logger.info(s"ES fields[${esFields.size}]: $esFields")
- EsClient.hotSwap(esIndex, esType, esRDD, esFields, typeMappings)
+ // these need to be indexed with "not_analyzed" and no norms so have to
+ // collect all field names before ES index create
+ val allFields = (allActions ++ allPropKeys).distinct // shouldn't need distinct but it's fast
+
+ if (propertiesRDD.isEmpty) {
+ // Elasticsearch takes a Map with all fields, not a tuple
+ logger.info("Grouping all correlators into doc + fields for writing to index")
+ logger.info(s"Finding non-empty RDDs from a list of ${correlators.length} correlators and " +
+ s"${properties.length} properties")
+ val esRDDs: List[RDD[(String, Map[String, Any])]] =
+ //(correlators ::: properties).filterNot(c => c.isEmpty())// for some reason way too slow
+ (correlators ::: properties)
+ //c.take(1).length == 0
+ if (esRDDs.nonEmpty) {
+ val esFields = groupAll(esRDDs).map { case (item, map) =>
+ // todo: every map's items must be checked for value type and converted before writing to ES
+ val esMap = map + ("id" -> item)
+ esMap
+ }
+ // create a new index then hot-swap the new index by re-aliasing to it then delete old index
+ logger.info("New data to index, performing a hot swap of the index.")
+ esClient.hotSwap(
+ params.indexName,
+ params.typeName,
+ esFields.asInstanceOf[RDD[scala.collection.Map[String,Any]]],
+ allFields,
+ typeMappings)
+ } else logger.warn("No data to write. May have been caused by a failed or stopped `pio train`, " +
+ "try running it again")
+
+ } else {
+ // this happens when updating only the popularity backfill model but to do a hotSwap we need to dup the
+ // entire index
+
+ // create a new index then hot-swap the new index by re-aliasing to it then delete old index
+ esClient.hotSwap(params.indexName, params.typeName, propertiesRDD.get, allFields,
+ typeMappings)
+ }
true
}
+
+ def groupAll( fields: Seq[RDD[(String, (Map[String, Any]))]]): RDD[(String, (Map[String, Any]))] = {
+ //if (fields.size > 1 && !fields.head.isEmpty() && !fields(1).isEmpty()) {
+ if (fields.size > 1) {
+ fields.head.cogroup[Map[String, Any]](groupAll(fields.drop(1))).map { case (key, pairMapSeqs) =>
+ // to be safe merge all maps but should only be one per rdd element
+ val rdd1Maps = pairMapSeqs._1.foldLeft(Map.empty[String, Any])(_ ++ _)
+ val rdd2Maps = pairMapSeqs._2.foldLeft(Map.empty[String, Any])(_ ++ _)
+ val fullMap = rdd1Maps ++ rdd2Maps
+ (key, fullMap)
+ }
+ } else fields.head
+ }
- def groupAll(fields: Seq[RDD[(ItemID, ItemProps)]]): RDD[(ItemID, ItemProps)] = {
- fields.fold(sc.emptyRDD[(ItemID, ItemProps)])(_ ++ _).reduceByKey(_ ++ _)
+ override def toString = {
+ s"URModel in Elasticsearch at index: ${indexName}"
}
+
+
}
-object URModel {
- @transient lazy val logger: Logger = Logger[this.type]
+object URModel
+ extends PersistentModelLoader[URAlgorithmParams, URModel] {
+ @transient lazy val logger = Logger[this.type]
/** This is actually only used to read saved values and since they are in Elasticsearch we don't need to read
- * this means we create a null model since it will not be used.
- * todo: we should rejigger the template framework so this is not required.
- * @param id ignored
- * @param params ignored
- * @param sc ignored
- * @return dummy null model
- */
+ * this means we create a null model since it will not be used.
+ * todo: we should rejigger the template framework so this is not required.
+ * @param id ignored
+ * @param params ignored
+ * @param sc ignored
+ * @return dummy null model
+ */
def apply(id: String, params: URAlgorithmParams, sc: Option[SparkContext]): URModel = {
// todo: need changes in PIO to remove the need for this
- new URModel(null, null, null, nullModel = true)(sc.get)
- }
-
- def extractJvalue(dateNames: Seq[String], key: String, value: Any): Any = value match {
- case JArray(list) => list.map(extractJvalue(dateNames, key, _))
- case JString(s) =>
- if (dateNames.contains(key)) {
- new DateTime(s).toDate
- } else if (RankingFieldName.toSeq.contains(key)) {
- s.toDouble
- } else {
- s
- }
- case JDouble(double) => double
- case JInt(int) => int
- case JBool(bool) => bool
- case _ => value
+ val urm = new URModel(null, null, null, nullModel = true)
+ logger.info("Created dummy null model")
+ urm
}
}
diff --git a/src/main/scala/package.scala b/src/main/scala/package.scala
index c3b4810..313754c 100644
--- a/src/main/scala/package.scala
+++ b/src/main/scala/package.scala
@@ -15,72 +15,20 @@
* limitations under the License.
*/
-package org.template
+package com.uniflash
import grizzled.slf4j.Logger
-
import scala.collection.JavaConversions._
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.mahout.sparkbindings._
import org.apache.spark.rdd.RDD
-import org.json4s._
/** Utility conversions for IndexedDatasetSpark */
package object conversions {
- type UserID = String
- type ActionID = String
- type ItemID = String
- // Item properties (fieldName, fieldValue)
- type ItemProps = Map[String, JValue]
-
- def drawActionML(implicit logger: Logger): Unit = {
- val actionML =
- """
- |
- | _ _ __ __ _
- | /\ | | (_) | \/ | |
- | / \ ___| |_ _ ___ _ __ | \ / | |
- | / /\ \ / __| __| |/ _ \| '_ \| |\/| | |
- | / ____ \ (__| |_| | (_) | | | | | | | |____
- | /_/ \_\___|\__|_|\___/|_| |_|_| |_|______|
- |
- |
- """.stripMargin
-
- logger.info(actionML)
- }
-
- def drawInfo(title: String, dataMap: Seq[(String, Any)])(implicit logger: Logger): Unit = {
- val leftAlignFormat = "║ %-30s%-28s ║"
-
- val line = "═" * 60
-
- val preparedTitle = "║ %-58s ║".format(title)
- val data = dataMap.map {
- case (key, value) =>
- leftAlignFormat.format(key, value)
- } mkString "\n"
-
- logger.info(
- s"""
- |╔$line╗
- |$preparedTitle
- |$data
- |╚$line╝
- |""".stripMargin)
-
- }
-
- implicit class OptionCollection[T](collectionOpt: Option[Seq[T]]) {
- def getOrEmpty: Seq[T] = {
- collectionOpt.getOrElse(Seq.empty[T])
- }
- }
-
implicit class IndexedDatasetConversions(val indexedDataset: IndexedDatasetSpark) {
- def toStringMapRDD(actionName: ActionID): RDD[(ItemID, ItemProps)] = {
+ def toStringMapRDD(actionName: String): RDD[(String, Map[String, Seq[String]])] = {
@transient lazy val logger = Logger[this.type]
//val matrix = indexedDataset.matrix.checkpoint()
@@ -93,37 +41,37 @@ package object conversions {
// may want to mapPartition and create bulk updates as a slight optimization
// creates an RDD of (itemID, Map[correlatorName, list-of-correlator-values])
- indexedDataset.matrix.rdd.map[(ItemID, ItemProps)] {
- case (rowNum, itemVector) =>
-
- // turn non-zeros into list for sorting
- var itemList = List[(Int, Double)]()
- for (ve <- itemVector.nonZeroes) {
- itemList = itemList :+ (ve.index, ve.get)
+ indexedDataset.matrix.rdd.map[(String, Map[String, Seq[String]])] { case (rowNum, itemVector) =>
+
+ // turn non-zeros into list for sorting
+ var itemList = List[(Int, Double)]()
+ for (ve <- itemVector.nonZeroes) {
+ val v = ve
+ itemList = itemList :+(ve.index, ve.get)
+ }
+ //sort by highest strength value descending(-)
+ val vector = itemList.sortBy { elem => -elem._2 }
+
+ val itemID = rowIDDictionary_bcast.value.inverse.getOrElse(rowNum, "INVALID_ITEM_ID")
+ try {
+
+ require(itemID != "INVALID_ITEM_ID", s"Bad row number in matrix, skipping item ${rowNum}")
+ require(vector.nonEmpty, s"No values so skipping item ${rowNum}")
+
+ // create a list of element ids
+ val values = vector.map { item =>
+ columnIDDictionary_bcast.value.inverse.getOrElse(item._1, "") // should always be in the dictionary
}
- //sort by highest strength value descending(-)
- val vector = itemList.sortBy { elem => -elem._2 }
- val itemID = rowIDDictionary_bcast.value.inverse.getOrElse(rowNum, "INVALID_ITEM_ID")
- try {
+ (itemID, Map(actionName -> values))
- require(itemID != "INVALID_ITEM_ID", s"Bad row number in matrix, skipping item $rowNum")
- require(vector.nonEmpty, s"No values so skipping item $rowNum")
-
- // create a list of element ids
- val values = JArray(vector.map { item =>
- JString(columnIDDictionary_bcast.value.inverse.getOrElse(item._1, "")) // should always be in the dictionary
- })
-
- (itemID, Map(actionName -> values))
-
- } catch {
- case cce: IllegalArgumentException => //non-fatal, ignore line
- null.asInstanceOf[(ItemID, ItemProps)]
- }
+ } catch {
+ case cce: IllegalArgumentException => //non-fatal, ignore line
+ null.asInstanceOf[(String, Map[String, Seq[String]])]
+ }
}.filter(_ != null)
}
}
-}
+}
\ No newline at end of file