Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions src/main/scala/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.prediction.controller.{EngineFactory, Engine}

/** The Query spec with optional values. The only hard rule is that there must be either a user or
* an item id. All other values are optional. */

case class Query(
user: Option[String] = None, // must be a user or item id
userBias: Option[Float] = None, // default: whatever is in algorithm params or 1
Expand All @@ -36,12 +37,17 @@ case class Query(
fields: Option[List[Field]] = None, // default: whatever is in algorithm params or None
currentDate: Option[String] = None, // if used will override dateRange filter, currentDate must lie between the item's
// expireDateName value and availableDateName value, all are ISO 8601 dates
dateRange: Option[DateRange] = None, // optional before and after filter applied to a date field
blacklistItems: Option[List[String]] = None, // default: whatever is in algorithm params or None
dateRange: Option[DateRange] = None, // default: whatever is in algorithm params or None
returnSelf: Option[Boolean] = None,// means for an item query should the item itself be returned, defaults
// to what is in the algorithm params or false
num: Option[Int] = None, // default: whatever is in algorithm params, which itself has a default--probably 20
eventNames: Option[List[String]]) // names used to ID all user actions
eventNames: Option[List[String]], // names used to ID all user actions
/** following variable defines features - must include particular items, price range, blacklist items,blacklistCategories respectively ***/
includeItems: Option[List[String]] = None,
priceRange: Option[PriceRange] = None,// optional before and after filter applied to a date field
blacklistItems: Option[List[String]] = None,
blacklistCategory: Option[List[String]] = None,
blacklistBrand: Option[List[String]] = None)
extends Serializable
Copy link

@pferrel pferrel May 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to specify a filter like we do with properties, which include the property name so they can be used for any property not just Category or Brand. I suggest an addition to the field definition, perhaps a bias enumeration, like

"bias": {
    "type": "not"
}

"bias": {
    "value": -1 // for filter
}

the type and value would not be used together.

by default the type is None, which means current behavior.

This will allow any number of fields to be set for any named property. Likewise this could be put in the engine.json for global application or into the query.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Pat

Just read your suggestions. Will work on it.
Once completed will send you PR


/** Used to specify how Fields are represented in engine.json */
Expand All @@ -59,16 +65,41 @@ case class DateRange(
after: Option[String]) // both empty should be ignored
extends Serializable


/********************************************************************************************************/
// This is for price range filter
case class PriceRange(
name: String, // name of item property for the date comparison
lessthan: Option[String], // empty strings means no filter
greaterthan: Option[String]) // both empty should be ignored
extends Serializable
/*********************************************************************************************************/

/** results of a MMRAlgoritm.predict */
case class PredictedResult(
itemScores: Array[ItemScore])
itemScores: Array[ItemScore],
includeItems : Array[IncludeItem])
extends Serializable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not correct indentation by the Scala style guide. http://docs.scala-lang.org/style/indentation.html#methods_with_numerous_arguments

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update indentation


case class ItemScore(
item: String, // item id
score: Double )// used to rank, original score returned from teh search engine
item: String, // item id
score: Double,
price: AnyRef,
productPictureUrl: AnyRef,
productTitle: AnyRef,
pageUrl:AnyRef
) //used to rank, original score returned from teh search engine
extends Serializable

case class IncludeItem(
item: String, // item id
score: Double,
price: AnyRef,
productPictureUrl: AnyRef,
productTitle: AnyRef,
pageUrl:AnyRef
) extends Serializable

object RecommendationEngine extends EngineFactory {
def apply() = {
new Engine(
Expand All @@ -77,4 +108,4 @@ object RecommendationEngine extends EngineFactory {
Map("ur" -> classOf[URAlgorithm]), // IMPORTANT: "ur" must be the "name" of the parameter set in engine.json
classOf[Serving])
}
}
}
191 changes: 186 additions & 5 deletions src/main/scala/URAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,25 @@ class URAlgorithm(val ap: URAlgorithmParams)
val backfillFieldName = ap.backfillField.getOrElse(BackfillField()).name
logger.info(s"PopModel using fieldName: ${backfillFieldName}")
val queryAndBlacklist = buildQuery(ap, query, backfillFieldName.getOrElse(defaultURAlgorithmParams.DefaultBackfillFieldName))
val recs = EsClient.search(queryAndBlacklist._1, ap.indexName)
val recs = EsClient.search(queryAndBlacklist._1, ap.indexName, query)
// should have all blacklisted items excluded
// todo: need to add dithering, mean, sigma, seed required, make a seed that only changes on some fixed time
// period so the recs ordering stays fixed for that time period.
recs
}

