From 144d634e8f0970b3ed8db6077a67a9575411e876 Mon Sep 17 00:00:00 2001 From: iamShantanu101 Date: Sun, 6 Nov 2016 23:58:46 +0530 Subject: [PATCH] Changes made for replacing io.prediction.* to org.apache.predictionio.* Changes are made in build.sbt for replacing "io.prediction" with "org.apache.predictionio" and "core" with "apache-predictioio-core" modified: build.sbt modified: src/main/scala/DataSource.scala modified: src/main/scala/Engine.scala modified: src/main/scala/EsClient.scala modified: src/main/scala/PopModel.scala modified: src/main/scala/Preparator.scala modified: src/main/scala/Serving.scala modified: src/main/scala/URAlgorithm.scala modified: src/main/scala/URModel.scala modified: src/main/scala/package.scala --- build.sbt | 30 +- src/main/scala/DataSource.scala | 106 ++- src/main/scala/Engine.scala | 82 ++- src/main/scala/EsClient.scala | 164 ++--- src/main/scala/PopModel.scala | 221 +++--- src/main/scala/Preparator.scala | 56 +- src/main/scala/Serving.scala | 12 +- src/main/scala/URAlgorithm.scala | 1091 ++++++++++++------------------ src/main/scala/URModel.scala | 222 ++++-- src/main/scala/package.scala | 108 +-- 10 files changed, 914 insertions(+), 1178 deletions(-) diff --git a/build.sbt b/build.sbt index 29ea0c3..20a1a08 100644 --- a/build.sbt +++ b/build.sbt @@ -1,21 +1,15 @@ -import scalariform.formatter.preferences._ -import com.typesafe.sbt.SbtScalariform -import com.typesafe.sbt.SbtScalariform.ScalariformKeys - name := "template-scala-parallel-universal-recommendation" -version := "0.4.2" +version := "0.2.3" organization := "io.prediction" -val mahoutVersion = "0.13.0-SNAPSHOT" - -val pioVersion = "0.9.7-aml" +val mahoutVersion = "0.11.1" libraryDependencies ++= Seq( - "io.prediction" %% "core" % pioVersion % "provided", - "org.apache.spark" %% "spark-core" % "1.4.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.4.0" % "provided", + "org.apache.predictionio" %% "apache-predictionio-core" % pioVersion.value % "provided", + "org.apache.spark" %% "spark-core" % "1.3.0" % "provided", + "org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided", "org.xerial.snappy" % "snappy-java" % "1.1.1.7", // Mahout's Spark libs "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion, @@ -28,22 +22,14 @@ libraryDependencies ++= Seq( // other external libs "com.thoughtworks.xstream" % "xstream" % "1.4.4" exclude("xmlpull", "xmlpull"), - "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.2" + "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.1.0.Beta4" exclude("org.apache.spark", "spark-catalyst_2.10") exclude("org.apache.spark", "spark-sql_2.10"), - "org.json4s" %% "json4s-native" % "3.2.10") - .map(_.exclude("org.apache.lucene","lucene-core")).map(_.exclude("org.apache.lucene","lucene-analyzers-common")) + "org.json4s" %% "json4s-native" % "3.2.11" +) resolvers += Resolver.mavenLocal -SbtScalariform.scalariformSettings - -ScalariformKeys.preferences := ScalariformKeys.preferences.value - .setPreference(AlignSingleLineCaseStatements, true) - .setPreference(DoubleIndentClassDeclaration, true) - .setPreference(DanglingCloseParenthesis, Prevent) - .setPreference(MultilineScaladocCommentsStartOnFirstLine, true) - assemblyMergeStrategy in assembly := { case "plugin.properties" => MergeStrategy.discard case PathList(ps @ _*) if ps.last endsWith "package-info.class" => diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala index 0c3b979..40685ae 100644 --- a/src/main/scala/DataSource.scala +++ b/src/main/scala/DataSource.scala @@ -15,96 +15,90 @@ * limitations under the License. */ -package org.template - -import _root_.io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, PDataSource, Params } -import _root_.io.prediction.data.storage.PropertyMap -import _root_.io.prediction.data.store.PEventStore -import grizzled.slf4j.Logger -import io.prediction.core.{ EventWindow, SelfCleaningDataSource } +package com.uniflash + +import _root_.org.apache.predictionio.controller.PDataSource +import _root_.org.apache.predictionio.controller.EmptyEvaluationInfo +import _root_.org.apache.predictionio.controller.EmptyActualResult +import _root_.org.apache.predictionio.controller.Params +import _root_.org.apache.predictionio.data.storage.{PropertyMap, Event} +import _root_.org.apache.predictionio.data.store.PEventStore +import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.template.conversions.{ ActionID, ItemID } -import org.template.conversions._ +import grizzled.slf4j.Logger /** Taken from engine.json these are passed in to the DataSource constructor - * - * @param appName registered name for the app - * @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These - * will be used to create the primary correlator and cross-cooccurrence secondary correlators. - */ + * + * @param appName registered name for the app + * @param eventNames a list of named events expected. The first is the primary event, the rest are secondary. These + * will be used to create the primary correlator and cross-cooccurrence secondary correlators. + */ case class DataSourceParams( - appName: String, - eventNames: List[String], // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames - eventWindow: Option[EventWindow]) extends Params + appName: String, + eventNames: List[String]) // IMPORTANT: eventNames must be exactly the same as URAlgorithmParams eventNames + extends Params /** Read specified events from the PEventStore and creates RDDs for each event. A list of pairs (eventName, eventRDD) - * are sent to the Preparator for further processing. - * @param dsp parameters taken from engine.json - */ + * are sent to the Preparator for further processing. + * @param dsp parameters taken from engine.json + */ class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] - with SelfCleaningDataSource { - - @transient override lazy implicit val logger: Logger = Logger[this.type] + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, EmptyActualResult] { - override def appName: String = dsp.appName - override def eventWindow: Option[EventWindow] = dsp.eventWindow - - drawInfo("Init DataSource", Seq( - ("══════════════════════════════", "════════════════════════════"), - ("App name", appName), - ("Event window", eventWindow), - ("Event names", dsp.eventNames))) + @transient lazy val logger = Logger[this.type] /** Reads events from PEventStore and create and RDD for each */ - override def readTraining(sc: SparkContext): TrainingData = { + override + def readTraining(sc: SparkContext): TrainingData = { val eventNames = dsp.eventNames - cleanPersistedPEvents(sc) + val eventsRDD = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(eventNames), - targetEntityType = Some(Some("item")))(sc).repartition(sc.defaultParallelism) + targetEntityType = Some(Some("item")))(sc) // now separate the events by event name - val actionRDDs: List[(ActionID, RDD[(UserID, ItemID)])] = eventNames.map { eventName => + val actionRDDs = eventNames.map { eventName => val actionRDD = eventsRDD.filter { event => - require(eventNames.contains(event.event), s"Unexpected event $event is read.") // is this really needed? + + require(eventNames.contains(event.event), s"Unexpected event ${event} is read.") // is this really needed? require(event.entityId.nonEmpty && event.targetEntityId.get.nonEmpty, "Empty user or item ID") + eventName.equals(event.event) + }.map { event => (event.entityId, event.targetEntityId.get) - } + }.cache() (eventName, actionRDD) - } filterNot { case (_, actionRDD) => actionRDD.isEmpty() } - - logger.debug(s"Received actions for events ${actionRDDs.map(_._1)}") + } // aggregating all $set/$unsets for metadata fields, which are attached to items - val fieldsRDD: RDD[(ItemID, PropertyMap)] = PEventStore.aggregateProperties( - appName = dsp.appName, - entityType = "item")(sc) - // logger.debug(s"FieldsRDD\n${fieldsRDD.take(25).mkString("\n")}") + val fieldsRDD = PEventStore.aggregateProperties( + appName= dsp.appName, + entityType= "item")(sc) // Have a list of (actionName, RDD), for each action // todo: some day allow data to be content, which requires rethinking how to use EventStore - TrainingData(actionRDDs, fieldsRDD) + new TrainingData(actionRDDs, fieldsRDD) } } /** Low level RDD based representation of the data ready for the Preparator - * - * @param actions List of Tuples (actionName, actionRDD)qw - * @param fieldsRDD RDD of item keyed PropertyMap for item metadata - */ -case class TrainingData( - actions: Seq[(ActionID, RDD[(UserID, ItemID)])], - fieldsRDD: RDD[(ItemID, PropertyMap)]) extends Serializable { - - override def toString: String = { + * + * @param actions List of Tuples (actionName, actionRDD)qw + * @param fieldsRDD RDD of item keyed PropertyMap for item metadata + */ +class TrainingData( + val actions: List[(String, RDD[(String, String)])], + val fieldsRDD: RDD[(String, PropertyMap)]) + extends Serializable { + + override def toString = { val a = actions.map { t => s"${t._1} actions: [count:${t._2.count()}] + sample:${t._2.take(2).toList} " }.toString() @@ -112,4 +106,4 @@ case class TrainingData( a + f } -} \ No newline at end of file +} diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index aee709b..8bc065b 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -15,68 +15,62 @@ * limitations under the License. */ -package org.template +package com.uniflash -import grizzled.slf4j.Logger -import io.prediction.controller.{ EmptyActualResult, EmptyEvaluationInfo, Engine, EngineFactory } -import org.template.conversions._ +import java.util.Date + +import org.apache.predictionio.controller.{EngineFactory, Engine} /** This file contains case classes that are used with reflection to specify how query and config - * JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be - * formed. The same for param case classes. - */ + * JSON is to be parsed. the Query case class, for instance defines the way a JSON query is to be + * formed. The same for param case classes. + */ /** The Query spec with optional values. The only hard rule is that there must be either a user or - * an item id. All other values are optional. - */ + * an item id. All other values are optional. */ case class Query( - user: Option[String] = None, // must be a user or item id - userBias: Option[Float] = None, // default: whatever is in algorithm params or 1 - item: Option[String] = None, // must be a user or item id - itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1 - fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None - currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's - // expireDateName value and availableDateName value, all are ISO 8601 dates - dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field - blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None - returnSelf: Option[Boolean] = None, // means for an item query should the item itself be returned, defaults - // to what is in the algorithm params or false - num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20 - eventNames: Option[List[String]], // names used to ID all user actions - withRanks: Option[Boolean] = None) // Add to ItemScore rank fields values, default fasle - extends Serializable + user: Option[String] = None, // must be a user or item id + userBias: Option[Float] = None, // default: whatever is in algorithm params or 1 + item: Option[String] = None, // must be a user or item id + itemBias: Option[Float] = None, // default: whatever is in algorithm params or 1 + fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None + currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's + // expireDateName value and availableDateName value, all are ISO 8601 dates + dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field + blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None + returnSelf: Option[Boolean] = None,// means for an item query should the item itself be returned, defaults + // to what is in the algorithm params or false + num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20 + eventNames: Option[List[String]]) // names used to ID all user actions + extends Serializable /** Used to specify how Fields are represented in engine.json */ case class Field( // no optional values for fields, whne specified - name: String, // name of metadata field - values: Seq[String], // fields can have multiple values like tags of a single value as when using hierarchical - // taxonomies - bias: Float) // any positive value is a boost, negative is a filter - extends Serializable + name: String, // name of metadata field + values: Array[String], // fields can have multiple values like tags of a single value as when using hierarchical + // taxonomies + bias: Float)// any positive value is a boost, negative is a filter + extends Serializable /** Used to specify the date range for a query */ case class DateRange( - name: String, // name of item property for the date comparison - before: Option[String], // empty strings means no filter - after: Option[String]) // both empty should be ignored - extends Serializable + name: String, // name of item property for the date comparison + before: Option[String], // empty strings means no filter + after: Option[String]) // both empty should be ignored + extends Serializable -/** results of a URAlgoritm.predict */ +/** results of a MMRAlgoritm.predict */ case class PredictedResult( - itemScores: Array[ItemScore]) - extends Serializable + itemScores: Array[ItemScore]) + extends Serializable case class ItemScore( - item: ItemID, // item id - score: Double, // used to rank, original score returned from teh search engine - ranks: Option[Map[String, Double]] = None) extends Serializable + item: String, // item id + score: Double )// used to rank, original score returned from teh search engine + extends Serializable object RecommendationEngine extends EngineFactory { - - @transient lazy implicit val logger: Logger = Logger[this.type] - drawActionML - - def apply(): Engine[TrainingData, EmptyEvaluationInfo, PreparedData, Query, PredictedResult, EmptyActualResult] = { + def apply() = { new Engine( classOf[DataSource], classOf[Preparator], diff --git a/src/main/scala/EsClient.scala b/src/main/scala/EsClient.scala index 7ac0e33..e06a028 100644 --- a/src/main/scala/EsClient.scala +++ b/src/main/scala/EsClient.scala @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.template +package com.uniflash import java.util import grizzled.slf4j.Logger -import io.prediction.data.storage._ +import org.apache.predictionio.data.storage.{Storage, StorageClientConfig, elasticsearch} import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -31,51 +31,48 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques import org.elasticsearch.action.admin.indices.refresh.RefreshRequest import org.elasticsearch.action.get.GetResponse import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.{ ImmutableSettings, Settings } +import org.elasticsearch.common.settings.{Settings, ImmutableSettings} import org.joda.time.DateTime import org.json4s.jackson.JsonMethods._ import org.elasticsearch.spark._ import org.elasticsearch.node.NodeBuilder._ -import org.elasticsearch.search.SearchHits -import org.json4s.JValue -import org.template.conversions.{ ItemID, ItemProps } import scala.collection.immutable import scala.collection.parallel.mutable /** Elasticsearch notes: - * 1) every query clause wil laffect scores unless it has a constant_score and boost: 0 - * 2) the Spark index writer is fast but must assemble all data for the index before the write occurs - * 3) many operations must be followed by a refresh before the action takes effect--sortof like a transaction commit - * 4) to use like a DB you must specify that the index of fields are `not_analyzed` so they won't be lowercased, - * stemmed, tokenized, etc. Then the values are literal and must match exactly what is in the query (no analyzer) - */ + * 1) every query clause wil laffect scores unless it has a constant_score and boost: 0 + * 2) the Spark index writer is fast but must assemble all data for the index before the write occurs + * 3) many operations must be followed by a refresh before the action takes effect--sortof like a transaction commit + * 4) to use like a DB you must specify that the index of fields are `not_analyzed` so they won't be lowercased, + * stemmed, tokenized, etc. Then the values are literal and must match exactly what is in the query (no analyzer) + */ /** Defines methods to use on Elasticsearch. */ -object EsClient { - @transient lazy val logger: Logger = Logger[this.type] +object esClient { + @transient lazy val logger = Logger[this.type] private lazy val client = if (Storage.getConfig("ELASTICSEARCH").nonEmpty) - new elasticsearch.StorageClient(Storage.getConfig("ELASTICSEARCH").get).client - else - throw new IllegalStateException("No Elasticsearch client configuration detected, check your pio-env.sh for" + - "proper configuration settings") + new elasticsearch.StorageClient(Storage.getConfig("ELASTICSEARCH").get).client + else + throw new IllegalStateException("No Elasticsearch client configuration detected, check your pio-env.sh for" + + "proper configuration settings") // wrong way that uses only default settings, which will be a localhost ES sever. //private lazy val client = new elasticsearch.StorageClient(StorageClientConfig()).client /** Delete all data from an instance but do not commit it. Until the "refresh" is done on the index - * the changes will not be reflected. - * @param indexName will delete all types under this index, types are not used by the UR - * @param refresh - * @return true if all is well - */ + * the changes will not be reflected. + * @param indexName will delete all types under this index, types are not used by the UR + * @param refresh + * @return true if all is well + */ def deleteIndex(indexName: String, refresh: Boolean = false): Boolean = { //val debug = client.connectedNodes() - if (client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists) { + if (client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists()) { val delete = client.admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet() if (!delete.isAcknowledged) { - logger.info(s"Index $indexName wasn't deleted, but may have quietly failed.") + logger.info(s"Index ${indexName} wasn't deleted, but may have quietly failed.") } else { // now refresh to get it 'committed' // todo: should do this after the new index is created so no index downtime @@ -83,35 +80,35 @@ object EsClient { } true } else { - logger.warn(s"Elasticsearch index: $indexName wasn't deleted because it didn't exist. This may be an error.") + logger.warn(s"Elasticsearch index: ${indexName} wasn't deleted because it didn't exist. This may be an error.") false } } /** Creates a new empty index in Elasticsearch and initializes mappings for fields that will be used - * @param indexName elasticsearch name - * @param indexType names the type of index, usually use the item name - * @param fieldNames ES field names - * @param typeMappings indicates which ES fields are to be not_analyzed without norms - * @param refresh should the index be refreshed so the create is committed - * @return true if all is well - */ + * @param indexName elasticsearch name + * @param indexType names the type of index, usually use the item name + * @param fieldNames ES field names + * @param typeMappings indicates which ES fields are to be not_analyzed without norms + * @param refresh should the index be refreshed so the create is committed + * @return true if all is well + */ def createIndex( indexName: String, - indexType: String, + indexType: String = "items", fieldNames: List[String], - typeMappings: Map[String, String] = Map.empty, + typeMappings: Option[Map[String, String]] = None, refresh: Boolean = false): Boolean = { - if (!client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists) { + if (!client.admin().indices().exists(new IndicesExistsRequest(indexName)).actionGet().isExists()) { var mappings = """ |{ | "properties": { """.stripMargin.replace("\n", "") - def mappingsField(`type`: String) = { + def mappingsField(t: String) = { s""" | : { - | "type": "${`type`}", + | "type": "${t}", | "index": "not_analyzed", | "norms" : { | "enabled" : false @@ -133,18 +130,17 @@ object EsClient { """.stripMargin.replace("\n", "") fieldNames.foreach { fieldName => - if (typeMappings.contains(fieldName)) - mappings += (fieldName + mappingsField(typeMappings(fieldName))) + if (typeMappings.nonEmpty && typeMappings.get.contains(fieldName)) + mappings += (fieldName + mappingsField(typeMappings.get(fieldName))) else // unspecified fields are treated as not_analyzed strings mappings += (fieldName + mappingsField("string")) } mappings += mappingsTail // any other string is not_analyzed - // logger.debug(s"ES mapping: $mappings") - val cir = new CreateIndexRequest(indexName).mapping(indexType, mappings) + val cir = new CreateIndexRequest(indexName).mapping("items",mappings) val create = client.admin().indices().create(cir).actionGet() if (!create.isAcknowledged) { - logger.info(s"Index $indexName wasn't created, but may have quietly failed.") + logger.info(s"Index ${indexName} wasn't created, but may have quietly failed.") } else { // now refresh to get it 'committed' // todo: should do this after the new index is created so no index downtime @@ -152,7 +148,7 @@ object EsClient { } true } else { - logger.warn(s"Elasticsearch index: $indexName wasn't created because it already exists. This may be an error.") + logger.warn(s"Elasticsearch index: ${indexName} wasn't created because it already exists. This may be an error.") false } } @@ -165,25 +161,22 @@ object EsClient { /** Create new index and hot-swap the new after it's indexed and ready to take over, then delete the old */ def hotSwap( alias: String, - typeName: String, - indexRDD: RDD[Map[String, Any]], + typeName: String = "items", + indexRDD: RDD[scala.collection.Map[String,Any]], fieldNames: List[String], - typeMappings: Map[String, String] = Map.empty): Unit = { + typeMappings: Option[Map[String, String]] = None): Unit = { // get index for alias, change a char, create new one with new id and index it, swap alias and delete old one val aliasMetadata = client.admin().indices().prepareGetAliases(alias).get().getAliases val newIndex = alias + "_" + DateTime.now().getMillis.toString - - logger.debug(s"Create new index: $newIndex, $typeName, $fieldNames, $typeMappings") createIndex(newIndex, typeName, fieldNames, typeMappings) val newIndexURI = "/" + newIndex + "/" + typeName - // logger.debug(s"Save to ES[$newIndexURI]:\n${indexRDD.take(25).mkString("\n")}") indexRDD.saveToEs(newIndexURI, Map("es.mapping.id" -> "id")) //refreshIndex(newIndex) if (!aliasMetadata.isEmpty - && aliasMetadata.get(alias) != null - && aliasMetadata.get(alias).get(0) != null) { // was alias so remove the old one + && aliasMetadata.get(alias) != null + && aliasMetadata.get(alias).get(0) != null) { // was alias so remove the old one //append the DateTime to the alias to create an index name val oldIndex = aliasMetadata.get(alias).get(0).getIndexRouting client.admin().indices().prepareAliases() @@ -206,34 +199,40 @@ object EsClient { } // clean out any old indexes that were the product of a failed train? val indices = util.Arrays.asList(client.admin().indices().prepareGetIndex().get().indices()).get(0) - indices.map { index => + indices.map{ index => if (index.contains(alias) && index != newIndex) deleteIndex(index) //clean out any old orphaned indexes } } /** Performs a search using the JSON query String - * - * @param query the JSON query string parable by Elasticsearch - * @param indexName the index to search - * @return a [PredictedResults] collection - */ - def search(query: String, indexName: String): Option[SearchHits] = { + * + * @param query the JSON query string parable by Elasticsearch + * @param indexName the index to search + * @return a [PredictedResults] collection + */ + def search(query: String, indexName: String): PredictedResult = { val sr = client.prepareSearch(indexName).setSource(query).get() + if (!sr.isTimedOut) { - Some(sr.getHits) + val recs = sr.getHits.getHits.map( hit => new ItemScore(hit.getId, hit.getScore.toDouble) ) + logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " + + s"a possible ${sr.getHits.totalHits()}") + new PredictedResult(recs) } else { - None + logger.info(s"No results for query ${parse(query)}") + new PredictedResult(Array.empty[ItemScore]) } + } /** Gets the "source" field of an Elasticsearch document - * - * @param indexName index that contains the doc/item - * @param typeName type name used to construct ES REST URI - * @param doc for UR the item id - * @return source [java.util.Map] of field names to any valid field values or null if empty - */ + * + * @param indexName index that contains the doc/item + * @param typeName type name used to construct ES REST URI + * @param doc for UR the item id + * @return source [java.util.Map] of field names to any valid field values or null if empty + */ def getSource(indexName: String, typeName: String, doc: String): util.Map[String, AnyRef] = { client.prepareGet(indexName, typeName, doc) .execute() @@ -257,30 +256,31 @@ object EsClient { val allIndicesMap = client.admin().indices().getAliases(new GetAliasesRequest(alias)).actionGet().getAliases if (allIndicesMap.size() == 1) { // must be a 1-1 mapping of alias <-> index - var indexName: String = "" - val itr = allIndicesMap.keysIt() - while (itr.hasNext) + var indexName: String = "" + var itr = allIndicesMap.keysIt() + while ( itr.hasNext ) indexName = itr.next() Some(indexName) // the one index the alias points to } else { // delete all the indices that are pointed to by the alias, they can't be used logger.warn("There is no 1-1 mapping of index to alias so deleting the old indexes that are referenced by the " + "alias. This may have been caused by a crashed or stopped `pio train` operation so try running it again.") - if (!allIndicesMap.isEmpty) { - val i = allIndicesMap.keys().toArray.asInstanceOf[Array[String]] - for (indexName <- i) { - deleteIndex(indexName, refresh = true) - } + val i = allIndicesMap.keys().toArray.asInstanceOf[Array[String]] + for ( indexName <- i ){ + deleteIndex(indexName, true) } + None // if more than one abort, need to clean up bad aliases } } - def getRDD( - alias: String, - typeName: String)(implicit sc: SparkContext): RDD[(ItemID, ItemProps)] = { - getIndexName(alias) - .map(index => sc.esJsonRDD(alias + "/" + typeName) map { case (itemId, json) => itemId -> DataMap(json).fields }) - .getOrElse(sc.emptyRDD) + def getRDD(sc: SparkContext, alias: String, typeName: String): + Option[RDD[(String, collection.Map[String, AnyRef])]] = { + val index = getIndexName(alias) + if (index.nonEmpty) { // ensures there is a 1-1 mapping of alias to index + val indexAsRDD = sc.esRDD(alias + "/" + typeName) + //val debug = indexAsRDD.count() + Some(indexAsRDD) + } else None // error so no index for the alias } -} \ No newline at end of file +} diff --git a/src/main/scala/PopModel.scala b/src/main/scala/PopModel.scala index 03f39f4..41febe6 100644 --- a/src/main/scala/PopModel.scala +++ b/src/main/scala/PopModel.scala @@ -15,198 +15,131 @@ * limitations under the License. */ -package org.template +package com.uniflash import grizzled.slf4j.Logger -import io.prediction.data.storage.Event -import io.prediction.data.store.PEventStore +import org.apache.predictionio.data.storage.Event import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.predictionio.data.store.PEventStore import org.joda.time.format.ISODateTimeFormat -import org.joda.time.{ DateTime, Interval } -import org.template.conversions.{ ItemID, ItemProps } - -import scala.language.postfixOps -import scala.util.Random - -object RankingFieldName { - val UserRank = "userRank" - val UniqueRank = "uniqueRank" - val PopRank = "popRank" - val TrendRank = "trendRank" - val HotRank = "hotRank" - val UnknownRank = "unknownRank" - def toSeq: Seq[String] = Seq(UserRank, UniqueRank, PopRank, TrendRank, HotRank) - override def toString: String = s"$UserRank, $UniqueRank, $PopRank, $TrendRank, $HotRank" -} +import org.joda.time.{DateTime, Interval} -object RankingType { - val Popular = "popular" - val Trending = "trending" - val Hot = "hot" - val UserDefined = "userDefined" - val Random = "random" - def toSeq: Seq[String] = Seq(Popular, Trending, Hot, UserDefined, Random) - override def toString: String = s"$Popular, $Trending, $Hot, $UserDefined, $Random" -} -class PopModel(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext) { +object PopModel { - @transient lazy val logger: Logger = Logger[this.type] + @transient lazy val logger = Logger[this.type] - def calc( - modelName: String, - eventNames: Seq[String], + def calc ( + modelName: Option[String] = None, + eventNames: List[String], appName: String, duration: Int = 0, - offsetDate: Option[String] = None): RDD[(ItemID, Double)] = { + endDateOption: Option[String] = None)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = { - // todo: make end manditory and fill it with "now" upstream if not specified, will simplify logic here - // end should always be 'now' except in unusual conditions like for testing - val end = if (offsetDate.isEmpty) DateTime.now else { + // startDate should always be 'now' except in unusual conditions like for testing + val endDate = if (endDateOption.isEmpty ) DateTime.now else { try { - ISODateTimeFormat.dateTimeParser().parseDateTime(offsetDate.get) + ISODateTimeFormat.dateTimeParser().parseDateTime(endDateOption.get) } catch { - case e: IllegalArgumentException => - logger.warn("Bad end for popModel: " + offsetDate.get + " using 'now'") + case e: IllegalArgumentException => e + logger.warn("Bad endDate for popModel: " + endDateOption.get + " using 'now'") DateTime.now } } - val interval = new Interval(end.minusSeconds(duration), end) - // based on type of popularity model return a set of (item-id, ranking-number) for all items - logger.info(s"PopModel $modelName using end: $end, and duration: $duration, interval: $interval") - - // if None? debatable, this is either an error or may need to default to popular, why call popModel otherwise modelName match { - case RankingType.Popular => calcPopular(appName, eventNames, interval) - case RankingType.Trending => calcTrending(appName, eventNames, interval) - case RankingType.Hot => calcHot(appName, eventNames, interval) - case RankingType.Random => calcRandom(appName, interval) - case RankingType.UserDefined => sc.emptyRDD - case unknownRankingType => - logger.warn( - s""" - |Bad rankings param type=[$unknownRankingType] in engine definition params, possibly a bad json value. - |Use one of the available parameter values ($RankingType).""".stripMargin) - sc.emptyRDD + case Some("popular") => calcPopular(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate)) + case Some("trending") => calcTrending(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate)) + case Some("hot") => calcHot(appName, eventNames, new Interval(endDate.minusSeconds(duration), endDate)) + case _ => None // debatable, this is either an error or may need to default to popular, why call popModel otherwise } - - } - - /** Create random rank for all items */ - def calcRandom( - appName: String, - interval: Interval): RDD[(ItemID, Double)] = { - - val events = eventsRDD(appName = appName, interval = interval) - val actionsRDD = events.map(_.targetEntityId).filter(_.isDefined).map(_.get).distinct() - val itemsRDD = fieldsRDD.map { case (itemID, _) => itemID } - - // logger.debug(s"ActionsRDD: ${actionsRDD.take(25).mkString(", ")}") - // logger.debug(s"ItemsRDD: ${itemsRDD.take(25).mkString(", ")}") - actionsRDD.union(itemsRDD).distinct().map { itemID => itemID -> Random.nextDouble() } } /** Creates a rank from the number of named events per item for the duration */ - def calcPopular( - appName: String, - eventNames: Seq[String], - interval: Interval): RDD[(ItemID, Double)] = { + def calcPopular(appName: String, eventNames: List[String] = List.empty, + interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = { + val events = eventsRDD(appName, eventNames, interval) - events.map { e => (e.targetEntityId, e.event) } + val retval = events.map { e => (e.targetEntityId, e.event) } .groupByKey() - .map { case (itemID, itEvents) => (itemID.get, itEvents.size.toDouble) } - .reduceByKey(_ + _) // make this a double in Elaseticsearch) + .map { case(itemID, itEvents) => (itemID.get, itEvents.size.toFloat)} + .reduceByKey (_+_) // make this a double in Elaseticsearch) + if (!retval.isEmpty()) Some(retval) else None } /** Creates a rank for each item by dividing the duration in two and counting named events in both buckets - * then dividing most recent by less recent. This ranks by change in popularity or velocity of populatiy change. - * Interval(start, end) end instant is always greater than or equal to the start instant. - */ - def calcTrending( - appName: String, - eventNames: Seq[String], - interval: Interval): RDD[(ItemID, Double)] = { + * then dividing most recent by less recent. This ranks by change in popularity or velocity of populatiy change. + * Interval(start, end) end instant is always greater than or equal to the start instant. + */ + def calcTrending(appName: String, eventNames: List[String] = List.empty, + interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = { - logger.info(s"Current Interval: $interval, ${interval.toDurationMillis}") - val halfInterval = interval.toDurationMillis / 2 - val olderInterval = new Interval(interval.getStart, interval.getStart.plus(halfInterval)) - logger.info(s"Older Interval: $olderInterval") - val newerInterval = new Interval(interval.getStart.plus(halfInterval), interval.getEnd) - logger.info(s"Newer Interval: $newerInterval") + val olderInterval = new Interval(interval.getStart, + interval.getStart().plusMillis((interval.toDurationMillis/2)toInt)) + val newerInterval = new Interval(interval.getStart().plusMillis((interval.toDurationMillis/2)toInt), interval.getEnd) + val intervalMillis = interval.toDurationMillis val olderPopRDD = calcPopular(appName, eventNames, olderInterval) - if (!olderPopRDD.isEmpty()) { + if ( olderPopRDD.nonEmpty) { val newerPopRDD = calcPopular(appName, eventNames, newerInterval) - newerPopRDD.join(olderPopRDD).map { - case (item, (newerScore, olderScore)) => item -> (newerScore - olderScore) - } - } else sc.emptyRDD - + if ( newerPopRDD.nonEmpty ) { + val retval = newerPopRDD.get.join(olderPopRDD.get).map { case (item, (newerScore, olderScore)) => + val velocity = (newerScore - olderScore) + (item, velocity) + } + if (!retval.isEmpty()) Some(retval) else None + } else None + } else None } /** Creates a rank for each item by divding all events per item into three buckets and calculating the change in - * velocity over time, in other words the acceleration of popularity change. - */ - def calcHot( - appName: String, - eventNames: Seq[String] = List.empty, - interval: Interval): RDD[(ItemID, Double)] = { - - logger.info(s"Current Interval: $interval, ${interval.toDurationMillis}") - val olderInterval = new Interval(interval.getStart, interval.getStart.plus(interval.toDurationMillis / 3)) - logger.info(s"Older Interval: $olderInterval") - val middleInterval = new Interval(olderInterval.getEnd, olderInterval.getEnd.plus(olderInterval.toDurationMillis)) - logger.info(s"Middle Interval: $middleInterval") + * velocity over time, in other words the acceleration of popularity change. + */ + def calcHot(appName: String, eventNames: List[String] = List.empty, + interval: Interval)(implicit sc: SparkContext): Option[RDD[(String, Float)]] = { + val olderInterval = new Interval(interval.getStart, + interval.getStart().plusMillis((interval.toDurationMillis/3)toInt)) + val middleInterval = new Interval(olderInterval.getEnd, + olderInterval.getEnd().plusMillis((olderInterval.toDurationMillis)toInt)) val newerInterval = new Interval(middleInterval.getEnd, interval.getEnd) - logger.info(s"Newer Interval: $newerInterval") val olderPopRDD = calcPopular(appName, eventNames, olderInterval) - if (!olderPopRDD.isEmpty()) { // todo: may want to allow an interval with no events, give them 0 counts + if (olderPopRDD.nonEmpty){ // todo: may want to allow an interval with no events, give them 0 counts + //val debug = olderPopRDD.get.count() val middlePopRDD = calcPopular(appName, eventNames, middleInterval) - if (!middlePopRDD.isEmpty()) { + if (middlePopRDD.nonEmpty){ + //val debug = middlePopRDD.get.count() val newerPopRDD = calcPopular(appName, eventNames, newerInterval) - val newVelocityRDD = newerPopRDD.join(middlePopRDD).map { - case (item, (newerScore, middleScore)) => item -> (newerScore - middleScore) - } - val oldVelocityRDD = middlePopRDD.join(olderPopRDD).map { - case (item, (middleScore, olderScore)) => item -> (middleScore - olderScore) - } - newVelocityRDD.join(oldVelocityRDD).map { - case (item, (newVelocity, oldVelocity)) => item -> (newVelocity - oldVelocity) - } - } else sc.emptyRDD - } else sc.emptyRDD + if (newerPopRDD.nonEmpty){ + //val debug = newerPopRDD.get.count() + val newVelocityRDD = newerPopRDD.get.join(middlePopRDD.get).map { case( item, (newerScore, olderScore)) => + val velocity = (newerScore - olderScore) + (item, velocity) + } + val oldVelocityRDD = middlePopRDD.get.join(olderPopRDD.get).map { case( item, (newerScore, olderScore)) => + val velocity = (newerScore - olderScore) + (item, velocity) + } + Some( newVelocityRDD.join(oldVelocityRDD).map { case (item, (newVelocity, oldVelocity)) => + val acceleration = (newVelocity - oldVelocity) + (item, acceleration) + }) + } else None + } else None + } else None } - def eventsRDD( - appName: String, - eventNames: Seq[String] = Seq.empty, - interval: Interval): RDD[Event] = { + def eventsRDD(appName: String, eventNames: List[String], interval: Interval) + (implicit sc: SparkContext): RDD[Event] = { - logger.info(s"PopModel getting eventsRDD for startTime: ${interval.getStart} and endTime ${interval.getEnd}") PEventStore.find( appName = appName, startTime = Some(interval.getStart), untilTime = Some(interval.getEnd), - eventNames = if (eventNames.nonEmpty) Some(eventNames) else None)(sc) + eventNames = Some(eventNames) + )(sc) } } - -object PopModel { - - def apply(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext): PopModel = { - new PopModel(fieldsRDD) - } - - val nameByType: Map[String, String] = Map( - RankingType.Popular -> RankingFieldName.PopRank, - RankingType.Trending -> RankingFieldName.TrendRank, - RankingType.Hot -> RankingFieldName.HotRank, - RankingType.UserDefined -> RankingFieldName.UserRank, - RankingType.Random -> RankingFieldName.UniqueRank).withDefaultValue(RankingFieldName.UnknownRank) - -} diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala index 5c8ef6e..0d64139 100644 --- a/src/main/scala/Preparator.scala +++ b/src/main/scala/Preparator.scala @@ -15,56 +15,52 @@ * limitations under the License. */ -package org.template +package com.uniflash -import io.prediction.controller.PPreparator -import org.apache.mahout.math.indexeddataset.{ BiDictionary, IndexedDataset } +import org.apache.predictionio.controller.PPreparator +import org.apache.predictionio.data.storage.PropertyMap +import org.apache.mahout.math.indexeddataset.{IndexedDataset, BiDictionary} import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD -import org.template.conversions._ class Preparator - extends PPreparator[TrainingData, PreparedData] { + extends PPreparator[TrainingData, PreparedData] { /** Create [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] rdd backed - * "distributed row matrices" from the input string keyed rdds. - * @param sc Spark context - * @param trainingData list of (actionName, actionRDD) - * @return list of (correlatorName, correlatorIndexedDataset) - */ + * "distributed row matrices" from the input string keyed rdds. + * @param sc Spark context + * @param trainingData list of (actionName, actionRDD) + * @return list of (correlatorName, correlatorIndexedDataset) + */ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { // now that we have all actions in separate RDDs we must merge any user dictionaries and // make sure the same user ids map to the correct events var userDictionary: Option[BiDictionary] = None - val indexedDatasets = trainingData.actions.map { - case (eventName, eventIDS) => + val indexedDatasets = trainingData.actions.map{ case(eventName, eventIDS) => - // passing in previous row dictionary will use the values if they exist - // and append any new ids, so after all are constructed we have all user ids in the last dictionary - val ids = IndexedDatasetSpark(eventIDS, userDictionary)(sc) - userDictionary = Some(ids.rowIDs) - (eventName, ids) + // passing in previous row dictionary will use the values if they exist + // and append any new ids, so after all are constructed we have all user ids in the last dictionary + val ids = IndexedDatasetSpark(eventIDS, userDictionary)(sc) + userDictionary = Some(ids.rowIDs) + (eventName, ids) } // now make sure all matrices have identical row space since this corresponds to all users + val numUsers = userDictionary.get.size + val numPrimary = indexedDatasets.head._2.matrix.nrow // todo: check to see that there are events in primary event IndexedDataset and abort if not. - val rowAdjustedIds = userDictionary map { userDict => - indexedDatasets.map { - case (eventName, eventIDS) => - (eventName, eventIDS.create(eventIDS.matrix, userDictionary.get, eventIDS.columnIDs).newRowCardinality(userDict.size)) - } - } getOrElse Seq.empty - - val fieldsRDD: RDD[(ItemID, ItemProps)] = trainingData.fieldsRDD.map { - case (itemId, propMap) => itemId -> propMap.fields + val rowAdjustedIds = indexedDatasets.map { case(eventName, eventIDS) => + (eventName, eventIDS.create(eventIDS.matrix, userDictionary.get, eventIDS.columnIDs).newRowCardinality(numUsers)) } - PreparedData(rowAdjustedIds, fieldsRDD) + + new PreparedData(rowAdjustedIds, trainingData.fieldsRDD) } } -case class PreparedData( - actions: Seq[(ActionID, IndexedDataset)], - fieldsRDD: RDD[(ItemID, ItemProps)]) extends Serializable \ No newline at end of file +class PreparedData( + val actions: List[(String, IndexedDataset)], + val fieldsRDD: RDD[(String, PropertyMap)]) + extends Serializable diff --git a/src/main/scala/Serving.scala b/src/main/scala/Serving.scala index a63e3d3..4277066 100644 --- a/src/main/scala/Serving.scala +++ b/src/main/scala/Serving.scala @@ -15,16 +15,16 @@ * limitations under the License. */ -package org.template +package com.uniflash -import io.prediction.controller.LServing +import org.apache.predictionio.controller.LServing class Serving - extends LServing[Query, PredictedResult] { + extends LServing[Query, PredictedResult] { - override def serve( - query: Query, + override + def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = { predictedResults.head } -} \ No newline at end of file +} diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index e6cb570..2c646a4 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -15,40 +15,34 @@ * limitations under the License. */ -package org.template +package com.uniflash import java.util - -import grizzled.slf4j.Logger -import io.prediction.controller.{ P2LAlgorithm, Params } -import io.prediction.data.storage.{ DataMap, Event, NullModel, PropertyMap } -import io.prediction.data.store.LEventStore -import org.apache.mahout.math.cf.{ DownsamplableCrossOccurrenceDataset, SimilarityAnalysis } +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data +import org.apache.predictionio.data.storage.{PropertyMap, Event} +import org.apache.predictionio.data.store.LEventStore +import org.apache.mahout.math.cf.SimilarityAnalysis import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark -import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.joda.time.DateTime -import org.json4s.JValue +import org.json4s +import org.json4s.JsonAST import org.json4s.JsonAST._ -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ -import org.template.conversions._ - import scala.collection.JavaConverters._ +import scala.collection.immutable import scala.concurrent.duration.Duration -import scala.language.{ implicitConversions, postfixOps } - -/** Available value for algorithm param "RecsModel" */ -object RecsModel { // todo: replace this with rankings - val All = "all" - val CF = "collabFiltering" - val BF = "backfill" - override def toString: String = s"$All, $CF, $BF" -} +import org.apache.spark.SparkContext +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import scala.collection.convert.wrapAsScala._ +import grizzled.slf4j.Logger +import org.elasticsearch.spark._ /** Setting the option in the params case class doesn't work as expected when the param is missing from - * engine.json so set these for use in the algorithm when they are not present in the engine.json - */ + * engine.json so set these for use in the algorithm when they are not present in the engine.json + */ object defaultURAlgorithmParams { val DefaultMaxEventsPerEventType = 500 val DefaultNum = 20 @@ -58,676 +52,478 @@ object defaultURAlgorithmParams { val DefaultExpireDateName = "expireDate" // default name for the expire date property of an item val DefaultAvailableDateName = "availableDate" //defualt name for and item's available after date val DefaultDateName = "date" // when using a date range in the query this is the name of the item's date - val DefaultRecsModel = RecsModel.All // use CF + backfill - val DefaultRankingParams = RankingParams() - val DefaultBackfillFieldName = RankingFieldName.PopRank - val DefaultBackfillType = RankingType.Popular - val DefaultBackfillDuration = "3650 days" // for all time - - val DefaultReturnSelf = false + val DefaultRecsModel = "all" // use CF + backfill + val DefaultBackfillParams = BackfillField() + val DefaultBackfillFieldName = "popRank" } -/* default values must be set in code not the case class declaration case class BackfillField( - name: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillFieldName), - backfillType: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillType), // may be 'hot', or 'trending' also - eventNames: Option[Seq[String]] = None, // None means use the algo eventNames list, otherwise a list of events - offsetDate: Option[String] = None, // used only for tests, specifies the offset date to start the duration so the most - // recent date for events going back by from the more recent offsetDate - duration - duration: Option[String] = Some(defaultURAlgorithmParams.DefaultBackfillDuration)) // duration worth of events - // to use in calculation of backfill + name: String = "popRank", + backfillType: String = "popular", // may be 'hot', or 'trending' also + eventnames: Option[List[String]] = None, // None means use the algo eventnames list, otherwise a list of events + endDate: Option[String] = None, // used only for tests, specifies the start (oldest date) of the popModel's duration + duration: Int = 259200) // number of seconds worth of events to use in calculation of backfill +/** Instantiated from engine.json */ case class URAlgorithmParams( - appName: String, // filled in from engine.json - indexName: String, // can optionally be used to specify the elasticsearch index name - typeName: String, // can optionally be used to specify the elasticsearch type name - recsModel: Option[String] = Some(defaultURAlgorithmParams.DefaultRecsModel), // "all", "collabFiltering", "backfill" - eventNames: Seq[String], // names used to ID all user actions - blacklistEvents: Option[Seq[String]] = None,// None means use the primary event, empty array means no filter - // number of events in user-based recs query - maxQueryEvents: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxQueryEvents), - maxEventsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxEventsPerEventType), - maxCorrelatorsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType), - num: Option[Int] = Some(defaultURAlgorithmParams.DefaultNum), // default max # of recs requested - userBias: Option[Float] = None, // will cause the default search engine boost of 1.0 - itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0 - returnSelf: Option[Boolean] = None, // query building logic defaults this to false - fields: Option[Seq[Field]] = None, //defaults to no fields - // leave out for default or popular - backfillField: Option[BackfillField] = None, - // name of date property field for when the item is available - availableDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultAvailableDateName), - // name of date property field for when an item is no longer available - expireDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultExpireDateName), - // used as the subject of a dateRange in queries, specifies the name of the item property - dateName: Option[String] = Some(defaultURAlgorithmParams.DefaultDateName), - seed: Option[Long] = None) // seed is not used presently + appName: String, // filled in from engine.json + indexName: String, // can optionally be used to specify the elasticsearch index name + typeName: String, // can optionally be used to specify the elasticsearch type name + recsModel: Option[String] = Some(defaultURAlgorithmParams.DefaultRecsModel), // "all", "collabFiltering", "backfill" + eventNames: List[String], // names used to ID all user actions + blacklistEvents: Option[List[String]] = None,// None means use the primary event, empty array means no filter + // number of events in user-based recs query + maxQueryEvents: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxQueryEvents), + maxEventsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxEventsPerEventType), + maxCorrelatorsPerEventType: Option[Int] = Some(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType), + num: Option[Int] = Some(defaultURAlgorithmParams.DefaultNum), // default max # of recs requested + userBias: Option[Float] = None, // will cause the default search engine boost of 1.0 + itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0 + returnSelf: Option[Boolean] = None, // query building logic defaults this to false + fields: Option[List[Field]] = None, //defaults to no fields + // leave out for default or popular + backfillField: Option[BackfillField] = None, + // name of date property field for when the item is avalable + availableDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultAvailableDateName), + // name of date property field for when an item is no longer available + expireDateName: Option[String] = Some(defaultURAlgorithmParams.DefaultExpireDateName), + // used as the subject of a dateRange in queries, specifies the name of the item property + dateName: Option[String] = Some(defaultURAlgorithmParams.DefaultDateName), + seed: Option[Long] = None) // seed is not used presently extends Params //fixed default make it reproducible unless supplied - */ - -case class RankingParams( - name: Option[String] = None, - `type`: Option[String] = None, // See [[org.template.BackfillType]] - eventNames: Option[Seq[String]] = None, // None means use the algo eventNames list, otherwise a list of events - offsetDate: Option[String] = None, // used only for tests, specifies the offset date to start the duration so the most - // recent date for events going back by from the more recent offsetDate - duration - endDate: Option[String] = None, - duration: Option[String] = None) { // duration worth of events to use in calculation of backfill - override def toString: String = { - s""" - |name: $name, - |type: ${`type`}, - |eventNames: $eventNames, - |offsetDate: $offsetDate, - |endDate: $endDate, - |duration: $duration - |""".stripMargin - } -} - -case class IndicatorParams( - name: String, // must match one in eventNames - maxItemsPerUser: Option[Int], // defaults to maxEventsPerEventType - maxCorrelatorsPerItem: Option[Int], // defaults to maxCorrelatorsPerEventType - minLLR: Option[Double]) // defaults to none, takes precendence over maxCorrelatorsPerItem - -case class URAlgorithmParams( - appName: String, // filled in from engine.json - indexName: String, // can optionally be used to specify the elasticsearch index name - typeName: String, // can optionally be used to specify the elasticsearch type name - recsModel: Option[String] = None, // "all", "collabFiltering", "backfill" - eventNames: Option[Seq[String]], // names used to ID all user actions - blacklistEvents: Option[Seq[String]] = None, // None means use the primary event, empty array means no filter - // number of events in user-based recs query - maxQueryEvents: Option[Int] = None, - maxEventsPerEventType: Option[Int] = None, - maxCorrelatorsPerEventType: Option[Int] = None, - num: Option[Int] = None, // default max # of recs requested - userBias: Option[Float] = None, // will cause the default search engine boost of 1.0 - itemBias: Option[Float] = None, // will cause the default search engine boost of 1.0 - returnSelf: Option[Boolean] = None, // query building logic defaults this to false - fields: Option[Seq[Field]] = None, //defaults to no fields - // leave out for default or popular - rankings: Option[Seq[RankingParams]] = None, - // name of date property field for when the item is available - availableDateName: Option[String] = None, - // name of date property field for when an item is no longer available - expireDateName: Option[String] = None, - // used as the subject of a dateRange in queries, specifies the name of the item property - dateName: Option[String] = None, - indicators: Option[List[IndicatorParams]] = None, // control params per matrix pair - seed: Option[Long] = None) // seed is not used presently - extends Params //fixed default make it reproducible unless supplied /** Creates cooccurrence, cross-cooccurrence and eventually content correlators with - * [[org.apache.mahout.math.cf.SimilarityAnalysis]] The analysis part of the recommender is - * done here but the algorithm can predict only when the coocurrence data is indexed in a - * search engine like Elasticsearch. This is done in URModel.save. - * - * @param ap taken from engine.json to describe limits and event types - */ + * [[org.apache.mahout.math.cf.SimilarityAnalysis]] The analysis part of the recommender is + * done here but the algorithm can predict only when the coocurrence data is indexed in a + * search engine like Elasticsearch. This is done in URModel.save. + * + * @param ap taken from engine.json to describe limits and event types + */ class URAlgorithm(val ap: URAlgorithmParams) - extends P2LAlgorithm[PreparedData, NullModel, Query, PredictedResult] { + extends P2LAlgorithm[PreparedData, URModel, Query, PredictedResult] { - @transient lazy implicit val logger: Logger = Logger[this.type] + case class BoostableCorrelators(actionName: String, itemIDs: Seq[String], boost: Option[Float]) + case class FilterCorrelators(actionName: String, itemIDs: Seq[String]) - case class BoostableCorrelators(actionName: String, itemIDs: Seq[ItemID], boost: Option[Float]) { - def toFilterCorrelators: FilterCorrelators = { - FilterCorrelators(actionName, itemIDs) - } - } - case class FilterCorrelators(actionName: String, itemIDs: Seq[ItemID]) - - val appName: String = ap.appName - val recsModel: String = ap.recsModel.getOrElse(defaultURAlgorithmParams.DefaultRecsModel) - //val eventNames: Seq[String] = ap.eventNames - - val userBias: Float = ap.userBias.getOrElse(1f) - val itemBias: Float = ap.itemBias.getOrElse(1f) - val maxQueryEvents: Int = ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - val limit: Int = ap.num.getOrElse(defaultURAlgorithmParams.DefaultNum) - - val blacklistEvents: Seq[String] = ap.blacklistEvents.getOrEmpty - val returnSelf: Boolean = ap.returnSelf.getOrElse(defaultURAlgorithmParams.DefaultReturnSelf) - val fields: Seq[Field] = ap.fields.getOrEmpty - - val randomSeed: Int = ap.seed.getOrElse(System.currentTimeMillis()).toInt - val maxCorrelatorsPerEventType: Int = ap.maxCorrelatorsPerEventType - .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType) - val maxEventsPerEventType: Int = ap.maxEventsPerEventType - .getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType) - - lazy val modelEventNames = if (ap.indicators.isEmpty) { - if (ap.eventNames.isEmpty) { - throw new IllegalArgumentException("No eventNames or indicators in engine.json and one of these is required") - } else ap.eventNames.get - } else { - var eventNames = Seq.empty[String] - ap.indicators.get.foreach { indicator => - eventNames = eventNames :+ indicator.name - } - eventNames - } + @transient lazy val logger = Logger[this.type] - // Unique by 'type' ranking params, if collision get first. - lazy val rankingsParams: Seq[RankingParams] = ap.rankings.getOrElse(Seq(RankingParams( - name = Some(defaultURAlgorithmParams.DefaultBackfillFieldName), - `type` = Some(defaultURAlgorithmParams.DefaultBackfillType), - eventNames = Some(modelEventNames.take(1)), - offsetDate = None, - endDate = None, - duration = Some(defaultURAlgorithmParams.DefaultBackfillDuration)))).groupBy(_.`type`).map(_._2.head).toSeq - - val rankingFieldNames: Seq[String] = rankingsParams map { rankingParams => - val rankingType = rankingParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType) - val rankingFieldName = rankingParams.name.getOrElse(PopModel.nameByType(rankingType)) - rankingFieldName - } + def train(sc: SparkContext, data: PreparedData): URModel = { + + val dateNames = Some(List(ap.dateName.getOrElse(""), ap.availableDateName.getOrElse(""), + ap.expireDateName.getOrElse(""))) // todo: return None if all are empty? + val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name - val dateNames: Seq[String] = Seq( - ap.dateName, - ap.availableDateName, - ap.expireDateName).collect { case Some(date) => date } distinct - - val esIndex: String = ap.indexName - val esType: String = ap.typeName - - drawInfo("Init URAlgorithm", Seq( - ("══════════════════════════════", "════════════════════════════"), - ("App name", appName), - ("ES index name", esIndex), - ("ES type name", esType), - ("RecsModel", recsModel), - ("Event names", modelEventNames), - ("══════════════════════════════", "════════════════════════════"), - ("Random seed", randomSeed), - ("MaxCorrelatorsPerEventType", maxCorrelatorsPerEventType), - ("MaxEventsPerEventType", maxEventsPerEventType), - ("══════════════════════════════", "════════════════════════════"), - ("User bias", userBias), - ("Item bias", itemBias), - ("Max query events", maxQueryEvents), - ("Limit", limit), - ("══════════════════════════════", "════════════════════════════"), - ("Rankings:", "")) ++ rankingsParams.map(x => (x.`type`.get, x.name))) - - def train(sc: SparkContext, data: PreparedData): NullModel = { - - recsModel match { - case RecsModel.All => calcAll(data)(sc) - case RecsModel.CF => calcAll(data, calcPopular = false)(sc) - case RecsModel.BF => calcPop(data)(sc) + ap.recsModel.getOrElse(defaultURAlgorithmParams.DefaultRecsModel) match { + case "all" => calcAll(sc, data, dateNames, backfillFieldName) + case "collabFiltering" => calcAll(sc, data, dateNames, backfillFieldName, popular = false ) + case "backfill" => calcPop(sc, data, dateNames, backfillFieldName) // error, throw an exception - case unknownRecsModel => - throw new IllegalArgumentException( - s""" - |Bad algorithm param recsModel=[$unknownRecsModel] in engine definition params, possibly a bad json value. - |Use one of the available parameter values ($RecsModel).""".stripMargin) + case _ => throw new IllegalArgumentException("Bad recsModel in engine definition params, possibly a bad json value.") } } /** Calculates recs model as well as popularity model */ def calcAll( + sc: SparkContext, data: PreparedData, - calcPopular: Boolean = true)(implicit sc: SparkContext): NullModel = { + dateNames: Option[List[String]] = None, + backfillFieldName: String, + popular: Boolean = true): + URModel = { // No one likes empty training data. - require( - data.actions.take(1).nonEmpty, - s""" - |Primary action in PreparedData cannot be empty. - |Please check if DataSource generates TrainingData - |and Preparator generates PreparedData correctly.""".stripMargin) - - //val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams) - //val nonDefaultMappings = Map(backfillParams.name.getOrElse(defaultURAlgorithmParams.DefaultBackfillFieldName) -> "float") + require(data.actions.take(1).nonEmpty, + s"Primary action in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams) + val nonDefaultMappings = Map(backfillParams.name -> "float") logger.info("Actions read now creating correlators") - val cooccurrenceIDSs = if (ap.indicators.isEmpty) { // using one global set of algo params - SimilarityAnalysis.cooccurrencesIDSs( - data.actions.map(_._2).toArray, - randomSeed = ap.seed.getOrElse(System.currentTimeMillis()).toInt, - maxInterestingItemsPerThing = ap.maxCorrelatorsPerEventType - .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType), - maxNumInteractions = ap.maxEventsPerEventType.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType)) - .map(_.asInstanceOf[IndexedDatasetSpark]) - } else { // using params per matrix pair, these take the place of eventNames, maxCorrelatorsPerEventType, - // and maxEventsPerEventType! - val indicators = ap.indicators.get - val iDs = data.actions.map(_._2).toSeq - val datasets = iDs.zipWithIndex.map { - case (iD, i) => - new DownsamplableCrossOccurrenceDataset( - iD, - indicators(i).maxItemsPerUser.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType), - indicators(i).maxCorrelatorsPerItem.getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType), - indicators(i).minLLR) - }.toList - - SimilarityAnalysis.crossOccurrenceDownsampled( - datasets, - ap.seed.getOrElse(System.currentTimeMillis()).toInt) - .map(_.asInstanceOf[IndexedDatasetSpark]) - } - + val cooccurrenceIDSs = SimilarityAnalysis.cooccurrencesIDSs( + data.actions.map(_._2).toArray, + randomSeed = ap.seed.getOrElse(System.currentTimeMillis()).toInt, + maxInterestingItemsPerThing = ap.maxCorrelatorsPerEventType + .getOrElse(defaultURAlgorithmParams.DefaultMaxCorrelatorsPerEventType), + maxNumInteractions = ap.maxEventsPerEventType.getOrElse(defaultURAlgorithmParams.DefaultMaxEventsPerEventType)) + .map(_.asInstanceOf[IndexedDatasetSpark]) // strip action names val cooccurrenceCorrelators = cooccurrenceIDSs.zip(data.actions.map(_._1)).map(_.swap) //add back the actionNames - val propertiesRDD: RDD[(ItemID, ItemProps)] = if (calcPopular) { - val ranksRdd = getRanksRDD(data.fieldsRDD) - data.fieldsRDD.fullOuterJoin(ranksRdd).map { - case (item, (Some(fieldsPropMap), Some(rankPropMap))) => item -> (fieldsPropMap ++ rankPropMap) - case (item, (Some(fieldsPropMap), None)) => item -> fieldsPropMap - case (item, (None, Some(rankPropMap))) => item -> rankPropMap - case (item, _) => item -> Map.empty + val popModel = if (popular) { + val duration = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).duration + val backfillEvents = backfillParams.eventnames.getOrElse(List(ap.eventNames.head)) + val start = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).endDate + PopModel.calc(Some(backfillParams.backfillType), backfillEvents, ap.appName, duration)(sc) + } else None + + val allPropertiesRDD = if (popModel.nonEmpty) { + data.fieldsRDD.cogroup[Float](popModel.get).map { case (item, pms) => + val pm = if (pms._1.nonEmpty && pms._2.nonEmpty) { + val newPM = pms._1.head.fields + (backfillFieldName -> JDouble(pms._2.head)) + PropertyMap(newPM, pms._1.head.firstUpdated, DateTime.now()) + } else if (pms._2.nonEmpty) PropertyMap(Map(backfillFieldName -> JDouble(pms._2.head)), DateTime.now(), DateTime.now()) + else PropertyMap( Map.empty[String, JValue], DateTime.now, DateTime.now) // some error???? + (item, pm) } - } else { - sc.emptyRDD - } + } else data.fieldsRDD logger.info("Correlators created now putting into URModel") new URModel( - coocurrenceMatrices = cooccurrenceCorrelators, - propertiesRDDs = Seq(propertiesRDD), - typeMappings = getRankingMapping).save(dateNames, esIndex, esType) - new NullModel + Some(cooccurrenceCorrelators), + Some(allPropertiesRDD), + ap.indexName, + dateNames, + typeMappings = Some(nonDefaultMappings)) } /** This function creates a URModel from an existing index in Elasticsearch + new popularity ranking - * It is used when you want to re-calc the popularity model between training on useage data. It leaves - * the part of the model created from usage data alone and only modifies the popularity ranking. - */ - def calcPop(data: PreparedData)(implicit sc: SparkContext): NullModel = { - - // Aggregating all $set/$unsets properties, which are attached to items - val fieldsRDD: RDD[(ItemID, ItemProps)] = data.fieldsRDD - // Calc new ranking properties for all items - val ranksRdd: RDD[(ItemID, ItemProps)] = getRanksRDD(fieldsRDD) - // Current items RDD from ES - val currentMetadataRDD: RDD[(ItemID, ItemProps)] = EsClient.getRDD(esIndex, esType) - val propertiesRDD: RDD[(ItemID, ItemProps)] = currentMetadataRDD.fullOuterJoin(ranksRdd) map { - case (itemId, maps) => - maps match { - case (Some(metaProp), Some(rankProp)) => itemId -> (metaProp ++ rankProp) - case (None, Some(rankProp)) => itemId -> rankProp - case (Some(metaProp), None) => itemId -> metaProp - case _ => itemId -> Map.empty - } - } - // logger.debug(s"RanksRdd\n${ranksRdd.take(25).mkString("\n")}") + * It is used when you want to re-calc the popularity model between training on useage data. It leaves + * the part of the model created from usage data alone and only modifies the popularity ranking. + */ + def calcPop( + sc: SparkContext, + data: PreparedData, + dateNames: Option[List[String]] = None, + backfillFieldName: String = ""): URModel = { + + val backfillParams = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams) + val backfillEvents = backfillParams.eventnames.getOrElse(List(ap.eventNames.head))//default to first/primary event + val start = ap.backfillField.getOrElse(defaultURAlgorithmParams.DefaultBackfillParams).endDate + val popModel = PopModel.calc( + Some(backfillParams.backfillType), + backfillEvents, + ap.appName, + backfillParams.duration, + start)(sc) + val popRDD = if (popModel.nonEmpty) { + val model = popModel.get.map { case (item, rank) => + val newPM = Map(backfillFieldName -> JDouble(rank)) + (item, PropertyMap(newPM, DateTime.now, DateTime.now)) + } + Some(model) + } else None + + val propertiesRDD = if (popModel.nonEmpty) { + val currentMetadata = esClient.getRDD(sc, ap.indexName, ap.typeName) + if (currentMetadata.nonEmpty) { // may be an empty index so ignore + Some(popModel.get.cogroup[collection.Map[String, AnyRef]](currentMetadata.get) + .map { case (item, (ranks, pms)) => + if (ranks.nonEmpty) pms.head + (backfillFieldName -> ranks.head) + else if (pms.nonEmpty) pms.head + else Map.empty[String, AnyRef] // could happen if only calculating popularity, which may leave out items with + // no events + }) + } else None + } else None // returns the existing model plus new popularity ranking new URModel( - propertiesRDDs = Seq(fieldsRDD.cache(), propertiesRDD.cache()), - typeMappings = getRankingMapping).save(dateNames, esIndex, esType) - new NullModel + None, + None, + ap.indexName, + None, + propertiesRDD = propertiesRDD, + typeMappings = Some(Map(backfillFieldName -> "float"))) } - var queryEventNames: Seq[String] = Seq.empty[String] // if passed in with the query overrides the engine.json list--used in MAP@k + var queryEventNames = List.empty[String] // if passed in with the query overrides the engine.json list--used in MAP@k //testing, this only effects which events are used in queries. /** Return a list of items recommended for a user identified in the query - * The ES json query looks like this: - * { - * "size": 20 - * "query": { - * "bool": { - * "should": [ - * { - * "terms": { - * "rate": ["0", "67", "4"] - * } - * }, - * { - * "terms": { - * "buy": ["0", "32"], - * "boost": 2 - * } - * }, - * { // categorical boosts - * "terms": { - * "category": ["cat1"], - * "boost": 1.05 - * } - * } - * ], - * "must": [ // categorical filters - * { - * "terms": { - * "category": ["cat1"], - * "boost": 0 - * } - * }, - * { - * "must_not": [//blacklisted items - * { - * "ids": { - * "values": ["items-id1", "item-id2", ...] - * } - * }, - * { - * "constant_score": {// date in query must fall between the expire and available dates of an item - * "filter": { - * "range": { - * "availabledate": { - * "lte": "2015-08-30T12:24:41-07:00" - * } - * } - * }, - * "boost": 0 - * } - * }, - * { - * "constant_score": {// date range filter in query must be between these item property values - * "filter": { - * "range" : { - * "expiredate" : { - * "gte": "2015-08-15T11:28:45.114-07:00" - * "lt": "2015-08-20T11:28:45.114-07:00" - * } - * } - * }, "boost": 0 - * } - * }, - * { - * "constant_score": { // this orders popular items for backfill - * "filter": { - * "match_all": {} - * }, - * "boost": 0.000001 // must have as least a small number to be boostable - * } - * } - * } - * } - * } - * - * @param model Ignored! since the model is already in Elasticsearch - * @param query contains query spec - * @todo Need to prune that query to minimum required for data include, for instance no need for the popularity - * ranking if no PopModel is being used, same for "must" clause and dates. - */ - def predict(model: NullModel, query: Query): PredictedResult = { - - queryEventNames = query.eventNames.getOrElse(modelEventNames) // eventNames in query take precedence - - val (queryStr, blacklist) = buildQuery(ap, query, rankingFieldNames) - val searchHitsOpt = EsClient.search(queryStr, esIndex) - - val withRanks = query.withRanks.getOrElse(false) - val predictedResult = searchHitsOpt match { - case Some(searchHits) => - val recs = searchHits.getHits.map { hit => - if (withRanks) { - val source = hit.getSource - val ranks: Map[String, Double] = rankingsParams map { backfillParams => - val backfillType = backfillParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType) - val backfillFieldName = backfillParams.name.getOrElse(PopModel.nameByType(backfillType)) - backfillFieldName -> source.get(backfillFieldName).asInstanceOf[Double] - } toMap - - ItemScore(hit.getId, hit.getScore.toDouble, - ranks = if (ranks.nonEmpty) Some(ranks) else None) - } else { - ItemScore(hit.getId, hit.getScore.toDouble) - } - } - logger.info(s"Results: ${searchHits.getHits.length} retrieved of a possible ${searchHits.totalHits()}") - PredictedResult(recs) - - case _ => - logger.info(s"No results for query ${parse(queryStr)}") - PredictedResult(Array.empty[ItemScore]) - } - + * The ES json query looks like this: + * { + * "size": 20 + * "query": { + * "bool": { + * "should": [ + * { + * "terms": { + * "rate": ["0", "67", "4"] + * } + * }, + * { + * "terms": { + * "buy": ["0", "32"], + * "boost": 2 + * } + * }, + * { // categorical boosts + * "terms": { + * "category": ["cat1"], + * "boost": 1.05 + * } + * } + * ], + * "must": [ // categorical filters + * { + * "terms": { + * "category": ["cat1"], + * "boost": 0 + * } + * }, + * { + * "must_not": [//blacklisted items + * { + * "ids": { + * "values": ["items-id1", "item-id2", ...] + * } + * }, + * { + * "constant_score": {// date in query must fall between the expire and avqilable dates of an item + * "filter": { + * "range": { + * "availabledate": { + * "lte": "2015-08-30T12:24:41-07:00" + * } + * } + * }, + * "boost": 0 + * } + * }, + * { + * "constant_score": {// date range filter in query must be between these item property values + * "filter": { + * "range" : { + * "expiredate" : { + * "gte": "2015-08-15T11:28:45.114-07:00" + * "lt": "2015-08-20T11:28:45.114-07:00" + * } + * } + * }, "boost": 0 + * } + * }, + * { + * "constant_score": { // this orders popular items for backfill + * "filter": { + * "match_all": {} + * }, + * "boost": 0.000001 // must have as least a small number to be boostable + * } + * } + * } + * } + * } + * + * @param model Ignored! since the model is already in Elasticsearch + * @param query contains query spec + * @todo Need to prune that query to minimum required for data include, for instance no need for the popularity + * ranking if no PopModel is being used, same for "must" clause and dates. + */ + def predict(model: URModel, query: Query): PredictedResult = { + logger.info(s"Query received, user id: ${query.user}, item id: ${query.item}") + + queryEventNames = query.eventNames.getOrElse(ap.eventNames) // eventNames in query take precedence for the query + // part of their use + val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name + val queryAndBlacklist = buildQuery(ap, query, backfillFieldName) + val recs = esClient.search(queryAndBlacklist._1, ap.indexName) // should have all blacklisted items excluded // todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time // period so the recs ordering stays fixed for that time period. - predictedResult - } - - /** Calculate all fields and items needed for ranking. - * - * @param fieldsRDD all items with their fields - * @param sc the current Spark context - * @return - */ - def getRanksRDD(fieldsRDD: RDD[(ItemID, ItemProps)])(implicit sc: SparkContext): RDD[(ItemID, ItemProps)] = { - val popModel = PopModel(fieldsRDD) - val rankRDDs: Seq[(String, RDD[(ItemID, Double)])] = rankingsParams map { rankingParams => - val rankingType = rankingParams.`type`.getOrElse(defaultURAlgorithmParams.DefaultBackfillType) - val rankingFieldName = rankingParams.name.getOrElse(PopModel.nameByType(rankingType)) - val durationAsString = rankingParams.duration.getOrElse(defaultURAlgorithmParams.DefaultBackfillDuration) - val duration = Duration(durationAsString).toSeconds.toInt - val backfillEvents = rankingParams.eventNames.getOrElse(modelEventNames.take(1)) - val offsetDate = rankingParams.offsetDate - val rankRdd = popModel.calc(modelName = rankingType, eventNames = backfillEvents, appName, duration, offsetDate) - rankingFieldName -> rankRdd - } - - // logger.debug(s"RankRDDs[${rankRDDs.size}]\n${rankRDDs.map(_._1).mkString(", ")}\n${rankRDDs.map(_._2.take(25).mkString("\n")).mkString("\n\n")}") - rankRDDs.foldLeft[RDD[(ItemID, ItemProps)]](sc.emptyRDD) { - case (leftRdd, (fieldName, rightRdd)) => - leftRdd.fullOuterJoin(rightRdd).map { - case (itemId, (Some(propMap), Some(rank))) => itemId -> (propMap + (fieldName -> JDouble(rank))) - case (itemId, (Some(propMap), None)) => itemId -> propMap - case (itemId, (None, Some(rank))) => itemId -> Map(fieldName -> JDouble(rank)) - case (itemId, _) => itemId -> Map.empty - } - } + recs } /** Build a query from default algorithms params and the query itself taking into account defaults */ - def buildQuery( - ap: URAlgorithmParams, - query: Query, - backfillFieldNames: Seq[String] = Seq.empty): (String, Seq[Event]) = { + def buildQuery(ap: URAlgorithmParams, query: Query, backfillFieldName: String = ""): (String, List[Event]) = { - try { - // create a list of all query correlators that can have a bias (boost or filter) attached - val (boostable, events) = getBiasedRecentUserActions(query) + try{ // require the minimum of a user or item, if not then return popular if any + //require( query.item.nonEmpty || query.user.nonEmpty, "Warning: a query must include either a user or item id") - // since users have action history and items have correlators and both correspond to the same "actions" like - // purchase or view, we'll pass both to the query if the user history or items correlators are empty - // then metadata or backfill must be relied on to return results. - val numRecs = query.num.getOrElse(limit) - val should = buildQueryShould(query, boostable) - val must = buildQueryMust(query, boostable) - val mustNot = buildQueryMustNot(query, events) - val sort = buildQuerySort() - - val json = - ("size" -> numRecs) ~ - ("query" -> - ("bool" -> - ("should" -> should) ~ - ("must" -> must) ~ - ("must_not" -> mustNot) ~ - ("minimum_should_match" -> 1))) ~ - ("sort" -> sort) - - val compactJson = compact(render(json)) - - logger.info(s"Query:\n$compactJson") - (compactJson, events) - } catch { - case e: IllegalArgumentException => ("", Seq.empty[Event]) - } - } + // create a list of all query correlators that can have a bias (boost or filter) attached + val alluserEvents = getBiasedRecentUserActions(query) - /** Build should query part */ - def buildQueryShould(query: Query, boostable: Seq[BoostableCorrelators]): Seq[JValue] = { + // create a list of all boosted query correlators + val recentUserHistory = if ( ap.userBias.getOrElse(1f) >= 0f ) + alluserEvents._1.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1) + else List.empty[BoostableCorrelators] - // create a list of all boosted query correlators - val recentUserHistory: Seq[BoostableCorrelators] = if (userBias >= 0f) { - boostable.slice(0, maxQueryEvents - 1) - } else { - Seq.empty - } + val similarItems = if ( ap.itemBias.getOrElse(1f) >= 0f ) + getBiasedSimilarItems(query) + else List.empty[BoostableCorrelators] - val similarItems: Seq[BoostableCorrelators] = if (itemBias >= 0f) { - getBiasedSimilarItems(query) - } else { - Seq.empty - } + val boostedMetadata = getBoostedMetadata(query) - val boostedMetadata = getBoostedMetadata(query) - val allBoostedCorrelators = recentUserHistory ++ similarItems ++ boostedMetadata + val allBoostedCorrelators = recentUserHistory ++ similarItems ++ boostedMetadata - val shouldFields: Seq[JValue] = allBoostedCorrelators.map { - case BoostableCorrelators(actionName, itemIDs, boost) => - render("terms" -> (actionName -> itemIDs) ~ ("boost" -> boost)) - } + // create a lsit of all query correlators that are to be used to filter results + val recentUserHistoryFilter = if ( ap.userBias.getOrElse(1f) < 0f ) { + // strip any boosts + alluserEvents._1.map { i => + FilterCorrelators(i.actionName, i.itemIDs) + }.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1) + } else List.empty[FilterCorrelators] - val shouldScore: JValue = parse( - """ - |{ - | "constant_score": { - | "filter": { - | "match_all": {} - | }, - | "boost": 0 - | } - |} - |""".stripMargin) + val similarItemsFilter = if ( ap.itemBias.getOrElse(1f) < 0f ) { + getBiasedSimilarItems(query).map { i => + FilterCorrelators(i.actionName, i.itemIDs) + }.toList + } else List.empty[FilterCorrelators] - shouldFields :+ shouldScore - } + val filteringMetadata = getFilteringMetadata(query) - /** Build must query part */ - def buildQueryMust(query: Query, boostable: Seq[BoostableCorrelators]): Seq[JValue] = { + val filteringDateRange = getFilteringDateRange(query) - // create a lsit of all query correlators that are to be used to filter results - val recentUserHistoryFilter: Seq[FilterCorrelators] = if (userBias < 0f) { - // strip any boosts - boostable.map(_.toFilterCorrelators).slice(0, maxQueryEvents - 1) - } else { - Seq.empty - } + val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata - val similarItemsFilter: Seq[FilterCorrelators] = if (itemBias < 0f) { - getBiasedSimilarItems(query).map(_.toFilterCorrelators) - } else { - Seq.empty - } + // since users have action history and items have correlators and both correspond to the same "actions" like + // purchase or view, we'll pass both to the query if the user history or items correlators are empty + // then metadata or backfill must be relied on to return results. - val filteringMetadata = getFilteringMetadata(query) - val filteringDateRange = getFilteringDateRange(query) - val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata + val numRecs = query.num.getOrElse(ap.num.getOrElse(defaultURAlgorithmParams.DefaultNum)) - val mustFields: Seq[JValue] = allFilteringCorrelators.map { - case FilterCorrelators(actionName, itemIDs) => - render("terms" -> (actionName -> itemIDs) ~ ("boost" -> 0)) - } - mustFields ++ filteringDateRange - } + val shouldFields: Option[List[JValue]] = if (allBoostedCorrelators.isEmpty) None + else { + Some(allBoostedCorrelators.map { i => + render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> i.boost))) + }.toList) + } + val popModelSort = List(parse( + """ + |{ + | "constant_score": { + | "filter": { + | "match_all": {} + | }, + | "boost": 0 + | } + |} + |""".stripMargin)) + + val should: List[JValue] = if (shouldFields.isEmpty) popModelSort else shouldFields.get ::: popModelSort + + + val mustFields: List[JValue] = allFilteringCorrelators.map { i => + render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> 0)))}.toList + val must: List[JValue] = mustFields ::: filteringDateRange + + val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0))) + val mustNot: JValue = mustNotFields + + val popQuery = if (ap.recsModel.getOrElse("all") == "all" || + ap.recsModel.getOrElse("all") == "backfill") { + Some(List( + parse( """{"_score": {"order": "desc"}}"""), + parse( + s""" + |{ + | "${backfillFieldName}": { + | "unmapped_type": "double", + | "order": "desc" + | } + |}""".stripMargin))) + } else None - /** Build not must query part */ - def buildQueryMustNot(query: Query, events: Seq[Event]): JValue = { - val mustNotFields: JValue = render("ids" -> ("values" -> getExcludedItems(events, query)) ~ ("boost" -> 0)) - mustNotFields - } - /** Build sort query part */ - def buildQuerySort(): Seq[JValue] = if (recsModel == RecsModel.All || recsModel == RecsModel.BF) { - val sortByScore: Seq[JValue] = Seq(parse("""{"_score": {"order": "desc"}}""")) - val sortByRanks: Seq[JValue] = rankingFieldNames map { fieldName => - parse(s"""{ "$fieldName": { "unmapped_type": "double", "order": "desc" } }""") + val json = + ( + ("size" -> numRecs) ~ + ("query"-> + ("bool"-> + ("should"-> should) ~ + ("must"-> must) ~ + ("must_not"-> mustNot) ~ + ("minimum_should_match" -> 1)) + ) ~ + ("sort" -> popQuery)) + val j = compact(render(json)) + logger.info(s"Query: \n${j}\n") + (compact(render(json)), alluserEvents._2) + } catch { + case e: IllegalArgumentException => + ("", List.empty[Event]) } - sortByScore ++ sortByRanks - } else { - Seq.empty } /** Create a list of item ids that the user has interacted with or are not to be included in recommendations */ - def getExcludedItems(userEvents: Seq[Event], query: Query): Seq[String] = { + def getExcludedItems(userEvents: List[Event], query: Query): List[String] = { val blacklistedItems = userEvents.filter { event => - // either a list or an empty list of filtering events so honor them - blacklistEvents match { - case Nil => modelEventNames.head equals event.event - case _ => blacklistEvents contains event.event - } - }.map(_.targetEntityId.getOrElse("")) ++ query.blacklistItems.getOrEmpty.distinct + if (ap.blacklistEvents.nonEmpty) { + // either a list or an empty list of filtering events so honor them + if (ap.blacklistEvents.get == List.empty[String]) false // no filtering events so all are allowed + else ap.blacklistEvents.get.contains(event.event) // if its filtered remove it, else allow + } else ap.eventNames(0).equals(event.event) // remove the primary event if nothing specified + }.map (_.targetEntityId.getOrElse("")) ++ query.blacklistItems.getOrElse(List.empty[String]) + .distinct // Now conditionally add the query item itself - val includeSelf = query.returnSelf.getOrElse(returnSelf) - val allExcludedItems = if (!includeSelf && query.item.nonEmpty) { - blacklistedItems :+ query.item.get - } // add the query item to be excuded - else { + val includeSelf = query.returnSelf.getOrElse(ap.returnSelf.getOrElse(false)) + val allExcludedItems = if ( !includeSelf && query.item.nonEmpty ) + blacklistedItems :+ query.item.get // add the query item to be excuded + else blacklistedItems - } allExcludedItems.distinct } /** Get similar items for an item, these are already in the action correlators in ES */ def getBiasedSimilarItems(query: Query): Seq[BoostableCorrelators] = { if (query.item.nonEmpty) { - val m = EsClient.getSource(esIndex, esType, query.item.get) + val m = esClient.getSource(ap.indexName, ap.typeName, query.item.get) if (m != null) { - val itemEventBias = query.itemBias.getOrElse(itemBias) + val itemEventBias = query.itemBias.getOrElse(ap.itemBias.getOrElse(1f)) val itemEventsBoost = if (itemEventBias > 0 && itemEventBias != 1) Some(itemEventBias) else None - modelEventNames.map { action => - val items: Seq[String] = try { - if (m.containsKey(action) && m.get(action) != null) { - m.get(action).asInstanceOf[util.ArrayList[String]].asScala - } else { - Seq.empty[String] - } + ap.eventNames.map { action => + val items = try { + if (m.containsKey(action) && m.get(action) != null) m.get(action).asInstanceOf[util.ArrayList[String]].toList + else List.empty[String] } catch { case cce: ClassCastException => - logger.warn(s"Bad value in item [${query.item}] corresponding to key: [$action] that was not a Seq[String] ignored.") - Seq.empty[String] + logger.warn(s"Bad value in item ${query.item} corresponding to key: ${action} that was not a List[String]" + + " ignored.") + List.empty[String] } - val rItems = if (items.size <= maxQueryEvents) items else items.slice(0, maxQueryEvents - 1) + val rItems = if (items.size <= ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents)) + items else items.slice(0, ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents) - 1) BoostableCorrelators(action, rItems, itemEventsBoost) } - } else { - Seq.empty - } // no similar items - } else { - Seq.empty[BoostableCorrelators] - } // no item specified + } else List.empty[BoostableCorrelators] // no similar items + } else List.empty[BoostableCorrelators] // no item specified } /** Get recent events of the user on items to create the recommendations query from */ - def getBiasedRecentUserActions(query: Query): (Seq[BoostableCorrelators], Seq[Event]) = { + def getBiasedRecentUserActions( + query: Query): (Seq[BoostableCorrelators], List[Event]) = { val recentEvents = try { LEventStore.findByEntity( - appName = appName, + appName = ap.appName, // entityType and entityId is specified for fast lookup entityType = "user", entityId = query.user.get, // one query per eventName is not ideal, maybe one query for lots of events then split by eventName //eventNames = Some(Seq(action)),// get all and separate later - eventNames = Some(queryEventNames), // get all and separate later + eventNames = Some(queryEventNames),// get all and separate later targetEntityType = None, // limit = Some(maxQueryEvents), // this will get all history then each action can be limited before using in // the query latest = true, // set time limit to avoid super long DB access - timeout = Duration(200, "millis")).toList + timeout = Duration(200, "millis") + ).toList } catch { case e: scala.concurrent.TimeoutException => - logger.error(s"Timeout when read recent events. Empty list is used. $e") - Seq.empty[Event] + logger.error(s"Timeout when read recent events." + + s" Empty list is used. ${e}") + List.empty[Event] case e: NoSuchElementException => // todo: bad form to use an exception to check if there is a user id logger.info("No user id for recs, returning similar items for the item specified") - Seq.empty[Event] + List.empty[Event] case e: Exception => // fatal because of error, an empty query - logger.error(s"Error when read recent events: $e") + logger.error(s"Error when read recent events: ${e}") throw e } - val userEventBias = query.userBias.getOrElse(userBias) + val userEventBias = query.userBias.getOrElse(ap.userBias.getOrElse(1f)) val userEventsBoost = if (userEventBias > 0 && userEventBias != 1) Some(userEventBias) else None + //val rActions = ap.eventNames.map { action => val rActions = queryEventNames.map { action => - var items = Seq.empty[String] + var items = List[String]() - for (event <- recentEvents) - if (event.event == action && items.size < maxQueryEvents) { - items = event.targetEntityId.get +: items + for ( event <- recentEvents ) + if (event.event == action && items.size < + ap.maxQueryEvents.getOrElse(defaultURAlgorithmParams.DefaultMaxQueryEvents)) { + items = event.targetEntityId.get :: items // todo: may throw exception and we should ignore the event instead of crashing } // userBias may be None, which will cause no JSON output for this @@ -737,59 +533,70 @@ class URAlgorithm(val ap: URAlgorithmParams) } /** get all metadata fields that potentially have boosts (not filters) */ - def getBoostedMetadata(query: Query): Seq[BoostableCorrelators] = { - val paramsBoostedFields = fields.filter(_.bias < 0f) - val queryBoostedFields = query.fields.getOrEmpty.filter(_.bias >= 0f) + def getBoostedMetadata( query: Query ): List[BoostableCorrelators] = { + val paramsBoostedFields = ap.fields.getOrElse(List.empty[Field]).filter( field => field.bias < 0 ).map { field => + BoostableCorrelators(field.name, field.values, Some(field.bias)) + } - (queryBoostedFields ++ paramsBoostedFields) - .map(field => BoostableCorrelators(field.name, field.values, Some(field.bias))) - .distinct // de-dup and favor query fields + val queryBoostedFields = query.fields.getOrElse(List.empty[Field]).filter { field => + field.bias >= 0f + }.map { field => + BoostableCorrelators(field.name, field.values, Some(field.bias)) + } + + (queryBoostedFields ++ paramsBoostedFields).distinct // de-dup and favor query fields } /** get all metadata fields that are filters (not boosts) */ - def getFilteringMetadata(query: Query): Seq[FilterCorrelators] = { - val paramsFilterFields = fields.filter(_.bias >= 0f) - val queryFilterFields = query.fields.getOrEmpty.filter(_.bias < 0f) + def getFilteringMetadata( query: Query ): List[FilterCorrelators] = { + val paramsFilterFields = ap.fields.getOrElse(List.empty[Field]).filter( field => field.bias >= 0 ).map { field => + FilterCorrelators(field.name, field.values) + } + + val queryFilterFields = query.fields.getOrElse(List.empty[Field]).filter { field => + field.bias < 0f + }.map { field => + FilterCorrelators(field.name, field.values) + } - (queryFilterFields ++ paramsFilterFields) - .map(field => FilterCorrelators(field.name, field.values)) - .distinct // de-dup and favor query fields + (queryFilterFields ++ paramsFilterFields).distinct // de-dup and favor query fields } /** get part of query for dates and date ranges */ - def getFilteringDateRange(query: Query): Seq[JValue] = { + def getFilteringDateRange( query: Query ): List[JValue] = { + var json: List[JValue] = List.empty[JValue] // currentDate in the query overrides the dateRange in the same query so ignore daterange if both val currentDate = query.currentDate.getOrElse(DateTime.now().toDateTimeISO.toString) - val json: Seq[JValue] = if (query.dateRange.nonEmpty && + if (query.dateRange.nonEmpty && (query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty)) { val name = query.dateRange.get.name val before = query.dateRange.get.before.getOrElse("") val after = query.dateRange.get.after.getOrElse("") val rangeStart = s""" - |{ - | "constant_score": { - | "filter": { - | "range": { - | "$name": { + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${name}": { """.stripMargin val rangeAfter = s""" - | "gt": "$after" + | "gt": "${after}" """.stripMargin val rangeBefore = s""" - | "lt": "$before" + | "lt": "${before}" """.stripMargin val rangeEnd = s""" - | } - | } - | }, - | "boost": 0 - | } - |} + | } + | } + | }, + | "boost": 0 + | } + |} """.stripMargin var range = rangeStart @@ -800,52 +607,46 @@ class URAlgorithm(val ap: URAlgorithmParams) if (!before.isEmpty) range += rangeBefore range += rangeEnd - Seq(parse(range)) - } else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) { // use the query date or system date + json = json :+ parse(range) + } else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) {// use the query date or system date val availableDate = ap.availableDateName.get // never None val expireDate = ap.expireDateName.get + val available = s""" - |{ - | "constant_score": { - | "filter": { - | "range": { - | "$availableDate": { - | "lte": "$currentDate" - | } - | } - | }, - | "boost": 0 - | } - |} + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${availableDate}": { + | "lte": "${currentDate}" + | } + | } + | }, + | "boost": 0 + | } + |} """.stripMargin + + json = json :+ parse(available) val expire = s""" - |{ - | "constant_score": { - | "filter": { - | "range": { - | "$expireDate": { - | "gt": "$currentDate" - | } - | } - | }, - | "boost": 0 - | } - |} + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${expireDate}": { + | "gt": "${currentDate}" + | } + | } + | }, + | "boost": 0 + | } + |} """.stripMargin - - Seq(parse(available), parse(expire)) + json = json :+ parse(expire) } else { - logger.info( - """ - |Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect. - |Ingoring date information for this query.""".stripMargin) - Seq.empty + logger.info("Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect.\nIngoring date information for this query.") } json } - def getRankingMapping: Map[String, String] = rankingFieldNames map { fieldName => - fieldName -> "float" - } toMap - } diff --git a/src/main/scala/URModel.scala b/src/main/scala/URModel.scala index a93e1fe..13a9ffa 100644 --- a/src/main/scala/URModel.scala +++ b/src/main/scala/URModel.scala @@ -15,108 +15,192 @@ * limitations under the License. */ -package org.template +package com.uniflash + +import java.util.Date import grizzled.slf4j.Logger -import io.prediction.data.storage.DataMap + +import org.apache.predictionio.controller.{PersistentModelLoader, PersistentModel} +import org.apache.predictionio.data.storage.PropertyMap import org.apache.mahout.math.indexeddataset.IndexedDataset -import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark -import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.joda.time.DateTime import org.json4s.JsonAST.JArray import org.json4s._ -import org.template.conversions.{ IndexedDatasetConversions, ItemID, ItemProps } +import com.uniflash.conversions.IndexedDatasetConversions +import org.elasticsearch.spark._ +import org.apache.spark.SparkContext + /** Universal Recommender models to save in ES */ class URModel( - coocurrenceMatrices: Seq[(ItemID, IndexedDataset)] = Seq.empty, - propertiesRDDs: Seq[RDD[(ItemID, ItemProps)]] = Seq.empty, - typeMappings: Map[String, String] = Map.empty, // maps fieldname that need type mapping in Elasticsearch - nullModel: Boolean = false)(implicit sc: SparkContext) { - - @transient lazy val logger: Logger = Logger[this.type] + coocurrenceMatrices: Option[List[(String, IndexedDataset)]], + fieldsRDD: Option[RDD[(String, PropertyMap)]], + indexName: String, + dateNames: Option[List[String]] = None, + nullModel: Boolean = false, + typeMappings: Option[Map[String, String]] = None, // maps fieldname that need type mapping in Elasticsearch + propertiesRDD: Option[RDD[collection.Map[String, Any]]] = None) + // a little hack to allow a dummy model used to save but not + // retrieve (see companion object's apply) + extends PersistentModel[URAlgorithmParams] { + @transient lazy val logger = Logger[this.type] /** Save all fields to be indexed by Elasticsearch and queried for recs - * This will is something like a table with row IDs = item IDs and separate fields for all - * cooccurrence and cross-cooccurrence correlators and metadata for each item. Metadata fields are - * limited to text term collections so vector types. Scalar values can be used but depend on - * Elasticsearch's support. One exception is the Data scalar, which is also supported - * @return always returns true since most other reasons to not save cause exceptions - */ - def save(dateNames: Seq[String], esIndex: String, esType: String): Boolean = { - - logger.debug(s"Start save model") + * This will is something like a table with row IDs = item IDs and separate fields for all + * cooccurrence and cross-cooccurrence correlators and metadata for each item. Metadata fields are + * limited to text term collections so vector types. Scalar values can be used but depend on + * Elasticsearch's support. One exception is the Data scalar, which is also supported + * @param id + * @param params from engine.json, algorithm control params + * @param sc The spark constext already created for execution + * @return always returns true since most other reasons to not save cause exceptions + */ + def save(id: String, params: URAlgorithmParams, sc: SparkContext): Boolean = { if (nullModel) throw new IllegalStateException("Saving a null model created from loading an old one.") + val esIndexURI = s"/${params.indexName}/${params.typeName}" + // for ES we need to create the entire index in an rdd of maps, one per item so we'll use // convert cooccurrence matrices into correlators as RDD[(itemID, (actionName, Seq[itemID])] // do they need to be in Elasticsearch format logger.info("Converting cooccurrence matrices into correlators") - val correlatorRDDs: Seq[RDD[(ItemID, ItemProps)]] = coocurrenceMatrices.map { - case (actionName, dataset) => - dataset.asInstanceOf[IndexedDatasetSpark].toStringMapRDD(actionName) - } - - logger.info("Group all properties RDD") - val groupedRDD: RDD[(ItemID, ItemProps)] = groupAll(correlatorRDDs ++ propertiesRDDs) - // logger.debug(s"Grouped RDD\n${groupedRDD.take(25).mkString("\n")}") - - val esRDD: RDD[Map[String, Any]] = groupedRDD.mapPartitions { iter => - iter map { - case (itemId, itemProps) => - val propsMap = itemProps.map { - case (propName, propValue) => - propName -> URModel.extractJvalue(dateNames, propName, propValue) + val correlators = if (coocurrenceMatrices.nonEmpty) coocurrenceMatrices.get.map { case (actionName, dataset) => + dataset.asInstanceOf[IndexedDatasetSpark].toStringMapRDD(actionName).asInstanceOf[RDD[(String, Map[String, Any])]] + //} else List.empty[RDD[(String, Map[String, Seq[String]])]] // empty mena only calculating PopModel + } else List.empty[RDD[(String, Map[String, Any])]] // empty mena only calculating PopModel + + // getting action names since they will be ES fields + logger.info(s"Getting a list of action name strings") + val allActions = coocurrenceMatrices.getOrElse(List.empty[(String, IndexedDatasetSpark)]).map(_._1) + + logger.info(s"Ready to pass date fields names to closure ${dateNames}") + val closureDateNames = dateNames.getOrElse(List.empty[String]) + // convert the PropertyMap into Map[String, Seq[String]] for ES + logger.info("Converting PropertyMap into Elasticsearch style rdd") + var properties = List.empty[RDD[(String, Map[String, Any])]] + var allPropKeys = List.empty[String] + if (fieldsRDD.nonEmpty) { + properties = List(fieldsRDD.get.map { case (item, pm) => + var m: Map[String, Any] = Map() + for (key <- pm.keySet) { + val k = key + val v = pm.get[JValue](key) + try { + // if we get something unexpected, add ignore and add nothing to the map + pm.get[JValue](key) match { + case JArray(list) => // assumes all lists are string tokens for bias + val l = list.map { + case JString(s) => s + case _ => "" + } + m = m + (key -> l) + case JString(s) => // name for this field is in engine params + if (closureDateNames.contains(key)) { + // one of the date fields + val dateTime = new DateTime(s) + val date: java.util.Date = dateTime.toDate() + m = m + (key -> date) + } + case JDouble(rank) => // only the ranking double from PopModel should be here + m = m + (key -> rank) + case JInt(someInt) => // not sure what this is but pass it on + m = m + (key -> someInt) + } + } catch { + case e: ClassCastException => e + case e: IllegalArgumentException => e + case e: MatchError => e + //got something we didn't expect so ignore it, put nothing in the map } - propsMap + ("id" -> itemId) - } + } + (item, m) + }) + allPropKeys = properties.head.flatMap(_._2.keySet).distinct.collect().toList } - // logger.debug(s"ES RDD\n${esRDD.take(25).mkString("\n")}") - val esFields: List[String] = esRDD.flatMap(_.keySet).distinct().collect.toList - logger.info(s"ES fields[${esFields.size}]: $esFields") - EsClient.hotSwap(esIndex, esType, esRDD, esFields, typeMappings) + // these need to be indexed with "not_analyzed" and no norms so have to + // collect all field names before ES index create + val allFields = (allActions ++ allPropKeys).distinct // shouldn't need distinct but it's fast + + if (propertiesRDD.isEmpty) { + // Elasticsearch takes a Map with all fields, not a tuple + logger.info("Grouping all correlators into doc + fields for writing to index") + logger.info(s"Finding non-empty RDDs from a list of ${correlators.length} correlators and " + + s"${properties.length} properties") + val esRDDs: List[RDD[(String, Map[String, Any])]] = + //(correlators ::: properties).filterNot(c => c.isEmpty())// for some reason way too slow + (correlators ::: properties) + //c.take(1).length == 0 + if (esRDDs.nonEmpty) { + val esFields = groupAll(esRDDs).map { case (item, map) => + // todo: every map's items must be checked for value type and converted before writing to ES + val esMap = map + ("id" -> item) + esMap + } + // create a new index then hot-swap the new index by re-aliasing to it then delete old index + logger.info("New data to index, performing a hot swap of the index.") + esClient.hotSwap( + params.indexName, + params.typeName, + esFields.asInstanceOf[RDD[scala.collection.Map[String,Any]]], + allFields, + typeMappings) + } else logger.warn("No data to write. May have been caused by a failed or stopped `pio train`, " + + "try running it again") + + } else { + // this happens when updating only the popularity backfill model but to do a hotSwap we need to dup the + // entire index + + // create a new index then hot-swap the new index by re-aliasing to it then delete old index + esClient.hotSwap(params.indexName, params.typeName, propertiesRDD.get, allFields, + typeMappings) + } true } + + def groupAll( fields: Seq[RDD[(String, (Map[String, Any]))]]): RDD[(String, (Map[String, Any]))] = { + //if (fields.size > 1 && !fields.head.isEmpty() && !fields(1).isEmpty()) { + if (fields.size > 1) { + fields.head.cogroup[Map[String, Any]](groupAll(fields.drop(1))).map { case (key, pairMapSeqs) => + // to be safe merge all maps but should only be one per rdd element + val rdd1Maps = pairMapSeqs._1.foldLeft(Map.empty[String, Any])(_ ++ _) + val rdd2Maps = pairMapSeqs._2.foldLeft(Map.empty[String, Any])(_ ++ _) + val fullMap = rdd1Maps ++ rdd2Maps + (key, fullMap) + } + } else fields.head + } - def groupAll(fields: Seq[RDD[(ItemID, ItemProps)]]): RDD[(ItemID, ItemProps)] = { - fields.fold(sc.emptyRDD[(ItemID, ItemProps)])(_ ++ _).reduceByKey(_ ++ _) + override def toString = { + s"URModel in Elasticsearch at index: ${indexName}" } + + } -object URModel { - @transient lazy val logger: Logger = Logger[this.type] +object URModel + extends PersistentModelLoader[URAlgorithmParams, URModel] { + @transient lazy val logger = Logger[this.type] /** This is actually only used to read saved values and since they are in Elasticsearch we don't need to read - * this means we create a null model since it will not be used. - * todo: we should rejigger the template framework so this is not required. - * @param id ignored - * @param params ignored - * @param sc ignored - * @return dummy null model - */ + * this means we create a null model since it will not be used. + * todo: we should rejigger the template framework so this is not required. + * @param id ignored + * @param params ignored + * @param sc ignored + * @return dummy null model + */ def apply(id: String, params: URAlgorithmParams, sc: Option[SparkContext]): URModel = { // todo: need changes in PIO to remove the need for this - new URModel(null, null, null, nullModel = true)(sc.get) - } - - def extractJvalue(dateNames: Seq[String], key: String, value: Any): Any = value match { - case JArray(list) => list.map(extractJvalue(dateNames, key, _)) - case JString(s) => - if (dateNames.contains(key)) { - new DateTime(s).toDate - } else if (RankingFieldName.toSeq.contains(key)) { - s.toDouble - } else { - s - } - case JDouble(double) => double - case JInt(int) => int - case JBool(bool) => bool - case _ => value + val urm = new URModel(null, null, null, nullModel = true) + logger.info("Created dummy null model") + urm } } diff --git a/src/main/scala/package.scala b/src/main/scala/package.scala index c3b4810..313754c 100644 --- a/src/main/scala/package.scala +++ b/src/main/scala/package.scala @@ -15,72 +15,20 @@ * limitations under the License. */ -package org.template +package com.uniflash import grizzled.slf4j.Logger - import scala.collection.JavaConversions._ import org.apache.mahout.sparkbindings.SparkDistributedContext import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import org.apache.mahout.sparkbindings._ import org.apache.spark.rdd.RDD -import org.json4s._ /** Utility conversions for IndexedDatasetSpark */ package object conversions { - type UserID = String - type ActionID = String - type ItemID = String - // Item properties (fieldName, fieldValue) - type ItemProps = Map[String, JValue] - - def drawActionML(implicit logger: Logger): Unit = { - val actionML = - """ - | - | _ _ __ __ _ - | /\ | | (_) | \/ | | - | / \ ___| |_ _ ___ _ __ | \ / | | - | / /\ \ / __| __| |/ _ \| '_ \| |\/| | | - | / ____ \ (__| |_| | (_) | | | | | | | |____ - | /_/ \_\___|\__|_|\___/|_| |_|_| |_|______| - | - | - """.stripMargin - - logger.info(actionML) - } - - def drawInfo(title: String, dataMap: Seq[(String, Any)])(implicit logger: Logger): Unit = { - val leftAlignFormat = "║ %-30s%-28s ║" - - val line = "═" * 60 - - val preparedTitle = "║ %-58s ║".format(title) - val data = dataMap.map { - case (key, value) => - leftAlignFormat.format(key, value) - } mkString "\n" - - logger.info( - s""" - |╔$line╗ - |$preparedTitle - |$data - |╚$line╝ - |""".stripMargin) - - } - - implicit class OptionCollection[T](collectionOpt: Option[Seq[T]]) { - def getOrEmpty: Seq[T] = { - collectionOpt.getOrElse(Seq.empty[T]) - } - } - implicit class IndexedDatasetConversions(val indexedDataset: IndexedDatasetSpark) { - def toStringMapRDD(actionName: ActionID): RDD[(ItemID, ItemProps)] = { + def toStringMapRDD(actionName: String): RDD[(String, Map[String, Seq[String]])] = { @transient lazy val logger = Logger[this.type] //val matrix = indexedDataset.matrix.checkpoint() @@ -93,37 +41,37 @@ package object conversions { // may want to mapPartition and create bulk updates as a slight optimization // creates an RDD of (itemID, Map[correlatorName, list-of-correlator-values]) - indexedDataset.matrix.rdd.map[(ItemID, ItemProps)] { - case (rowNum, itemVector) => - - // turn non-zeros into list for sorting - var itemList = List[(Int, Double)]() - for (ve <- itemVector.nonZeroes) { - itemList = itemList :+ (ve.index, ve.get) + indexedDataset.matrix.rdd.map[(String, Map[String, Seq[String]])] { case (rowNum, itemVector) => + + // turn non-zeros into list for sorting + var itemList = List[(Int, Double)]() + for (ve <- itemVector.nonZeroes) { + val v = ve + itemList = itemList :+(ve.index, ve.get) + } + //sort by highest strength value descending(-) + val vector = itemList.sortBy { elem => -elem._2 } + + val itemID = rowIDDictionary_bcast.value.inverse.getOrElse(rowNum, "INVALID_ITEM_ID") + try { + + require(itemID != "INVALID_ITEM_ID", s"Bad row number in matrix, skipping item ${rowNum}") + require(vector.nonEmpty, s"No values so skipping item ${rowNum}") + + // create a list of element ids + val values = vector.map { item => + columnIDDictionary_bcast.value.inverse.getOrElse(item._1, "") // should always be in the dictionary } - //sort by highest strength value descending(-) - val vector = itemList.sortBy { elem => -elem._2 } - val itemID = rowIDDictionary_bcast.value.inverse.getOrElse(rowNum, "INVALID_ITEM_ID") - try { + (itemID, Map(actionName -> values)) - require(itemID != "INVALID_ITEM_ID", s"Bad row number in matrix, skipping item $rowNum") - require(vector.nonEmpty, s"No values so skipping item $rowNum") - - // create a list of element ids - val values = JArray(vector.map { item => - JString(columnIDDictionary_bcast.value.inverse.getOrElse(item._1, "")) // should always be in the dictionary - }) - - (itemID, Map(actionName -> values)) - - } catch { - case cce: IllegalArgumentException => //non-fatal, ignore line - null.asInstanceOf[(ItemID, ItemProps)] - } + } catch { + case cce: IllegalArgumentException => //non-fatal, ignore line + null.asInstanceOf[(String, Map[String, Seq[String]])] + } }.filter(_ != null) } } -} +} \ No newline at end of file