Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.silkframework.plugins.spatial.transformer

import org.silkframework.plugins.spatial.utils._
import org.silkframework.rule.input.Transformer
import org.silkframework.rule.input.InlineTransformer
import org.silkframework.runtime.plugin.annotations.Plugin

/**
Expand All @@ -30,7 +30,7 @@ import org.silkframework.runtime.plugin.annotations.Plugin
categories = Array("Spatial"),
label = "Transform geometry",
description = "Trasforms a geometry expressed in GeoSPARQL, stSPARQL or W3C Geo vocabulary from any serialization (WKT or GML) and any Coordinate Reference System (CRS) to WKT and WGS 84 (latitude-longitude).")
case class GeometryTransformer() extends Transformer {
case class GeometryTransformer() extends InlineTransformer {

override final def apply(values: Seq[Seq[String]]): Seq[String] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package org.silkframework.plugins.spatial.transformer
import java.util.logging.Logger

import org.silkframework.plugins.spatial.utils._
import org.silkframework.rule.input.Transformer
import org.silkframework.rule.input.InlineTransformer
import org.silkframework.runtime.plugin.annotations.Plugin

/**
Expand All @@ -32,7 +32,7 @@ import org.silkframework.runtime.plugin.annotations.Plugin
categories = Array("Spatial"),
label = "Points-to-centroid",
description = "Transforms a cluster of points expressed in W3C Geo vocabulary to their centroid expressed in WKT and WGS 84 (latitude-longitude).")
case class PointsToCentroidTransformer() extends Transformer {
case class PointsToCentroidTransformer() extends InlineTransformer {

override final def apply(values: Seq[Seq[String]]): Seq[String] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object LinkingSerializers {
case _ =>
// Link does not provide evaluation details, need to evaluate it first
for (linkingRule <- rule; entities <- link.entities) {
val details = DetailedEvaluator(linkingRule, entities).details
val details = DetailedEvaluator(linkingRule.execution(), entities).details
json += (RULE_VALUES -> ConfidenceJsonFormat.write(details))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.silkframework.serialization.json.transformer

import org.silkframework.config.{Task, TaskSpec}
import org.silkframework.rule.TaskContext
import org.silkframework.rule.input.Transformer
import org.silkframework.rule.input.{Transformer, TransformerExecution}
import org.silkframework.rule.plugins.transformer.value.ConstantTransformer
import org.silkframework.runtime.activity.UserContext
import org.silkframework.runtime.plugin.PluginContext
Expand All @@ -28,7 +28,7 @@ trait JsonTransformer extends Transformer {
def getJson(inputTask: Task[_ <: TaskSpec], project: ProjectTrait)
(implicit pluginContext: PluginContext): JsValue

override def withContext(taskContext: TaskContext): Transformer = {
override def execution(taskContext: TaskContext): TransformerExecution = {
val inputTask = taskContext.inputTasks.headOption.getOrElse(throw new ValidationException("This task does not have an input"))
val project = taskContext.pluginContext.projectId match {
case Some(projectId) =>
Expand All @@ -47,10 +47,6 @@ trait JsonTransformer extends Transformer {
}
}

def apply(values: Seq[Seq[String]]): Seq[String] = {
throw new ValidationException("No input task available.")
}

@tailrec
private def navigatePath(json: JsValue, path: String): JsLookupResult = {
val parts = path.split('/')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class InputTaskJsonTransformerTest extends AnyFlatSpec with Matchers with TestWo

def retrieve(path: String): String = {
val taskContext = TaskContext(Seq(dataset), PluginContext.fromProject(project))
val transformer = InputTaskAttributesTransformer(path).withContext(taskContext)
val transformer = InputTaskAttributesTransformer(path).execution(taskContext)
transformer(Seq.empty).head
}

Expand Down
44 changes: 26 additions & 18 deletions silk-rules/src/main/scala/org/silkframework/rule/LinkageRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.silkframework.rule

import org.silkframework.entity.{Entity, Index}
import org.silkframework.rule.similarity.SimilarityOperator
import org.silkframework.rule.similarity.{SimilarityOperator, SimilarityOperatorExecution}
import org.silkframework.runtime.plugin.PluginObjectParameterNoSchema
import org.silkframework.runtime.serialization._
import org.silkframework.runtime.validation.ValidationIssue
Expand All @@ -40,24 +40,40 @@ case class LinkageRule(operator: Option[SimilarityOperator] = None,
operator.foreach(_.validateIds())

/**
* Generates a copy of this rule that has been configured with a given task context.
* This is relevant for operators whose results are based on the input task(s), i.e., the file hash transformer.
* Validates this rule.
* This should cover non-fatal issues that should be fixed by the user after rule creation.
* Issues that lead to an inconsistent and unusable rule should not be checked here, but instead throw an exception in the constructor.
*/
def withContext(taskContext: TaskContext): LinkageRule = {
copy(operator = operator.map(_.withContext(taskContext)))
def validate(): Seq[ValidationIssue] = {
operator.toSeq.flatMap(_.validate())
}

/**
* Builds a runtime executor for this rule resolved against the given task context.
*/
def execution(taskContext: TaskContext = TaskContext.empty): LinkageRuleExecution = {
new LinkageRuleExecution(this, operator.map(_.execution(taskContext)))
}
}

