diff --git a/libs/gui-elements b/libs/gui-elements index 47cdaefc57..945fe9637d 160000 --- a/libs/gui-elements +++ b/libs/gui-elements @@ -1 +1 @@ -Subproject commit 47cdaefc57ccd76705366d7831227e6fe03c8302 +Subproject commit 945fe9637dedb053be6fafea166376b7e8e75a1a diff --git a/silk-core/src/main/scala/org/silkframework/execution/Executor.scala b/silk-core/src/main/scala/org/silkframework/execution/Executor.scala index 90c732f88c..7c3abab856 100644 --- a/silk-core/src/main/scala/org/silkframework/execution/Executor.scala +++ b/silk-core/src/main/scala/org/silkframework/execution/Executor.scala @@ -13,12 +13,17 @@ import org.silkframework.runtime.plugin.{AnyPlugin, PluginContext} * @tparam ExecType The execution type, e.g., SparkExecution */ @PluginType() -trait Executor[TaskType <: TaskSpec, ExecType <: ExecutionType] extends AnyPlugin { +trait Executor[TaskType <: TaskSpec, ExecType <: ExecutionType] extends AnyPlugin with java.io.Closeable { def execute(task: Task[TaskType], inputs: Seq[ExecType#DataType], output: ExecutorOutput, execution: ExecType, context: ActivityContext[ExecutionReport] = new ActivityMonitor(getClass.getSimpleName)) (implicit pluginContext: PluginContext): Option[ExecType#DataType] + /** + * Called after the (worklow) execution has finished. + */ + override def close(): Unit = {} + } /** diff --git a/silk-core/src/main/scala/org/silkframework/execution/ExecutorRegistry.scala b/silk-core/src/main/scala/org/silkframework/execution/ExecutorRegistry.scala index 1ba6d518d5..d968f49494 100644 --- a/silk-core/src/main/scala/org/silkframework/execution/ExecutorRegistry.scala +++ b/silk-core/src/main/scala/org/silkframework/execution/ExecutorRegistry.scala @@ -137,6 +137,18 @@ object ExecutorRegistry extends ExecutorRegistry { context: ActivityContext[ExecutionReport] = new ActivityMonitor(getClass.getSimpleName) )(implicit pluginContext: PluginContext): Option[ExecType#DataType] = { val exec = executor(task.data, execution) + executeWith(exec, task, inputs, output, execution, context) + } + + /** Execute with a pre-instantiated executor, skipping executor lookup. */ + def executeWith[TaskType <: TaskSpec, ExecType <: ExecutionType]( + exec: Executor[TaskType, ExecType], + task: Task[TaskType], + inputs: Seq[ExecType#DataType], + output: ExecutorOutput, + execution: ExecType, + context: ActivityContext[ExecutionReport] + )(implicit pluginContext: PluginContext): Option[ExecType#DataType] = { context.status.update(Status.Running("Running", None), logStatus = false) val startTime = System.currentTimeMillis() val result = exec.execute(task, inputs, output, execution, context) @@ -144,6 +156,12 @@ object ExecutorRegistry extends ExecutorRegistry { result } + /** Instantiates the executor for a given task and execution type without executing it. */ + def instantiateExecutor[TaskType <: TaskSpec, ExecType <: ExecutionType]( + task: TaskType, + execution: ExecType + ): Executor[TaskType, ExecType] = executor(task, execution) + /** Fetch the execution specific access to a dataset for the configured execution.*/ def access[DatasetType <: Dataset](task: Task[DatasetSpec[DatasetType]]): DatasetAccess = { diff --git a/silk-core/src/main/scala/org/silkframework/execution/local/LocalExecution.scala b/silk-core/src/main/scala/org/silkframework/execution/local/LocalExecution.scala index 49a648359b..f7dbd28578 100644 --- a/silk-core/src/main/scala/org/silkframework/execution/local/LocalExecution.scala +++ b/silk-core/src/main/scala/org/silkframework/execution/local/LocalExecution.scala @@ -21,10 +21,14 @@ import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala case class LocalExecution(useLocalInternalDatasets: Boolean, replaceDataSources: Map[String, Dataset] = Map.empty, replaceSinks: Map[String, Dataset] = Map.empty, - workflowId: Option[Identifier] = None) extends ExecutionType { + workflowId: Option[Identifier] = None, + parentExecution: Option[LocalExecution] = None) extends ExecutionType { type DataType = LocalEntities + /** Unique identifier for this execution instance. */ + val executionId: Identifier = Identifier.random + private val log: Logger = Logger.getLogger(this.getClass.getName) private val internalDatasets: mutable.Map[Option[String], InternalDatasetTrait] = mutable.Map.empty @@ -107,7 +111,8 @@ object LocalExecution { id = "LocalInternalDataset", label = "Internal dataset (single graph)", description = - """Dataset for storing entities between workflow steps. This variant does use the same graph for all internal datasets in a workflow. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""" + """Dataset for storing entities between workflow steps. This variant does use the same graph for all internal datasets in a workflow. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""", + deprecation = "This dataset is deprecated and will be removed in a future version. Instead use either the \"In-workflow dataset\" or the \"In-memory dataset\"." ) case class LocalInternalDataset() extends InternalDatasetTrait { override protected def internalDatasetPluginImpl: Dataset = InternalDataset.createInternalDataset() diff --git a/silk-core/src/main/scala/org/silkframework/plugins/dataset/InternalDataset.scala b/silk-core/src/main/scala/org/silkframework/plugins/dataset/InternalDataset.scala index 90669d51cc..728a2d6d0a 100644 --- a/silk-core/src/main/scala/org/silkframework/plugins/dataset/InternalDataset.scala +++ b/silk-core/src/main/scala/org/silkframework/plugins/dataset/InternalDataset.scala @@ -16,7 +16,8 @@ import scala.util.Try id = "internal", label = "Internal dataset", categories = Array(DatasetCategories.embedded), - description = """Dataset for storing entities between workflow steps. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""" + description = """Dataset for storing entities between workflow steps. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""", + deprecation = "This dataset is deprecated and will be removed in a future version. Instead use either the \"In-workflow dataset\" or the \"In-memory dataset\"." ) case class InternalDataset( @Param(label = "graph URI", value = "The RDF graph that is used for storing internal data") diff --git a/silk-plugins/silk-plugins-rdf/src/main/resources/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.md b/silk-plugins/silk-plugins-rdf/src/main/resources/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.md index 71cd845a7e..878ba25dc7 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/resources/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.md +++ b/silk-plugins/silk-plugins-rdf/src/main/resources/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.md @@ -9,23 +9,27 @@ Typical use cases: ## 2. Behaviour and lifecycle -- The dataset maintains a single in-memory RDF model. -- All read and write operations go through a SPARQL endpoint over this model. -- Data exists only **in memory**: - - It is not persisted to disk by this dataset. - - After an application restart, the dataset contents are empty again. +The dataset maintains a single in-memory RDF model and exposes it via a SPARQL endpoint. Two lifecycle modes are available, controlled by the `workflowScoped` parameter: -Within a workflow: -- The dataset can be used as both **input** and **output**: - - Upstream operators can write triples/entities/links into it. - - Downstream operators can read from it via SPARQL-based mechanisms. +**Application-scoped mode** (default, `workflowScoped = false`): +- A single shared model is created when the dataset is instantiated. +- Data persists for the lifetime of the running application process. +- All workflow executions share the same in-memory graph. +- After an application restart, the dataset contents are empty again. + +**Workflow-scoped mode** (`workflowScoped = true`): +- A separate model is created for each workflow execution. +- Concurrent workflow executions are fully isolated from each other. +- A dataset task in a nested workflow shares the same model as the parent workflow for the same task identifier. Data written by the parent is available in the nested workflow and vice versa. +- If the dataset is read from outside a workflow context, the data from the most recently started executor is returned. +- When the workflow execution ends, the per-execution data is removed automatically. ## 3. Reading data - When used as a **source**, the dataset exposes its data as a SPARQL endpoint. - Queries and retrievals behave like against a normal SPARQL dataset: - Entity retrieval, path/type discovery, sampling, etc. are executed via SPARQL. -- There is no file backing this dataset; everything comes from what has been written into the in-memory model during the lifetime of the process. +- There is no file backing this dataset; everything comes from what has been written into the in-memory model during the lifetime of the process (application-scoped) or the workflow execution (workflow-scoped). ## 4. Writing data @@ -44,33 +48,58 @@ All three sinks ultimately write into the same in-memory graph; there is no sepa ## 5. Configuration -### Clear graph before workflow execution +### Workflow scoped + +- **Parameter:** `workflowScoped` (boolean) +- **Default:** `false` + +When `true` (workflow-scoped mode): +- Data is stored in a separate in-memory graph for each workflow execution. +- Concurrent workflow executions are fully isolated from each other. +- A dataset task in a nested workflow shares the same graph as the parent for the same task identifier. Data written by the parent is available in the nested workflow and vice versa. +- If the dataset is read from outside a workflow context, the data from the most recently started executor is returned. +- When the workflow execution ends, the per-execution data is removed automatically. + +When `false` (default, application-scoped mode): +- Data persists in a single shared graph for the lifetime of the running process. +- All workflow executions share the same graph. + +### Clear graph before workflow execution -- **Parameter:** `Clear graph before workflow execution` (boolean) -- **Default:** `true` +- **Parameter:** `clearGraphBeforeExecution` (boolean, **deprecated**) +- **Default:** `false` -Behaviour: +This parameter is deprecated. Use the **Clear dataset** operator in the workflow instead. + +Behaviour (application-scoped mode only): - If **true**: - - Before the dataset is used in a workflow execution, the graph is cleared (for writes via this dataset). + - Before the dataset is used in a workflow execution, the graph is cleared. - The workflow sees a **fresh, empty in-memory graph** at the start of the run. - If **false**: - Existing data in the in-memory graph is **preserved** when the workflow starts. - New data is added on top of whatever is already stored in the model. -This parameter controls whether the dataset behaves as a **fresh scratch graph per workflow run** or as a **longer-lived in-memory graph** within the lifetime of the running application. +This parameter has no effect when `workflowScoped = true` (the executor manages the lifecycle). ## 6. Limitations and recommendations - **Memory-bound** - All data is kept in memory; large graphs will increase memory usage and may impact performance. - For large or production RDF graphs, use an external RDF store and a SPARQL dataset instead. + - A size limit is enforced: once the estimated size of data written to the dataset exceeds the value of `org.silkframework.runtime.resource.Resource.maxInMemorySize`, the workflow fails with an error. This prevents the JVM from running out of heap memory. - **No persistence** - Contents are lost when the application/server is restarted. - Do not treat this dataset as long-term storage. +- **SPARQL engine** + - The dataset is backed by [Apache Jena](https://jena.apache.org/), exposed through a Jena in-memory SPARQL endpoint. + +- **No named-graph support** + - Only the **default graph** is available. Writing triples into a named graph is not possible. + - **Scope** - Best suited for: - small to medium intermediate results, @@ -79,14 +108,23 @@ This parameter controls whether the dataset behaves as a **fresh scratch graph p ## 7. Example usage scenarios -- Use as a **temporary integration graph**: +- Use as a **temporary integration graph** (application-scoped): - Multiple sources write into the in-memory dataset. - A downstream SPARQL-based operator queries the combined graph. -- Use as a **scratch area for experimentation**: +- Use as a **scratch area for experimentation** (application-scoped): - Quickly test mapping or linking logic by writing output into the in-memory dataset. - Inspect the result via SPARQL without configuring an external endpoint. -- Use as a **small lookup store**: +- Use as a **small lookup store** (application-scoped): - Preload a small set of reference triples (e.g. codes or mappings). - Let workflows query these during execution. + +- Use as a **workflow-local intermediate store** (workflow-scoped): + - Multiple operators in a single workflow run write intermediate RDF results. + - Downstream operators in the same run read from the dataset without affecting parallel runs. + +- Use in **nested workflows** (workflow-scoped): + - A parent workflow writes data into a workflow-scoped dataset. + - A nested sub-workflow reads and enriches the same data. + - After the nested workflow completes, the parent can read the enriched result. diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/RdfPlugins.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/RdfPlugins.scala index 9342e83304..835a2491e0 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/RdfPlugins.scala +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/RdfPlugins.scala @@ -1,6 +1,6 @@ package org.silkframework.plugins.dataset.rdf -import org.silkframework.plugins.dataset.rdf.datasets.{AlignmentDataset, InMemoryDataset, RdfFileDataset, SparqlDataset} +import org.silkframework.plugins.dataset.rdf.datasets.{AlignmentDataset, InMemoryDataset, InMemoryDatasetExecutor, RdfFileDataset, SparqlDataset} import org.silkframework.plugins.dataset.rdf.executors.{LocalSparqlCopyExecutor, LocalSparqlSelectExecutor, LocalSparqlUpdateExecutor} import org.silkframework.plugins.dataset.rdf.tasks.{SparqlCopyCustomTask, SparqlSelectCustomTask, SparqlUpdateCustomTask} import org.silkframework.plugins.dataset.rdf.vocab.{InMemoryVocabularyManager, RdfFilesVocabularyManager, RdfProjectFilesVocabularyManager, RdfVocabularyManager} @@ -26,7 +26,8 @@ class RdfPlugins extends PluginModule { val executors = Seq( classOf[LocalSparqlSelectExecutor], classOf[LocalSparqlUpdateExecutor], - classOf[LocalSparqlCopyExecutor] + classOf[LocalSparqlCopyExecutor], + classOf[InMemoryDatasetExecutor] ) } diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.scala index bb4a987335..f15ed10dac 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.scala +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDataset.scala @@ -1,18 +1,24 @@ package org.silkframework.plugins.dataset.rdf.datasets -import org.apache.jena.rdf.model.ModelFactory +import org.apache.jena.rdf.model.{Model, ModelFactory} import org.silkframework.dataset._ import org.silkframework.dataset.rdf.{RdfDataset, SparqlEndpoint, SparqlParams} -import org.silkframework.plugins.dataset.rdf.endpoint.JenaModelEndpoint +import org.silkframework.execution.local.LocalExecution import org.silkframework.plugins.dataset.rdf.access.{SparqlSink, SparqlSource} +import org.silkframework.plugins.dataset.rdf.endpoint.JenaModelEndpoint import org.silkframework.runtime.activity.UserContext import org.silkframework.runtime.plugin.annotations.{Param, Plugin, PluginReference} +import org.silkframework.util.Identifier + +import java.util.Collections @Plugin( id = InMemoryDataset.pluginId, - label = "In-memory dataset", + label = "In-memory Knowledge Graph", categories = Array(DatasetCategories.embedded), - description = "A Dataset that holds all data in-memory.", + description = "A dataset that holds all data in-memory. " + + "In the default (application-scoped) mode, data persists for the lifetime of the running process. " + + "In workflow-scoped mode, data is isolated per workflow execution and shared with nested workflows that reference the same dataset task.", documentationFile = "InMemoryDataset.md", relatedPlugins = Array( new PluginReference( @@ -25,33 +31,75 @@ import org.silkframework.runtime.plugin.annotations.{Param, Plugin, PluginRefere ) ) ) -case class InMemoryDataset(@Param(label = "Clear graph before workflow execution (deprecated)", - value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear this dataset before it is used in a workflow execution.", - advanced = true) - clearGraphBeforeExecution: Boolean = false) extends RdfDataset with TripleSinkDataset { - - private val model = ModelFactory.createDefaultModel() - - override val sparqlEndpoint: SparqlEndpoint = new JenaModelEndpoint(model) +case class InMemoryDataset( + @Param(label = "Workflow-scoped", + value = "If true, data is isolated per workflow execution and cleared after the execution ends, " + + "sharing data with nested workflows that reference the same dataset task. " + + "If false (default), data persists for the lifetime of the application process.") + workflowScoped: Boolean = false, + @Param(label = "Clear graph before workflow execution (deprecated)", + value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear this dataset before it is used in a workflow execution.", + advanced = true) + clearGraphBeforeExecution: Boolean = false +) extends RdfDataset with TripleSinkDataset { /** - * Returns a data source for reading entities from the data set. - */ - override def source(implicit userContext: UserContext): DataSource = new SparqlSource(SparqlParams(), sparqlEndpoint) + * The active Jena model backing this dataset. + * + * Application-scoped mode: initialised once and never reassigned; holds data for the + * lifetime of the process. + * + * Workflow-scoped mode: replaced by [[updateData]] each time [[InMemoryDatasetExecutor]] + * activates a new execution. + */ + @volatile private[datasets] var model: Model = ModelFactory.createDefaultModel() /** - * Returns a entity sink for writing entities to the data set. - */ - override def entitySink(implicit userContext: UserContext): EntitySink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution) + * Models for all current workflow executions, keyed by [[ExecutionModelKey]]. + * Uses a WeakHashMap so entries are automatically cleaned up by GC when the key is no longer referenced. + * Entries are also explicitly removed by [[InMemoryDatasetExecutor.close()]] when the execution finishes. + */ + private val executionModels: java.util.Map[ExecutionModelKey, Model] = + Collections.synchronizedMap(new java.util.WeakHashMap[ExecutionModelKey, Model]()) - /** - * Returns a link sink for writing entity links to the data set. - */ - override def linkSink(implicit userContext: UserContext): LinkSink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution) + private[datasets] def registerModel(key: ExecutionModelKey, model: Model): Unit = + executionModels.put(key, model) + + private[datasets] def findModel(execution: LocalExecution, taskId: Identifier): Option[Model] = + Option(executionModels.get(ExecutionModelKey(execution.executionId, taskId))).orElse( + execution.parentExecution.flatMap(findModel(_, taskId)) + ) - override def tripleSink(implicit userContext: UserContext): TripleSink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution) + private[datasets] def removeModel(key: ExecutionModelKey): Unit = + executionModels.remove(key) + + /** Switches [[model]] to the given execution's model so out-of-workflow reads see current data. */ + private[datasets] def updateData(newModel: Model): Unit = + model = newModel + + // In workflow-scoped mode the executor owns the model lifecycle, so sinks must not drop the graph. + private def dropGraph: Boolean = !workflowScoped && clearGraphBeforeExecution + + override def sparqlEndpoint: SparqlEndpoint = new JenaModelEndpoint(model) + + override def source(implicit userContext: UserContext): DataSource = + new SparqlSource(SparqlParams(), sparqlEndpoint) + + override def entitySink(implicit userContext: UserContext): EntitySink = + new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph) + + override def linkSink(implicit userContext: UserContext): LinkSink = + new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph) + + override def tripleSink(implicit userContext: UserContext): TripleSink = + new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph) } object InMemoryDataset { final val pluginId = "inMemory" } + +/** + * Key for the [[InMemoryDataset.executionModels]] WeakHashMap (workflow-scoped mode). + */ +private[datasets] case class ExecutionModelKey(executionId: Identifier, taskId: Identifier) diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetExecutor.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetExecutor.scala new file mode 100644 index 0000000000..82ad59d2fc --- /dev/null +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetExecutor.scala @@ -0,0 +1,54 @@ +package org.silkframework.plugins.dataset.rdf.datasets + +import org.apache.jena.rdf.model.{Model, ModelFactory} +import org.silkframework.config.Task +import org.silkframework.dataset.{DatasetAccess, DatasetSpec, DatasetSpecAccess} +import org.silkframework.execution.local.{LocalDatasetExecutor, LocalExecution} + +/** + * Executor for [[InMemoryDataset]]. + * + * In application-scoped mode (`workflowScoped == false`), wraps the dataset's static model. + * + * In workflow-scoped mode (`workflowScoped == true`), holds a separate Jena model for the + * duration of a workflow execution. If the execution has a parent (nested workflow), the parent's + * model for the same task is reused so that the nested workflow sees the data written by the parent. + */ +class InMemoryDatasetExecutor extends LocalDatasetExecutor[InMemoryDataset] { + + // Used only in workflow-scoped mode + @volatile private var model: Model = _ + @volatile private var modelDataset: JenaModelDataset = _ + @volatile private var initialized: Boolean = false + @volatile private var modelKey: Option[ExecutionModelKey] = None + @volatile private var plugin: Option[InMemoryDataset] = None + + override def access(task: Task[DatasetSpec[InMemoryDataset]], execution: LocalExecution): DatasetAccess = { + val datasetPlugin = task.data.plugin + if (datasetPlugin.workflowScoped) { + if (!initialized) { + initialized = true + model = execution.parentExecution + .flatMap(datasetPlugin.findModel(_, task.id)) + .getOrElse(ModelFactory.createDefaultModel()) + modelDataset = JenaModelDataset.fromModel(model, dropGraphOnClear = false) + val key = ExecutionModelKey(execution.executionId, task.id) + datasetPlugin.registerModel(key, model) + modelKey = Some(key) + plugin = Some(datasetPlugin) + } + datasetPlugin.updateData(model) + DatasetSpecAccess(task.data, modelDataset) + } else { + val ds = JenaModelDataset.fromModel(datasetPlugin.model, dropGraphOnClear = datasetPlugin.clearGraphBeforeExecution) + DatasetSpecAccess(task.data, ds) + } + } + + override def close(): Unit = { + for { + key <- modelKey + p <- plugin + } p.removeModel(key) + } +} diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/JenaModelDataset.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/JenaModelDataset.scala index 096ef77f90..c7d38564ac 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/JenaModelDataset.scala +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/datasets/JenaModelDataset.scala @@ -7,7 +7,7 @@ import org.silkframework.plugins.dataset.rdf.endpoint.JenaModelEndpoint import org.silkframework.plugins.dataset.rdf.access.{SparqlSink, SparqlSource} import org.silkframework.runtime.activity.UserContext -case class JenaModelDataset() extends RdfDataset { +case class JenaModelDataset(dropGraphOnClear: Boolean = true) extends RdfDataset { private val sparqlParams = SparqlParams() @@ -27,21 +27,21 @@ case class JenaModelDataset() extends RdfDataset { * Returns a link sink for writing entity links to the data set. */ override def linkSink(implicit userContext: UserContext): LinkSink = { - new SparqlSink(sparqlParams, sparqlEndpoint, dropGraphOnClear = true) + new SparqlSink(sparqlParams, sparqlEndpoint, dropGraphOnClear = dropGraphOnClear) } /** * Returns an entity sink for writing entities to the data set. */ override def entitySink(implicit userContext: UserContext): EntitySink = { - new SparqlSink(sparqlParams, sparqlEndpoint, dropGraphOnClear = true) + new SparqlSink(sparqlParams, sparqlEndpoint, dropGraphOnClear = dropGraphOnClear) } } object JenaModelDataset { - def apply(model: Model): JenaModelDataset = { - val ds = JenaModelDataset() + def fromModel(model: Model, dropGraphOnClear: Boolean = true): JenaModelDataset = { + val ds = new JenaModelDataset(dropGraphOnClear) ds.sparqlEndpoint = new JenaModelEndpoint(model) ds } diff --git a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpoint.scala b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpoint.scala index 178deb70e2..437c7e45cc 100644 --- a/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpoint.scala +++ b/silk-plugins/silk-plugins-rdf/src/main/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpoint.scala @@ -1,17 +1,33 @@ package org.silkframework.plugins.dataset.rdf.endpoint import org.apache.jena.query.{DatasetFactory, Query, QueryExecution, QueryExecutionFactory} -import org.apache.jena.rdf.model.Model -import org.apache.jena.sparql.core.DatasetGraphFactory +import org.apache.jena.rdf.listeners.StatementListener +import org.apache.jena.rdf.model.{Model, Statement} import org.apache.jena.update.{UpdateExecutionFactory, UpdateFactory, UpdateProcessor} import org.silkframework.dataset.rdf.{QuadIterator, SparqlEndpoint, SparqlParams, SparqlResults} import org.silkframework.runtime.activity.UserContext +import org.silkframework.runtime.resource.Resource /** * A SPARQL endpoint which executes all queries on a Jena Model. */ class JenaModelEndpoint(model: Model) extends JenaEndpoint { + private val byteLimit: Long = Resource.maxInMemorySize() + @volatile private var estimatedBytesWritten: Long = 0L + + model.register(new StatementListener { + override def addedStatement(s: Statement): Unit = + estimatedBytesWritten += statementBytes(s) + override def removedStatement(s: Statement): Unit = + estimatedBytesWritten = math.max(0L, estimatedBytesWritten - statementBytes(s)) + }) + + private def statementBytes(s: Statement): Long = + s.getSubject.toString.length.toLong + + s.getPredicate.toString.length.toLong + + s.getObject.toString.length.toLong + override protected def createQueryExecution(query: Query): QueryExecution = { QueryExecutionFactory.create(query, model) } @@ -45,6 +61,14 @@ class JenaModelEndpoint(model: Model) extends JenaEndpoint { (implicit userContext: UserContext): Unit = { this.synchronized { super.update(query) + if (estimatedBytesWritten > byteLimit) { + throw new RuntimeException( + s"In-memory Knowledge Graph has exceeded the size limit of $byteLimit bytes " + + s"(estimated bytes written: $estimatedBytesWritten). " + + s"Reduce the amount of data written or increase the limit by configuring " + + s"'${Resource.maxInMemorySizeParameterName}'." + ) + } } } diff --git a/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedIntegrationTest.scala b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedIntegrationTest.scala new file mode 100644 index 0000000000..6d7a8d16ee --- /dev/null +++ b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedIntegrationTest.scala @@ -0,0 +1,176 @@ +package org.silkframework.plugins.dataset.rdf.datasets + +import org.apache.jena.rdf.model.{Model, ModelFactory} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import org.silkframework.config.{MetaData, Prefixes} +import org.silkframework.dataset.DatasetSpec +import org.silkframework.entity.paths.UntypedPath +import org.silkframework.plugins.dataset.rdf.tasks.SparqlCopyCustomTask +import org.silkframework.rule._ +import org.silkframework.runtime.activity.{ActivityMonitor, UserContext} +import org.silkframework.util.{ConfigTestTrait, Uri} +import org.silkframework.workspace.activity.workflow.{LocalWorkflowExecutorGeneratingProvenance, Workflow, WorkflowDataset, WorkflowExecutionReportWithProvenance, WorkflowOperator} +import org.silkframework.workspace.{InMemoryWorkspaceTestTrait, ProjectConfig, WorkspaceFactory} + +/** + * Integration test for [[InMemoryDataset]] with `workflowScoped = true` within a real workflow execution. + */ +class InMemoryDatasetWorkflowScopedIntegrationTest extends AnyFlatSpec with Matchers with ConfigTestTrait { + + implicit val userContext: UserContext = UserContext.Empty + implicit val prefixes: Prefixes = Prefixes.empty + + override def propertyMap: Map[String, Option[String]] = Map( + "workspace.provider.plugin" -> Some("inMemoryWorkspaceProvider") + ) + + "InMemoryDataset (workflowScoped = true)" should "isolate data between two instances and share data across multiple uses of the same instance within a workflow" in { + val workspace = WorkspaceFactory().workspace + val project = workspace.createProject(ProjectConfig(metaData = MetaData(Some("inMemoryWorkflowScopedIntegrationTest")))) + + val source1Model: Model = ModelFactory.createDefaultModel() + source1Model.createResource("http://s1") + .addProperty(source1Model.createProperty("http://p"), source1Model.createResource("http://o1")) + + val source2Model: Model = ModelFactory.createDefaultModel() + source2Model.createResource("http://s2") + .addProperty(source2Model.createProperty("http://p"), source2Model.createResource("http://o2")) + + val output1AModel: Model = ModelFactory.createDefaultModel() + val output1BModel: Model = ModelFactory.createDefaultModel() + val output2AModel: Model = ModelFactory.createDefaultModel() + val output2BModel: Model = ModelFactory.createDefaultModel() + + project.addTask("source1", DatasetSpec(JenaModelDataset.fromModel(source1Model))) + project.addTask("source2", DatasetSpec(JenaModelDataset.fromModel(source2Model))) + project.addTask("inMemory1", DatasetSpec(InMemoryDataset(workflowScoped = true))) + project.addTask("inMemory2", DatasetSpec(InMemoryDataset(workflowScoped = true))) + project.addTask("output1A", DatasetSpec(JenaModelDataset.fromModel(output1AModel))) + project.addTask("output1B", DatasetSpec(JenaModelDataset.fromModel(output1BModel))) + project.addTask("output2A", DatasetSpec(JenaModelDataset.fromModel(output2AModel))) + project.addTask("output2B", DatasetSpec(JenaModelDataset.fromModel(output2BModel))) + + val copyQuery = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }" + project.addTask("copyToInMemory1", SparqlCopyCustomTask(copyQuery, tempFile = false)) + project.addTask("copyToInMemory2", SparqlCopyCustomTask(copyQuery, tempFile = false)) + + val identityTransform = TransformSpec( + selection = DatasetSelection("dummy", Uri("")), + mappingRule = RootMappingRule(MappingRules( + propertyRules = Seq( + DirectMapping( + id = "pmap", + sourcePath = UntypedPath(Uri("http://p")), + mappingTarget = MappingTarget(Uri("http://p")) + ) + ) + )) + ) + project.addTask("readFromInMemory1A", identityTransform) + project.addTask("readFromInMemory1B", identityTransform) + project.addTask("readFromInMemory2A", identityTransform) + project.addTask("readFromInMemory2B", identityTransform) + + val workflow = Workflow( + operators = Seq( + WorkflowOperator(Seq(Some("source1")), "copyToInMemory1", Seq("inMemory1"), Seq.empty, (0, 0), "copyToInMemory1", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("source2")), "copyToInMemory2", Seq("inMemory2"), Seq.empty, (0, 300), "copyToInMemory2", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("inMemory1")), "readFromInMemory1A", Seq("output1A"), Seq.empty, (200, 0), "readFromInMemory1A", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("inMemory1")), "readFromInMemory1B", Seq("output1B"), Seq.empty, (200,100), "readFromInMemory1B", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("inMemory2")), "readFromInMemory2A", Seq("output2A"), Seq.empty, (200,300), "readFromInMemory2A", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("inMemory2")), "readFromInMemory2B", Seq("output2B"), Seq.empty, (200,400), "readFromInMemory2B", None, Seq.empty, Seq.empty) + ), + datasets = Seq( + WorkflowDataset(Seq.empty, "source1", Seq("copyToInMemory1"), (0, 0), "source1", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq.empty, "source2", Seq("copyToInMemory2"), (0, 300), "source2", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("copyToInMemory1")), "inMemory1", Seq("readFromInMemory1A", "readFromInMemory1B"), (100, 0), "inMemory1", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("copyToInMemory2")), "inMemory2", Seq("readFromInMemory2A", "readFromInMemory2B"), (100,300), "inMemory2", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("readFromInMemory1A")), "output1A", Seq.empty, (300, 0), "output1A", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("readFromInMemory1B")), "output1B", Seq.empty, (300,100), "output1B", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("readFromInMemory2A")), "output2A", Seq.empty, (300,300), "output2A", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("readFromInMemory2B")), "output2B", Seq.empty, (300,400), "output2B", None, Seq.empty, Seq.empty) + ) + ) + project.addTask("workflow", workflow) + val workflowTask = project.task[Workflow]("workflow") + + val executor = LocalWorkflowExecutorGeneratingProvenance(workflowTask) + val monitor = new ActivityMonitor("monitor", initialValue = Some(WorkflowExecutionReportWithProvenance.empty)) + executor.run(monitor) + + output1AModel.size() must be > 0L + output1BModel.size() must be > 0L + output2AModel.size() must be > 0L + output2BModel.size() must be > 0L + + output1AModel.isIsomorphicWith(output1BModel) mustBe true + output2AModel.isIsomorphicWith(output2BModel) mustBe true + output1AModel.isIsomorphicWith(output2AModel) mustBe false + } + + it should "propagate InMemoryDataset (workflowScoped) data from a parent workflow to a nested workflow" in { + val workspace = WorkspaceFactory().workspace + val project = workspace.createProject(ProjectConfig(metaData = MetaData(Some("nestedWorkflowScopedTest")))) + + val sourceModel: Model = ModelFactory.createDefaultModel() + sourceModel.createResource("http://nested/s1") + .addProperty(sourceModel.createProperty("http://p"), sourceModel.createResource("http://nested/o1")) + sourceModel.createResource("http://nested/s2") + .addProperty(sourceModel.createProperty("http://p"), sourceModel.createResource("http://nested/o2")) + + val outputModel: Model = ModelFactory.createDefaultModel() + + project.addTask("source", DatasetSpec(JenaModelDataset.fromModel(sourceModel))) + project.addTask("inMemoryDs", DatasetSpec(InMemoryDataset(workflowScoped = true))) + project.addTask("output", DatasetSpec(JenaModelDataset.fromModel(outputModel))) + + val copyQuery = "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }" + project.addTask("copyToInMemory", SparqlCopyCustomTask(copyQuery, tempFile = false)) + + val identityTransform = TransformSpec( + selection = DatasetSelection("dummy", Uri("")), + mappingRule = RootMappingRule(MappingRules( + propertyRules = Seq( + DirectMapping( + id = "pmap", + sourcePath = UntypedPath(Uri("http://p")), + mappingTarget = MappingTarget(Uri("http://p")) + ) + ) + )) + ) + project.addTask("readFromInMemory", identityTransform) + + val nestedWorkflow = Workflow( + operators = Seq( + WorkflowOperator(Seq(Some("inMemoryDs")), "readFromInMemory", Seq("output"), Seq.empty, (100, 0), "readFromInMemory", None, Seq.empty, Seq.empty) + ), + datasets = Seq( + WorkflowDataset(Seq.empty, "inMemoryDs", Seq("readFromInMemory"), (0, 0), "inMemoryDs", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("readFromInMemory")), "output", Seq.empty, (200, 0), "output", None, Seq.empty, Seq.empty) + ) + ) + project.addTask("nestedWorkflow", nestedWorkflow) + + val parentWorkflow = Workflow( + operators = Seq( + WorkflowOperator(Seq(Some("source")), "copyToInMemory", Seq("inMemoryDs"), Seq.empty, (100, 0), "copyToInMemory", None, Seq.empty, Seq.empty), + WorkflowOperator(Seq(Some("inMemoryDs")), "nestedWorkflow", Seq.empty, Seq.empty, (300, 0), "nestedWorkflow", None, Seq.empty, Seq.empty) + ), + datasets = Seq( + WorkflowDataset(Seq.empty, "source", Seq("copyToInMemory"), (0, 0), "source", None, Seq.empty, Seq.empty), + WorkflowDataset(Seq(Some("copyToInMemory")), "inMemoryDs", Seq("nestedWorkflow"), (200, 0), "inMemoryDs", None, Seq.empty, Seq.empty) + ) + ) + project.addTask("parentWorkflow", parentWorkflow) + val workflowTask = project.task[Workflow]("parentWorkflow") + + val executor = LocalWorkflowExecutorGeneratingProvenance(workflowTask) + val monitor = new ActivityMonitor("nestedMonitor", initialValue = Some(WorkflowExecutionReportWithProvenance.empty)) + executor.run(monitor) + + outputModel.size() must be > 0L + outputModel.listSubjects().toList.size() mustBe 2 + } +} diff --git a/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedTest.scala b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedTest.scala new file mode 100644 index 0000000000..6745122745 --- /dev/null +++ b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/datasets/InMemoryDatasetWorkflowScopedTest.scala @@ -0,0 +1,171 @@ +package org.silkframework.plugins.dataset.rdf.datasets + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import org.silkframework.config.{PlainTask, Prefixes} +import org.silkframework.dataset.{DatasetAccess, DatasetSpec, DatasetSpecAccess} +import org.silkframework.dataset.rdf.{RdfDataset, SparqlEndpoint} +import org.silkframework.execution.local.LocalExecution +import org.silkframework.runtime.activity.UserContext +import org.silkframework.util.Identifier + +class InMemoryDatasetWorkflowScopedTest extends AnyFlatSpec with Matchers { + + private implicit val userContext: UserContext = UserContext.Empty + private implicit val prefixes: Prefixes = Prefixes.empty + + private val dataset = InMemoryDataset(workflowScoped = true) + private val task = PlainTask("test", DatasetSpec(dataset)) + private val execution = LocalExecution() + + private val tripleCountQuery = "SELECT * WHERE {?s ?p ?o}" + + behavior of "InMemoryDataset (workflowScoped = true)" + + it should "store data in the executor, not in the dataset itself" in { + val executor = new InMemoryDatasetExecutor() + val executorEndpoint = sparqlEndpoint(executor.access(task, execution)) + + executorEndpoint.update("INSERT DATA { }") + + executorEndpoint.select(tripleCountQuery).bindings.size mustBe 1 + dataset.sparqlEndpoint.select(tripleCountQuery).bindings.size mustBe 1 + } + + it should "retain data after close() is called" in { + val executor = new InMemoryDatasetExecutor() + val executorEndpoint = sparqlEndpoint(executor.access(task, execution)) + + executorEndpoint.update("INSERT DATA { }") + executorEndpoint.select(tripleCountQuery).bindings.size mustBe 1 + + executor.close() + + executorEndpoint.select(tripleCountQuery).bindings.size mustBe 1 + } + + it should "update the dataset sparqlEndpoint to the latest executor's model" in { + val dataset2 = InMemoryDataset(workflowScoped = true) + val task2 = PlainTask("test2", DatasetSpec(dataset2)) + val executor1 = new InMemoryDatasetExecutor() + val executor2 = new InMemoryDatasetExecutor() + + val endpoint1 = sparqlEndpoint(executor1.access(task2, execution)) + endpoint1.update("INSERT DATA { }") + dataset2.sparqlEndpoint.select(tripleCountQuery).bindings.size mustBe 1 + + executor2.access(task2, execution) + dataset2.sparqlEndpoint.select(tripleCountQuery).bindings.size mustBe 0 + } + + it should "isolate data between concurrent executions" in { + val executor1 = new InMemoryDatasetExecutor() + val executor2 = new InMemoryDatasetExecutor() + val endpoint1 = sparqlEndpoint(executor1.access(task, execution)) + val endpoint2 = sparqlEndpoint(executor2.access(task, execution)) + + endpoint1.update("INSERT DATA { }") + endpoint2.update("INSERT DATA { }") + endpoint2.update("INSERT DATA { }") + + endpoint1.select(tripleCountQuery).bindings.size mustBe 1 + endpoint2.select(tripleCountQuery).bindings.size mustBe 2 + } + + it should "use parent execution data in the nested executor" in { + val nestedDataset = InMemoryDataset(workflowScoped = true) + val nestedTask = PlainTask("nestedTest", DatasetSpec(nestedDataset)) + + val parentExecution = LocalExecution(false, workflowId = Some(Identifier("parentWf"))) + val parentExecutor = new InMemoryDatasetExecutor() + val parentEndpoint = sparqlEndpoint(parentExecutor.access(nestedTask, parentExecution)) + parentEndpoint.update("INSERT DATA { }") + parentEndpoint.update("INSERT DATA { }") + parentEndpoint.select(tripleCountQuery).bindings.size mustBe 2 + + val childExecution = LocalExecution(false, workflowId = Some(Identifier("childWf")), parentExecution = Some(parentExecution)) + val childExecutor = new InMemoryDatasetExecutor() + val childEndpoint = sparqlEndpoint(childExecutor.access(nestedTask, childExecution)) + + childEndpoint.select(tripleCountQuery).bindings.size mustBe 2 + } + + it should "share the model between parent and child executions" in { + val nestedDataset = InMemoryDataset(workflowScoped = true) + val nestedTask = PlainTask("nestedTest", DatasetSpec(nestedDataset)) + + val parentExecution = LocalExecution(false, workflowId = Some(Identifier("parentWf"))) + val parentExecutor = new InMemoryDatasetExecutor() + val parentEndpoint = sparqlEndpoint(parentExecutor.access(nestedTask, parentExecution)) + parentEndpoint.update("INSERT DATA { }") + + val childExecution = LocalExecution(false, workflowId = Some(Identifier("childWf")), parentExecution = Some(parentExecution)) + val childExecutor = new InMemoryDatasetExecutor() + val childEndpoint = sparqlEndpoint(childExecutor.access(nestedTask, childExecution)) + childEndpoint.update("INSERT DATA { }") + + childEndpoint.select(tripleCountQuery).bindings.size mustBe 2 + parentEndpoint.select(tripleCountQuery).bindings.size mustBe 2 + } + + it should "only reuse the parent model with the same task id in a nested execution" in { + val sharedDataset = InMemoryDataset(workflowScoped = true) + val taskA = PlainTask("datasetA", DatasetSpec(sharedDataset)) + val taskB = PlainTask("datasetB", DatasetSpec(sharedDataset)) + + val parentExecution = LocalExecution(false, workflowId = Some(Identifier("parentWf"))) + + val parentExecutorA = new InMemoryDatasetExecutor() + val endpointA = sparqlEndpoint(parentExecutorA.access(taskA, parentExecution)) + endpointA.update("INSERT DATA { }") + + val parentExecutorB = new InMemoryDatasetExecutor() + val endpointB = sparqlEndpoint(parentExecutorB.access(taskB, parentExecution)) + endpointB.update("INSERT DATA { }") + endpointB.update("INSERT DATA { }") + + val childExecution = LocalExecution(false, workflowId = Some(Identifier("childWf")), parentExecution = Some(parentExecution)) + val childExecutorA = new InMemoryDatasetExecutor() + val childEndpointA = sparqlEndpoint(childExecutorA.access(taskA, childExecution)) + childEndpointA.select(tripleCountQuery).bindings.size mustBe 1 + + val childExecutorB = new InMemoryDatasetExecutor() + val childEndpointB = sparqlEndpoint(childExecutorB.access(taskB, childExecution)) + childEndpointB.select(tripleCountQuery).bindings.size mustBe 2 + } + + it should "create a new model in a nested execution if the parent has no matching task id" in { + val sharedDataset = InMemoryDataset(workflowScoped = true) + val parentTask = PlainTask("parentOnly", DatasetSpec(sharedDataset)) + val childTask = PlainTask("childOnly", DatasetSpec(sharedDataset)) + + val parentExecution = LocalExecution(false, workflowId = Some(Identifier("parentWf"))) + + val parentExecutor = new InMemoryDatasetExecutor() + val parentEndpoint = sparqlEndpoint(parentExecutor.access(parentTask, parentExecution)) + parentEndpoint.update("INSERT DATA { }") + + val childExecution = LocalExecution(false, workflowId = Some(Identifier("childWf")), parentExecution = Some(parentExecution)) + val childExecutor = new InMemoryDatasetExecutor() + val childEndpoint = sparqlEndpoint(childExecutor.access(childTask, childExecution)) + childEndpoint.select(tripleCountQuery).bindings.size mustBe 0 + } + + it should "clean up model on close()" in { + val nestedDataset = InMemoryDataset(workflowScoped = true) + val nestedTask = PlainTask("cleanupTest", DatasetSpec(nestedDataset)) + + val exec = LocalExecution(false, workflowId = Some(Identifier("wf"))) + val executor = new InMemoryDatasetExecutor() + sparqlEndpoint(executor.access(nestedTask, exec)) + .update("INSERT DATA { }") + + nestedDataset.findModel(exec, nestedTask.id) must not be empty + + executor.close() + nestedDataset.findModel(exec, nestedTask.id) mustBe empty + } + + private def sparqlEndpoint(access: DatasetAccess): SparqlEndpoint = + access.asInstanceOf[DatasetSpecAccess].datasetAccess.asInstanceOf[RdfDataset].sparqlEndpoint +} \ No newline at end of file diff --git a/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpointTest.scala b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpointTest.scala new file mode 100644 index 0000000000..5e3e4a3965 --- /dev/null +++ b/silk-plugins/silk-plugins-rdf/src/test/scala/org/silkframework/plugins/dataset/rdf/endpoint/JenaModelEndpointTest.scala @@ -0,0 +1,49 @@ +package org.silkframework.plugins.dataset.rdf.endpoint + +import org.apache.jena.rdf.model.ModelFactory +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.must.Matchers +import org.silkframework.runtime.activity.UserContext +import org.silkframework.runtime.resource.Resource +import org.silkframework.util.ConfigTestTrait + +class JenaModelEndpointTest extends AnyFlatSpec with Matchers { + + private implicit val userContext: UserContext = UserContext.Empty + + behavior of "JenaModelEndpoint" + + it should "not throw when data written is within the memory limit" in { + ConfigTestTrait.withConfig(Resource.maxInMemorySizeParameterName -> Some("100b")) { + val endpoint = new JenaModelEndpoint(ModelFactory.createDefaultModel()) + // Two triples at 9+8+9 bytes each = 52 estimated bytes, under the 100b limit + endpoint.update("INSERT DATA { }") + endpoint.update("INSERT DATA { }") + } + } + + it should "throw when data written exceeds the memory limit" in { + ConfigTestTrait.withConfig(Resource.maxInMemorySizeParameterName -> Some("50b")) { + val endpoint = new JenaModelEndpoint(ModelFactory.createDefaultModel()) + // First triple: 26 estimated bytes, within the 50b limit + endpoint.update("INSERT DATA { }") + // Second triple pushes the total to 52 bytes, exceeding the limit + an[RuntimeException] should be thrownBy { + endpoint.update("INSERT DATA { }") + } + } + } + + it should "throw for a short generative update that produces more data than the query string" in { + ConfigTestTrait.withConfig(Resource.maxInMemorySizeParameterName -> Some("50b")) { + val endpoint = new JenaModelEndpoint(ModelFactory.createDefaultModel()) + // Write one triple (26 estimated bytes), within the 50b limit + endpoint.update("INSERT DATA { }") + // A 47-char WHERE-clause query that generates a new triple: + // → 9+15+9 = 33 more bytes, total 59 > 50b + an[RuntimeException] should be thrownBy { + endpoint.update("INSERT { ?s ?o } WHERE { ?s ?p ?o }") + } + } + } +} diff --git a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowAsTaskExecutor.scala b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowAsTaskExecutor.scala index ab62f1bf51..5104c1967c 100644 --- a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowAsTaskExecutor.scala +++ b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowAsTaskExecutor.scala @@ -30,7 +30,8 @@ class LocalWorkflowAsTaskExecutor extends Executor[Workflow, LocalExecution] { projectTask, clearDatasets = false, replaceDataSources = execution.replaceDataSources, - replaceSinks = execution.replaceSinks + replaceSinks = execution.replaceSinks, + parentExecution = Some(execution) ).run(workflowContext) None diff --git a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowExecutor.scala b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowExecutor.scala index 00becbbea3..ff00400ffb 100644 --- a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowExecutor.scala +++ b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/LocalWorkflowExecutor.scala @@ -38,7 +38,8 @@ case class LocalWorkflowExecutor(workflowTask: ProjectTask[Workflow], replaceDataSources: Map[String, Dataset] = Map.empty, replaceSinks: Map[String, Dataset] = Map.empty, useLocalInternalDatasets: Boolean = false, - clearDatasets: Boolean = true) + clearDatasets: Boolean = true, + parentExecution: Option[LocalExecution] = None) extends WorkflowExecutor[LocalExecution] { private val log = Logger.getLogger(getClass.getName) @@ -81,11 +82,7 @@ case class LocalWorkflowExecutor(workflowTask: ProjectTask[Workflow], } private def runWorkflow(implicit context: ActivityContext[WorkflowExecutionReport], userContext: UserContext): Unit = { - implicit val workflowRunContext: WorkflowRunContext = WorkflowRunContext( - activityContext = context, - workflow = currentWorkflow, - userContext = userContext - ) + implicit val workflowRunContext: WorkflowRunContext = createRunContext checkReadOnlyDatasets() checkVariableDatasets() @@ -113,6 +110,14 @@ case class LocalWorkflowExecutor(workflowTask: ProjectTask[Workflow], } finally { context.value.updateWith(_.asDone()) this.executionContext.executeShutdownHooks() + workflowRunContext.taskExecutors.foreach { case (taskId, exec) => + try { + exec.close() + } catch { + case NonFatal(ex) => + log.log(Level.WARNING, s"Exception while closing executor for task '$taskId'.", ex) + } + } } } @@ -415,66 +420,12 @@ case class LocalWorkflowExecutor(workflowTask: ProjectTask[Workflow], } } - /** NOT USED ANYMORE, only here for documentation reasons, should be deleted after everything in here is supported. */ - def executeOperator(operator: WorkflowNode) - (implicit workflowRunContext: WorkflowRunContext): Unit = { - // Get the error sinks for this operator - val errorOutputs = operator match { - case wo: WorkflowOperator => wo.errorOutputs.map(project.anyTask(_)(workflowRunContext.userContext)) - case _ => Seq() - } - var errorSinks: Seq[DatasetWriteAccess] = errorOutputSinks(errorOutputs) - - - if (errorOutputs.exists(!_.data.isInstanceOf[Dataset])) { - // TODO: Needs proper graph - // TODO: How to handle error output in new model? - errorSinks +:= InternalDataset(null) - } - - // val activity = taskExecutor(dataSources, taskData, sinks, errorSinks) - // val report = activityContext.child(activity, 0.0).startBlockingAndGetValue() - // activityContext.value() = activityContext.value().withReport(operator.id, report) - } - - private def errorOutputSinks(errorOutputs: Seq[ProjectTask[_ <: TaskSpec]]): Seq[DatasetWriteAccess] = { - errorOutputs.collect { - case pt: ProjectTask[_] if pt.data.isInstanceOf[Dataset] => - pt.data.asInstanceOf[Dataset] - } - } - - /** - * Returns the dataset that should be used in the workflow. Specifically [[VariableDataset]] - * and [[InternalDataset]] need to be replaced by the corresponding real dataset. - * - * @param datasetTask - * @param replaceDatasets A map with replacement datasets for [[VariableDataset]] objects. - * @return - */ - private def resolveDataset(datasetTask: Task[GenericDatasetSpec], - replaceDatasets: Map[String, Dataset]): Task[GenericDatasetSpec] = { - replaceDatasets.get(datasetTask.id.toString) match { - case Some(d) => - PlainTask(datasetTask.id, datasetTask.data.copy(plugin = d), metaData = datasetTask.metaData) - case None => - datasetTask.data.plugin match { - case _: VariableDataset => - throw new IllegalArgumentException("No replacement found for variable dataset " + datasetTask.id.toString) - case _: InternalDataset => - val internalDataset = executionContext.createInternalDataset(Some(datasetTask.id.toString)) - PlainTask(datasetTask.id, datasetTask.data.copy(plugin = internalDataset), metaData = datasetTask.metaData) - case _: Dataset => - datasetTask - } - } - } - override protected val executionContext: LocalExecution = LocalExecution( useLocalInternalDatasets, replaceDataSources, replaceSinks, - Some(workflowTask.id) + Some(workflowTask.id), + parentExecution ) override protected def workflowNodeEntities[T](workflowDependencyNode: WorkflowDependencyNode, diff --git a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/WorkflowExecutor.scala b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/WorkflowExecutor.scala index 5a9ba1c563..c02ef81d83 100644 --- a/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/WorkflowExecutor.scala +++ b/silk-workspace/src/main/scala/org/silkframework/workspace/activity/workflow/WorkflowExecutor.scala @@ -1,9 +1,11 @@ package org.silkframework.workspace.activity.workflow -import org.silkframework.config.{Prefixes, Task, TaskSpec} -import org.silkframework.dataset.Dataset +import org.silkframework.config.{PlainTask, Prefixes, Task, TaskSpec} +import org.silkframework.dataset.{Dataset, VariableDataset} import org.silkframework.dataset.DatasetSpec.GenericDatasetSpec import org.silkframework.execution._ +import org.silkframework.execution.local.LocalExecution +import org.silkframework.plugins.dataset.InternalDataset import org.silkframework.runtime.activity.Status.Canceling import org.silkframework.runtime.activity._ import org.silkframework.runtime.plugin.PluginContext @@ -57,7 +59,12 @@ trait WorkflowExecutor[ExecType <: ExecutionType] extends Activity[WorkflowExecu updateProgress(operation, task) val result = try { - ExecutorRegistry.execute(task, inputs, output, executionContext, taskContext) + workflowRunContext.taskExecutors.get(task.id) match { + case Some(exec) => + ExecutorRegistry.executeWith(exec.asInstanceOf[Executor[TaskType, ExecType]], task, inputs, output, executionContext, taskContext) + case None => + throw WorkflowExecutionException(s"No executor found for task '${task.id}'. This is a bug: executors should have been initialized before execution.") + } } catch { case NonFatal(ex) => workflowRunContext.activityContext.value.updateWith(_.addFailedNode(nodeId, ex)) @@ -88,6 +95,30 @@ trait WorkflowExecutor[ExecType <: ExecutionType] extends Activity[WorkflowExecu } } + protected def createRunContext(implicit userContext: UserContext, context: ActivityContext[WorkflowExecutionReport]): WorkflowRunContext = { + val workflowRunContext = WorkflowRunContext( + activityContext = context, + workflow = currentWorkflow, + userContext = userContext + ) + + for (node <- workflowNodes) { + val taskOpt: Option[Task[_ <: TaskSpec]] = node match { + case datasetNode: WorkflowDataset => + project.taskOption[GenericDatasetSpec](datasetNode.task).map { dt => + resolveDataset(dt, replaceDataSources ++ replaceSinks) + } + case operatorNode: WorkflowOperator => + project.anyTaskOption(operatorNode.task) + } + for (t <- taskOpt) { + workflowRunContext.taskExecutors.getOrElseUpdate(t.id, ExecutorRegistry.instantiateExecutor(t.data, executionContext)) + } + } + + workflowRunContext + } + /** * Update the progress and write a log message. * @@ -189,6 +220,37 @@ trait WorkflowExecutor[ExecType <: ExecutionType] extends Activity[WorkflowExecu } } + /** + * Returns the dataset that should be used in the workflow. Specifically [[VariableDataset]] + * and [[InternalDataset]] need to be replaced by the corresponding real dataset. + * + * @param datasetTask + * @param replaceDatasets A map with replacement datasets for [[VariableDataset]] objects. + * @return + */ + protected def resolveDataset(datasetTask: Task[GenericDatasetSpec], + replaceDatasets: Map[String, Dataset]): Task[GenericDatasetSpec] = { + replaceDatasets.get(datasetTask.id.toString) match { + case Some(d) => + PlainTask(datasetTask.id, datasetTask.data.copy(plugin = d), metaData = datasetTask.metaData) + case None => + datasetTask.data.plugin match { + case _: VariableDataset => + throw new IllegalArgumentException("No replacement found for variable dataset " + datasetTask.id.toString) + case _: InternalDataset => + executionContext match { + case localExecution: LocalExecution => + val internalDataset = localExecution.createInternalDataset(Some(datasetTask.id.toString)) + PlainTask(datasetTask.id, datasetTask.data.copy(plugin = internalDataset), metaData = datasetTask.metaData) + case _ => + datasetTask + } + case _: Dataset => + datasetTask + } + } + } + /** Necessary update for the user context, so external datasets can be accessed in safe-mode inside a workflow execution. */ def updateUserContext(userContext: UserContext): UserContext = { val executionContext = userContext.executionContext @@ -197,11 +259,22 @@ trait WorkflowExecutor[ExecType <: ExecutionType] extends Activity[WorkflowExecu } } +/** + * A context for a single workflow execution. + * + * @param activityContext The activity context for the workflow execution. + * @param workflow The workflow that is being be executed. + * @param userContext The user that is executing the workflow. + * @param alreadyExecuted The workflow nodes that have already been executed. + * @param reconfiguredTasks The already tasks that have been reconfigured. + * @param taskExecutors The executors for each task by task id. + */ case class WorkflowRunContext(activityContext: ActivityContext[WorkflowExecutionReport], workflow: Workflow, userContext: UserContext, alreadyExecuted: mutable.Set[WorkflowNode] = mutable.Set(), - reconfiguredTasks: mutable.Map[WorkflowNode, Task[_ <: TaskSpec]] = mutable.Map()) { + reconfiguredTasks: mutable.Map[WorkflowNode, Task[_ <: TaskSpec]] = mutable.Map(), + taskExecutors: mutable.Map[Identifier, Executor[_, _]] = mutable.Map()) { /** * Listeners for updates to task reports. * We need to hold them to prevent their garbage collection.