diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 63fde0b9a..dd28d517e 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -796,7 +796,6 @@ async def execute(self, job: Job) -> CWLCommandOutput: inputs=job.inputs, streamflow_context=self.step.workflow.context, token_value=context["inputs"][key], - job=job, ) ) for key in job.inputs.keys() diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 7636ee792..44b8ba37e 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -541,7 +541,7 @@ async def _update_listing( deployment=src_location.deployment, location_name=src_location.name, ): - # adjust the path + # Adjust the path existing.append( utils.remap_token_value( path_processor=get_path_processor( @@ -570,7 +570,10 @@ async def _update_file_token( token_value: MutableMapping[str, Any], dst_path: StreamFlowPath | None = None, ) -> MutableMapping[str, Any]: - token_class = utils.get_token_class(token_value) + if (token_class := utils.get_token_class(token_value)) is None: + raise WorkflowExecutionException( + f"Step {self.name} (job {job.name}) is processing an invalid token." + ) # Get destination coordinates dst_connector = self.workflow.context.scheduler.get_connector(job.name) dst_locations = self.workflow.context.scheduler.get_locations(job.name) @@ -626,7 +629,7 @@ async def _update_file_token( except FileExistsError: pass # Transform token value - new_token_value = { + new_token_value: MutableMapping[str, Any] = { "class": token_class, "path": str(filepath), "location": "file://" + str(filepath), diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 98f5cc6bf..e0d5fcf01 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -138,7 +138,7 @@ async def _get_listing( loc_path = StreamFlowPath(dirpath, context=context, location=location) async for dirpath, dirnames, filenames in loc_path.walk(follow_symlinks=True): for dirname in dirnames: - directory = dirpath / dirname + directory = loc_path / dirname if str(directory) not in listing_tokens: load_listing = ( LoadListing.deep_listing @@ -158,7 +158,7 @@ async def _get_listing( ) ) for filename in filenames: - file = dirpath / filename + file = loc_path / filename if str(file) not in listing_tokens: listing_tokens[str(file)] = asyncio.create_task( get_file_token( # nosec @@ -198,8 +198,8 @@ async def _process_secondary_file( case MutableMapping(): filepath = get_path_from_token(secondary_file) for location in locations: - if await ( - path := StreamFlowPath(filepath, context=context, location=location) + if await StreamFlowPath( + filepath, context=context, location=location ).exists(): return await get_file_token( context=context, @@ -1218,6 +1218,8 @@ def remap_token_value( old_dir=old_dir, new_dir=new_dir, ) + if "dirname" in value: + value["dirname"] = new_dir if "secondaryFiles" in value: value["secondaryFiles"] = [ remap_token_value(path_processor, old_dir, new_dir, sf) @@ -1387,10 +1389,11 @@ async def update_file_token( elif not load_contents and "contents" in new_token_value: del new_token_value["contents"] # Process listings - if get_token_class(new_token_value) == "Directory" and load_listing is not None: + if get_token_class(new_token_value) == "Directory": # If load listing is set to `no_listing`, remove the listing entries in present - if load_listing == LoadListing.no_listing and "listing" in new_token_value: - del new_token_value["listing"] + if load_listing is None or load_listing == LoadListing.no_listing: + if "listing" in new_token_value: + del new_token_value["listing"] # If listing is not present or if the token needs a deep listing, process directory contents elif ( "listing" not in new_token_value