From 53de1ec2df6bbd3f495cf4301de6b02ed1434195 Mon Sep 17 00:00:00 2001 From: Rasna Tomar Date: Thu, 19 May 2016 15:49:47 +0530 Subject: [PATCH 1/5] Commit by Rasna and Ambuj --- src/main/scala/Engine.scala | 45 +++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index 4c73e64..a5db0f5 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -28,6 +28,7 @@ import io.prediction.controller.{EngineFactory, Engine} /** The Query spec with optional values. The only hard rule is that there must be either a user or * an item id. All other values are optional. */ + case class Query( user: Option[String] = None, // must be a user or item id userBias: Option[Float] = None, // default: whatever is in algorithm params or 1 @@ -36,12 +37,17 @@ case class Query( fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's // expireDateName value and availableDateName value, all are ISO 8601 dates - dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field - blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None + dateRange: Option[DateRange] = None, // default: whatever is in algorithm params or None returnSelf: Option[Boolean] = None,// means for an item query should the item itself be returned, defaults // to what is in the algorithm params or false num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20 - eventNames: Option[List[String]]) // names used to ID all user actions + eventNames: Option[List[String]], // names used to ID all user actions + /** following variable defines features - must include particular items, price range, blacklist items,blacklistCategories respectively ***/ + includeItems: Option[List[String]] = None, + priceRange: Option[PriceRange] = None,// optional before and after filter applied to a date field + blacklistItems: Option[List[String]] = None, + blacklistCategory: Option[List[String]] = None, + blacklistBrand: Option[List[String]] = None) extends Serializable /** Used to specify how Fields are represented in engine.json */ @@ -59,16 +65,41 @@ case class DateRange( after: Option[String]) // both empty should be ignored extends Serializable + +/********************************************************************************************************/ +// This is for price range filter +case class PriceRange( + name: String, // name of item property for the date comparison + lessthan: Option[String], // empty strings means no filter + greaterthan: Option[String]) // both empty should be ignored + extends Serializable +/*********************************************************************************************************/ + /** results of a MMRAlgoritm.predict */ case class PredictedResult( - itemScores: Array[ItemScore]) + itemScores: Array[ItemScore], + includeItems : Array[IncludeItem]) extends Serializable case class ItemScore( - item: String, // item id - score: Double )// used to rank, original score returned from teh search engine + item: String, // item id + score: Double, + price: AnyRef, + productPictureUrl: AnyRef, + productTitle: AnyRef, + pageUrl:AnyRef + ) // used to rank, original score returned from teh search engine extends Serializable +case class IncludeItem( + item: String, // item id + score: Double, + price: AnyRef, + productPictureUrl: AnyRef, + productTitle: AnyRef, + pageUrl:AnyRef + ) extends Serializable + object RecommendationEngine extends EngineFactory { def apply() = { new Engine( @@ -77,4 +108,4 @@ object RecommendationEngine extends EngineFactory { Map("ur" -> classOf[URAlgorithm]), // IMPORTANT: "ur" must be the "name" of the parameter set in engine.json classOf[Serving]) } -} \ No newline at end of file +} From 4c50dd3e2d311a9c1ffe68fc4bacb6739c2fd41a Mon Sep 17 00:00:00 2001 From: Rasna Tomar Date: Thu, 19 May 2016 15:56:53 +0530 Subject: [PATCH 2/5] Commit by Rasna and Ambuj --- src/main/scala/Engine.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala index a5db0f5..746793f 100644 --- a/src/main/scala/Engine.scala +++ b/src/main/scala/Engine.scala @@ -88,7 +88,7 @@ case class ItemScore( productPictureUrl: AnyRef, productTitle: AnyRef, pageUrl:AnyRef - ) // used to rank, original score returned from teh search engine + ) //used to rank, original score returned from teh search engine extends Serializable case class IncludeItem( From 38fccc0a9337b6062343217fd2eaaeb781258173 Mon Sep 17 00:00:00 2001 From: Rasna Tomar Date: Thu, 19 May 2016 15:57:45 +0530 Subject: [PATCH 3/5] Commit by Rasna and Ambuj --- src/main/scala/URAlgorithm.scala | 191 ++++++++++++++++++++++++++++++- 1 file changed, 186 insertions(+), 5 deletions(-) diff --git a/src/main/scala/URAlgorithm.scala b/src/main/scala/URAlgorithm.scala index 27368ef..a6accf0 100644 --- a/src/main/scala/URAlgorithm.scala +++ b/src/main/scala/URAlgorithm.scala @@ -377,13 +377,25 @@ class URAlgorithm(val ap: URAlgorithmParams) val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name logger.info(s"PopModel using fieldName: ${backfillFieldName}") val queryAndBlacklist = buildQuery(ap, query, backfillFieldName.getOrElse(defaultURAlgorithmParams.DefaultBackfillFieldName)) - val recs = EsClient.search(queryAndBlacklist._1, ap.indexName) + val recs = EsClient.search(queryAndBlacklist._1, ap.indexName, query) // should have all blacklisted items excluded // todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time // period so the recs ordering stays fixed for that time period. recs } + def buildIncludeQuery (query : Query) : String = { + val mustinclude: JValue = render(("ids" -> ("values" -> query.includeItems))) + val json = + ( + ("query"-> + (mustinclude) + ) + ) + val j = compact(render(json)) + logger.info(s"Include item query: \n${j}\n") + compact(render(json)) + } /** Build a query from default algorithms params and the query itself taking into account defaults */ def buildQuery(ap: URAlgorithmParams, query: Query, backfillFieldName: String = ""): (String, List[Event]) = { @@ -426,6 +438,8 @@ class URAlgorithm(val ap: URAlgorithmParams) val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata + val excludeItemsBrandCategory = getExcludedItemsBrandCategory(query) + // since users have action history and items have correlators and both correspond to the same "actions" like // purchase or view, we'll pass both to the query if the user history or items correlators are empty // then metadata or backfill must be relied on to return results. @@ -451,15 +465,18 @@ class URAlgorithm(val ap: URAlgorithmParams) |""".stripMargin)) val should: List[JValue] = if (shouldFields.isEmpty) popModelSort else shouldFields.get ::: popModelSort - + val filteringDatePriceRange = getFilteringDatePriceRange(query) val mustFields: List[JValue] = allFilteringCorrelators.map { i => render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> 0)))}.toList - val must: List[JValue] = mustFields ::: filteringDateRange + //val must: List[JValue] = mustFields ::: filteringDateRange + + //val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0))) val must: List[JValue] = mustFields ::: filteringDatePriceRange + val must: List[JValue] = mustFields ::: filteringDatePriceRange - val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0))) - val mustNot: JValue = mustNotFields + val mustNot : List[JValue] = excludeItemsBrandCategory + val popQuery = if (ap.recsModel.getOrElse("all") == "all" || ap.recsModel.getOrElse("all") == "backfill") { Some(List( @@ -710,4 +727,168 @@ class URAlgorithm(val ap: URAlgorithmParams) json } + //function containing both date range and price filter together +def getFilteringDatePriceRange( query: Query ): List[JValue] = { + var json: List[JValue] = List.empty[JValue] + var range = "" + // currentDate in the query overrides the dateRange in the same query so ignore daterange if both + val currentDate = query.currentDate.getOrElse(DateTime.now().toDateTimeISO.toString) + + if (query.priceRange.nonEmpty && (query.priceRange.get.greaterthan.nonEmpty && query.priceRange.get.lessthan.nonEmpty)) { + logger.info(s"inside price Date Range") + val name = query.priceRange.get.name + val lessthan = query.priceRange.get.lessthan.getOrElse("") + val greaterthan = query.priceRange.get.greaterthan.getOrElse("") + + val rangeStart = s""" + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${name}": { + + """.stripMargin + + + val rangeAfter = s""" + | "gt": "${greaterthan}" + """.stripMargin + + + val rangeBefore = s""" + | "lt": "${lessthan}" + """. + stripMargin + + val rangeEnd = s""" + | } + | } + | }, + | "boost": 0 + | } + |} + """.stripMargin + + range += rangeStart + if (!greaterthan.isEmpty) { + range += rangeAfter + if (!lessthan.isEmpty) range += "," + } + if (!lessthan.isEmpty){ + range += rangeBefore + } + range += rangeEnd + if (query.dateRange.nonEmpty && (query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty)) + range += "," + json = json :+ parse(range) + } + if (query.dateRange.nonEmpty && (query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty)) { + logger.info(s"inside Date Range") + val name = query.dateRange.get.name + val before = query.dateRange.get.before.getOrElse("") + val after = query.dateRange.get.after.getOrElse("") + + val rangeStart = s""" + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${name}": { + """.stripMargin + val rangeAfter = s""" + | "gt": "${after}" + """.stripMargin + + val rangeBefore = s""" + | "lt": "${before}" + """.stripMargin + val rangeEnd = s""" + | } + | } + | }, + | "boost": 0 + | } + |} + """.stripMargin + range += rangeStart + if (!after.isEmpty) { + range += rangeAfter + if (!before.isEmpty) range += "," + } + if(!before.isEmpty) + { + range += rangeBefore + } + range += rangeEnd + json = json :+ parse(range) + } + else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) { + // use the query date or system date + val availableDate = ap.availableDateName.get + // never None + val expireDate = ap.expireDateName.get + + val available = s""" + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${availableDate} ": { + | "lte": "${currentDate} " + | } + | } + | }, + | "boost": 0 + | } + |} + """.stripMargin + + json = json :+ parse(available) + val expire = s""" + |{ + | "constant_score": { + | "filter": { + | "range": { + | "${expireDate}": { + | "gt": "${currentDate}" + | } + | } + | }, + | "boost": 0 + | } + |} + """.stripMargin + json = json :+ parse(expire) + } + else + { + logger.info("Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect.\nIngoring date information for this query.") + } + json + } + + def getExcludedItemsBrandCategory(query : Query): List[JValue] = { + var json: List[JValue] = List.empty[JValue] + val alluserEvents = getBiasedRecentUserActions(query) + val excludedItems = getExcludedItems(alluserEvents._2, query) + if(query.blacklistCategory.nonEmpty) { + val excludeCategory: JValue = render("terms" -> ("categories" -> query.blacklistCategory)) // exlcude items from particular category + val excludeBrandString : String = compact(excludeCategory) + json = json :+ parse(excludeBrandString) + } + if(query.blacklistBrand.nonEmpty) + { + val excludeBrand: JValue = render("terms" -> ("brands" -> query.blacklistBrand)) // exlcude items from particular brand + val excludeCategoryString : String = compact(excludeBrand) + json = json :+ parse(excludeCategoryString) + } + if(excludedItems.nonEmpty) + { + val excludeItems: JValue = render("ids" -> ("values" -> getExcludedItems(alluserEvents._2, query))) // exclude particular item + val excludeItemsString : String = compact(excludeItems) + json = json :+ parse(excludeItemsString) + } + json + } + } From cf72b565d00eeeabf4bf2a40217f4248f170518a Mon Sep 17 00:00:00 2001 From: Rasna Tomar Date: Thu, 19 May 2016 16:02:24 +0530 Subject: [PATCH 4/5] Update esClient.scala --- src/main/scala/esClient.scala | 43 ++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/src/main/scala/esClient.scala b/src/main/scala/esClient.scala index bff5eda..638f1b9 100644 --- a/src/main/scala/esClient.scala +++ b/src/main/scala/esClient.scala @@ -19,6 +19,7 @@ package org.template import java.util +import com.betaout.{IncludeItem, _} import grizzled.slf4j.Logger import io.prediction.data.storage.{Storage, StorageClientConfig, elasticsearch} import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest @@ -31,7 +32,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques import org.elasticsearch.action.admin.indices.refresh.RefreshRequest import org.elasticsearch.action.get.GetResponse import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.{Settings, ImmutableSettings} +import org.elasticsearch.common.settings.{ImmutableSettings, Settings} import org.joda.time.DateTime import org.json4s.jackson.JsonMethods._ import org.elasticsearch.spark._ @@ -211,20 +212,46 @@ object EsClient { * @param indexName the index to search * @return a [PredictedResults] collection */ - def search(query: String, indexName: String): PredictedResult = { - val sr = client.prepareSearch(indexName).setSource(query).get() + +/********** changes in search function to get mustinclude query items in result***********/ +// exlcude products with empty product picture URL + +def search(query: String, indexName: String, _query: Query): PredictedResult = { + val sr = client.prepareSearch(indexName).setSource(query).get() + val ap: URAlgorithmParams = null + val URAlgorithmObj = new URAlgorithm(ap) + if (_query.includeItems != None) { + val queryIncludeItems = URAlgorithmObj.buildIncludeQuery(_query) + val sr2 = client.prepareSearch(indexName).setSource(queryIncludeItems).get() if (!sr.isTimedOut) { - val recs = sr.getHits.getHits.map( hit => new ItemScore(hit.getId, hit.getScore.toDouble) ) + val recs = sr.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") == null).map(hit => new ItemScore(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl"))) + + val includeitems = sr2.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") == null).map(hit => new IncludeItem(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl"))) + // return only products whose product picture URL is not null logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " + s"a possible ${sr.getHits.totalHits()}") - new PredictedResult(recs) + if(!sr2.isTimedOut) + new PredictedResult(recs, includeitems) + else + new PredictedResult(recs, Array.empty[IncludeItem]) } else { logger.info(s"No results for query ${parse(query)}") - new PredictedResult(Array.empty[ItemScore]) + new PredictedResult(Array.empty[ItemScore], Array.empty[IncludeItem]) } - } + else { + if (!sr.isTimedOut) { + val recs = sr.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") != null).map(hit => new ItemScore(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl"))) + logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " + + s"a possible ${sr.getHits.totalHits()}") + new PredictedResult(recs, Array.empty[IncludeItem]) + } else { + logger.info(s"No results for query ${parse(query)}") + new PredictedResult(Array.empty[ItemScore], Array.empty[IncludeItem]) + } + } +} /** Gets the "source" field of an Elasticsearch document * @@ -283,4 +310,4 @@ object EsClient { Some(indexAsRDD) } else None // error so no index for the alias } -} \ No newline at end of file +} From 717871abae52299d94db254d466e050810e4a98a Mon Sep 17 00:00:00 2001 From: Rasna Tomar Date: Thu, 19 May 2016 16:02:55 +0530 Subject: [PATCH 5/5] commit by Rasna and Ambuj --- src/main/scala/esClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/esClient.scala b/src/main/scala/esClient.scala index 638f1b9..1b7414d 100644 --- a/src/main/scala/esClient.scala +++ b/src/main/scala/esClient.scala @@ -12,7 +12,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and - * limitations under the License. + ** limitations under the License. */ package org.template