/**
* Runtime executor for a [[LinkageRule]]. Holds the contextualized
* [[SimilarityOperatorExecution]] used to evaluate the rule against entities.
*/
final class LinkageRuleExecution(val operator: LinkageRule,
val operatorExecution: Option[SimilarityOperatorExecution]) extends Serializable {

/**
* Computes the similarity between two entities.
*
* @param entities The entities to be compared.
* @param limit If the confidence is below this limit, it will be capped to -1.0.
* @return The confidence as a value between -1.0 and 1.0.
* @return The confidence as a value between -1.0 and 1.0.
* -1.0 for definitive non-matches.
* +1.0 for definitive matches.
*/
def apply(entities: DPair[Entity], limit: Double = 0.0): Double = {
operator match {
operatorExecution match {
case Some(op) => op(entities, limit).getOrElse(-1.0)
case None => -1.0
}
Expand All @@ -67,24 +83,16 @@ case class LinkageRule(operator: Option[SimilarityOperator] = None,
* Indexes an entity.
*
* @param entity The entity to be indexed
* @param sourceOrTarget If true, the index will be for the source entity. If false, the index will be for the target entity.
* @param limit The confidence limit
* @return A set of (multidimensional) indexes. Entities within the threshold will always get the same index.
* @return A set of (multidimensional) indexes. Entities within the threshold will always get the same index.
*/
def index(entity: Entity, sourceOrTarget: Boolean, limit: Double = 0.0): Index = {
operator match {
operatorExecution match {
case Some(op) => op.index(entity, sourceOrTarget, limit)
case None => Index.empty
}
}

/**
* Validates this rule.
* This should cover non-fatal issues that should be fixed by the user after rule creation.
* Issues that lead to an inconsistent and unusable rule should not be checked here, but instead throw an exception in the constructor.
*/
def validate(): Seq[ValidationIssue] = {
operator.toSeq.flatMap(_.validate())
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object LinkageRuleIndex {
case BooleanNot(child) => LinkageRuleIndexNot(convert(child, entity, sourceOrTarget))
case BooleanComparisonOperator(id, sourceInput, targetInput, comparison) =>
val inputId = if(sourceOrTarget) sourceInput.inputOperator.id else targetInput.inputOperator.id
val index = comparison.index(entity, sourceOrTarget, limit = 0.0)
val index = comparison.execution(TaskContext.empty).index(entity, sourceOrTarget, limit = 0.0)
LinkageRuleIndexComparison(id, LinkageRuleIndexInput(inputId, index.flatten))
}
}
Expand Down
19 changes: 16 additions & 3 deletions silk-rules/src/main/scala/org/silkframework/rule/Operator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ trait Operator {
def withChildren(newChildren: Seq[Operator]): Operator

/**
* Generates a copy of this operator tree that has been configured with a given task context.
* This is relevant for operators whose results are based on the input task(s), i.e., the file hash transformer.
* Builds an executor that performs this operator's runtime work for a given task context.
* The returned [[OperatorExecution]] holds any state that depends on the task context
* (e.g. resolved input task data). Pure operators may return themselves.
*/
def withContext(taskContext: TaskContext): Operator = this
def execution(taskContext: TaskContext = TaskContext.empty): OperatorExecution

/**
* Asserts that all identifiers in this rule tree are unique.
Expand All @@ -67,6 +68,18 @@ trait Operator {

}

/**
* Runtime form of an [[Operator]], contextualized for a specific [[TaskContext]].
* Each [[Operator]] subclass has a corresponding [[OperatorExecution]] sub-trait
* (e.g. [[org.silkframework.rule.input.InputExecution]],
* [[org.silkframework.rule.similarity.SimilarityOperatorExecution]])
* that declares the actual execution methods.
*/
trait OperatorExecution extends Serializable {
/** Backlink to the operator that produced this execution. */
def operator: Operator
}

/**
* Operator companion object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,34 @@ import org.silkframework.runtime.plugin.PluginContext
* If the task is executed within a workflow, those are the connected input task(s).
* If the task is executed standalone, those are the configured default input(s).
*/
case class TaskContext(inputTasks: Seq[Task[_ <: TaskSpec]], pluginContext: PluginContext)
case class TaskContext(inputTasks: Seq[Task[_ <: TaskSpec]], pluginContext: PluginContext)

object TaskContext {

/**
* The empty task context.
*/
def empty: TaskContext = TaskContext(Seq.empty, PluginContext.empty)

/**
* Creates a task context for a single input task.
*/
def forInput(inputTask: Task[_ <: TaskSpec])(implicit pluginContext: PluginContext): TaskContext = {
TaskContext(Seq(inputTask), pluginContext)
}

/**
* Creates a task context for the given input tasks.
*/
def forInputs(inputTasks: Seq[Task[_ <: TaskSpec]])(implicit pluginContext: PluginContext): TaskContext = {
TaskContext(inputTasks, pluginContext)
}

/**
* Creates a task context for no input.
*/
def noInput(implicit pluginContext: PluginContext): TaskContext = {
TaskContext(Seq.empty, pluginContext)
}

}
Loading
Loading