def buildIncludeQuery (query : Query) : String = {
val mustinclude: JValue = render(("ids" -> ("values" -> query.includeItems)))
val json =
(
("query"->
(mustinclude)
)
)
val j = compact(render(json))
logger.info(s"Include item query: \n${j}\n")
compact(render(json))
}
/** Build a query from default algorithms params and the query itself taking into account defaults */
def buildQuery(ap: URAlgorithmParams, query: Query, backfillFieldName: String = ""): (String, List[Event]) = {

Expand Down Expand Up @@ -426,6 +438,8 @@ class URAlgorithm(val ap: URAlgorithmParams)

val allFilteringCorrelators = recentUserHistoryFilter ++ similarItemsFilter ++ filteringMetadata

val excludeItemsBrandCategory = getExcludedItemsBrandCategory(query)

// since users have action history and items have correlators and both correspond to the same "actions" like
// purchase or view, we'll pass both to the query if the user history or items correlators are empty
// then metadata or backfill must be relied on to return results.
Expand All @@ -451,15 +465,18 @@ class URAlgorithm(val ap: URAlgorithmParams)
|""".stripMargin))

val should: List[JValue] = if (shouldFields.isEmpty) popModelSort else shouldFields.get ::: popModelSort

val filteringDatePriceRange = getFilteringDatePriceRange(query)

val mustFields: List[JValue] = allFilteringCorrelators.map { i =>
render(("terms" -> (i.actionName -> i.itemIDs) ~ ("boost" -> 0)))}.toList
val must: List[JValue] = mustFields ::: filteringDateRange
//val must: List[JValue] = mustFields ::: filteringDateRange

//val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0))) val must: List[JValue] = mustFields ::: filteringDatePriceRange
val must: List[JValue] = mustFields ::: filteringDatePriceRange

val mustNotFields: JValue = render(("ids" -> ("values" -> getExcludedItems (alluserEvents._2, query)) ~ ("boost" -> 0)))
val mustNot: JValue = mustNotFields

val mustNot : List[JValue] = excludeItemsBrandCategory

val popQuery = if (ap.recsModel.getOrElse("all") == "all" ||
ap.recsModel.getOrElse("all") == "backfill") {
Some(List(
Expand Down Expand Up @@ -710,4 +727,168 @@ class URAlgorithm(val ap: URAlgorithmParams)
json
}

//function containing both date range and price filter together
def getFilteringDatePriceRange( query: Query ): List[JValue] = {
var json: List[JValue] = List.empty[JValue]
var range = ""
// currentDate in the query overrides the dateRange in the same query so ignore daterange if both
val currentDate = query.currentDate.getOrElse(DateTime.now().toDateTimeISO.toString)

if (query.priceRange.nonEmpty && (query.priceRange.get.greaterthan.nonEmpty && query.priceRange.get.lessthan.nonEmpty)) {
logger.info(s"inside price Date Range")
val name = query.priceRange.get.name
val lessthan = query.priceRange.get.lessthan.getOrElse("")
val greaterthan = query.priceRange.get.greaterthan.getOrElse("")

val rangeStart = s"""
|{
| "constant_score": {
| "filter": {
| "range": {
| "${name}": {

""".stripMargin


val rangeAfter = s"""
| "gt": "${greaterthan}"
""".stripMargin


val rangeBefore = s"""
| "lt": "${lessthan}"
""".
stripMargin

val rangeEnd = s"""
| }
| }
| },
| "boost": 0
| }
|}
""".stripMargin

range += rangeStart
if (!greaterthan.isEmpty) {
range += rangeAfter
if (!lessthan.isEmpty) range += ","
}
if (!lessthan.isEmpty){
range += rangeBefore
}
range += rangeEnd
if (query.dateRange.nonEmpty && (query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty))
range += ","
json = json :+ parse(range)
}
if (query.dateRange.nonEmpty && (query.dateRange.get.after.nonEmpty || query.dateRange.get.before.nonEmpty)) {
logger.info(s"inside Date Range")
val name = query.dateRange.get.name
val before = query.dateRange.get.before.getOrElse("")
val after = query.dateRange.get.after.getOrElse("")

val rangeStart = s"""
|{
| "constant_score": {
| "filter": {
| "range": {
| "${name}": {
""".stripMargin
val rangeAfter = s"""
| "gt": "${after}"
""".stripMargin

val rangeBefore = s"""
| "lt": "${before}"
""".stripMargin
val rangeEnd = s"""
| }
| }
| },
| "boost": 0
| }
|}
""".stripMargin
range += rangeStart
if (!after.isEmpty) {
range += rangeAfter
if (!before.isEmpty) range += ","
}
if(!before.isEmpty)
{
range += rangeBefore
}
range += rangeEnd
json = json :+ parse(range)
}
else if (ap.availableDateName.nonEmpty && ap.expireDateName.nonEmpty) {
// use the query date or system date
val availableDate = ap.availableDateName.get
// never None
val expireDate = ap.expireDateName.get

val available = s"""
|{
| "constant_score": {
| "filter": {
| "range": {
| "${availableDate} ": {
| "lte": "${currentDate} "
| }
| }
| },
| "boost": 0
| }
|}
""".stripMargin

json = json :+ parse(available)
val expire = s"""
|{
| "constant_score": {
| "filter": {
| "range": {
| "${expireDate}": {
| "gt": "${currentDate}"
| }
| }
| },
| "boost": 0
| }
|}
""".stripMargin
json = json :+ parse(expire)
}
else
{
logger.info("Misconfigured date information, either your engine.json date settings or your query's dateRange is incorrect.\nIngoring date information for this query.")
}
json
}

def getExcludedItemsBrandCategory(query : Query): List[JValue] = {
var json: List[JValue] = List.empty[JValue]
val alluserEvents = getBiasedRecentUserActions(query)
val excludedItems = getExcludedItems(alluserEvents._2, query)
if(query.blacklistCategory.nonEmpty) {
val excludeCategory: JValue = render("terms" -> ("categories" -> query.blacklistCategory)) // exlcude items from particular category
val excludeBrandString : String = compact(excludeCategory)
json = json :+ parse(excludeBrandString)
}
if(query.blacklistBrand.nonEmpty)
{
val excludeBrand: JValue = render("terms" -> ("brands" -> query.blacklistBrand)) // exlcude items from particular brand
val excludeCategoryString : String = compact(excludeBrand)
json = json :+ parse(excludeCategoryString)
}
if(excludedItems.nonEmpty)
{
val excludeItems: JValue = render("ids" -> ("values" -> getExcludedItems(alluserEvents._2, query))) // exclude particular item
val excludeItemsString : String = compact(excludeItems)
json = json :+ parse(excludeItemsString)
}
json
}

}
45 changes: 36 additions & 9 deletions src/main/scala/esClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
** limitations under the License.
*/

package org.template

import java.util

import com.betaout.{IncludeItem, _}
import grizzled.slf4j.Logger
import io.prediction.data.storage.{Storage, StorageClientConfig, elasticsearch}
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest
Expand All @@ -31,7 +32,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsReques
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.{Settings, ImmutableSettings}
import org.elasticsearch.common.settings.{ImmutableSettings, Settings}
import org.joda.time.DateTime
import org.json4s.jackson.JsonMethods._
import org.elasticsearch.spark._
Expand Down Expand Up @@ -211,20 +212,46 @@ object EsClient {
* @param indexName the index to search
* @return a [PredictedResults] collection
*/
def search(query: String, indexName: String): PredictedResult = {
val sr = client.prepareSearch(indexName).setSource(query).get()


/********** changes in search function to get mustinclude query items in result***********/
// exlcude products with empty product picture URL

def search(query: String, indexName: String, _query: Query): PredictedResult = {
val sr = client.prepareSearch(indexName).setSource(query).get()
val ap: URAlgorithmParams = null
val URAlgorithmObj = new URAlgorithm(ap)
if (_query.includeItems != None) {
val queryIncludeItems = URAlgorithmObj.buildIncludeQuery(_query)
val sr2 = client.prepareSearch(indexName).setSource(queryIncludeItems).get()
if (!sr.isTimedOut) {
val recs = sr.getHits.getHits.map( hit => new ItemScore(hit.getId, hit.getScore.toDouble) )
val recs = sr.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") == null).map(hit => new ItemScore(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl")))

val includeitems = sr2.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") == null).map(hit => new IncludeItem(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl")))
// return only products whose product picture URL is not null
logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " +
s"a possible ${sr.getHits.totalHits()}")
new PredictedResult(recs)
if(!sr2.isTimedOut)
new PredictedResult(recs, includeitems)
else
new PredictedResult(recs, Array.empty[IncludeItem])
} else {
logger.info(s"No results for query ${parse(query)}")
new PredictedResult(Array.empty[ItemScore])
new PredictedResult(Array.empty[ItemScore], Array.empty[IncludeItem])
}

}
else {
if (!sr.isTimedOut) {
val recs = sr.getHits.getHits.filter(a => a.sourceAsMap().get("productPictureUrl") != null).map(hit => new ItemScore(hit.getId, hit.getScore.toDouble, hit.sourceAsMap().get("price"), hit.sourceAsMap().get("productPictureUrl"), hit.sourceAsMap().get("productTitle"), hit.sourceAsMap().get("pageUrl")))
logger.info(s"Results: ${sr.getHits.getHits.size} retrieved of " +
s"a possible ${sr.getHits.totalHits()}")
new PredictedResult(recs, Array.empty[IncludeItem])
} else {
logger.info(s"No results for query ${parse(query)}")
new PredictedResult(Array.empty[ItemScore], Array.empty[IncludeItem])
}
}
}

/** Gets the "source" field of an Elasticsearch document
*
Expand Down Expand Up @@ -283,4 +310,4 @@ object EsClient {
Some(indexAsRDD)
} else None // error so no index for the alias
}
}
}