diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 193159897..c48444a3d 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -369,6 +369,7 @@ def __init__( file_format: str | None = None, full_js: bool = False, glob: str | None = None, + inplace_update: bool = False, load_contents: bool = False, load_listing: LoadListing = LoadListing.no_listing, optional: bool = False, @@ -382,6 +383,7 @@ def __init__( self.expression_lib: MutableSequence[str] | None = expression_lib self.file_format: str | None = file_format self.full_js: bool = full_js + self.inplace_update: bool = inplace_update self.glob: str | None = glob self.load_contents: bool = load_contents self.load_listing: LoadListing = load_listing @@ -446,6 +448,7 @@ async def _build_token( base_path=( self.target.workdir if self.target else job.output_directory ), + inplace_update=self.inplace_update, ) # Process file format if self.file_format: diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index d7f982b8b..2abfa1022 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -208,6 +208,9 @@ def _create_command_output_processor_base( glob=cwl_element.get("outputBinding", {}).get( "glob", cwl_element.get("path") ), + inplace_update=context["requirements"] + .get("InplaceUpdateRequirement", {}) + .get("inplaceUpdate", False), load_contents=cwl_element.get( "loadContents", cwl_element.get("outputBinding", {}).get("loadContents", False), @@ -231,6 +234,9 @@ def _create_command_output_processor_base( glob=cwl_element.get("outputBinding", {}).get( "glob", cwl_element.get("path") ), + inplace_update=context["requirements"] + .get("InplaceUpdateRequirement", {}) + .get("inplaceUpdate", False), load_contents=cwl_element.get( "loadContents", cwl_element.get("outputBinding", {}).get("loadContents", False), @@ -296,6 +302,9 @@ def _create_command_output_processor( expression_lib=expression_lib, full_js=full_js, optional=optional, + inplace_update=context["requirements"] + .get("InplaceUpdateRequirement", {}) + .get("inplaceUpdate", False), ) # Record type: -> ObjectCommandOutputProcessor elif port_type["type"] == "record": diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 0f7d57a93..e195a7d92 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -177,9 +177,14 @@ async def _register_path( path: str, relpath: str, data_type: DataType = DataType.PRIMARY, + inplace_update: bool = False, ) -> DataLocation | None: if real_path := await remotepath.follow_symlink(context, connector, location, path): if real_path != path: + if inplace_update: + for data_loc in context.data_manager.get_data_locations(path=real_path): + if data_loc.deployment != connector.deployment_name: + data_loc.data_type = DataType.INVALID if data_locations := context.data_manager.get_data_locations( path=real_path, deployment=connector.deployment_name ): @@ -810,6 +815,7 @@ async def register_data( locations: MutableSequence[ExecutionLocation], base_path: str | None, token_value: MutableSequence[MutableMapping[str, Any]] | MutableMapping[str, Any], + inplace_update: bool = False, ): # If `token_value` is a list, process every item independently if isinstance(token_value, MutableSequence): @@ -860,6 +866,7 @@ async def register_data( location=location, path=path, relpath=relpath, + inplace_update=inplace_update, ) ) for location in locations