From 3bb0a9b4d83d90f3b21830bc29d4b140b73afcb3 Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 4 Mar 2026 12:11:55 +0100 Subject: [PATCH 1/3] This commit fixes the `listing` field in a `Directory` input is populated according to the CWL standard. Specifically, when `loadListing` is missing in the `WorkflowInputParameter`, the default is `no_listing`. --- streamflow/cwl/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 98f5cc6bf..651bfd52b 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -1387,10 +1387,11 @@ async def update_file_token( elif not load_contents and "contents" in new_token_value: del new_token_value["contents"] # Process listings - if get_token_class(new_token_value) == "Directory" and load_listing is not None: + if get_token_class(new_token_value) == "Directory": # If load listing is set to `no_listing`, remove the listing entries in present - if load_listing == LoadListing.no_listing and "listing" in new_token_value: - del new_token_value["listing"] + if load_listing is None or load_listing == LoadListing.no_listing: + if "listing" in new_token_value: + del new_token_value["listing"] # If listing is not present or if the token needs a deep listing, process directory contents elif ( "listing" not in new_token_value From e5da7b9589bbef17ab09cce0a8880754338bdf2f Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 4 Mar 2026 16:42:52 +0100 Subject: [PATCH 2/3] Fix `InitialWorkDir` --- streamflow/cwl/command.py | 1 - streamflow/cwl/step.py | 7 +++++-- streamflow/cwl/utils.py | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 63fde0b9a..dd28d517e 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -796,7 +796,6 @@ async def execute(self, job: Job) -> CWLCommandOutput: inputs=job.inputs, streamflow_context=self.step.workflow.context, token_value=context["inputs"][key], - job=job, ) ) for key in job.inputs.keys() diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 7636ee792..014158231 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -570,7 +570,10 @@ async def _update_file_token( token_value: MutableMapping[str, Any], dst_path: StreamFlowPath | None = None, ) -> MutableMapping[str, Any]: - token_class = utils.get_token_class(token_value) + if (token_class := utils.get_token_class(token_value)) is None: + raise WorkflowExecutionException( + f"Job {job.name} is processing a token which is not a file." + ) # Get destination coordinates dst_connector = self.workflow.context.scheduler.get_connector(job.name) dst_locations = self.workflow.context.scheduler.get_locations(job.name) @@ -626,7 +629,7 @@ async def _update_file_token( except FileExistsError: pass # Transform token value - new_token_value = { + new_token_value: MutableMapping[str, Any] = { "class": token_class, "path": str(filepath), "location": "file://" + str(filepath), diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 651bfd52b..7e9ecdaaa 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -198,8 +198,8 @@ async def _process_secondary_file( case MutableMapping(): filepath = get_path_from_token(secondary_file) for location in locations: - if await ( - path := StreamFlowPath(filepath, context=context, location=location) + if await StreamFlowPath( + filepath, context=context, location=location ).exists(): return await get_file_token( context=context, From e36a003c8b430552dc0b91a70811d56ca100b30f Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 4 Mar 2026 17:42:08 +0100 Subject: [PATCH 3/3] Fix listing of symbolic link --- streamflow/cwl/step.py | 4 ++-- streamflow/cwl/utils.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 014158231..44b8ba37e 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -541,7 +541,7 @@ async def _update_listing( deployment=src_location.deployment, location_name=src_location.name, ): - # adjust the path + # Adjust the path existing.append( utils.remap_token_value( path_processor=get_path_processor( @@ -572,7 +572,7 @@ async def _update_file_token( ) -> MutableMapping[str, Any]: if (token_class := utils.get_token_class(token_value)) is None: raise WorkflowExecutionException( - f"Job {job.name} is processing a token which is not a file." + f"Step {self.name} (job {job.name}) is processing an invalid token." ) # Get destination coordinates dst_connector = self.workflow.context.scheduler.get_connector(job.name) diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 7e9ecdaaa..e0d5fcf01 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -138,7 +138,7 @@ async def _get_listing( loc_path = StreamFlowPath(dirpath, context=context, location=location) async for dirpath, dirnames, filenames in loc_path.walk(follow_symlinks=True): for dirname in dirnames: - directory = dirpath / dirname + directory = loc_path / dirname if str(directory) not in listing_tokens: load_listing = ( LoadListing.deep_listing @@ -158,7 +158,7 @@ async def _get_listing( ) ) for filename in filenames: - file = dirpath / filename + file = loc_path / filename if str(file) not in listing_tokens: listing_tokens[str(file)] = asyncio.create_task( get_file_token( # nosec @@ -1218,6 +1218,8 @@ def remap_token_value( old_dir=old_dir, new_dir=new_dir, ) + if "dirname" in value: + value["dirname"] = new_dir if "secondaryFiles" in value: value["secondaryFiles"] = [ remap_token_value(path_processor, old_dir, new_dir, sf)