diff --git a/pyproject.toml b/pyproject.toml index cb48b6c75..8b3c276d3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,7 @@ zip-safe = true "streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"] "streamflow.deployment.filter" = ["schemas/*.json"] "streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"] +"streamflow.provenance" = ["schemas/**/*.json"] "streamflow.recovery" = ["schemas/*.json"] "streamflow.scheduling" = ["schemas/*.json"] "streamflow.scheduling.policy" = ["schemas/*.json"] diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index 91e4ab941..875ff30cc 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -7,6 +7,7 @@ from streamflow.core.data import DataManager from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager from streamflow.core.persistence import Database +from streamflow.core.provenance import ProvenanceManager from streamflow.core.recovery import CheckpointManager, FailureManager from streamflow.core.scheduling import Policy, Scheduler from streamflow.cwl.requirement.docker import cwl_docker_translator_classes @@ -17,6 +18,7 @@ from streamflow.deployment.filter import binding_filter_classes from streamflow.log_handler import logger from streamflow.persistence import database_classes +from streamflow.provenance import provenance_manager_classes from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes from streamflow.scheduling import scheduler_classes from streamflow.scheduling.policy import policy_classes @@ -31,6 +33,7 @@ "deployment_manager": deployment_manager_classes, "failure_manager": failure_manager_classes, "policy": policy_classes, + "provenance_manager": provenance_manager_classes, "scheduler": scheduler_classes, } @@ -85,6 +88,9 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]): def register_policy(self, name: str, cls: type[Policy]): self._register(name, cls, "policy") + def register_provenance_manager(self, name: str, cls: type[ProvenanceManager]): + self._register(name, cls, "provenance_manager") + def register_scheduler(self, name: str, cls: type[Scheduler]): self._register(name, cls, "scheduler") diff --git a/streamflow/main.py b/streamflow/main.py index 5d295626a..896013413 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -31,7 +31,7 @@ from streamflow.parser import parser from streamflow.persistence import database_classes from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext -from streamflow.provenance import prov_classes +from streamflow.provenance import provenance_manager_classes from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes from streamflow.scheduling import scheduler_classes @@ -126,20 +126,20 @@ async def _async_prov(args: argparse.Namespace): f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}" ) wf_type = list(wf_type)[0] - if args.type not in prov_classes: + if args.type not in provenance_manager_classes: raise WorkflowProvenanceException( f"{args.type} provenance format is not supported." ) - elif wf_type not in prov_classes[args.type]: + elif wf_type not in provenance_manager_classes[args.type]: raise WorkflowProvenanceException( "{} provenance format is not supported for workflows of type {}.".format( args.type, wf_type ) ) else: - provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type]( - context, db_context, workflows - ) + provenance_manager: ProvenanceManager = provenance_manager_classes[ + args.type + ][wf_type](context, db_context, workflows) await provenance_manager.create_archive( outdir=args.outdir, filename=args.name, diff --git a/streamflow/provenance/__init__.py b/streamflow/provenance/__init__.py index f7e2f6338..673ed3455 100644 --- a/streamflow/provenance/__init__.py +++ b/streamflow/provenance/__init__.py @@ -1,3 +1,3 @@ from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager -prov_classes = {"run_crate": {"cwl": CWLRunCrateProvenanceManager}} +provenance_manager_classes = {"run_crate/cwl": CWLRunCrateProvenanceManager} diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index b1ed3555f..64910148f 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -12,6 +12,7 @@ import uuid from abc import ABC, abstractmethod from collections.abc import MutableMapping, MutableSequence +from importlib.resources import files from typing import Any, cast, get_args from zipfile import ZipFile @@ -1424,6 +1425,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: ) return main_entity + @classmethod + def get_schema(cls) -> str: + return ( + files(__package__) + .joinpath("schemas") + .joinpath("run_crate") + .joinpath("cwl.json") + .read_text("utf-8") + ) + async def get_property_value( self, name: str, token: Token ) -> MutableMapping[str, Any] | None: diff --git a/streamflow/provenance/schemas/run_crate/cwl.json b/streamflow/provenance/schemas/run_crate/cwl.json new file mode 100644 index 000000000..cf8ab6dfd --- /dev/null +++ b/streamflow/provenance/schemas/run_crate/cwl.json @@ -0,0 +1,7 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json", + "type": "object", + "properties": {}, + "additionalProperties": false +} \ No newline at end of file