From a164088f8ecbab11af55afba8ffe7b79b6fca390 Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Sat, 6 Dec 2025 23:25:56 +0100 Subject: [PATCH] Improve type hints This commit improves type hints in the code according to `mypy` suggestions. --- pyproject.toml | 4 +- streamflow/core/asyncache.py | 10 +- streamflow/core/data.py | 8 +- streamflow/core/workflow.py | 46 +- streamflow/cwl/command.py | 87 ++-- streamflow/cwl/hardware.py | 17 +- streamflow/cwl/processor.py | 108 +++-- streamflow/cwl/step.py | 114 +++-- streamflow/cwl/token.py | 4 +- streamflow/cwl/transformer.py | 8 +- streamflow/cwl/translator.py | 415 +++++++++++------- streamflow/cwl/utils.py | 225 ++++++---- streamflow/deployment/aiotarstream.py | 42 +- streamflow/deployment/connector/kubernetes.py | 68 ++- streamflow/deployment/connector/ssh.py | 2 +- streamflow/deployment/filter/matching.py | 4 +- streamflow/deployment/manager.py | 2 +- streamflow/provenance/run_crate.py | 12 +- streamflow/recovery/failure_manager.py | 2 +- streamflow/recovery/policy/recovery.py | 3 +- streamflow/scheduling/scheduler.py | 8 +- streamflow/workflow/executor.py | 6 +- tests/conftest.py | 6 +- tests/cwl-conformance/conftest.py | 9 +- tests/test_binding_filter.py | 2 +- tests/test_connector.py | 8 +- tests/test_cwl_execution.py | 4 +- tests/test_cwl_persistence.py | 6 +- tests/test_cwl_provenance.py | 4 +- tests/test_persistence.py | 10 +- tests/test_recovery.py | 31 +- tests/test_remotepath.py | 4 +- tests/test_report.py | 3 +- tests/test_scheduler.py | 12 +- tests/test_schema.py | 7 +- tests/test_translator.py | 11 +- tests/utils/connector.py | 5 +- tests/utils/utils.py | 8 +- tests/utils/workflow.py | 16 +- uv.lock | 22 +- 40 files changed, 776 insertions(+), 587 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1359409ef..ff8aca914 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "asyncssh==2.22.0", "bcrypt==5.0.0", "cachetools==6.2.6", - "cwl-utils==0.40", + "cwl-utils @ git+https://github.com/common-workflow-language/cwl-utils.git@main", "importlib-metadata==8.7.1", "Jinja2==3.1.6", "jsonschema==4.26.0", @@ -182,4 +182,4 @@ strict = true [[tool.mypy.overrides]] module = "streamflow.cwl.antlr.*" -ignore_errors = true +ignore_errors = true \ No newline at end of file diff --git a/streamflow/core/asyncache.py b/streamflow/core/asyncache.py index ae91f47af..c1ab8630d 100644 --- a/streamflow/core/asyncache.py +++ b/streamflow/core/asyncache.py @@ -15,7 +15,7 @@ __all__ = ["cached", "cachedmethod"] from collections.abc import Callable, MutableMapping -from contextlib import AbstractAsyncContextManager +from contextlib import AbstractAsyncContextManager, suppress from typing import Any, TypeVar from cachetools import keys as cache_keys @@ -60,10 +60,8 @@ async def wrapper(*args, **kwargs): f"argument type {type(obj).__name__} uses identity hashing (cache miss risk)." ) v = await func(*args, **kwargs) - try: + with suppress(ValueError): cache[k] = v - except ValueError: - pass # value too large return v else: @@ -131,10 +129,8 @@ async def wrapper(self, *args, **kwargs): f"argument type {type(obj).__name__} uses identity hashing (cache miss risk)." ) v = await method(self, *args, **kwargs) - try: + with suppress(ValueError): c[k] = v - except ValueError: - pass # value too large return v else: diff --git a/streamflow/core/data.py b/streamflow/core/data.py index 4227f758e..6aac3ea13 100644 --- a/streamflow/core/data.py +++ b/streamflow/core/data.py @@ -4,13 +4,11 @@ from abc import ABC, abstractmethod from collections.abc import MutableSequence from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, AnyStr from streamflow.core.context import SchemaEntity if TYPE_CHECKING: - from typing import Any - from streamflow.core.context import StreamFlowContext from streamflow.core.deployment import ExecutionLocation @@ -120,7 +118,7 @@ def __init__(self, stream: Any): async def close(self) -> None: ... @abstractmethod - async def read(self, size: int | None = None): ... + async def read(self, size: int | None = None) -> AnyStr: ... @abstractmethod - async def write(self, data: Any): ... + async def write(self, data: AnyStr): ... diff --git a/streamflow/core/workflow.py b/streamflow/core/workflow.py index f30cd1725..dd09f5ce5 100644 --- a/streamflow/core/workflow.py +++ b/streamflow/core/workflow.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from collections.abc import MutableMapping, MutableSequence from enum import IntEnum -from typing import TYPE_CHECKING, TypeVar, cast +from typing import TYPE_CHECKING, TypeVar, cast, overload from typing_extensions import Self @@ -379,7 +379,7 @@ def add_input_port(self, name: str, port: Port) -> None: def add_output_port(self, name: str, port: Port) -> None: self._add_port(name, port, DependencyType.OUTPUT) - def get_input_port(self, name: str | None = None) -> Port | None: + def get_input_port(self, name: str | None = None) -> Port: if name is None: if len(self.input_ports) == 1: return self.workflow.ports.get(next(iter(self.input_ports.values()))) @@ -387,16 +387,18 @@ def get_input_port(self, name: str | None = None) -> Port | None: raise WorkflowExecutionException( f"Cannot retrieve default input port as step {self.name} contains multiple input ports." ) - return ( - self.workflow.ports.get(self.input_ports[name]) - if name in self.input_ports - else None - ) + else: + if name in self.input_ports: + return self.workflow.ports[self.input_ports[name]] + else: + raise WorkflowExecutionException( + f"Cannot retrieve input port {name} from step {self.name}" + ) def get_input_ports(self) -> MutableMapping[str, Port]: return {k: self.workflow.ports[v] for k, v in self.input_ports.items()} - def get_output_port(self, name: str | None = None) -> Port | None: + def get_output_port(self, name: str | None = None) -> Port: if name is None: if len(self.output_ports) == 1: return self.workflow.ports.get(next(iter(self.output_ports.values()))) @@ -404,11 +406,13 @@ def get_output_port(self, name: str | None = None) -> Port | None: raise WorkflowExecutionException( f"Cannot retrieve default output port as step {self.name} contains multiple output ports." ) - return ( - self.workflow.ports.get(self.output_ports[name]) - if name in self.output_ports - else None - ) + else: + if name in self.output_ports: + return self.workflow.ports[self.output_ports[name]] + else: + raise WorkflowExecutionException( + f"Cannot retrieve output port {name} from step {self.name}" + ) def get_output_ports(self) -> MutableMapping[str, Port]: return {k: self.workflow.ports[v] for k, v in self.output_ports.items()} @@ -527,7 +531,7 @@ async def _load( ) -> Self: return cls(tag=row["tag"], value=row["value"], recoverable=row["recoverable"]) - async def _save_value(self, context: StreamFlowContext): + async def _save_value(self, context: StreamFlowContext) -> Any: return self.value async def get_weight(self, context: StreamFlowContext) -> int: @@ -561,7 +565,7 @@ def recoverable(self, recoverable: bool) -> None: ) self._recoverable = recoverable - def retag(self, tag: str) -> Token: + def retag(self, tag: str) -> Self: return self.__class__(tag=tag, value=self.value, recoverable=self._recoverable) async def save( @@ -580,7 +584,7 @@ async def save( except TypeError as e: raise WorkflowExecutionException from e - def update(self, value: Any) -> Token: + def update(self, value: Any) -> Self: return self.__class__(tag=self.tag, value=value, recoverable=self._recoverable) @@ -620,6 +624,16 @@ async def _save_additional_params( ) -> MutableMapping[str, Any]: return {"config": self.config, "output_ports": self.output_ports} + if TYPE_CHECKING: + + @overload + def create_port(self) -> Port: ... + + @overload + def create_port( + self, cls: type[P] = ..., name: str | None = ..., **kwargs + ) -> P: ... + def create_port(self, cls: type[P] = Port, name: str | None = None, **kwargs) -> P: if name is None: name = str(uuid.uuid4()) diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index 6780a4fa8..00f9eb6b7 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -8,11 +8,12 @@ import shlex import time from asyncio.subprocess import STDOUT -from collections.abc import MutableMapping, MutableSequence +from collections.abc import MutableMapping, MutableSequence, Sequence from decimal import Decimal from types import ModuleType from typing import Any, cast +import cwl_utils.types from ruamel.yaml import RoundTripRepresenter from ruamel.yaml.scalarfloat import ScalarFloat from typing_extensions import Self @@ -76,7 +77,7 @@ def _adjust_cwl_output( for v in value ] case MutableMapping(): - if utils.get_token_class(value) in ["File", "Directory"]: + if cwl_utils.types.is_file_or_directory(value): if (path := utils.get_path_from_token(value)) is None: raise WorkflowExecutionException( f"Job {job_name} cannot retrieve output. " @@ -118,7 +119,7 @@ def _adjust_input( match input_: case MutableMapping(): # Process the input if it is a file or an object - if (token_class := utils.get_token_class(input_)) in ("File", "Directory"): + if cwl_utils.types.is_file_or_directory(input_): if (path := utils.get_path_from_token(input_)) is None: raise WorkflowExecutionException( f"Job {job_name} cannot adjust the input. " @@ -127,25 +128,30 @@ def _adjust_input( elif path == src_path: input_["path"] = dst_path dirname, basename = path_processor.split(dst_path) - input_["dirname"] = dirname input_["basename"] = basename input_["location"] = f"file://{dst_path}" - if token_class == "File": # nosec + if cwl_utils.types.is_file(input_): # nosec + input_["dirname"] = dirname nameroot, nameext = path_processor.splitext(basename) input_["nameroot"] = nameroot input_["nameext"] = nameext - for ins in input_.get("listing", ()): - _adjust_input( - job_name, - ins, - path_processor, - ins["path"], - path_processor.join( - dst_path, path_processor.basename(ins["path"]) - ), - ) + else: + for ins in input_.get("listing", ()): + _adjust_input( + job_name, + ins, + path_processor, + ins["path"], + path_processor.join( + dst_path, path_processor.basename(ins["path"]) + ), + ) return True - elif src_path.startswith(path) and "listing" in input_: + elif ( + src_path.startswith(path) + and cwl_utils.types.is_directory(input_) + and "listing" in input_ + ): for inp in input_["listing"]: if _adjust_input( job_name, inp, path_processor, src_path, dst_path @@ -273,7 +279,10 @@ def _get_value_for_command(token: Any, item_separator: str | None) -> Any: return item_separator.join([_get_value_repr(v) for v in value]) return value or None case MutableMapping(): - if (path := utils.get_path_from_token(token)) is not None: + if ( + cwl_utils.types.is_file_or_directory(token) + and (path := utils.get_path_from_token(token)) is not None + ): return path else: raise WorkflowExecutionException( @@ -368,10 +377,7 @@ async def _prepare_work_dir( # If listing is a dictionary, it could be a File, a Directory, a Dirent or some other object case MutableMapping(): # If it is a File or Directory element, put the corresponding file in the output directory - if (listing_class := utils.get_token_class(listing)) in [ - "File", - "Directory", - ]: + if cwl_utils.types.is_file_or_directory(listing): # If a compatible source location exists, simply transfer data if (src_path := utils.get_path_from_token(listing)) is not None and ( selected_location := await _get_source_location( @@ -399,7 +405,7 @@ async def _prepare_work_dir( dst_path=dst_path, ) # Otherwise create a File or a Directory in the remote path - elif is_literal_file(listing_class, listing, options.job.name): + elif is_literal_file(listing, options.job.name): if dst_path is None: dst_path = ( path_processor.join(base_path, listing["basename"]) @@ -410,7 +416,7 @@ async def _prepare_work_dir( dst_path = path_processor.join( dst_path, path_processor.basename(src_path) ) - if listing_class == "Directory": + if cwl_utils.types.is_directory(listing): await utils.create_remote_directory( context=context, locations=locations, @@ -439,7 +445,10 @@ async def _prepare_work_dir( context=context, locations=locations, content=( - listing["contents"] if "contents" in listing else "" + listing["contents"] + if cwl_utils.types.is_file(listing) + and "contents" in listing + else "" ), path=dst_path, relpath=path_processor.relpath(dst_path, base_path), @@ -450,7 +459,7 @@ async def _prepare_work_dir( f"Impossible to copy the {src_path} file in the working directory: No data locations found" ) # If `secondaryFiles` is present, recursively process secondary files - if "secondaryFiles" in listing: + if cwl_utils.types.is_file(listing) and "secondaryFiles" in listing: await asyncio.gather( *( asyncio.create_task( @@ -601,12 +610,12 @@ def __init__( base_command: MutableSequence[str] | None = None, environment: MutableMapping[str, str] | None = None, expression_lib: MutableSequence[str] | None = None, - failure_codes: MutableSequence[int] | None = None, + failure_codes: Sequence[int] | None = None, full_js: bool = False, initial_work_dir: str | MutableSequence[Any] | None = None, inplace_update: bool = False, is_shell_command: bool = False, - success_codes: MutableSequence[int] | None = None, + success_codes: Sequence[int] | None = None, step_stderr: str | None = None, step_stdin: str | None = None, step_stdout: str | None = None, @@ -706,7 +715,9 @@ async def _load( ) def _get_executable_command( - self, context: MutableMapping[str, Any], inputs: MutableMapping[str, Token] + self, + context: cwl_utils.types.CWLParameterContext, + inputs: MutableMapping[str, Token], ) -> MutableSequence[str]: command = [] options = CWLCommandOptions( @@ -980,11 +991,11 @@ class CWLCommandOptions(CommandOptions): def __init__( self, - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, expression_lib: MutableSequence[str] | None = None, full_js: bool = False, ): - self.context: MutableMapping[str, Any] = context + self.context: cwl_utils.types.CWLParameterContext = context self.expression_lib: MutableSequence[str] | None = expression_lib self.full_js: bool = full_js @@ -1077,8 +1088,10 @@ def bind( if isinstance(self.position, str) and not self.position.isnumeric(): position = utils.eval_expression( expression=self.position, - context=cast(dict[str, Any], options.context) - | {"self": get_token_value(token) if token else None}, + context=options.context + | cwl_utils.types.CWLParameterContext( + self=get_token_value(token) if token else None + ), full_js=options.full_js, expression_lib=options.expression_lib, ) @@ -1212,8 +1225,7 @@ def _update_options( "The `options` argument must be a `CWLCommandOptions` instance" ) return CWLCommandOptions( - context=cast(dict[str, Any], options.context) - | {"inputs": {self.name: get_token_value(token)}}, + context=options.context | {"inputs": {self.name: get_token_value(token)}}, expression_lib=options.expression_lib, full_js=options.full_js, ) @@ -1229,8 +1241,7 @@ def _update_options( ) value = get_token_value(token) return CWLCommandOptions( - context=cast(dict[str, Any], options.context) - | {"inputs": {self.name: value}, "self": value}, + context=options.context | {"inputs": {self.name: value}, "self": value}, expression_lib=options.expression_lib, full_js=options.full_js, ) @@ -1358,7 +1369,7 @@ class InitialWorkDirOptions: def __init__( self, absolute_initial_workdir_allowed: bool, - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, expression_lib: MutableSequence[str] | None, inplace_update: bool, full_js: bool, @@ -1366,7 +1377,7 @@ def __init__( step: Step, ): self.absolute_initial_workdir_allowed: bool = absolute_initial_workdir_allowed - self.context: MutableMapping[str, Any] = context + self.context: cwl_utils.types.CWLParameterContext = context self.expression_lib: MutableSequence[str] | None = expression_lib self.inplace_update: bool = inplace_update self.full_js: bool = full_js diff --git a/streamflow/cwl/hardware.py b/streamflow/cwl/hardware.py index 0c51cbc3d..25f9db1a9 100644 --- a/streamflow/cwl/hardware.py +++ b/streamflow/cwl/hardware.py @@ -5,6 +5,7 @@ from collections.abc import MutableMapping, MutableSequence from typing import TYPE_CHECKING, Any +import cwl_utils.types from typing_extensions import Self from streamflow.core.context import StreamFlowContext @@ -67,7 +68,7 @@ async def _save_additional_params( } def _process_requirement( - self, requirement: str | float, context: MutableMapping[str, Any] + self, requirement: str | float, context: cwl_utils.types.CWLParameterContext ) -> float: return math.ceil( eval_expression( @@ -78,22 +79,22 @@ def _process_requirement( ) ) - def get_cores(self, context: MutableMapping[str, Any]) -> float: + def get_cores(self, context: cwl_utils.types.CWLParameterContext) -> float: return self._process_requirement(self.cores, context) - def get_memory(self, context: MutableMapping[str, Any]) -> float: + def get_memory(self, context: cwl_utils.types.CWLParameterContext) -> float: return self._process_requirement(self.memory, context) - def get_outdir_size(self, context: MutableMapping[str, Any]) -> float: + def get_outdir_size(self, context: cwl_utils.types.CWLParameterContext) -> float: return self._process_requirement(self.outdir, context) - def get_tmpdir_size(self, context: MutableMapping[str, Any]) -> float: + def get_tmpdir_size(self, context: cwl_utils.types.CWLParameterContext) -> float: return self._process_requirement(self.tmpdir, context) def eval(self, job: Job) -> Hardware: - context = { - "inputs": {name: get_token_value(t) for name, t in job.inputs.items()} - } + context = cwl_utils.types.CWLParameterContext( + inputs={name: get_token_value(t) for name, t in job.inputs.items()} + ) return Hardware( cores=self.get_cores(context), memory=self.get_memory(context), diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 9938dda53..5ba081cf2 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -93,7 +93,7 @@ def _check_token_type( async def _fill_context( - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, command_output: asyncio.Future[CommandOutput], output_eval: str, full_js: bool, @@ -253,7 +253,10 @@ async def _process_file_token( load_contents=self.load_contents, load_listing=self.load_listing, ) - # Process secondary files + # Process secondary + sf_map: MutableMapping[ + str, cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ] if token_value.get("secondaryFiles"): initial_paths = [ utils.get_path_from_token(sf) @@ -284,7 +287,7 @@ async def _process_file_token( else: sf_map = {} if self.secondary_files: - sf_context = cast(dict[str, Any], context) | {"self": token_value} + sf_context = context | {"self": token_value} await utils.process_secondary_files( context=self.workflow.context, cwl_version=cwl_workflow.cwl_version, @@ -333,7 +336,7 @@ async def _save_additional_params( async def process(self, inputs: MutableMapping[str, Token], token: Token) -> Token: # Process file token - if utils.get_token_class(token.value) in ["File", "Directory"]: + if cwl_utils.types.is_file_or_directory(token.value): token = token.update(await self._process_file_token(inputs, token.value)) # Check type _check_token_type( @@ -408,11 +411,7 @@ async def _load( full_js=row["full_js"], glob=row["glob"], load_contents=row["load_contents"], - load_listing=( - LoadListing(row["load_listing"]) - if row["load_listing"] is not None - else None - ), + load_listing=LoadListing(row["load_listing"]), optional=row["optional"], output_eval=row["output_eval"], secondary_files=[ @@ -427,61 +426,56 @@ async def _build_token( self, job: Job, connector: Connector | None, - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, token_value: Any, recoverable: bool, ) -> Token: match token_value: case MutableMapping(): - match utils.get_token_class(token_value): - case "File" | "Directory": - connector = self._get_connector(connector, job) - locations = await self._get_locations(connector, job) - # Register path - await utils.register_data( - context=self.workflow.context, - connector=connector, - locations=locations, - token_value=token_value, - base_path=( - self.target.workdir - if self.target - else job.output_directory - ), + if cwl_utils.types.is_file_or_directory(token_value): + connector = self._get_connector(connector, job) + locations = await self._get_locations(connector, job) + # Register path + await utils.register_data( + context=self.workflow.context, + connector=connector, + locations=locations, + token_value=token_value, + base_path=( + self.target.workdir if self.target else job.output_directory + ), + ) + # Process file format + if cwl_utils.types.is_file(token_value) and self.file_format: + context |= {"self": token_value} + token_value["format"] = utils.eval_expression( + expression=self.file_format, + context=context, + full_js=self.full_js, + expression_lib=self.expression_lib, ) - # Process file format - if self.file_format: - context |= {"self": token_value} - token_value["format"] = utils.eval_expression( - expression=self.file_format, - context=context, - full_js=self.full_js, - expression_lib=self.expression_lib, - ) - return CWLFileToken( - value=token_value, - tag=get_tag(job.inputs.values()), - recoverable=recoverable, + return CWLFileToken( + value=token_value, + tag=get_tag(job.inputs.values()), + recoverable=recoverable, + ) + else: + token_tasks = { + k: asyncio.create_task( + self._build_token(job, connector, context, v, recoverable) ) - case _: - token_tasks = { - k: asyncio.create_task( - self._build_token( - job, connector, context, v, recoverable - ) + for k, v in token_value.items() + } + return ObjectToken( + value=dict( + zip( + token_tasks.keys(), + await asyncio.gather(*token_tasks.values()), + strict=True, ) - for k, v in token_value.items() - } - return ObjectToken( - value=dict( - zip( - token_tasks.keys(), - await asyncio.gather(*token_tasks.values()), - strict=True, - ) - ), - tag=get_tag(job.inputs.values()), - ) + ), + tag=get_tag(job.inputs.values()), + ) case MutableSequence(): return ListToken( value=await asyncio.gather( @@ -508,7 +502,7 @@ async def _process_command_output( job: Job, command_output: asyncio.Future[CommandOutput], connector: Connector | None, - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, ) -> MutableMapping[str, Any]: connector = self._get_connector(connector, job) locations = await self._get_locations(connector, job) diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index 04c218f44..7362d7d4f 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -7,6 +7,7 @@ from collections.abc import MutableMapping, MutableSequence from typing import Any, cast +import cwl_utils.types from typing_extensions import Self from streamflow.core.context import StreamFlowContext @@ -106,7 +107,7 @@ def __init__( self.full_js: bool = full_js async def _eval(self, inputs: MutableMapping[str, Token]) -> bool: - context = utils.build_context(inputs) + context = utils.build_context(inputs=inputs) condition = utils.eval_expression( expression=self.expression, context=context, @@ -185,7 +186,7 @@ async def _load( class CWLLoopConditionalStep(CWLConditionalStep): async def _eval(self, inputs: MutableMapping[str, Token]) -> bool: - context = utils.build_context(inputs) + context = utils.build_context(inputs=inputs) condition = utils.eval_expression( expression=self.expression, context=context, @@ -310,7 +311,7 @@ def _is_recoverable(self, job: Job) -> bool: if isinstance(self.recoverable, bool): recoverable_ = self.recoverable else: - context = utils.build_context(job.inputs) + context = utils.build_context(inputs=job.inputs) recoverable_ = utils.eval_expression( expression=self.recoverable, context=context, @@ -507,7 +508,7 @@ async def _transfer_value(self, job: Job, token_value: Any) -> Any: ) ) case MutableMapping(): - if utils.get_token_class(token_value) in ["File", "Directory"]: + if cwl_utils.types.is_file_or_directory(token_value): return await self._update_file_token(job, token_value) else: return dict( @@ -530,10 +531,12 @@ async def _transfer_value(self, job: Job, token_value: Any) -> Any: async def _update_listing( self, job: Job, - token_value: MutableMapping[str, Any], + token_value: cwl_utils.types.CWLDirectoryType, dst_path: StreamFlowPath | None = None, src_location: DataLocation | None = None, - ) -> MutableSequence[MutableMapping[str, Any]]: + ) -> MutableSequence[ + cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ]: existing, tasks = [], [] for element in token_value["listing"]: if src_location and self.workflow.context.data_manager.get_data_locations( @@ -567,10 +570,9 @@ async def _update_listing( async def _update_file_token( self, job: Job, - token_value: MutableMapping[str, Any], + token_value: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType, dst_path: StreamFlowPath | None = None, - ) -> MutableMapping[str, Any]: - token_class = utils.get_token_class(token_value) + ) -> cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType: # Get destination coordinates dst_connector = self.workflow.context.scheduler.get_connector(job.name) dst_locations = self.workflow.context.scheduler.get_locations(job.name) @@ -625,16 +627,20 @@ async def _update_file_token( ) except FileExistsError: pass - # Transform token value - new_token_value = { - "class": token_class, - "path": str(filepath), - "location": "file://" + str(filepath), - "basename": filepath.name, - "dirname": str(filepath.parent), - } + new_token_value: ( + cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ) # If token contains a file - if token_class == "File": # nosec + if cwl_utils.types.is_file(token_value): # nosec + new_token_value = cwl_utils.types.CWLFileType( + **{ + "class": "File", + "path": str(filepath), + "location": "file://" + str(filepath), + "basename": filepath.name, + "dirname": str(filepath.parent), + } + ) # Retrieve symbolic link data locations data_locations = self.workflow.context.data_manager.get_data_locations( path=str(filepath), @@ -692,10 +698,19 @@ async def _update_file_token( ) ) # If token contains a directory, propagate listing if present - elif token_class == "Directory" and "listing" in token_value: # nosec - new_token_value["listing"] = await self._update_listing( - job, token_value, filepath, selected_location + else: + new_token_value = cwl_utils.types.CWLDirectoryType( + **{ + "class": "Directory", + "path": str(filepath), + "location": "file://" + str(filepath), + "basename": filepath.name, + } ) + if "listing" in token_value: # nosec + new_token_value["listing"] = await self._update_listing( + job, token_value, filepath, selected_location + ) return new_token_value # Otherwise, create elements remotely else: @@ -704,7 +719,7 @@ async def _update_file_token( unique=not self.prefix_path, path=dst_path or dst_dir ) as filepath: # If the token contains a directory, simply create it - if token_class == "Directory": # nosec + if cwl_utils.types.is_directory(token_value): # nosec await utils.create_remote_directory( context=self.workflow.context, locations=dst_locations, @@ -754,41 +769,44 @@ async def _update_file_token( connector=dst_connector, cwl_version=cast(CWLWorkflow, self.workflow).cwl_version, locations=dst_locations, - token_class=token_class, + token_class=utils.get_token_class(token_value), filepath=str(filepath), load_contents="contents" in token_value, load_listing=LoadListing.no_listing, ) - if ( - "checksum" in token_value - and new_token_value["checksum"] != token_value["checksum"] - ): - raise WorkflowExecutionException( - "Error creating file {} with path {} in locations {}.".format( - token_value["path"], - new_token_value["path"], - [str(loc) for loc in dst_locations], + if cwl_utils.types.is_file(token_value): + if ( + "checksum" in token_value + and cast(cwl_utils.types.CWLFileType, new_token_value)["checksum"] + != token_value["checksum"] + ): + raise WorkflowExecutionException( + "Error creating file {} with path {} in locations {}.".format( + token_value["path"], + new_token_value["path"], + [str(loc) for loc in dst_locations], + ) ) - ) - # Check secondary files - if "secondaryFiles" in token_value: - new_token_value["secondaryFiles"] = await asyncio.gather( - *( - asyncio.create_task( - self._update_file_token( - job=job, - token_value=element, - dst_path=filepath.parent, + # Check secondary files + if "secondaryFiles" in token_value: + cast(cwl_utils.types.CWLFileType, new_token_value)[ + "secondaryFiles" + ] = await asyncio.gather( + *( + asyncio.create_task( + self._update_file_token( + job=job, + token_value=element, + dst_path=filepath.parent, + ) ) + for element in token_value["secondaryFiles"] ) - for element in token_value["secondaryFiles"] ) - ) - # If listing is specified, recursively process its contents - if "listing" in token_value: - new_token_value["listing"] = await self._update_listing( - job, token_value, filepath + if cwl_utils.types.is_directory(token_value) and "listing" in token_value: + cast(cwl_utils.types.CWLDirectoryType, new_token_value)["listing"] = ( + await self._update_listing(job, token_value, filepath) ) # Return the new token value return new_token_value diff --git a/streamflow/cwl/token.py b/streamflow/cwl/token.py index 2333d7305..d6201a249 100644 --- a/streamflow/cwl/token.py +++ b/streamflow/cwl/token.py @@ -20,10 +20,10 @@ async def _get_file_token_weight(context: StreamFlowContext, value: Any) -> int: ) if data_locations: data_location = next(iter(data_locations)) - path = StreamFlowPath( + sf_path = StreamFlowPath( data_location.path, context=context, location=data_location.location ) - weight = await (await path.resolve()).size() + weight = await (await sf_path.resolve()).size() if "secondaryFiles" in value: weight += sum( await asyncio.gather( diff --git a/streamflow/cwl/transformer.py b/streamflow/cwl/transformer.py index 9cb5e0986..7438e5b44 100644 --- a/streamflow/cwl/transformer.py +++ b/streamflow/cwl/transformer.py @@ -484,8 +484,7 @@ async def transform( new_inputs[output_name] = await self.processor.process( new_inputs, new_inputs[output_name] ) - context = utils.build_context(new_inputs) - context |= {"self": context["inputs"].get(output_name)} + context = utils.build_context(inputs=inputs, self=inputs.get(output_name)) return { output_name: await build_token( cwl_version=cast(CWLWorkflow, self.workflow).cwl_version, @@ -583,16 +582,13 @@ async def transform( if self.loop_source_port else None ) - context = cast(dict[str, Any], utils.build_context(loop_inputs)) | { - "self": get_token_value(self_token) - } return { self.get_output_name(): await build_token( cwl_version=cast(CWLWorkflow, self.workflow).cwl_version, inputs=inputs, token_value=utils.eval_expression( expression=self.value_from, - context=context, + context=utils.build_context(inputs=loop_inputs, self=self_token), full_js=self.full_js, expression_lib=self.expression_lib, ), diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index dbee725ac..16d1d25a9 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -5,12 +5,13 @@ import os import posixpath import urllib.parse -from collections.abc import MutableMapping, MutableSequence +from collections.abc import Mapping, MutableMapping, MutableSequence, Sequence from pathlib import Path, PurePosixPath -from typing import Any, cast, get_args +from typing import Any, cast import cwl_utils.parser import cwl_utils.parser.utils +import cwl_utils.types from schema_salad.exceptions import ValidationException from streamflow.config.config import WorkflowConfig @@ -33,7 +34,7 @@ UnionCommandOutputProcessor, UnionTokenProcessor, ) -from streamflow.core.utils import random_name +from streamflow.core.utils import flatten_list, random_name from streamflow.core.workflow import ( CommandTokenProcessor, Port, @@ -126,17 +127,17 @@ def _adjust_default_ports( transformer_prefix: str, dependent_ports: MutableSequence[str] | None = None, ) -> None: - dependent_ports = dependent_ports or {} + dependent_ports = dependent_ports or [] if filtered_ports := { port_name: port for port_name, port in input_ports.items() if port_name not in default_ports.keys() and port_name not in dependent_ports }: for default_name in default_ports.keys(): - transformer = workflow.steps.get( + transformer = workflow.steps[ posixpath.join(step_name, default_name) + f"-{transformer_prefix}-default-transformer" - ) + ] for port_name, port in filtered_ports.items(): transformer.add_input_port( ( @@ -183,8 +184,9 @@ def _create_command( for a in (cwl_element.arguments or []) ], success_codes=cwl_element.successCodes, - failure_codes=(cwl_element.permanentFailCodes or []).extend( - cwl_element.permanentFailCodes or [] + failure_codes=( + (cwl_element.permanentFailCodes or []) + + (cwl_element.temporaryFailCodes or []) ) or None, is_shell_command=is_shell_command, @@ -230,9 +232,7 @@ def _create_command_output_processor( | cwl_utils.parser.InputSchema | cwl_utils.parser.OutputSchema | MutableSequence[ - str, - cwl_utils.parser.OutputSchema, - cwl_utils.parser.InputSchema, + str | cwl_utils.parser.OutputSchema | cwl_utils.parser.InputSchema, ] ), cwl_element: ( @@ -247,7 +247,7 @@ def _create_command_output_processor( single: bool = True, ) -> CommandOutputProcessor: # Array type: -> single is False - if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + if isinstance(port_type, cwl_utils.parser.ArraySchema): return _create_command_output_processor( port_name=port_name, workflow=workflow, @@ -261,7 +261,7 @@ def _create_command_output_processor( single=False, ) # Enum type: -> create command output processor - elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + elif isinstance(port_type, cwl_utils.parser.EnumSchema): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -273,9 +273,7 @@ def _create_command_output_processor( else: enum_prefix = cwl_name_prefix # Return OutputProcessor - if isinstance( - cwl_element, get_args(cwl_utils.parser.ExpressionToolOutputParameter) - ): + if isinstance(cwl_element, cwl_utils.parser.ExpressionToolOutputParameter): return CWLExpressionToolOutputProcessor( name=port_name, workflow=workflow, @@ -307,7 +305,7 @@ def _create_command_output_processor( optional=optional, ) # Record type: -> ObjectCommandOutputProcessor - elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + elif isinstance(port_type, cwl_utils.parser.RecordSchema): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -315,6 +313,7 @@ def _create_command_output_processor( if (type_name := getattr(port_type, "name", port_name)).startswith("_:"): type_name = port_name record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) + output_binding = _get_output_binding(cwl_element) return CWLObjectCommandOutputProcessor( name=port_name, workflow=workflow, @@ -340,11 +339,7 @@ def _create_command_output_processor( }, expression_lib=expression_lib, full_js=full_js, - output_eval=( - cwl_element.outputBinding.outputEval - if getattr(cwl_element, "outputBinding", None) - else None - ), + output_eval=output_binding.outputEval if output_binding else None, single=single, ) elif isinstance(port_type, MutableSequence): @@ -420,7 +415,7 @@ def _create_command_output_processor( else: return processors[0] # Complex type -> Extract from schema definitions and propagate - elif "#" in port_type: + elif isinstance(port_type, str) and "#" in port_type: return _create_command_output_processor( port_name=port_name, workflow=workflow, @@ -460,7 +455,7 @@ def _get_command_token_processor( if token_type == "int" # nosec else "double" if token_type == "float" else token_type # nosec ) - if isinstance(binding, get_args(cwl_utils.parser.CommandLineBinding)): + if isinstance(binding, cwl_utils.parser.CommandLineBinding): return CWLCommandTokenProcessor( name=input_name, processor=processor, @@ -480,7 +475,7 @@ def _get_command_token_processor( expression=binding, token_type=( token_type.save() - if isinstance(token_type, get_args(cwl_utils.parser.Saveable)) + if isinstance(token_type, cwl_utils.parser.Saveable) else token_type ), ) @@ -493,10 +488,10 @@ def _get_command_token_processor_from_input( is_shell_command: bool = False, schema_def_types: MutableMapping[str, Any] | None = None, ) -> CommandTokenProcessor: - processor = None + processor: CommandTokenProcessor | None = None command_line_binding = cwl_element.inputBinding # Array type: -> CWLMapCommandToken - if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + if isinstance(port_type, cwl_utils.parser.ArraySchema): processor = CWLMapCommandTokenProcessor( name=input_name, processor=_get_command_token_processor_from_input( @@ -508,7 +503,7 @@ def _get_command_token_processor_from_input( ), ) # Enum type: -> substitute the type with string and reprocess - elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + elif isinstance(port_type, cwl_utils.parser.EnumSchema): return _get_command_token_processor_from_input( cwl_element=cwl_element, port_type="string", @@ -517,12 +512,12 @@ def _get_command_token_processor_from_input( schema_def_types=schema_def_types, ) # Object type: -> CWLObjectCommandToken - elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + elif isinstance(port_type, cwl_utils.parser.RecordSchema): if (type_name := getattr(port_type, "name", input_name)).startswith("_:"): type_name = input_name record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) processors = {} - for el in port_type.fields: + for el in port_type.fields or []: key = utils.get_name("", record_name_prefix, el.name) processors[key] = _get_command_token_processor_from_input( cwl_element=el, @@ -633,6 +628,7 @@ def _create_list_merger( input_port_name = _get_source_name(input_port_name) combinator.add_input_port(input_port_name, port) combinator.combinator.add_item(input_port_name) + transformer: Transformer match pick_value: case "first_non_null": combinator.add_output_port(output_port_name, workflow.create_port()) @@ -774,14 +770,14 @@ def _create_nested_size_tag( def _create_residual_combinator( workflow: Workflow, step_name: str, - inner_combinator: Combinator, + inner_combinator: Combinator | None, inner_inputs: MutableSequence[str], input_ports: MutableMapping[str, Port], ) -> Combinator: dot_product_combinator = DotProductCombinator( workflow=workflow, name=step_name + "-dot-product-combinator" ) - if inner_combinator: + if inner_combinator is not None: dot_product_combinator.add_combinator( inner_combinator, inner_combinator.get_items(recursive=True) ) @@ -797,10 +793,17 @@ def _create_residual_combinator( def _create_token_processor( port_name: str, workflow: CWLWorkflow, - port_type: Any, + port_type: ( + cwl_utils.parser.utils.InputTypeSchemas + | cwl_utils.parser.utils.OutputTypeSchemas + | None + ), cwl_element: ( - cwl_utils.parser.InputParameter - | cwl_utils.parser.OutputParameter + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter + | cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.ExpressionToolOutputParameter + | cwl_utils.parser.WorkflowOutputParameter | cwl_utils.parser.WorkflowStepInput ), cwl_name_prefix: str, @@ -812,7 +815,7 @@ def _create_token_processor( only_propagate_secondary_files: bool = True, ) -> TokenProcessor: # Array type: -> MapTokenProcessor - if isinstance(port_type, get_args(cwl_utils.parser.ArraySchema)): + if isinstance(port_type, cwl_utils.parser.ArraySchema): return _create_token_processor_optional( processor=MapTokenProcessor( name=port_name, @@ -833,7 +836,7 @@ def _create_token_processor( optional=optional, ) # Enum type: -> create output processor - elif isinstance(port_type, get_args(cwl_utils.parser.EnumSchema)): + elif isinstance(port_type, cwl_utils.parser.EnumSchema): # Process InlineJavascriptRequirement requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) @@ -859,7 +862,7 @@ def _create_token_processor( full_js=full_js, ) # Record type: -> ObjectTokenProcessor - elif isinstance(port_type, get_args(cwl_utils.parser.RecordSchema)): + elif isinstance(port_type, cwl_utils.parser.RecordSchema): if (type_name := getattr(port_type, "name", port_name)).startswith("_:"): type_name = cwl_name_prefix record_name_prefix = utils.get_name(posixpath.sep, posixpath.sep, type_name) @@ -884,7 +887,7 @@ def _create_token_processor( force_deep_listing=force_deep_listing, only_propagate_secondary_files=only_propagate_secondary_files, ) - for port_type in port_type.fields + for port_type in port_type.fields or [] }, ), optional=optional, @@ -965,7 +968,7 @@ def _create_token_processor( else: return processors[0] # Complex type -> Extract from schema definitions and propagate - elif "#" in port_type: + elif isinstance(port_type, str) and "#" in port_type: return _create_token_processor( port_name=port_name, workflow=workflow, @@ -979,7 +982,7 @@ def _create_token_processor( only_propagate_secondary_files=only_propagate_secondary_files, ) # Simple type -> Create typed processor - else: + elif isinstance(port_type, str): return _create_token_processor_optional( processor=_create_token_processor_base( port_name=port_name, @@ -993,15 +996,22 @@ def _create_token_processor( ), optional=optional, ) + else: + raise WorkflowDefinitionException( + f"Unexpected type {port_type} for port {port_name}" + ) def _create_token_processor_base( port_name: str, workflow: CWLWorkflow, - port_type: Any, + port_type: str | MutableSequence[str], cwl_element: ( - cwl_utils.parser.InputParameter - | cwl_utils.parser.OutputParameter + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter + | cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.ExpressionToolOutputParameter + | cwl_utils.parser.WorkflowOutputParameter | cwl_utils.parser.WorkflowStepInput ), context: MutableMapping[str, Any], @@ -1080,7 +1090,9 @@ def _create_token_transformer( name: str, port_name: str, workflow: CWLWorkflow, - cwl_element: cwl_utils.parser.InputParameter, + cwl_element: ( + cwl_utils.parser.CommandInputParameter | cwl_utils.parser.WorkflowInputParameter + ), cwl_name_prefix: str, schema_def_types: MutableMapping[str, Any], context: MutableMapping[str, Any], @@ -1157,45 +1169,82 @@ def _get_hardware_requirement( def _get_load_contents( port_description: ( - cwl_utils.parser.InputParameter - | cwl_utils.parser.OutputParameter + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter + | cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.ExpressionToolOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.WorkflowOutputParameter | cwl_utils.parser.WorkflowStepInput ), only_input: bool = False, ) -> bool | None: - if getattr(port_description, "loadContents", None) is not None: + if ( + isinstance(port_description, cwl_utils.parser.LoadContents) + and port_description.loadContents is not None + ): return port_description.loadContents elif ( - getattr(port_description, "inputBinding", None) - and port_description.inputBinding.loadContents is not None + isinstance( + port_description, + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter, + ) + and (input_binding := port_description.inputBinding) is not None + and not isinstance(input_binding, cwl_utils.parser.cwl_v1_0.InputBinding) + and input_binding.loadContents is not None ): - return port_description.inputBinding.loadContents + return input_binding.loadContents elif ( - getattr(port_description, "outputBinding", None) - and port_description.outputBinding.loadContents is not None + ( + isinstance( + port_description, + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.ExpressionToolOutputParameter, + ) + ) + and (output_binding := _get_output_binding(port_description)) is not None + and output_binding.loadContents is not None and not only_input ): - return port_description.outputBinding.loadContents + return output_binding.loadContents else: return None def _get_load_listing( port_description: ( - cwl_utils.parser.InputParameter - | cwl_utils.parser.OutputParameter + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter + | cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.ExpressionToolOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.WorkflowOutputParameter | cwl_utils.parser.WorkflowStepInput ), context: MutableMapping[str, Any], ) -> LoadListing: requirements = context["hints"] | context["requirements"] - if getattr(port_description, "loadListing", None): + if ( + isinstance(port_description, cwl_utils.parser.LoadContents) + and port_description.loadListing is not None + ): return LoadListing[port_description.loadListing] elif ( - getattr(port_description, "outputBinding", None) - and getattr(port_description.outputBinding, "loadListing", None) is not None + ( + isinstance( + port_description, + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.ExpressionToolOutputParameter, + ) + ) + and (output_binding := _get_output_binding(port_description)) is not None + and not isinstance(output_binding, cwl_utils.parser.cwl_v1_0.OutputBinding) + and output_binding.loadListing is not None ): - return LoadListing[port_description.outputBinding.loadListing] + return LoadListing[output_binding.loadListing] elif ( "LoadListingRequirement" in requirements and requirements["LoadListingRequirement"].loadListing @@ -1217,9 +1266,7 @@ def _get_loop( return { "loop": loop.loop, "outputMethod": ( - loop.outputMethod == "all_iterations" - if loop.outputMethod == "all" - else "last_iteration" + "all_iterations" if loop.outputMethod == "all" else "last_iteration" ), "when": loop.loopWhen, } @@ -1233,6 +1280,26 @@ def _get_loop( return None +def _get_output_binding( + cwl_element: ( + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.OutputRecordField + | cwl_utils.parser.ExpressionToolOutputParameter + ), +) -> cwl_utils.parser.CommandOutputBinding: + return ( + cwl_element.outputBinding + if isinstance( + cwl_element, + cwl_utils.parser.CommandOutputParameter + | cwl_utils.parser.CommandOutputRecordField + | cwl_utils.parser.cwl_v1_0.OutputRecordField + | cwl_utils.parser.cwl_v1_0.ExpressionToolOutputParameter, + ) + else None + ) + + def _get_path(element_id: str) -> str: path = element_id if "#" in path: @@ -1260,24 +1327,33 @@ def _get_schema_def_types( def _get_secondary_files( - cwl_element, default_required: bool + cwl_element: ( + str + | cwl_utils.parser.SecondaryFileSchema + | Sequence[str | cwl_utils.parser.SecondaryFileSchema] + | None + ), + default_required: bool, ) -> MutableSequence[SecondaryFile]: - if not cwl_element: + if cwl_element is None: return [] - secondary_files = [] - for sf in cwl_element: - if isinstance(sf, str): - secondary_files.append(SecondaryFile(pattern=sf, required=default_required)) - elif isinstance(sf, get_args(cwl_utils.parser.SecondaryFileSchema)): - secondary_files.append( - SecondaryFile( - pattern=sf.pattern, - required=( - sf.required if sf.required is not None else default_required - ), - ) + elif isinstance(cwl_element, str): + return [SecondaryFile(pattern=cwl_element, required=default_required)] + elif isinstance(cwl_element, cwl_utils.parser.SecondaryFileSchema): + return [ + SecondaryFile( + pattern=cwl_element.pattern, + required=( + cwl_element.required + if cwl_element.required is not None + else default_required + ), ) - return secondary_files + ] + else: + return flatten_list( + [_get_secondary_files(sf, default_required) for sf in cwl_element] + ) def _inject_value(value: Any) -> Any: @@ -1288,51 +1364,51 @@ def _inject_value(value: Any) -> Any: return value else: return {k: _inject_value(v) for k, v in value.items()} - elif isinstance(value, get_args(cwl_utils.parser.File)): - dict_value = {"class": value.class_} + elif isinstance(value, cwl_utils.parser.File): + file_dict = cwl_utils.types.CWLFileType(**{"class": "File"}) if value.basename is not None: - dict_value["basename"] = value.basename + file_dict["basename"] = value.basename if value.checksum is not None: - dict_value["checksum"] = value.checksum + file_dict["checksum"] = value.checksum if value.contents is not None: - dict_value["contents"] = value.contents + file_dict["contents"] = value.contents if value.dirname is not None: - dict_value["dirname"] = value.dirname + file_dict["dirname"] = value.dirname if value.format is not None: - dict_value["format"] = value.format + file_dict["format"] = value.format if value.location is not None: - dict_value["location"] = value.location + file_dict["location"] = value.location if value.nameext is not None: - dict_value["nameext"] = value.nameext + file_dict["nameext"] = value.nameext if value.nameroot is not None: - dict_value["nameroot"] = value.nameroot + file_dict["nameroot"] = value.nameroot if value.path is not None: - dict_value["path"] = value.path + file_dict["path"] = value.path if value.secondaryFiles is not None: - dict_value["secondaryFiles"] = [ + file_dict["secondaryFiles"] = [ _inject_value(sf) for sf in value.secondaryFiles ] if value.size is not None: - dict_value["size"] = value.size - return dict_value - elif isinstance(value, get_args(cwl_utils.parser.Directory)): - dict_value = {"class": value.class_} + file_dict["size"] = value.size + return file_dict + elif isinstance(value, cwl_utils.parser.Directory): + directory_dict = cwl_utils.types.CWLDirectoryType(**{"class": "Directory"}) if value.basename is not None: - dict_value["basename"] = value.basename + directory_dict["basename"] = value.basename if value.listing is not None: - dict_value["listing"] = [_inject_value(sf) for sf in value.listing] + directory_dict["listing"] = [_inject_value(sf) for sf in value.listing] if value.location is not None: - dict_value["location"] = value.location + directory_dict["location"] = value.location if value.path is not None: - dict_value["path"] = value.path - return dict_value - elif isinstance(value, get_args(cwl_utils.parser.Dirent)): - dict_value = {"entry": value.entry} + directory_dict["path"] = value.path + return directory_dict + elif isinstance(value, cwl_utils.parser.Dirent): + dirent_dict = cwl_utils.types.DirentType(entry=value.entry) if value.entryname is not None: - dict_value["entryname"] = value.entryname + dirent_dict["entryname"] = value.entryname if value.writable is not None: - dict_value["writable"] = value.writable - return dict_value + dirent_dict["writable"] = value.writable + return dirent_dict else: return value @@ -1343,9 +1419,7 @@ def _is_optional_port( | cwl_utils.parser.InputSchema | cwl_utils.parser.OutputSchema | MutableSequence[ - str, - cwl_utils.parser.OutputSchema, - cwl_utils.parser.InputSchema, + str | cwl_utils.parser.OutputSchema | cwl_utils.parser.InputSchema ] ), ) -> bool: @@ -1373,9 +1447,9 @@ def _process_docker_image( ) -> str: # Retrieve image if docker_requirement.dockerPull is not None: - return cast(str, docker_requirement.dockerPull) + return docker_requirement.dockerPull elif docker_requirement.dockerImageId is not None: - return cast(str, docker_requirement.dockerImageId) + return docker_requirement.dockerImageId else: raise WorkflowDefinitionException( "DockerRequirements without `dockerPull` or `dockerImageId` are not supported yet" @@ -1458,7 +1532,7 @@ def _process_loop_transformers( def _process_transformers( step_name: str, input_ports: MutableMapping[str, Port], - transformers: MutableMapping[str, Transformer], + transformers: Mapping[str, Transformer], input_dependencies: MutableMapping[str, set[str]], ) -> MutableMapping[str, Port]: new_input_ports = {} @@ -1514,9 +1588,7 @@ def create_command_output_processor_base( requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) # Create OutputProcessor - if isinstance( - cwl_element, get_args(cwl_utils.parser.ExpressionToolOutputParameter) - ): + if isinstance(cwl_element, cwl_utils.parser.ExpressionToolOutputParameter): return CWLExpressionToolOutputProcessor( name=port_name, workflow=workflow, @@ -1611,7 +1683,7 @@ def __init__( self.deployment_map: MutableMapping[str, DeployStep] = {} self.gather_map: MutableMapping[str, str] = {} self.input_ports: MutableMapping[str, Port] = {} - self.output_ports: MutableMapping[str, str | Port] = {} + self.output_ports: MutableMapping[str, Port] = {} self.scatter: MutableMapping[str, Any] = {} self.workflow_config: WorkflowConfig = workflow_config @@ -1678,7 +1750,10 @@ def _get_input_port( self, workflow: Workflow, cwl_element: cwl_utils.parser.Process, - element_input: cwl_utils.parser.InputParameter, + element_input: ( + cwl_utils.parser.CommandInputParameter + | cwl_utils.parser.WorkflowInputParameter + ), global_name: str, port_name: str, default_ports: MutableMapping[str, Port], @@ -1693,7 +1768,7 @@ def _get_input_port( # Insert default port transformer_suffix = ( "-wf-default-transformer" - if isinstance(cwl_element, get_args(cwl_utils.parser.Workflow)) + if isinstance(cwl_element, cwl_utils.parser.Workflow) else "-cmd-default-transformer" ) input_port = self._handle_default_port( @@ -1757,7 +1832,7 @@ def _inject_input( global_name: str, port_name: str, port: Port, - output_directory: str, + output_directory: str | None, value: Any, ) -> None: # Retrieve the DeployStep for the port target @@ -1782,9 +1857,15 @@ def _inject_input( job_prefix=f"{global_name}-injector", connector_ports={target.deployment.name: deploy_step.get_output_port()}, binding_config=binding_config, - input_directory=target.workdir or self.output_directory, - output_directory=target.workdir or self.output_directory, - tmp_directory=target.workdir or self.output_directory, + input_directory=( + target.workdir if target.workdir is not None else self.output_directory + ), + output_directory=( + target.workdir if target.workdir is not None else self.output_directory + ), + tmp_directory=( + target.workdir if target.workdir is not None else self.output_directory + ), ) # Create a CWLInputInjector step to process the input injector_step = workflow.create_step( @@ -1803,14 +1884,14 @@ def _inject_input( workflow.input_ports[port_name] = input_port.name def _inject_inputs(self, workflow: Workflow) -> None: - output_directory = None - if self.cwl_inputs: - # Compute output directory path - output_directory = os.path.dirname(self.cwl_inputs_path) + # Compute output directory path + output_directory = ( + os.path.dirname(self.cwl_inputs_path) if self.cwl_inputs else None + ) # Compute suffix default_suffix = ( "-wf-default-transformer" - if isinstance(self.cwl_definition, get_args(cwl_utils.parser.Workflow)) + if isinstance(self.cwl_definition, cwl_utils.parser.Workflow) else "-cmd-default-transformer" ) # Process externally provided inputs @@ -1844,6 +1925,7 @@ def _recursive_translate( cwl_element: ( cwl_utils.parser.CommandLineTool | cwl_utils.parser.ExpressionTool + | cwl_utils.parser.Operation | cwl_utils.parser.Workflow | cwl_utils.parser.WorkflowStep ), @@ -1864,19 +1946,23 @@ def _recursive_translate( ) for requirement in cwl_element.requirements or []: if requirement.class_ == "Loop": - if not isinstance(cwl_element, get_args(cwl_utils.parser.WorkflowStep)): + if not isinstance(cwl_element, cwl_utils.parser.WorkflowStep): raise WorkflowDefinitionException( "The `cwltool:Loop` clause is not compatible " f"with the `{cwl_element.__class__.__name__}` class." ) - if ( - cast(cwl_utils.parser.ScatterWorkflowStep, cwl_element).scatter - is not None - ): + if cwl_element.scatter is not None: raise WorkflowDefinitionException( "The `cwltool:Loop` clause is not compatible with the `scatter` directive." ) - if cwl_element.when is not None: + if ( + not isinstance( + cwl_element, + cwl_utils.parser.cwl_v1_0.WorkflowStep + | cwl_utils.parser.cwl_v1_1.WorkflowStep, + ) + and cwl_element.when is not None + ): raise WorkflowDefinitionException( "The `cwltool:Loop` clause is not compatible with the `when` directive." ) @@ -1895,7 +1981,7 @@ def _recursive_translate( requirement.class_ ] = requirement # Dispatch element - if isinstance(cwl_element, get_args(cwl_utils.parser.Workflow)): + if isinstance(cwl_element, cwl_utils.parser.Workflow): self._translate_workflow( workflow=workflow, cwl_element=cwl_element, @@ -1903,7 +1989,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, get_args(cwl_utils.parser.WorkflowStep)): + elif isinstance(cwl_element, cwl_utils.parser.WorkflowStep): self._translate_workflow_step( workflow=workflow, cwl_element=cwl_element, @@ -1911,7 +1997,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, get_args(cwl_utils.parser.CommandLineTool)): + elif isinstance(cwl_element, cwl_utils.parser.CommandLineTool): self._translate_command_line_tool( workflow=workflow, cwl_element=cwl_element, @@ -1919,7 +2005,7 @@ def _recursive_translate( name_prefix=name_prefix, cwl_name_prefix=cwl_name_prefix, ) - elif isinstance(cwl_element, get_args(cwl_utils.parser.ExpressionTool)): + elif isinstance(cwl_element, cwl_utils.parser.ExpressionTool): self._translate_command_line_tool( workflow=workflow, cwl_element=cwl_element, @@ -2047,7 +2133,7 @@ def _translate_command_line_tool( ) # Add the output port as an input of the schedule step schedule_step.add_input_port(port_name, token_transformer.get_output_port()) - if isinstance(cwl_element, get_args(cwl_utils.parser.CommandLineTool)): + if isinstance(cwl_element, cwl_utils.parser.CommandLineTool): # Create a TransferStep transfer_step = workflow.create_step( cls=CWLTransferStep, @@ -2060,7 +2146,7 @@ def _translate_command_line_tool( transfer_step.add_output_port(port_name, workflow.create_port()) # Connect the transfer step with the ExecuteStep step.add_input_port(port_name, transfer_step.get_output_port(port_name)) - elif isinstance(cwl_element, get_args(cwl_utils.parser.ExpressionTool)): + elif isinstance(cwl_element, cwl_utils.parser.ExpressionTool): # Connect the token transformer step with the ExecuteStep step.add_input_port(port_name, token_transformer.get_output_port()) # Store input port and token transformer @@ -2096,12 +2182,11 @@ def _translate_command_line_tool( port_target = None # In CWL <= v1.2, ExpressionTool output is never type-checked if isinstance( - cwl_element, get_args(cwl_utils.parser.ExpressionTool) - ) and context["version"] in [ - "v1.0", - "v1.1", - "v1.2", - ]: + cwl_element, + cwl_utils.parser.cwl_v1_0.ExpressionTool + | cwl_utils.parser.cwl_v1_1.ExpressionTool + | cwl_utils.parser.cwl_v1_2.ExpressionTool, + ): if isinstance(element_output.type_, MutableSequence): port_type = element_output.type_ if "null" not in port_type: @@ -2150,7 +2235,7 @@ def _translate_command_line_tool( port=output_port, output_processor=output_processor, ) - if isinstance(cwl_element, get_args(cwl_utils.parser.CommandLineTool)): + if isinstance(cwl_element, cwl_utils.parser.CommandLineTool): # Process command step.command = _create_command( cwl_element=cwl_element, @@ -2162,7 +2247,7 @@ def _translate_command_line_tool( # Process ToolTimeLimit if "ToolTimeLimit" in requirements: step.command.time_limit = requirements["ToolTimeLimit"].timelimit - elif isinstance(cwl_element, get_args(cwl_utils.parser.ExpressionTool)): + elif isinstance(cwl_element, cwl_utils.parser.ExpressionTool): step.command = CWLExpressionCommand(step, cwl_element.expression) # Add JS requirements step.command.expression_lib = expression_lib @@ -2238,11 +2323,7 @@ def _translate_workflow( full_js=full_js, expression_lib=expression_lib, ), - resolve_dependencies( - expression=secondary_file.required, - full_js=full_js, - expression_lib=expression_lib, - ), + secondary_file.required, ) input_dependencies[global_name] = set.union( {global_name}, {posixpath.join(step_name, d) for d in local_deps} @@ -2265,7 +2346,11 @@ def _translate_workflow( link_merge = element_output.linkMerge pick_value = ( None - if context["version"] in ["v1.0", "v1.1"] + if isinstance( + element_output, + cwl_utils.parser.cwl_v1_0.WorkflowOutputParameter + | cwl_utils.parser.cwl_v1_1.WorkflowOutputParameter, + ) else element_output.pickValue ) # If outputSource element is a list, the output element can depend on multiple ports @@ -2368,7 +2453,7 @@ def _translate_workflow_step( # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) # Find scatter elements - if isinstance(cwl_element, get_args(cwl_utils.parser.ScatterWorkflowStep)): + if isinstance(cwl_element, cwl_utils.parser.ScatterWorkflowStep): if isinstance(cwl_element.scatter, str): scatter_inputs = [ utils.get_name(step_name, cwl_step_name, cwl_element.scatter) @@ -2396,7 +2481,7 @@ def _translate_workflow_step( context=context, ) # If the inner command is a workflow, check if `SubworkflowFeatureRequirement` is defined - if isinstance(cwl_element, get_args(cwl_utils.parser.Workflow)): + if isinstance(cwl_element, cwl_utils.parser.Workflow): if "SubworkflowFeatureRequirement" not in requirements: raise WorkflowDefinitionException( "Workflow contains embedded workflow but " @@ -2564,10 +2649,16 @@ def _translate_workflow_step( # Save input ports in the global map self.input_ports |= input_ports # Process condition - conditional_step = None cwl_condition = ( - None if context["version"] in ["v1.0", "v1.1"] else cwl_element.when + None + if isinstance( + cwl_element, + cwl_utils.parser.cwl_v1_0.WorkflowStep + | cwl_utils.parser.cwl_v1_1.WorkflowStep, + ) + else cwl_element.when ) + conditional_step: CWLConditionalStep | None if cwl_condition is not None: if loop is not None: # Create loop conditional step @@ -2599,6 +2690,8 @@ def _translate_workflow_step( conditional_step.add_output_port( port_name, self.input_ports[global_name] ) + else: + conditional_step = None # Process outputs external_output_ports = {} internal_output_ports = {} @@ -2686,8 +2779,8 @@ def _translate_workflow_step( port_name, external_output_ports[global_name] ) # Add skip ports if there is a condition without a loop - if cwl_condition and loop is None: - cast(CWLConditionalStep, conditional_step).add_skip_port( + if conditional_step is not None and loop is None: + conditional_step.add_skip_port( port_name, internal_output_ports[global_name] ) # Process loop outputs @@ -2848,7 +2941,7 @@ def _translate_workflow_step_input( context: MutableMapping[str, Any], element_id: str, element_input: cwl_utils.parser.WorkflowStepInput, - element_source: str, + element_source: str | Sequence[str] | None, name_prefix: str, cwl_name_prefix: str, requirements: MutableMapping[str, Any], @@ -3043,13 +3136,13 @@ def translate(self) -> Workflow: relpath=os.path.basename(path), ) if self.cwl_inputs: - path = self.cwl_inputs_path + inputs_path = self.cwl_inputs_path self.context.data_manager.register_path( location=ExecutionLocation( deployment=deployment_name, local=True, name="__LOCAL__" ), - path=path, - relpath=os.path.basename(path), + path=inputs_path, + relpath=os.path.basename(cast(str, inputs_path)), ) # Build workflow graph self._recursive_translate( diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index e26e18fff..b37d8a71e 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -14,6 +14,7 @@ import cwl_utils.expression import cwl_utils.parser import cwl_utils.parser.utils +import cwl_utils.types from cwl_utils.parser.cwl_v1_2_utils import CONTENT_LIMIT from typing_extensions import Self @@ -133,7 +134,7 @@ async def _get_listing( dirpath: StreamFlowPath, load_contents: bool, recursive: bool, -) -> MutableSequence[MutableMapping[str, Any]]: +) -> MutableSequence[cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType]: listing_tokens = {} for location in locations: loc_path = StreamFlowPath(dirpath, context=context, location=location) @@ -183,37 +184,42 @@ async def _process_secondary_file( connector: Connector, cwl_version: str, locations: MutableSequence[ExecutionLocation], - secondary_file: Any, - token_value: MutableMapping[str, Any], + secondary_file: ( + str | cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType | None + ), + token_value: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType, from_expression: bool, - existing_sf: MutableMapping[str, Any], + existing_sf: MutableMapping[ + str, cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ], load_contents: bool, load_listing: LoadListing | None, only_retrieve_from_token: bool, -) -> MutableMapping[str, Any] | None: +) -> cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType | None: match secondary_file: # If value is None, simply return None case None: return None # If value is a dictionary, simply append it to the list case MutableMapping(): - filepath = get_path_from_token(secondary_file) - for location in locations: - if await ( - path := StreamFlowPath(filepath, context=context, location=location) - ).exists(): - return await get_file_token( - context=context, - connector=connector, - cwl_version=cwl_version, - locations=locations, - token_class=get_token_class(secondary_file), - filepath=filepath, - file_format=secondary_file.get("format"), - basename=secondary_file.get("basename"), - load_contents=load_contents, - load_listing=load_listing, - ) + if cwl_utils.types.is_file_or_directory(secondary_file): + filepath = get_path_from_token(secondary_file) + for location in locations: + if await StreamFlowPath( + filepath, context=context, location=location + ).exists(): + return await get_file_token( + context=context, + connector=connector, + cwl_version=cwl_version, + locations=locations, + token_class=get_token_class(secondary_file), + filepath=filepath, + file_format=secondary_file.get("format"), + basename=secondary_file.get("basename"), + load_contents=load_contents, + load_listing=load_listing, + ) return None # If value is a string case str(): @@ -362,10 +368,13 @@ def build_context( output_directory: str | None = None, tmp_directory: str | None = None, hardware: Hardware | None = None, -) -> MutableMapping[str, Any]: - context = {"inputs": {}, "self": None, "runtime": {}} - for name, token in inputs.items(): - context["inputs"][name] = get_token_value(token) + self: Token | None = None, +) -> cwl_utils.types.CWLParameterContext: + context = cwl_utils.types.CWLParameterContext( + inputs={name: get_token_value(token) for name, token in inputs.items()}, + self=get_token_value(self) if self is not None else None, + runtime=cwl_utils.types.CWLRuntimeParameterContext(), + ) if output_directory: context["runtime"]["outdir"] = output_directory if tmp_directory: @@ -524,7 +533,7 @@ async def build_token( async def build_token_value( context: StreamFlowContext, cwl_version: str, - js_context: MutableMapping[str, Any], + js_context: cwl_utils.types.CWLParameterContext, full_js: bool, expression_lib: MutableSequence[str] | None, secondary_files: MutableSequence[SecondaryFile] | None, @@ -555,13 +564,13 @@ async def build_token_value( ) ) return await asyncio.gather(*value_tasks) - elif isinstance(token_value, MutableMapping) and ( - token_class := get_token_class(token_value) - ) in ["File", "Directory"]: + elif cwl_utils.types.is_file_or_directory(token_value): path_processor = get_path_processor(connector) # Process secondary files in token value - sf_map = {} - if "secondaryFiles" in token_value: + sf_map: MutableMapping[ + str, cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ] = {} + if cwl_utils.types.is_file(token_value) and "secondaryFiles" in token_value: sf_tasks = [] for sf in token_value.get("secondaryFiles", []): sf_token_class = get_token_class(sf) @@ -580,7 +589,7 @@ async def build_token_value( file_format=sf.get("format"), basename=sf.get("basename"), contents=sf.get("contents"), - is_literal=is_literal_file(sf_token_class, sf), + is_literal=is_literal_file(sf), load_contents=load_contents, load_listing=load_listing, ) @@ -590,14 +599,14 @@ async def build_token_value( get_path_from_token(sf): sf for sf in await asyncio.gather(*sf_tasks) } # Get filepath - is_literal = is_literal_file(token_class, token_value) + is_literal = is_literal_file(token_value) if (filepath := get_path_from_token(token_value)) is not None: token_value = await get_file_token( context=context, connector=connector, cwl_version=cwl_version, locations=locations, - token_class=token_class, + token_class=get_token_class(token_value), filepath=filepath, file_format=token_value.get("format"), basename=token_value.get("basename"), @@ -606,13 +615,13 @@ async def build_token_value( load_listing=load_listing, ) # If there is only a 'contents' field, propagate the parameter - elif "contents" in token_value: + elif cwl_utils.types.is_file(token_value) and "contents" in token_value: token_value = await get_file_token( context=context, connector=connector, cwl_version=cwl_version, locations=locations, - token_class=token_class, + token_class=get_token_class(token_value), filepath=path_processor.join( js_context["runtime"]["outdir"], token_value.get("basename", random_name()), @@ -624,7 +633,7 @@ async def build_token_value( load_listing=load_listing, ) # If there is only a 'listing' field, build a folder token and process all the listing entries recursively - elif "listing" in token_value: + elif cwl_utils.types.is_directory(token_value) and "listing" in token_value: filepath = js_context["runtime"]["outdir"] if "basename" in token_value: filepath = path_processor.join(filepath, token_value["basename"]) @@ -660,7 +669,7 @@ async def build_token_value( connector=connector, cwl_version=cwl_version, locations=locations, - token_class=token_class, + token_class=get_token_class(token_value), filepath=filepath, file_format=token_value.get("format"), basename=token_value.get("basename"), @@ -676,7 +685,7 @@ async def build_token_value( cwl_version=cwl_version, secondary_files=secondary_files, sf_map=sf_map, - js_context=cast(dict[str, Any], js_context) | {"self": token_value}, + js_context=js_context | {"self": token_value}, full_js=full_js, expression_lib=expression_lib, connector=connector, @@ -711,7 +720,7 @@ async def create_remote_directory( def eval_expression( expression: Any, - context: MutableMapping[str, Any], + context: cwl_utils.types.CWLParameterContext, full_js: bool = False, expression_lib: MutableSequence[str] | None = None, timeout: int | None = None, @@ -801,18 +810,21 @@ async def get_file_token( is_literal: bool = False, load_contents: bool = False, load_listing: LoadListing | None = None, -) -> MutableMapping[str, Any]: +) -> cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType: path_processor = get_path_processor(connector) basename = basename or path_processor.basename(filepath) file_location = "".join(["file://", urllib.parse.quote(filepath)]) - token = { - "class": token_class, - "location": file_location, - "basename": basename, - "path": filepath, - "dirname": path_processor.dirname(filepath), - } + token: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType if token_class == "File": # nosec + token = cwl_utils.types.CWLFileType( + **{ + "class": "File", + "location": file_location, + "basename": basename, + "path": filepath, + "dirname": path_processor.dirname(filepath), + } + ) if file_format: token["format"] = file_format token["nameroot"], token["nameext"] = path_processor.splitext(basename) @@ -839,25 +851,34 @@ async def get_file_token( raise WorkflowExecutionException(f"File {filepath} does not exist") if contents and not load_contents: token["contents"] = contents - elif ( - token_class == "Directory" # nosec - and load_listing != LoadListing.no_listing - and not is_literal - ): - for location in locations: - if await ( - path := StreamFlowPath(filepath, context=context, location=location) - ).exists(): - token["listing"] = await _get_listing( - context=context, - connector=connector, - cwl_version=cwl_version, - locations=locations, - dirpath=path, - load_contents=load_contents, - recursive=load_listing == LoadListing.deep_listing, - ) - break + elif token_class == "Directory": # nosec + token = cwl_utils.types.CWLDirectoryType( + **{ + "class": "Directory", + "location": file_location, + "basename": basename, + "path": filepath, + } + ) + if load_listing != LoadListing.no_listing and not is_literal: + for location in locations: + if await ( + path := StreamFlowPath(filepath, context=context, location=location) + ).exists(): + token["listing"] = await _get_listing( + context=context, + connector=connector, + cwl_version=cwl_version, + locations=locations, + dirpath=path, + load_contents=load_contents, + recursive=load_listing == LoadListing.deep_listing, + ) + break + else: + raise WorkflowExecutionException( + f"Received token with class {token_class}: expected `File` or `Directory`" + ) return token @@ -878,7 +899,9 @@ def get_name( ) -def get_path_from_token(token_value: MutableMapping[str, Any]) -> str | None: +def get_path_from_token( + token_value: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType, +) -> str | None: location = token_value.get("location", token_value.get("path")) if location and "://" in location: scheme = urllib.parse.urlsplit(location).scheme @@ -921,17 +944,18 @@ def is_expression(expression: Any) -> bool: def is_literal_file( - file_class: str, file_value: MutableMapping[str, Any], job_name: str | None = None + file_value: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType, + job_name: str | None = None, ) -> bool: if not ("path" in file_value or "location" in file_value): job_msg = f"Job {job_name} cannot process the file. " if job_name else "" - if file_class == "File" and ("contents" not in file_value): + if cwl_utils.types.is_file(file_value) and ("contents" not in file_value): # The file can have the `contents` field without the `basename` field # but cannot have the `basename` field without the `contents` field raise WorkflowDefinitionException( f"{job_msg}Anonymous file object must have 'contents' and 'basename' fields." ) - if file_class == "Directory" and ( + if cwl_utils.types.is_directory(file_value) and ( "listing" not in file_value or "basename" not in file_value ): raise WorkflowDefinitionException( @@ -973,7 +997,7 @@ def process_embedded_tool( ) inner_cwl_name_prefix = ( step_name - if context["version"] == "v1.0" + if isinstance(cwl_element, cwl_utils.parser.cwl_v1_0.WorkflowStep) else posixpath.join(cwl_step_name, "run") ) else: @@ -1010,7 +1034,7 @@ async def process_secondary_files( cwl_version: str, secondary_files: MutableSequence[SecondaryFile], sf_map: MutableMapping[str, Any], - js_context: MutableMapping[str, Any], + js_context: cwl_utils.types.CWLParameterContext, full_js: bool, expression_lib: MutableSequence[str] | None, connector: Connector, @@ -1115,7 +1139,13 @@ async def register_data( connector: Connector, locations: MutableSequence[ExecutionLocation], base_path: str | None, - token_value: MutableSequence[MutableMapping[str, Any]] | MutableMapping[str, Any], + token_value: ( + cwl_utils.types.CWLFileType + | cwl_utils.types.CWLDirectoryType + | MutableSequence[ + cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ] + ), ) -> None: # If `token_value` is a list, process every item independently if isinstance(token_value, MutableSequence): @@ -1128,7 +1158,7 @@ async def register_data( ) ) # Otherwise, if token value is a dictionary and it refers to a File or a Directory, register the path - elif get_token_class(token_value) in ["File", "Directory"]: + elif cwl_utils.types.is_file_or_directory(token_value): path_processor = get_path_processor(connector) # Extract paths from token paths = [] @@ -1147,7 +1177,9 @@ async def register_data( elif "secondaryFiles" in token_value: paths.extend( sf_path - for sf in token_value["secondaryFiles"] + for sf in cast(cwl_utils.types.CWLFileType, token_value)[ + "secondaryFiles" + ] if (sf_path := get_path_from_token(sf)) ) # Register paths to the `DataManager` @@ -1262,7 +1294,9 @@ def resolve_dependencies( """ if is_expression(expression): context_key = context_key or "inputs" - context: MutableMapping[str, Any] = {"inputs": {}, "self": {}, "runtime": {}} + context = cwl_utils.types.CWLParameterContext( + inputs={}, self=None, runtime=cwl_utils.types.CWLRuntimeParameterContext() + ) engine = DependencyResolver(context_key) cwl_utils.expression.interpolate( expression, @@ -1371,14 +1405,14 @@ async def update_file_token( connector: Connector, cwl_version: str, location: ExecutionLocation, - token_value: MutableMapping[str, Any], + token_value: cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType, load_contents: bool | None, load_listing: LoadListing | None = None, -) -> MutableMapping[str, Any]: +) -> cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType: if path := get_path_from_token(token_value): filepath = StreamFlowPath(path, context=context, location=location) # Process contents - if get_token_class(token_value) == "File" and load_contents is not None: + if cwl_utils.types.is_file(token_value) and load_contents is not None: if load_contents and "contents" not in token_value: token_value |= { "contents": await _get_contents( @@ -1388,17 +1422,15 @@ async def update_file_token( ) } elif not load_contents and "contents" in token_value: - token_value = { - k: token_value[k] for k in token_value if k != "contents" - } + token_value = token_value.copy() + token_value.pop("contents") # Process listings - if get_token_class(token_value) == "Directory" and load_listing is not None: + if cwl_utils.types.is_directory(token_value) and load_listing is not None: # If load listing is set to `no_listing`, remove the listing entries in present if load_listing == LoadListing.no_listing: if "listing" in token_value: - token_value = { - k: token_value[k] for k in token_value if k != "listing" - } + token_value = token_value.copy() + token_value.pop("listing") # If listing is not present or if the token needs a deep listing, process directory contents elif ( "listing" not in token_value or load_listing == LoadListing.deep_listing @@ -1416,12 +1448,17 @@ async def update_file_token( } # If load listing is set to `shallow_listing`, remove the deep listing entries if present elif load_listing == LoadListing.shallow_listing: - token_value |= { - "listing": [ - {k: v[k] for k in v if k != "listing"} - for v in token_value["listing"] - ] - } + listing: MutableSequence[ + cwl_utils.types.CWLFileType | cwl_utils.types.CWLDirectoryType + ] = [] + for v in token_value["listing"]: + if cwl_utils.types.is_directory(v) and "listing" in v: + v = v.copy() + v.pop("listing") + listing.append(v) + else: + listing.append(v) + token_value["listing"] = listing return token_value diff --git a/streamflow/deployment/aiotarstream.py b/streamflow/deployment/aiotarstream.py index 8338ade87..3e28ce859 100644 --- a/streamflow/deployment/aiotarstream.py +++ b/streamflow/deployment/aiotarstream.py @@ -13,8 +13,9 @@ import time from abc import ABC from builtins import open as bltn_open +from collections.abc import Iterable from types import TracebackType -from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeAlias +from typing import IO, TYPE_CHECKING, Literal, Protocol, TypeAlias from typing_extensions import Self @@ -197,7 +198,7 @@ async def read(self, size: int | None = None) -> bytes: self.pos += len(buf) return buf - async def write(self, data: Any) -> None: + async def write(self, data: bytes) -> None: if self.cmp is None: await self._init_compression() self.crc = self.zlib.crc32(data, self.crc) @@ -294,7 +295,7 @@ async def read(self, size: int | None = None) -> bytes: self.position += length return tarfile.NUL * length - async def write(self, data: Any) -> None: + async def write(self, data: bytes) -> None: raise NotImplementedError @@ -318,7 +319,7 @@ async def read(self, size: int | None = None) -> bytes: def tell(self) -> int: return self.position - async def write(self, data: Any) -> None: + async def write(self, data: bytes) -> None: await self.stream.write(data) self.position += len(data) @@ -334,7 +335,7 @@ async def seek(self, offset: int) -> None: elif offset < self.position: raise tarfile.ReadError("Cannot seek backward with streams") - async def write(self, data: Any) -> None: + async def write(self, data: bytes) -> None: raise NotImplementedError @@ -357,7 +358,7 @@ def _proc_builtin(self, tarstream: AioTarStream) -> Self: ) return self - async def _proc_gnulong(self, tarstream): + async def _proc_gnulong(self, tarstream: AioTarStream) -> Self: buf = await tarstream.stream.read(self._block(self.size)) try: next = await self.fromtarfile(tarstream) @@ -370,7 +371,9 @@ async def _proc_gnulong(self, tarstream): next.linkname = tarfile.nts(buf, tarstream.encoding, tarstream.errors) return next - async def _proc_gnusparse_10(self, next, pax_headers, tarstream): + async def _proc_gnusparse_10( + self, next: Self, pax_headers: dict[str, str], tarstream: AioTarStream + ) -> None: sparse = [] buf = await tarstream.stream.read(tarfile.BLOCKSIZE) fields, buf = buf.split(b"\n", 1) @@ -383,7 +386,7 @@ async def _proc_gnusparse_10(self, next, pax_headers, tarstream): next.offset_data = tarstream.stream.tell() next.sparse = list(zip(sparse[::2], sparse[1::2], strict=True)) - async def _proc_member(self, tarstream): + async def _proc_member(self, tarstream: AioTarStream) -> Self: if self.type in (tarfile.GNUTYPE_LONGNAME, tarfile.GNUTYPE_LONGLINK): return await self._proc_gnulong(tarstream) elif self.type == tarfile.GNUTYPE_SPARSE: @@ -393,7 +396,7 @@ async def _proc_member(self, tarstream): else: return self._proc_builtin(tarstream) - async def _proc_pax(self, tarstream): + async def _proc_pax(self, tarstream: AioTarStream) -> Self: buf = await tarstream.stream.read(self._block(self.size)) if self.type == tarfile.XGLTYPE: pax_headers = tarstream.pax_headers @@ -535,7 +538,7 @@ def __init__( self.errorlevel = errorlevel self.copybufsize = copybufsize self.closed = False - self.members = [] + self.members: list[AioTarInfo] = [] self._loaded = False self.inodes = {} self._unames = {} # Cached mappings of uid -> uname @@ -877,7 +880,13 @@ async def extract( else: self._dbg(1, "tarfile: %s" % e) - async def extractall(self, path=".", members=None, *, numeric_owner=False): + async def extractall( + self, + path: StrOrBytesPath = ".", + members: Iterable[AioTarInfo] | None = None, + *, + numeric_owner: bool = False, + ): directories = [] for tarinfo in members or self: if tarinfo.isdir(): @@ -934,7 +943,12 @@ async def getmembers(self) -> list[AioTarInfo]: async def getnames(self) -> list[str]: return [tarinfo.name for tarinfo in await self.getmembers()] - def gettarinfo(self, name=None, arcname=None, fileobj=None): + def gettarinfo( + self, + name: StrOrBytesPath | None = None, + arcname: str | None = None, + fileobj: IO[bytes] | None = None, + ) -> AioTarInfo: self._check("awx") if fileobj is not None: name = fileobj.name @@ -1006,7 +1020,9 @@ def gettarinfo(self, name=None, arcname=None, fileobj=None): tarinfo.devminor = os.minor(statres.st_rdev) return tarinfo - def list(self, verbose=True, *, members=None): + def list( + self, verbose: bool = True, *, members: Iterable[AioTarInfo] = None + ) -> None: self._check() if members is None: members = self diff --git a/streamflow/deployment/connector/kubernetes.py b/streamflow/deployment/connector/kubernetes.py index 64b50dcd3..09ba740d3 100644 --- a/streamflow/deployment/connector/kubernetes.py +++ b/streamflow/deployment/connector/kubernetes.py @@ -38,12 +38,12 @@ ) from streamflow.core.scheduling import AvailableLocation from streamflow.core.utils import get_option -from streamflow.deployment.aiotarstream import BaseStreamWrapper from streamflow.deployment.connector.base import ( BaseConnector, copy_remote_to_remote, copy_same_connector, ) +from streamflow.deployment.stream import BaseStreamWrapper from streamflow.log_handler import logger SERVICE_NAMESPACE_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" @@ -135,8 +135,8 @@ async def write(self, data: Any) -> None: class KubernetesResponseWrapperContextManager(AsyncContextManager[StreamWrapper]): - def __init__(self, coro) -> None: - self.coro = coro + def __init__(self, coro: Awaitable[str]): + self.coro: Awaitable[str] = coro self.response: KubernetesResponseWrapper | None = None async def __aenter__(self) -> KubernetesResponseWrapper: @@ -387,20 +387,17 @@ async def get_stream_reader( ) -> AsyncContextManager[StreamWrapper]: pod, container = location.name.split(":") return KubernetesResponseWrapperContextManager( - coro=cast( - Coroutine, - self.client_ws.connect_get_namespaced_pod_exec( - name=pod, - namespace=self.namespace or "default", - container=container, - command=command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - _preload_content=False, - ), - ) + coro=self.client_ws.connect_get_namespaced_pod_exec( + name=pod, + namespace=self.namespace or "default", + container=container, + command=command, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=False, + ), ) async def get_stream_writer( @@ -437,7 +434,7 @@ async def run( timeout: int | None = None, job_name: str | None = None, ) -> tuple[str, int] | None: - command = utils.create_command( + cmd = utils.create_command( self.__class__.__name__, command, environment, @@ -449,32 +446,29 @@ async def run( if logger.isEnabledFor(logging.DEBUG): logger.debug( "EXECUTING command {command} on {location} {job}".format( - command=command, + command=cmd, location=location, job=f"for job {job_name}" if job_name else "", ) ) - command = ( + cmd = ( ["sh", "-c"] + [f"{k}={v}" for k, v in location.environment.items()] - + [utils.encode_command(command)] + + [utils.encode_command(cmd)] ) pod, container = location.name.split(":") # noinspection PyUnresolvedReferences result = await asyncio.wait_for( - cast( - Awaitable, - self.client_ws.connect_get_namespaced_pod_exec( - name=pod, - namespace=self.namespace or "default", - container=container, - command=command, - stderr=True, - stdin=False, - stdout=True, - tty=False, - _preload_content=not capture_output, - ), + self.client_ws.connect_get_namespaced_pod_exec( + name=pod, + namespace=self.namespace or "default", + container=container, + command=cmd, + stderr=True, + stdin=False, + stdout=True, + tty=False, + _preload_content=not capture_output, ), timeout=timeout, ) @@ -615,9 +609,9 @@ async def _is_ready(self, k8s_object: Any) -> bool: ) else: max_unavailable = 0 - return ready_replicas >= replicas - max_unavailable + return bool(ready_replicas >= replicas - max_unavailable) elif kind == "PersistentVolumeClaim": - return k8s_object.status.phase == "Bound" + return bool(k8s_object.status.phase == "Bound") elif kind == "Service": if k8s_object.spec.type == "ExternalName": return True @@ -645,7 +639,7 @@ async def _is_ready(self, k8s_object: Any) -> bool: ) if str(max_unavailable).endswith("%"): max_unavailable = ceil(replicas * (int(max_unavailable[:-1]) / 100)) - return ( + return bool( int(k8s_object.status.number_ready or 0) >= replicas - max_unavailable ) diff --git a/streamflow/deployment/connector/ssh.py b/streamflow/deployment/connector/ssh.py index e354f4810..8543587c6 100644 --- a/streamflow/deployment/connector/ssh.py +++ b/streamflow/deployment/connector/ssh.py @@ -442,7 +442,7 @@ def __init__( dataTransferConnection ) self.nodes: MutableMapping[str, SSHConfig] = { - n.hostname: n for n in [self._get_config(n) for n in nodes] + n.hostname: n for n in [self._get_config(n) for n in nodes] if n is not None } self.hardware: MutableMapping[str, Hardware] = {} self._cls_context: type[SSHContext] = SSHContext diff --git a/streamflow/deployment/filter/matching.py b/streamflow/deployment/filter/matching.py index 4c0caebfd..36924a5c9 100644 --- a/streamflow/deployment/filter/matching.py +++ b/streamflow/deployment/filter/matching.py @@ -73,7 +73,9 @@ def __init__( filters: MutableSequence[ MutableMapping[ str, - MutableMapping[str, str] | MutableSequence[MutableMapping[str, str]], + str + | MutableMapping[str, str] + | MutableSequence[MutableMapping[str, str]], ] ], ) -> None: diff --git a/streamflow/deployment/manager.py b/streamflow/deployment/manager.py index 791ff58a4..2a3237443 100644 --- a/streamflow/deployment/manager.py +++ b/streamflow/deployment/manager.py @@ -165,7 +165,7 @@ async def deploy(self, deployment_config: DeploymentConfig) -> None: self.dependency_graph[deployment_name].add(deployment_name) def get_connector(self, deployment_name: str) -> Connector | None: - return self.deployments_map.get(deployment_name, None) + return self.deployments_map.get(deployment_name) @classmethod def get_schema(cls) -> str: diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index 1d543afbb..7a408d0ca 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -12,7 +12,7 @@ import uuid from abc import ABC, abstractmethod from collections.abc import Container, MutableMapping, MutableSequence -from typing import Any, cast, get_args +from typing import Any, cast from zipfile import ZipFile import cwl_utils.parser @@ -217,13 +217,13 @@ def _process_cwl_type( jsonld_param["valueRequired"] = "False" else: _process_cwl_type(t, jsonld_param, cwl_param) - elif isinstance(cwl_type, get_args(cwl_utils.parser.ArraySchema)): + elif isinstance(cwl_type, cwl_utils.parser.ArraySchema): jsonld_param["multipleValues"] = "True" _process_cwl_type(cwl_type.items, jsonld_param, cwl_param) - elif isinstance(cwl_type, get_args(cwl_utils.parser.EnumSchema)): + elif isinstance(cwl_type, cwl_utils.parser.EnumSchema): jsonld_param["additionalType"] = "Text" jsonld_param["valuePattern"] = "|".join(cwl_type.symbols) - elif isinstance(cwl_type, get_args(cwl_utils.parser.RecordSchema)): + elif isinstance(cwl_type, cwl_utils.parser.RecordSchema): jsonld_param["additionalType"] = "PropertyValue" jsonld_param["multipleValues"] = "True" @@ -1111,7 +1111,7 @@ def _get_step( step_name=step_name, context={"version": version}, ) - if isinstance(embedded_tool, get_args(cwl_utils.parser.Workflow)): + if isinstance(embedded_tool, cwl_utils.parser.Workflow): work_example = self._get_workflow( cwl_prefix=cwl_prefix, prefix=step_name, @@ -1423,7 +1423,7 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: self._add_params(cwl_prefix, posixpath.sep, main_entity, self.cwl_definition) # Add steps if present if ( - isinstance(self.cwl_definition, get_args(cwl_utils.parser.Workflow)) + isinstance(self.cwl_definition, cwl_utils.parser.Workflow) and len(self.cwl_definition.steps) > 0 ): main_entity["step"] = [] diff --git a/streamflow/recovery/failure_manager.py b/streamflow/recovery/failure_manager.py index ccf2eb9bf..f344d2e08 100644 --- a/streamflow/recovery/failure_manager.py +++ b/streamflow/recovery/failure_manager.py @@ -145,7 +145,7 @@ def get_schema(cls) -> str: ) def get_request(self, job_name: str) -> RetryRequest: - pass + return RetryRequest() async def recover(self, job: Job, step: Step, exception: BaseException) -> None: if logger.isEnabledFor(logging.WARNING): diff --git a/streamflow/recovery/policy/recovery.py b/streamflow/recovery/policy/recovery.py index cd254e778..3d3b948c8 100644 --- a/streamflow/recovery/policy/recovery.py +++ b/streamflow/recovery/policy/recovery.py @@ -7,7 +7,7 @@ from typing import cast from streamflow.core.exception import FailureHandlingException -from streamflow.core.recovery import RecoveryPolicy +from streamflow.core.recovery import RecoveryPolicy, TokenAvailability from streamflow.core.utils import get_tag from streamflow.core.workflow import Job, Step, Token, Workflow from streamflow.log_handler import logger @@ -15,7 +15,6 @@ from streamflow.recovery.utils import ( GraphMapper, ProvenanceGraph, - TokenAvailability, create_graph_mapper, ) from streamflow.workflow.executor import StreamFlowExecutor diff --git a/streamflow/scheduling/scheduler.py b/streamflow/scheduling/scheduler.py index c14959285..d4a898e07 100644 --- a/streamflow/scheduling/scheduler.py +++ b/streamflow/scheduling/scheduler.py @@ -170,7 +170,11 @@ async def _free_resources( self.hardware_locations[loc.name] = ( self.hardware_locations[loc.name] - job_hardware ) + storage_usage - if locations := [loc.wraps for loc in locations if loc.stacked]: + if locations := [ + loc.wraps + for loc in locations + if loc.stacked and loc.wraps is not None + ]: conn = cast(ConnectorWrapper, conn).connector for execution_loc in locations: job_hardware = await utils.bind_mount_point( @@ -531,7 +535,7 @@ async def schedule( hardware_requirement: HardwareRequirement | None, ) -> None: job_context = JobContext(job) - targets = list(binding_config.targets) + targets: MutableSequence[Target] = list(binding_config.targets) for f in (self._get_binding_filter(f) for f in binding_config.filters): targets = await f.get_targets(job, targets) wait_tasks = [ diff --git a/streamflow/workflow/executor.py b/streamflow/workflow/executor.py index e89908337..e5fd3e68c 100644 --- a/streamflow/workflow/executor.py +++ b/streamflow/workflow/executor.py @@ -3,6 +3,7 @@ import asyncio import time from collections.abc import MutableMapping, MutableSequence +from contextlib import suppress from typing import TYPE_CHECKING, overload from streamflow.core import utils @@ -35,9 +36,8 @@ async def _handle_exception(self, task: asyncio.Task[None]) -> None: ... async def _handle_exception(self, task: asyncio.Task[Token | None]) -> Token | None: try: - return await task - except asyncio.CancelledError: - pass + with suppress(asyncio.CancelledError): + return await task except Exception as exc: logger.exception(exc) if not self.closed: diff --git a/tests/conftest.py b/tests/conftest.py index 946439c5c..2a9e6c00f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -175,7 +175,9 @@ def object_to_dict(obj: Any) -> MutableMapping[str, Any]: } -def are_equals(elem1, elem2, obj_compared=None): +def are_equals( + elem1: Any, elem2: Any, obj_compared: MutableSequence[Any] | None = None +) -> bool: """ The function return True if the elems are the same, otherwise False The param obj_compared is useful to break a circul reference inside the objects @@ -191,7 +193,7 @@ def are_equals(elem1, elem2, obj_compared=None): return True if is_primitive_type(elem1): - return elem1 == elem2 + return bool(elem1 == elem2) if isinstance(elem1, Collection) and not isinstance(elem1, dict): if len(elem1) != len(elem2): diff --git a/tests/cwl-conformance/conftest.py b/tests/cwl-conformance/conftest.py index cf4b06755..077bc8a26 100644 --- a/tests/cwl-conformance/conftest.py +++ b/tests/cwl-conformance/conftest.py @@ -19,10 +19,13 @@ def pytest_cwl_execute_test( args = [ "--streamflow-file", os.path.join(this_directory, "streamflow.yml"), - "--outdir", - config.outdir, - processfile, ] + + if config.outdir is not None: + args.extend(["--outdir", config.outdir]) + + args.append(processfile) + if jobfile is not None: args.append(jobfile) diff --git a/tests/test_binding_filter.py b/tests/test_binding_filter.py index 59d287401..6106767b8 100644 --- a/tests/test_binding_filter.py +++ b/tests/test_binding_filter.py @@ -8,7 +8,7 @@ WorkflowExecutionException, ) from streamflow.core.workflow import Job, Token -from streamflow.deployment.filter import MatchingBindingFilter +from streamflow.deployment.filter.matching import MatchingBindingFilter from streamflow.workflow.token import ListToken, ObjectToken from tests.utils.workflow import BaseFileToken, random_job_name diff --git a/tests/test_connector.py b/tests/test_connector.py index 31bd37618..8cc7812c2 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -4,7 +4,7 @@ import logging import os import re -from collections.abc import Callable, MutableSequence +from collections.abc import Callable, Iterable, MutableSequence from typing import Any import pytest @@ -14,7 +14,7 @@ from streamflow.core.context import StreamFlowContext from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.exception import WorkflowExecutionException -from streamflow.deployment.connector import SSHConnector +from streamflow.deployment.connector.ssh import SSHConnector from streamflow.deployment.future import FutureConnector from tests.conftest import get_class_callables from tests.utils.connector import ( @@ -145,7 +145,7 @@ async def test_future_connector_multiple_request_fail( @pytest.mark.asyncio async def test_ssh_connector_channel_open_error( caplog: LogCaptureFixture, - chosen_deployment_types: MutableSequence[str], + chosen_deployment_types: Iterable[str], context: StreamFlowContext, ) -> None: """ @@ -168,7 +168,7 @@ async def test_ssh_connector_channel_open_error( @pytest.mark.asyncio async def test_ssh_connector_multiple_request_fail( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test SSHConnector with multiple requests but the deployment fails""" if "ssh" not in chosen_deployment_types: diff --git a/tests/test_cwl_execution.py b/tests/test_cwl_execution.py index 7fc6dabf0..482ce2803 100644 --- a/tests/test_cwl_execution.py +++ b/tests/test_cwl_execution.py @@ -2,7 +2,7 @@ import itertools import os import posixpath -from collections.abc import MutableSequence +from collections.abc import Iterable from typing import cast import pytest @@ -211,7 +211,7 @@ async def test_initial_workdir( @pytest.mark.asyncio @pytest.mark.parametrize("file_type", ("file", "directory")) async def test_creating_file( - chosen_deployment_types: MutableSequence[str], + chosen_deployment_types: Iterable[str], context: StreamFlowContext, file_type: str, ) -> None: diff --git a/tests/test_cwl_persistence.py b/tests/test_cwl_persistence.py index 5e1e8c4c0..f1fc231f1 100644 --- a/tests/test_cwl_persistence.py +++ b/tests/test_cwl_persistence.py @@ -18,6 +18,7 @@ NullTokenProcessor, ObjectTokenProcessor, PopCommandOutputProcessor, + TokenProcessor, UnionCommandOutputProcessor, UnionTokenProcessor, ) @@ -34,7 +35,6 @@ from streamflow.cwl.processor import ( CWLCommandOutputProcessor, CWLExpressionToolOutputProcessor, - CWLFileToken, CWLObjectCommandOutputProcessor, CWLTokenProcessor, ) @@ -49,6 +49,7 @@ CWLScheduleStep, CWLTransferStep, ) +from streamflow.cwl.token import CWLFileToken from streamflow.cwl.transformer import ( AllNonNullTransformer, CloneTransformer, @@ -214,6 +215,7 @@ async def test_cwl_command(context: StreamFlowContext, processor_t: str) -> None ) job_port = workflow.create_port(JobPort) await workflow.save(context) + processors: MutableSequence[CommandTokenProcessor] | None match processor_t: case "none": processors = None @@ -462,7 +464,7 @@ async def test_cwl_token_transformer( if workflow.format_graph is None: workflow.format_graph = Graph() await workflow.save(context) - processor = None + processor: TokenProcessor match processor_t: case "primitive": processor = _create_cwl_token_processor(port.name, workflow) diff --git a/tests/test_cwl_provenance.py b/tests/test_cwl_provenance.py index 9adb44ad8..a52eb5efc 100644 --- a/tests/test_cwl_provenance.py +++ b/tests/test_cwl_provenance.py @@ -53,12 +53,12 @@ TerminationToken, ) from streamflow.workflow.transformer import ManyToOneTransformer -from tests.test_provenance import ( +from tests.utils.cwl import get_cwl_parser +from tests.utils.utils import ( create_and_run_step, inject_tokens, verify_dependency_tokens, ) -from tests.utils.cwl import get_cwl_parser from tests.utils.workflow import ( CWL_VERSION, create_deploy_step, diff --git a/tests/test_persistence.py b/tests/test_persistence.py index 9a03df1e2..832188b6e 100644 --- a/tests/test_persistence.py +++ b/tests/test_persistence.py @@ -47,7 +47,7 @@ async def _load( row: MutableMapping[str, Any], loading_context: DatabaseLoadingContext, ) -> Self: - return DummyHardwareRequirement() + return cls() async def _save_additional_params( self, context: StreamFlowContext @@ -122,7 +122,7 @@ async def test_deploy_step(context: StreamFlowContext) -> None: @pytest.mark.asyncio async def test_schedule_step(context: StreamFlowContext) -> None: """Test saving and loading ScheduleStep from database""" - workflow, (job_port,) = await create_workflow(context, type_="default", num_port=1) + workflow, (job_port,) = await create_workflow(context, type_=Workflow, num_port=1) binding_config = get_full_instantiation( BindingConfig, targets=[ @@ -167,7 +167,7 @@ async def test_schedule_step(context: StreamFlowContext) -> None: @pytest.mark.asyncio async def test_execute_step(context: StreamFlowContext) -> None: """Test saving and loading ExecuteStep from database""" - workflow, (job_port,) = await create_workflow(context, type_="default", num_port=1) + workflow, (job_port,) = await create_workflow(context, type_=Workflow, num_port=1) await workflow.save(context) step = get_full_instantiation( @@ -183,7 +183,7 @@ async def test_execute_step(context: StreamFlowContext) -> None: @pytest.mark.asyncio async def test_gather_step(context: StreamFlowContext) -> None: """Test saving and loading GatherStep from database""" - workflow, (port,) = await create_workflow(context, type_="default", num_port=1) + workflow, (port,) = await create_workflow(context, type_=Workflow, num_port=1) await workflow.save(context) step = get_full_instantiation( @@ -200,7 +200,7 @@ async def test_gather_step(context: StreamFlowContext) -> None: @pytest.mark.asyncio async def test_scatter_step(context: StreamFlowContext) -> None: """Test saving and loading ScatterStep from database""" - workflow, (port,) = await create_workflow(context, type_="default", num_port=1) + workflow, (port,) = await create_workflow(context, type_=Workflow, num_port=1) await workflow.save(context) step = get_full_instantiation( diff --git a/tests/test_recovery.py b/tests/test_recovery.py index 092b576c5..9e1c31e67 100644 --- a/tests/test_recovery.py +++ b/tests/test_recovery.py @@ -4,9 +4,10 @@ import posixpath import tempfile import uuid -from collections.abc import AsyncGenerator, MutableMapping, MutableSequence +from collections.abc import AsyncGenerator, Iterable from typing import Any +import cwl_utils.types import pytest import pytest_asyncio @@ -75,7 +76,7 @@ async def _assert_token_result( async def _create_file( context: StreamFlowContext, location: ExecutionLocation -) -> MutableMapping[str, Any]: +) -> cwl_utils.types.CWLFileType: path = StreamFlowPath( tempfile.gettempdir() if location.local else "/tmp", utils.random_name(), @@ -84,18 +85,20 @@ async def _create_file( ) await path.write_text("StreamFlow fault tolerance") path = await path.resolve() - return { - "basename": os.path.basename(path), - "checksum": f"sha1${await path.checksum()}", - "class": "File", - "path": str(path), - "size": await path.size(), - } + return cwl_utils.types.CWLFileType( + **{ + "basename": os.path.basename(path), + "checksum": f"sha1${await path.checksum()}", + "class": "File", + "path": str(path), + "size": await path.size(), + } + ) @pytest_asyncio.fixture(scope="module") async def fault_tolerant_context( - chosen_deployment_types: MutableSequence[str], + chosen_deployment_types: Iterable[str], ) -> AsyncGenerator[StreamFlowContext, Any]: _context = build_context( { @@ -140,7 +143,7 @@ async def test_execute( token_type: str, ) -> None: deployment_t = "local-fs-volatile" - workflow = next(iter(await create_workflow(fault_tolerant_context, num_port=0))) + workflow, _ = await create_workflow(fault_tolerant_context, num_port=0) translator = RecoveryTranslator(workflow) deployment_config = await get_deployment_config( fault_tolerant_context, deployment_t @@ -148,6 +151,7 @@ async def test_execute( execution_location = await get_location(fault_tolerant_context, deployment_t) translator.deployment_configs = {deployment_config.name: deployment_config} input_ports = {} + token_value: cwl_utils.types.CWLOutputType if token_type == "primitive": token_value = 100 elif token_type == "file": @@ -452,7 +456,7 @@ async def test_resume_scatter_step(context: StreamFlowContext) -> None: async def test_scatter(fault_tolerant_context: StreamFlowContext) -> None: num_of_failures = 1 deployment_t = "local-fs-volatile" - workflow = next(iter(await create_workflow(fault_tolerant_context, num_port=0))) + workflow, _ = await create_workflow(fault_tolerant_context, num_port=0) translator = RecoveryTranslator(workflow) deployment_config = await get_deployment_config( fault_tolerant_context, deployment_t @@ -550,7 +554,7 @@ async def test_synchro(fault_tolerant_context: StreamFlowContext) -> None: num_of_failures = 1 token_t = "file" deployment_t = "local-fs-volatile" - workflow = next(iter(await create_workflow(fault_tolerant_context, num_port=0))) + workflow, _ = await create_workflow(fault_tolerant_context, num_port=0) translator = RecoveryTranslator(workflow) deployment_config = await get_deployment_config( fault_tolerant_context, deployment_t @@ -558,6 +562,7 @@ async def test_synchro(fault_tolerant_context: StreamFlowContext) -> None: execution_location = await get_location(fault_tolerant_context, deployment_t) translator.deployment_configs = {deployment_config.name: deployment_config} input_ports = {} + token_value: cwl_utils.types.CWLOutputType if token_t == "default": token_value = 100 elif token_t == "file": diff --git a/tests/test_remotepath.py b/tests/test_remotepath.py index 64481fcc0..12958c94a 100644 --- a/tests/test_remotepath.py +++ b/tests/test_remotepath.py @@ -1,7 +1,7 @@ from __future__ import annotations import tempfile -from collections.abc import MutableSequence +from collections.abc import Iterable import pytest import pytest_asyncio @@ -178,7 +178,7 @@ async def test_glob( @pytest.mark.asyncio async def test_mkdir_failure( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test on `mkdir` function failure""" if "docker" not in chosen_deployment_types: diff --git a/tests/test_report.py b/tests/test_report.py index 1de22a3af..004db0b08 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -1,5 +1,6 @@ import hashlib import os +from collections.abc import AsyncGenerator from tempfile import TemporaryDirectory import pytest @@ -13,7 +14,7 @@ @pytest_asyncio.fixture(scope="session") -async def context() -> StreamFlowContext: +async def context() -> AsyncGenerator[StreamFlowContext]: _context = build_context( { "database": { diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0efd4efac..c6daed23c 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2,7 +2,7 @@ import asyncio import os -from collections.abc import Callable, MutableSequence +from collections.abc import Callable, Iterable, MutableSequence from typing import cast import pytest @@ -93,7 +93,7 @@ def service(context: StreamFlowContext, deployment: str) -> str | None: @pytest.mark.asyncio async def test_bind_volumes( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test the binding of volumes in stacked locations""" for deployment in ["docker", "local"]: @@ -155,7 +155,7 @@ async def test_bind_volumes( @pytest.mark.asyncio async def test_binding_filter( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test Binding Filter using a job with two targets both free. With the CustomBindingFilter the scheduling will choose the second target""" for deployment in ["docker", "local"]: @@ -280,7 +280,7 @@ def test_hardware() -> None: @pytest.mark.asyncio async def test_multi_env( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test scheduling two jobs on two different environments.""" for deployment in ["docker", "local"]: @@ -335,7 +335,7 @@ async def test_multi_env( @pytest.mark.asyncio async def test_multi_targets_one_job( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """Test scheduling one jobs with two targets: Local and Docker Image. The job will be scheduled in the first""" for deployment in ["docker", "local"]: @@ -392,7 +392,7 @@ async def test_multi_targets_one_job( @pytest.mark.asyncio async def test_multi_targets_two_jobs( - chosen_deployment_types: MutableSequence[str], context: StreamFlowContext + chosen_deployment_types: Iterable[str], context: StreamFlowContext ) -> None: """ Test scheduling two jobs with two same targets: Local and Docker Image. diff --git a/tests/test_schema.py b/tests/test_schema.py index 9d76170a1..00b9a67ad 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -8,9 +8,10 @@ from streamflow.config.validator import SfValidator from streamflow.core.exception import WorkflowDefinitionException from streamflow.main import build_context -from streamflow.persistence import SqliteDatabase -from streamflow.recovery import DefaultCheckpointManager, DefaultFailureManager -from streamflow.scheduling import DefaultScheduler +from streamflow.persistence.sqlite import SqliteDatabase +from streamflow.recovery.checkpoint_manager import DefaultCheckpointManager +from streamflow.recovery.failure_manager import DefaultFailureManager +from streamflow.scheduling.scheduler import DefaultScheduler from tests.utils.data import CustomDataManager from tests.utils.deployment import CustomDeploymentManager from tests.utils.utils import InjectPlugin diff --git a/tests/test_translator.py b/tests/test_translator.py index c128913a8..60f369e9d 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -5,7 +5,7 @@ import posixpath import random import tempfile -from collections.abc import MutableMapping, MutableSequence +from collections.abc import Iterable, MutableMapping from pathlib import PurePosixPath from typing import Any, cast @@ -22,7 +22,7 @@ from streamflow.core.deployment import Target from streamflow.core.exception import WorkflowDefinitionException from streamflow.core.utils import compare_tags -from streamflow.core.workflow import Token +from streamflow.core.workflow import Token, Workflow from streamflow.cwl.runner import main from streamflow.cwl.step import CWLTransferStep from streamflow.cwl.token import CWLFileToken @@ -65,7 +65,7 @@ def _get_streamflow_config() -> MutableMapping[str, Any]: } -def _get_workflow_config(streamflow_config) -> WorkflowConfig: +def _get_workflow_config(streamflow_config: MutableMapping[str, Any]) -> WorkflowConfig: SfValidator().validate(streamflow_config) return WorkflowConfig( list(streamflow_config["workflows"].keys())[0], streamflow_config @@ -78,7 +78,7 @@ def _get_workflow_config(streamflow_config) -> WorkflowConfig: itertools.product(("File", "Directory"), ("literal", "concrete")), ) async def test_inject_remote_input( - chosen_deployment_types: MutableSequence[str], + chosen_deployment_types: Iterable[str], context: StreamFlowContext, file_kind: str, file_type: str, @@ -268,6 +268,7 @@ async def test_inject_remote_input( JobPort, injector_schedule_step.get_output_port("__job__"), ).get_job(port_name) + assert job is not None # Check output tokens of input injector step output_tokens = input_injector_step.get_output_port(port_name).token_list @@ -374,7 +375,7 @@ async def test_gather_order(context: StreamFlowContext) -> None: value_type = "primitive" output_name = "test_out" workflow, (input_port, output_port) = await create_workflow( - context, type_="default", num_port=2 + context, type_=Workflow, num_port=2 ) translator = RecoveryTranslator(workflow) translator.deployment_configs = { diff --git a/tests/utils/connector.py b/tests/utils/connector.py index 438ac3b38..e5a8034e1 100644 --- a/tests/utils/connector.py +++ b/tests/utils/connector.py @@ -14,10 +14,11 @@ from streamflow.core.data import StreamWrapper from streamflow.core.deployment import Connector, ExecutionLocation from streamflow.core.scheduling import AvailableLocation, Hardware -from streamflow.deployment.connector import LocalConnector, SSHConnector from streamflow.deployment.connector.base import BaseConnector +from streamflow.deployment.connector.local import LocalConnector from streamflow.deployment.connector.ssh import ( SSHConfig, + SSHConnector, SSHContext, get_param_from_file, parse_hostname, @@ -253,7 +254,7 @@ async def run( self, location: ExecutionLocation, command: MutableSequence[str], - environment: MutableMapping[str, str] = None, + environment: MutableMapping[str, str] | None = None, workdir: str | None = None, stdin: int | str | None = None, stdout: int | str = asyncio.subprocess.STDOUT, diff --git a/tests/utils/utils.py b/tests/utils/utils.py index df378f1a8..7ef75daab 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -241,7 +241,7 @@ async def duplicate_elements( async def inject_tokens( - token_list: MutableSequence[Token], + token_list: MutableSequence[T], in_port: Port, context: StreamFlowContext, save_input_token: bool = True, @@ -275,9 +275,9 @@ async def verify_dependency_tokens( token: Token, port: Port, context: StreamFlowContext, - expected_depender: MutableSequence[Token] | None = None, - expected_dependee: MutableSequence[Token] | None = None, - alternative_expected_dependee: MutableSequence[Token] | None = None, + expected_depender: MutableSequence[T] | None = None, + expected_dependee: MutableSequence[T] | None = None, + alternative_expected_dependee: MutableSequence[T] | None = None, ) -> None: loading_context = DefaultDatabaseLoadingContext() expected_depender = expected_depender or [] diff --git a/tests/utils/workflow.py b/tests/utils/workflow.py index 56c58dbcc..81a7cf0c1 100644 --- a/tests/utils/workflow.py +++ b/tests/utils/workflow.py @@ -6,8 +6,9 @@ import posixpath from collections.abc import Iterable, MutableMapping, MutableSequence from pathlib import PurePath -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, TypeVar, cast +import cwl_utils.types from typing_extensions import Self from streamflow.core import utils @@ -37,7 +38,7 @@ Token, Workflow, ) -from streamflow.cwl.utils import get_token_class, search_in_parent_locations +from streamflow.cwl.utils import search_in_parent_locations from streamflow.cwl.workflow import CWLWorkflow from streamflow.data.remotepath import StreamFlowPath from streamflow.deployment.utils import get_path_processor @@ -68,6 +69,8 @@ if TYPE_CHECKING: from streamflow.core.context import StreamFlowContext + W = TypeVar("W", bound=Workflow) + CWL_VERSION = "v1.2" @@ -197,10 +200,11 @@ def create_schedule_step( async def create_workflow( context: StreamFlowContext, num_port: int = 2, - type_: str = "cwl", + type_: type[W] = CWLWorkflow, save: bool = True, ) -> tuple[Workflow, tuple[Port, ...]]: - if type_ == "cwl": + workflow: Workflow + if type_ == CWLWorkflow: workflow = CWLWorkflow( context=context, name=utils.random_name(), @@ -336,7 +340,7 @@ async def build_token( ), ) case MutableMapping(): - if get_token_class(token_value) in ["File", "Directory"]: + if cwl_utils.types.is_file_or_directory(token_value): connector = context.scheduler.get_connector(job.name) locations = context.scheduler.get_locations(job.name) relpath = ( @@ -910,7 +914,7 @@ def _get_deploy_step(self, deployment_name: str) -> DeployStep: def _get_schedule_step( self, cls: type[ScheduleStep], - binding_config: BindingConfig, + binding_config: BindingConfig | None, deployment_names: MutableSequence[str], step_name: str, workflow: Workflow, diff --git a/uv.lock b/uv.lock index c5fd71969..c5d59cc14 100644 --- a/uv.lock +++ b/uv.lock @@ -810,8 +810,8 @@ wheels = [ [[package]] name = "cwl-utils" -version = "0.40" -source = { registry = "https://pypi.org/simple" } +version = "0.41" +source = { git = "https://github.com/common-workflow-language/cwl-utils.git?rev=main#c39b6e6daba9cd681615417c8847888ec224647f" } dependencies = [ { name = "cwl-upgrader" }, { name = "packaging" }, @@ -820,10 +820,6 @@ dependencies = [ { name = "ruamel-yaml" }, { name = "schema-salad" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/7f/00/80a18397dd81fc39ad5b61af227becebc9d537bcb679d14731023b0bee4a/cwl_utils-0.40.tar.gz", hash = "sha256:fb836fe71617e10cfefb74cfcb2ab7b4a6e36b36cebf2f04b3fb43e15bc74751", size = 365488, upload-time = "2025-09-09T20:53:57.952Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/63/4b/ccab2a5ca9e0b6553810b85c06387e60fc9443cec3c987e3a062705bd225/cwl_utils-0.40-py3-none-any.whl", hash = "sha256:f6688cd3b78b826af2aa5518b31d8d7ba784914d0a7a5784266c615093e1e94b", size = 437060, upload-time = "2025-09-09T20:53:56.035Z" }, -] [[package]] name = "cwltest" @@ -1813,11 +1809,11 @@ wheels = [ [[package]] name = "narwhals" -version = "2.13.0" +version = "2.14.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/89/ea/f82ef99ced4d03c33bb314c9b84a08a0a86c448aaa11ffd6256b99538aa5/narwhals-2.13.0.tar.gz", hash = "sha256:ee94c97f4cf7cfeebbeca8d274784df8b3d7fd3f955ce418af998d405576fdd9", size = 594555, upload-time = "2025-12-01T13:54:05.329Z" } +sdist = { url = "https://files.pythonhosted.org/packages/4a/84/897fe7b6406d436ef312e57e5a1a13b4a5e7e36d1844e8d934ce8880e3d3/narwhals-2.14.0.tar.gz", hash = "sha256:98be155c3599db4d5c211e565c3190c398c87e7bf5b3cdb157dece67641946e0", size = 600648, upload-time = "2025-12-16T11:29:13.458Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/87/0d/1861d1599571974b15b025e12b142d8e6b42ad66c8a07a89cb0fc21f1e03/narwhals-2.13.0-py3-none-any.whl", hash = "sha256:9b795523c179ca78204e3be53726da374168f906e38de2ff174c2363baaaf481", size = 426407, upload-time = "2025-12-01T13:54:03.861Z" }, + { url = "https://files.pythonhosted.org/packages/79/3e/b8ecc67e178919671695f64374a7ba916cf0adbf86efedc6054f38b5b8ae/narwhals-2.14.0-py3-none-any.whl", hash = "sha256:b56796c9a00179bd757d15282c540024e1d5c910b19b8c9944d836566c030acf", size = 430788, upload-time = "2025-12-16T11:29:11.699Z" }, ] [[package]] @@ -3255,7 +3251,7 @@ requires-dist = [ { name = "asyncssh", specifier = "==2.22.0" }, { name = "bcrypt", specifier = "==5.0.0" }, { name = "cachetools", specifier = "==6.2.6" }, - { name = "cwl-utils", specifier = "==0.40" }, + { name = "cwl-utils", git = "https://github.com/common-workflow-language/cwl-utils.git?rev=main" }, { name = "importlib-metadata", specifier = "==8.7.1" }, { name = "jinja2", specifier = "==3.1.6" }, { name = "jsonschema", specifier = "==4.26.0" }, @@ -3455,11 +3451,11 @@ wheels = [ [[package]] name = "tzdata" -version = "2025.2" +version = "2025.3" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/95/32/1a225d6164441be760d75c2c42e2780dc0873fe382da3e98a2e1e48361e5/tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9", size = 196380, upload-time = "2025-03-23T13:54:43.652Z" } +sdist = { url = "https://files.pythonhosted.org/packages/5e/a7/c202b344c5ca7daf398f3b8a477eeb205cf3b6f32e7ec3a6bac0629ca975/tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7", size = 196772, upload-time = "2025-12-13T17:45:35.667Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, + { url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" }, ] [[package]]