Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
fcc6bc5
Workflow execution: Instantiate the node executors at the beginning o…
robertisele Apr 9, 2026
6232ecd
Add close method to Executor trait that allow operators to clean up a…
robertisele Apr 9, 2026
86cbcce
Add InWorkflowDataset
robertisele Apr 9, 2026
dd5b8da
Merge pull request #1034 from silk-framework/detached2
robertisele Apr 9, 2026
ccc01ee
Add integration test for InWorkflowDataset
robertisele Apr 9, 2026
8128921
SparkWorkflowExecutor should also create executors for each excution.
robertisele Apr 9, 2026
0e505fc
InMemoryDataset should hold the data from the most recent execution.
robertisele Apr 10, 2026
c1477fd
InWorkflowDataset: Nested workflow executions should use the same dat…
robertisele Apr 10, 2026
1cfccd8
InWorkflowDataset: Update doc
robertisele Apr 10, 2026
a4a73e9
InWorkflowDataset: Improve automatic cleanup in case the execution id…
robertisele Apr 10, 2026
6e3d049
InWorkflowDataset bugfixes.
robertisele Apr 10, 2026
76580ba
Update doc
robertisele Apr 10, 2026
23df68a
InWorkflowDataset fix: Don't remove graph on clear
robertisele Apr 22, 2026
c23cc9a
Remove printlns
robertisele Apr 22, 2026
abffed2
Deprecate internal datasets.
robertisele Apr 23, 2026
bdeac5e
InWorkflowDatasetExecutor bugfix: Need to use DatasetSpecAccess so th…
robertisele Apr 28, 2026
d82eca9
Merge InWorkflowDataset into InMemoryDataset and simplify code
robertisele Apr 28, 2026
a067416
Update In-Memory dataset icon
robertisele Apr 28, 2026
032b12c
Updated doc of In-Memory dataset and renamed it to "In-memory Knowled…
robertisele Apr 28, 2026
ba4d2ab
JenaModelEndpoint: Limit the memory that can be written
robertisele Apr 28, 2026
c191342
JenaModelEndpoint: document memory limit
robertisele Apr 28, 2026
d6021de
Merge remote-tracking branch 'origin/develop' into feature/workflowLi…
robertisele Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ import org.silkframework.runtime.plugin.{AnyPlugin, PluginContext}
* @tparam ExecType The execution type, e.g., SparkExecution
*/
@PluginType()
trait Executor[TaskType <: TaskSpec, ExecType <: ExecutionType] extends AnyPlugin {
trait Executor[TaskType <: TaskSpec, ExecType <: ExecutionType] extends AnyPlugin with java.io.Closeable {

def execute(task: Task[TaskType], inputs: Seq[ExecType#DataType], output: ExecutorOutput,
execution: ExecType, context: ActivityContext[ExecutionReport] = new ActivityMonitor(getClass.getSimpleName))
(implicit pluginContext: PluginContext): Option[ExecType#DataType]

/**
* Called after the (worklow) execution has finished.
*/
override def close(): Unit = {}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,31 @@ object ExecutorRegistry extends ExecutorRegistry {
context: ActivityContext[ExecutionReport] = new ActivityMonitor(getClass.getSimpleName)
)(implicit pluginContext: PluginContext): Option[ExecType#DataType] = {
val exec = executor(task.data, execution)
executeWith(exec, task, inputs, output, execution, context)
}

/** Execute with a pre-instantiated executor, skipping executor lookup. */
def executeWith[TaskType <: TaskSpec, ExecType <: ExecutionType](
exec: Executor[TaskType, ExecType],
task: Task[TaskType],
inputs: Seq[ExecType#DataType],
output: ExecutorOutput,
execution: ExecType,
context: ActivityContext[ExecutionReport]
)(implicit pluginContext: PluginContext): Option[ExecType#DataType] = {
context.status.update(Status.Running("Running", None), logStatus = false)
val startTime = System.currentTimeMillis()
val result = exec.execute(task, inputs, output, execution, context)
context.status.update(Status.Finished(success = true, System.currentTimeMillis() - startTime, cancelled = false), logStatus = false)
result
}

/** Instantiates the executor for a given task and execution type without executing it. */
def instantiateExecutor[TaskType <: TaskSpec, ExecType <: ExecutionType](
task: TaskType,
execution: ExecType
): Executor[TaskType, ExecType] = executor(task, execution)


/** Fetch the execution specific access to a dataset for the configured execution.*/
def access[DatasetType <: Dataset](task: Task[DatasetSpec[DatasetType]]): DatasetAccess = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala
case class LocalExecution(useLocalInternalDatasets: Boolean,
replaceDataSources: Map[String, Dataset] = Map.empty,
replaceSinks: Map[String, Dataset] = Map.empty,
workflowId: Option[Identifier] = None) extends ExecutionType {
workflowId: Option[Identifier] = None,
parentExecution: Option[LocalExecution] = None) extends ExecutionType {

type DataType = LocalEntities

/** Unique identifier for this execution instance. */
val executionId: Identifier = Identifier.random

private val log: Logger = Logger.getLogger(this.getClass.getName)

private val internalDatasets: mutable.Map[Option[String], InternalDatasetTrait] = mutable.Map.empty
Expand Down Expand Up @@ -107,7 +111,8 @@ object LocalExecution {
id = "LocalInternalDataset",
label = "Internal dataset (single graph)",
description =
"""Dataset for storing entities between workflow steps. This variant does use the same graph for all internal datasets in a workflow. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters."""
"""Dataset for storing entities between workflow steps. This variant does use the same graph for all internal datasets in a workflow. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""",
deprecation = "This dataset is deprecated and will be removed in a future version. Instead use either the \"In-workflow dataset\" or the \"In-memory dataset\"."
)
case class LocalInternalDataset() extends InternalDatasetTrait {
override protected def internalDatasetPluginImpl: Dataset = InternalDataset.createInternalDataset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import scala.util.Try
id = "internal",
label = "Internal dataset",
categories = Array(DatasetCategories.embedded),
description = """Dataset for storing entities between workflow steps. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters."""
description = """Dataset for storing entities between workflow steps. The underlying dataset type can be configured using the `dataset.internal.*` configuration parameters.""",
deprecation = "This dataset is deprecated and will be removed in a future version. Instead use either the \"In-workflow dataset\" or the \"In-memory dataset\"."
)
case class InternalDataset(
@Param(label = "graph URI", value = "The RDF graph that is used for storing internal data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@ Typical use cases:

## 2. Behaviour and lifecycle

- The dataset maintains a single in-memory RDF model.
- All read and write operations go through a SPARQL endpoint over this model.
- Data exists only **in memory**:
- It is not persisted to disk by this dataset.
- After an application restart, the dataset contents are empty again.
The dataset maintains a single in-memory RDF model and exposes it via a SPARQL endpoint. Two lifecycle modes are available, controlled by the `workflowScoped` parameter:

Within a workflow:
- The dataset can be used as both **input** and **output**:
- Upstream operators can write triples/entities/links into it.
- Downstream operators can read from it via SPARQL-based mechanisms.
**Application-scoped mode** (default, `workflowScoped = false`):
- A single shared model is created when the dataset is instantiated.
- Data persists for the lifetime of the running application process.
- All workflow executions share the same in-memory graph.
- After an application restart, the dataset contents are empty again.

**Workflow-scoped mode** (`workflowScoped = true`):
- A separate model is created for each workflow execution.
- Concurrent workflow executions are fully isolated from each other.
- A dataset task in a nested workflow shares the same model as the parent workflow for the same task identifier. Data written by the parent is available in the nested workflow and vice versa.
- If the dataset is read from outside a workflow context, the data from the most recently started executor is returned.
- When the workflow execution ends, the per-execution data is removed automatically.

## 3. Reading data

- When used as a **source**, the dataset exposes its data as a SPARQL endpoint.
- Queries and retrievals behave like against a normal SPARQL dataset:
- Entity retrieval, path/type discovery, sampling, etc. are executed via SPARQL.
- There is no file backing this dataset; everything comes from what has been written into the in-memory model during the lifetime of the process.
- There is no file backing this dataset; everything comes from what has been written into the in-memory model during the lifetime of the process (application-scoped) or the workflow execution (workflow-scoped).

## 4. Writing data

Expand All @@ -44,33 +48,58 @@ All three sinks ultimately write into the same in-memory graph; there is no sepa

## 5. Configuration

### Clear graph before workflow execution
### Workflow scoped <a id="parameter_doc_workflowScoped"></a>

- **Parameter:** `workflowScoped` (boolean)
- **Default:** `false`

When `true` (workflow-scoped mode):
- Data is stored in a separate in-memory graph for each workflow execution.
- Concurrent workflow executions are fully isolated from each other.
- A dataset task in a nested workflow shares the same graph as the parent for the same task identifier. Data written by the parent is available in the nested workflow and vice versa.
- If the dataset is read from outside a workflow context, the data from the most recently started executor is returned.
- When the workflow execution ends, the per-execution data is removed automatically.

When `false` (default, application-scoped mode):
- Data persists in a single shared graph for the lifetime of the running process.
- All workflow executions share the same graph.

### Clear graph before workflow execution <a id="parameter_doc_clearGraphBeforeExecution"></a>

- **Parameter:** `Clear graph before workflow execution` (boolean)
- **Default:** `true`
- **Parameter:** `clearGraphBeforeExecution` (boolean, **deprecated**)
- **Default:** `false`

Behaviour:
This parameter is deprecated. Use the **Clear dataset** operator in the workflow instead.

Behaviour (application-scoped mode only):

- If **true**:
- Before the dataset is used in a workflow execution, the graph is cleared (for writes via this dataset).
- Before the dataset is used in a workflow execution, the graph is cleared.
- The workflow sees a **fresh, empty in-memory graph** at the start of the run.

- If **false**:
- Existing data in the in-memory graph is **preserved** when the workflow starts.
- New data is added on top of whatever is already stored in the model.

This parameter controls whether the dataset behaves as a **fresh scratch graph per workflow run** or as a **longer-lived in-memory graph** within the lifetime of the running application.
This parameter has no effect when `workflowScoped = true` (the executor manages the lifecycle).

## 6. Limitations and recommendations

- **Memory-bound**
- All data is kept in memory; large graphs will increase memory usage and may impact performance.
- For large or production RDF graphs, use an external RDF store and a SPARQL dataset instead.
- A size limit is enforced: once the estimated size of data written to the dataset exceeds the value of `org.silkframework.runtime.resource.Resource.maxInMemorySize`, the workflow fails with an error. This prevents the JVM from running out of heap memory.

- **No persistence**
- Contents are lost when the application/server is restarted.
- Do not treat this dataset as long-term storage.

- **SPARQL engine**
- The dataset is backed by [Apache Jena](https://jena.apache.org/), exposed through a Jena in-memory SPARQL endpoint.

- **No named-graph support**
- Only the **default graph** is available. Writing triples into a named graph is not possible.

- **Scope**
- Best suited for:
- small to medium intermediate results,
Expand All @@ -79,14 +108,23 @@ This parameter controls whether the dataset behaves as a **fresh scratch graph p

## 7. Example usage scenarios

- Use as a **temporary integration graph**:
- Use as a **temporary integration graph** (application-scoped):
- Multiple sources write into the in-memory dataset.
- A downstream SPARQL-based operator queries the combined graph.

- Use as a **scratch area for experimentation**:
- Use as a **scratch area for experimentation** (application-scoped):
- Quickly test mapping or linking logic by writing output into the in-memory dataset.
- Inspect the result via SPARQL without configuring an external endpoint.

- Use as a **small lookup store**:
- Use as a **small lookup store** (application-scoped):
- Preload a small set of reference triples (e.g. codes or mappings).
- Let workflows query these during execution.

- Use as a **workflow-local intermediate store** (workflow-scoped):
- Multiple operators in a single workflow run write intermediate RDF results.
- Downstream operators in the same run read from the dataset without affecting parallel runs.

- Use in **nested workflows** (workflow-scoped):
- A parent workflow writes data into a workflow-scoped dataset.
- A nested sub-workflow reads and enriches the same data.
- After the nested workflow completes, the parent can read the enriched result.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.silkframework.plugins.dataset.rdf

import org.silkframework.plugins.dataset.rdf.datasets.{AlignmentDataset, InMemoryDataset, RdfFileDataset, SparqlDataset}
import org.silkframework.plugins.dataset.rdf.datasets.{AlignmentDataset, InMemoryDataset, InMemoryDatasetExecutor, RdfFileDataset, SparqlDataset}
import org.silkframework.plugins.dataset.rdf.executors.{LocalSparqlCopyExecutor, LocalSparqlSelectExecutor, LocalSparqlUpdateExecutor}
import org.silkframework.plugins.dataset.rdf.tasks.{SparqlCopyCustomTask, SparqlSelectCustomTask, SparqlUpdateCustomTask}
import org.silkframework.plugins.dataset.rdf.vocab.{InMemoryVocabularyManager, RdfFilesVocabularyManager, RdfProjectFilesVocabularyManager, RdfVocabularyManager}
Expand All @@ -26,7 +26,8 @@ class RdfPlugins extends PluginModule {
val executors = Seq(
classOf[LocalSparqlSelectExecutor],
classOf[LocalSparqlUpdateExecutor],
classOf[LocalSparqlCopyExecutor]
classOf[LocalSparqlCopyExecutor],
classOf[InMemoryDatasetExecutor]
)

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
package org.silkframework.plugins.dataset.rdf.datasets

import org.apache.jena.rdf.model.ModelFactory
import org.apache.jena.rdf.model.{Model, ModelFactory}
import org.silkframework.dataset._
import org.silkframework.dataset.rdf.{RdfDataset, SparqlEndpoint, SparqlParams}
import org.silkframework.plugins.dataset.rdf.endpoint.JenaModelEndpoint
import org.silkframework.execution.local.LocalExecution
import org.silkframework.plugins.dataset.rdf.access.{SparqlSink, SparqlSource}
import org.silkframework.plugins.dataset.rdf.endpoint.JenaModelEndpoint
import org.silkframework.runtime.activity.UserContext
import org.silkframework.runtime.plugin.annotations.{Param, Plugin, PluginReference}
import org.silkframework.util.Identifier

import java.util.Collections

@Plugin(
id = InMemoryDataset.pluginId,
label = "In-memory dataset",
label = "In-memory Knowledge Graph",
categories = Array(DatasetCategories.embedded),
description = "A Dataset that holds all data in-memory.",
description = "A dataset that holds all data in-memory. " +
"In the default (application-scoped) mode, data persists for the lifetime of the running process. " +
"In workflow-scoped mode, data is isolated per workflow execution and shared with nested workflows that reference the same dataset task.",
documentationFile = "InMemoryDataset.md",
relatedPlugins = Array(
new PluginReference(
Expand All @@ -25,33 +31,75 @@ import org.silkframework.runtime.plugin.annotations.{Param, Plugin, PluginRefere
)
)
)
case class InMemoryDataset(@Param(label = "Clear graph before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear this dataset before it is used in a workflow execution.",
advanced = true)
clearGraphBeforeExecution: Boolean = false) extends RdfDataset with TripleSinkDataset {

private val model = ModelFactory.createDefaultModel()

override val sparqlEndpoint: SparqlEndpoint = new JenaModelEndpoint(model)
case class InMemoryDataset(
@Param(label = "Workflow-scoped",
value = "If true, data is isolated per workflow execution and cleared after the execution ends, " +
"sharing data with nested workflows that reference the same dataset task. " +
"If false (default), data persists for the lifetime of the application process.")
workflowScoped: Boolean = false,
@Param(label = "Clear graph before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear this dataset before it is used in a workflow execution.",
advanced = true)
clearGraphBeforeExecution: Boolean = false
) extends RdfDataset with TripleSinkDataset {

/**
* Returns a data source for reading entities from the data set.
*/
override def source(implicit userContext: UserContext): DataSource = new SparqlSource(SparqlParams(), sparqlEndpoint)
* The active Jena model backing this dataset.
*
* Application-scoped mode: initialised once and never reassigned; holds data for the
* lifetime of the process.
*
* Workflow-scoped mode: replaced by [[updateData]] each time [[InMemoryDatasetExecutor]]
* activates a new execution.
*/
@volatile private[datasets] var model: Model = ModelFactory.createDefaultModel()

/**
* Returns a entity sink for writing entities to the data set.
*/
override def entitySink(implicit userContext: UserContext): EntitySink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution)
* Models for all current workflow executions, keyed by [[ExecutionModelKey]].
* Uses a WeakHashMap so entries are automatically cleaned up by GC when the key is no longer referenced.
* Entries are also explicitly removed by [[InMemoryDatasetExecutor.close()]] when the execution finishes.
*/
private val executionModels: java.util.Map[ExecutionModelKey, Model] =
Collections.synchronizedMap(new java.util.WeakHashMap[ExecutionModelKey, Model]())

/**
* Returns a link sink for writing entity links to the data set.
*/
override def linkSink(implicit userContext: UserContext): LinkSink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution)
private[datasets] def registerModel(key: ExecutionModelKey, model: Model): Unit =
executionModels.put(key, model)

private[datasets] def findModel(execution: LocalExecution, taskId: Identifier): Option[Model] =
Option(executionModels.get(ExecutionModelKey(execution.executionId, taskId))).orElse(
execution.parentExecution.flatMap(findModel(_, taskId))
)

override def tripleSink(implicit userContext: UserContext): TripleSink = new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = clearGraphBeforeExecution)
private[datasets] def removeModel(key: ExecutionModelKey): Unit =
executionModels.remove(key)

/** Switches [[model]] to the given execution's model so out-of-workflow reads see current data. */
private[datasets] def updateData(newModel: Model): Unit =
model = newModel

// In workflow-scoped mode the executor owns the model lifecycle, so sinks must not drop the graph.
private def dropGraph: Boolean = !workflowScoped && clearGraphBeforeExecution

override def sparqlEndpoint: SparqlEndpoint = new JenaModelEndpoint(model)

override def source(implicit userContext: UserContext): DataSource =
new SparqlSource(SparqlParams(), sparqlEndpoint)

override def entitySink(implicit userContext: UserContext): EntitySink =
new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph)

override def linkSink(implicit userContext: UserContext): LinkSink =
new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph)

override def tripleSink(implicit userContext: UserContext): TripleSink =
new SparqlSink(SparqlParams(), sparqlEndpoint, dropGraphOnClear = dropGraph)
}

object InMemoryDataset {
final val pluginId = "inMemory"
}

/**
* Key for the [[InMemoryDataset.executionModels]] WeakHashMap (workflow-scoped mode).
*/
private[datasets] case class ExecutionModelKey(executionId: Identifier, taskId: Identifier)
Loading
Loading