diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 41394c03c..afe14c42b 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -231,6 +231,8 @@ jobs: include: - on: "macos-15-intel" python: "3.14" + - on: "windows-2025" + python: "3.14" runs-on: ${{ matrix.on }} env: TOXENV: ${{ format('py{0}-unit', matrix.python) }} @@ -272,10 +274,62 @@ jobs: kubectl apply -f https://docs.projectcalico.org/v3.25/manifests/calico.yaml kubectl -n kube-system set env daemonset/calico-node FELIX_IGNORELOOSERPF=true if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install WSL (Windows)" + uses: Vampire/setup-wsl@v6 + with: + distribution: Ubuntu-24.04 + wsl-conf: | + [automount] + root = / + options = "metadata" + + [boot] + systemd=true + wsl-version: 2 + if: ${{ startsWith(matrix.on, 'windows-') }} + - name: "Install Docker on WSL (Windows)" + shell: wsl-bash {0} + run: | + install -m 0755 -d /etc/apt/keyrings \ + && curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc \ + && chmod a+r /etc/apt/keyrings/docker.asc + + cat > /etc/apt/sources.list.d/docker.sources < /etc/systemd/system/docker.service.d/override.conf << EOF + [Service] + ExecStart= + ExecStart=/usr/bin/dockerd -H unix:///var/run/docker.sock -H tcp://0.0.0.0:2375 --containerd=/run/containerd/containerd.sock + EOF + + systemctl daemon-reload \ + && systemctl restart docker \ + && systemctl status docker \ + && docker run hello-world + if: ${{ startsWith(matrix.on, 'windows-') }} + - name: "Configure Docker to use WSL2 (Windows)" + run: | + docker context create wsl --docker ("host=tcp://$((wsl hostname -I).Split()[0].Trim()):2375") + docker context use wsl + docker info + docker run hello-world + if: ${{ startsWith(matrix.on, 'windows-') }} - name: "Install Tox" run: uv tool install tox --with tox-uv - name: "Run StreamFlow tests via Tox" - run: tox + run: | + docker info + tox - name: "Upload test results" if: ${{ !cancelled() }} uses: codecov/codecov-action@v5 diff --git a/cwl-conformance-test.bat b/cwl-conformance-test.bat new file mode 100644 index 000000000..fa820321a --- /dev/null +++ b/cwl-conformance-test.bat @@ -0,0 +1,103 @@ +echo off + +rem Version of the standard to test against +rem Current options: v1.0, v1.1, v1.2, and v1.3 +if "%VERSION%"=="" set "VERSION=v1.2" + +rem Which commit of the standard's repo to use +rem Defaults to the last commit of the main branch +if "%COMMIT%"=="" set "COMMIT=main" + +rem Comma-separated list of test names that should be excluded from execution +rem Defaults to "docker_entrypoint, inplace_update_on_file_content" +if "%EXCLUDE%"=="" set "EXCLUDE=docker_entrypoint,modify_file_content" + +rem Name of the CWLDockerTranslator plugin to use for test execution +rem This parameter allows to test automatic CWL requirements translators +if "%DOCKER%"=="" set "DOCKER=docker" + +rem Additional arguments for the pytest command +rem Defaults to none +rem set "PYTEST_EXTRA=" + +rem The directory where this script resides +set "SCRIPT_DIRECTORY=%~dp0" +set "SCRIPT_DIRECTORY=%SCRIPT_DIRECTORY:~0,-1%" + +rem Download archive from GitHub +if "%VERSION%"=="v1.0" ( + set "REPO=common-workflow-language" +) else ( + set "REPO=cwl-%VERSION%" +) + +if not exist "%REPO%-%COMMIT%" ( + if not exist "%COMMIT%.tar.gz" ( + echo Downloading %REPO% @ %COMMIT%... + pwsh -Command "Invoke-WebRequest -Uri https://github.com/common-workflow-language/%REPO%/archive/%COMMIT%.tar.gz -OutFile %COMMIT%.tar.gz" + ) + tar -xzf "%COMMIT%.tar.gz" +) + +rem Setup environment +call :venv cwl-conformance-venv +python -m pip install -U setuptools wheel pip +python -m pip install -r "%SCRIPT_DIRECTORY%\requirements.txt" +python -m pip install -r "%SCRIPT_DIRECTORY%\test-requirements.txt" +if "%VERSION%"=="v1.3" ( + python -m pip uninstall -y cwl-utils + python -m pip install git+https://github.com/common-workflow-language/cwl-utils.git@refs/pull/370/head +) + +rem Set conformance test filename +if "%VERSION%"=="v1.0" ( + set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\%VERSION%\conformance_test_v1.0.yaml" +) else ( + set "CONFORMANCE_TEST=%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%\conformance_tests.yaml" +) +move "%CONFORMANCE_TEST%" "%CONFORMANCE_TEST:.yaml=.cwltest.yaml%" +set "CONFORMANCE_TEST=%CONFORMANCE_TEST:.yaml=.cwltest.yaml%" + +rem Build command +set "TEST_COMMAND=python -m pytest "%CONFORMANCE_TEST%" -n auto -rs" +if not "%EXCLUDE%"=="" ( + set "TEST_COMMAND=%TEST_COMMAND% --cwl-exclude %EXCLUDE%" +) +set "TEST_COMMAND=%TEST_COMMAND% --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= %PYTEST_EXTRA%" + +rem Cleanup coverage +if exist "%SCRIPT_DIRECTORY%\.coverage" del "%SCRIPT_DIRECTORY%\.coverage" +if exist "%SCRIPT_DIRECTORY%\coverage.xml" del "%SCRIPT_DIRECTORY%\coverage.xml" +if exist "%SCRIPT_DIRECTORY%\junit.xml" del "%SCRIPT_DIRECTORY%\junit.xml" + +rem Run test +copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\conftest.py" "%~dpn1" +copy "%SCRIPT_DIRECTORY%\tests\cwl-conformance\streamflow-%DOCKER%.yml" "%~dpn1\streamflow.yml" +cmd /c "%TEST_COMMAND%" +set "RETURN_CODE=%ERRORLEVEL%" + +rem Coverage report +if "%RETURN_CODE%"=="0" ( + coverage report + coverage xml +) + +rem Cleanup +rd /s /q "%SCRIPT_DIRECTORY%\%REPO%-%COMMIT%" +rd /s /q "%SCRIPT_DIRECTORY%\cwl-conformance-venv" +del "%COMMIT%.tar.gz" 2>nul + +rem Exit +exit /b %RETURN_CODE% +goto :eof + +:venv +if not exist "%~1" ( + where virtualenv >nul 2>&1 && ( + virtualenv -p python "%~1" + ) || ( + python -m venv "%~1" + ) +) +call "%~1\Scripts\activate.bat" +goto :eof \ No newline at end of file diff --git a/streamflow/deployment/aiotarstream.py b/streamflow/deployment/aiotarstream.py index a4277e555..90909ab37 100644 --- a/streamflow/deployment/aiotarstream.py +++ b/streamflow/deployment/aiotarstream.py @@ -1,9 +1,7 @@ from __future__ import annotations import copy -import grp import os -import pwd import re import shutil import stat @@ -21,6 +19,15 @@ from streamflow.core.data import StreamWrapper from streamflow.deployment.stream import BaseStreamWrapper +try: + import grp +except ImportError: + grp = None +try: + import pwd +except ImportError: + pwd = None + if TYPE_CHECKING: StrOrBytesPath: TypeAlias = str | bytes | os.PathLike[str] | os.PathLike[bytes] diff --git a/streamflow/deployment/connector/container.py b/streamflow/deployment/connector/container.py index 751fb8efc..b8ae3d46d 100644 --- a/streamflow/deployment/connector/container.py +++ b/streamflow/deployment/connector/container.py @@ -7,6 +7,7 @@ import os import posixpath import shlex +import sys from abc import ABC, abstractmethod from collections.abc import MutableMapping, MutableSequence from importlib.resources import files @@ -728,7 +729,11 @@ async def _populate_instance(self, name: str) -> None: ) # Check if the container user is the current host user if self._wraps_local(): - host_user = os.getuid() + if sys.platform == "win32": + # Windows does not support the `getuid` function + host_user = -1 + else: + host_user = os.getuid() else: stdout, returncode = await self.connector.run( location=self._get_inner_location(), diff --git a/streamflow/deployment/connector/local.py b/streamflow/deployment/connector/local.py index f1bf86c6c..1e3b75c7c 100644 --- a/streamflow/deployment/connector/local.py +++ b/streamflow/deployment/connector/local.py @@ -28,8 +28,15 @@ def _local_copy(src: str, dst: str, read_only: bool) -> None: try: os.symlink(src, dst, target_is_directory=os.path.isdir(src)) except OSError as e: - if not e.errno == errno.EEXIST: - raise + if e.errno != errno.EEXIST: + if sys.platform == "win32" and e.errno == errno.EINVAL: + if logger.isEnabledFor(logging.WARNING): + logger.warning( + f"Unable to create a symbolic link from {src} to {dst}: {e.strerror}" + ) + shutil.copy(src, dst) + else: + raise else: if os.path.isdir(src): os.makedirs(dst, exist_ok=True) @@ -164,6 +171,7 @@ async def run( timeout: int | None = None, job_name: str | None = None, ) -> tuple[str, int] | None: + # Create command command = utils.create_command( self.__class__.__name__, command, diff --git a/streamflow/log_handler.py b/streamflow/log_handler.py index 65d1fd42c..8302d1a69 100644 --- a/streamflow/log_handler.py +++ b/streamflow/log_handler.py @@ -126,5 +126,5 @@ def highlight(self, msg: str | Any) -> str: ) defaultStreamHandler.setFormatter(formatter) logger.addHandler(defaultStreamHandler) -logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) logger.propagate = False diff --git a/tests/test_cwl_execution.py b/tests/test_cwl_execution.py index 7fc6dabf0..d4a48d8d8 100644 --- a/tests/test_cwl_execution.py +++ b/tests/test_cwl_execution.py @@ -121,63 +121,65 @@ async def test_initial_workdir( # Create input data path = None try: - if file_type == "file": - path = StreamFlowPath( - "/tmp", "test.txt", context=context, location=execution_location - ) - await path.write_text("Hello world") - path = await path.resolve() - file_token = CWLFileToken( - { - "class": "File", - "path": str(path), - } - ) - initial_paths = [str(path)] - elif file_type in ("dir", "listing"): - path = StreamFlowPath( - "/tmp", "land", context=context, location=execution_location - ) - await path.mkdir() - path = await path.resolve() - await (path / "lvl1").mkdir() - await (path / "lvl2").mkdir() - await (path / "lvl1" / "text.txt").write_text("Hello dir") - file_token = CWLFileToken( - { - "class": "Directory", - "path": str(path), - "listing": [ - { - "class": "Directory", - "path": str(path / "lvl1"), - "listing": [ - { - "class": "File", - "path": str(path / "lvl1" / "text.txt"), - } - ], - }, - {"class": "Directory", "path": str(path / "lvl2")}, - ], - } - ) - initial_paths = ( - [str(path)] - if file_type == "dir" - else [str(path / "lvl1"), str(path / "lvl2")] - ) - else: - raise NotImplementedError(f"Unsupported file type: {file_type}") + match file_type: + case "file": + path = StreamFlowPath( + "/tmp", "test.txt", context=context, location=execution_location + ) + await path.write_text("Hello world") + path = await path.resolve() + file_token = CWLFileToken( + { + "class": "File", + "path": str(path), + } + ) + initial_paths = [str(path)] + case "dir" | "listing": + path = StreamFlowPath( + "/tmp", "land", context=context, location=execution_location + ) + await path.mkdir() + path = await path.resolve() + await (path / "lvl1").mkdir() + await (path / "lvl2").mkdir() + await (path / "lvl1" / "text.txt").write_text("Hello dir") + file_token = CWLFileToken( + { + "class": "Directory", + "path": str(path), + "listing": [ + { + "class": "Directory", + "path": str(path / "lvl1"), + "listing": [ + { + "class": "File", + "path": str(path / "lvl1" / "text.txt"), + } + ], + }, + {"class": "Directory", "path": str(path / "lvl2")}, + ], + } + ) + initial_paths = ( + [str(path)] + if file_type == "dir" + else [str(path / "lvl1"), str(path / "lvl2")] + ) + case _: + raise NotImplementedError(f"Unsupported file type: {file_type}") # Inject input token - if token_type == "file": - token_value = file_token - elif token_type == "list": - token_value = ListToken([file_token]) - elif token_type == "object": - token_value = ObjectToken({"a": file_token}) - else: - raise ValueError(f"Invalid token_type: {token_type}") + match token_type: + case "file": + token_value = file_token + case "list": + token_value = ListToken([file_token]) + case "object": + token_value = ObjectToken({"a": file_token}) + case _: + raise ValueError(f"Invalid token_type: {token_type}") await inject_tokens([token_value], in_port, context) # Execute workflow await workflow.save(context) diff --git a/tests/test_cwl_loop.py b/tests/test_cwl_loop.py deleted file mode 100644 index d24fa4784..000000000 --- a/tests/test_cwl_loop.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Test the CWL Loop extension. - -From https://github.com/common-workflow-language/cwltool/blob/6f0e1d941a61063828aa074bcbaae55bedf05167/tests/test_loop.py). -""" - -import json -from collections.abc import MutableMapping, MutableSequence - -from cwltool.tests.util import get_data -from pytest import CaptureFixture - -from streamflow.cwl.runner import main - - -def test_validate_loop() -> None: - """Affirm that a loop workflow validates.""" - params = [ - "--validate", - get_data("tests/loop-ext/single-var-loop.cwl"), - ] - assert main(params) == 0 - - -def test_validate_loop_fail_scatter() -> None: - """Affirm that a loop workflow does not validate if scatter and loop directives are on the same step.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-scatter.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_when() -> None: - """Affirm that a loop workflow does not validate if when and loop directives are on the same step.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-when.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_no_loop_when() -> None: - """Affirm that a loop workflow does not validate if no loopWhen directive is specified.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-no-loopWhen.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_on_workflow() -> None: - """Affirm that a workflow does not validate if it contains a Loop requirement.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-workflow.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_on_command_line_tool() -> None: - """Affirm that a CommandLineTool does not validate if it contains a Loop requirement.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-command-line-tool.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_on_expression_tool() -> None: - """Affirm that an ExpressionTool does not validate if it contains a Loop requirement.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-expression-tool.cwl"), - ] - assert main(params) == 1 - - -def test_validate_loop_fail_on_hint() -> None: - """Affirm that a loop workflow does not validate if it contains a Loop hint.""" - params = [ - "--validate", - get_data("tests/loop-ext/invalid-loop-hint.cwl"), - ] - assert main(params) == 1 - - -def test_loop_fail_non_boolean_loop_when() -> None: - """Affirm that a loop workflow fails if loopWhen directive returns a non-boolean value.""" - params = [ - get_data("tests/loop-ext/invalid-non-boolean-loopWhen.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - assert main(params) == 1 - - -def test_loop_single_variable(capsys: CaptureFixture[str]) -> None: - """Test a simple loop case with a single variable.""" - params = [ - get_data("tests/loop-ext/single-var-loop.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected = {"o1": 10} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_single_variable_no_iteration(capsys: CaptureFixture[str]) -> None: - """Test a simple loop case with a single variable and a false loopWhen condition.""" - params = [ - get_data("tests/loop-ext/single-var-loop-no-iteration.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected = {"o1": None} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_two_variables(capsys: CaptureFixture[str]) -> None: - """Test a loop case with two variables, which are both back-propagated between iterations.""" - params = [ - get_data("tests/loop-ext/two-vars-loop.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - main(params) - expected = {"o1": 10} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_two_variables_single_backpropagation(capsys: CaptureFixture[str]) -> None: - """Test a loop case with two variables, but when only one of them is back-propagated between iterations.""" - params = [ - get_data("tests/loop-ext/two-vars-loop-2.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - main(params) - expected = {"o1": 10} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_with_all_output_method(capsys: CaptureFixture[str]) -> None: - """Test a loop case with outputMethod set to all.""" - params = [ - get_data("tests/loop-ext/all-output-loop.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected = {"o1": [2, 3, 4, 5, 6, 7, 8, 9, 10]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_with_all_output_method_no_iteration(capsys: CaptureFixture[str]) -> None: - """Test a loop case with outputMethod set to all and a false loopWhen condition.""" - params = [ - get_data("tests/loop-ext/all-output-loop-no-iteration.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected: MutableMapping[str, MutableSequence[int]] = {"o1": []} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_value_from(capsys: CaptureFixture[str]) -> None: - """Test a loop case with a variable generated by a valueFrom directive.""" - params = [ - get_data("tests/loop-ext/value-from-loop.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - main(params) - expected = {"o1": 10} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_loop_value_from_fail_no_requirement() -> None: - """Test that a workflow loop fails if a valueFrom directive is specified without StepInputExpressionRequirement.""" - params = [ - get_data("tests/loop-ext/invalid-value-from-loop-no-requirement.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - assert main(params) == 1 - - -def test_loop_inside_scatter(capsys: CaptureFixture[str]) -> None: - """Test a loop subworkflow inside a scatter step.""" - params = [ - get_data("tests/loop-ext/loop-inside-scatter.cwl"), - get_data("tests/loop-ext/loop-inside-scatter-job.yml"), - ] - main(params) - expected = {"o1": [10, 10, 10, 10, 10]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_scatter_inside_loop(capsys: CaptureFixture[str]) -> None: - """Test a loop workflow with inside a scatter step.""" - params = [ - get_data("tests/loop-ext/scatter-inside-loop.cwl"), - get_data("tests/loop-ext/loop-inside-scatter-job.yml"), - ] - main(params) - expected = {"o1": [10, 11, 12, 13, 14]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_nested_loops(capsys: CaptureFixture[str]) -> None: - """Test a workflow with two nested loops.""" - params = [ - get_data("tests/loop-ext/loop-inside-loop.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - main(params) - expected = {"o1": [2, 3, 4]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_nested_loops_all(capsys: CaptureFixture[str]) -> None: - """Test a workflow with two nested loops, both with outputMethod set to all.""" - params = [ - get_data("tests/loop-ext/loop-inside-loop-all.cwl"), - get_data("tests/loop-ext/two-vars-loop-job.yml"), - ] - main(params) - expected = {"o1": [[2], [2, 3], [2, 3, 4]]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_multi_source_loop_input(capsys: CaptureFixture[str]) -> None: - """Test a loop with two sources, which are selected through a pickValue directive.""" - params = [ - get_data("tests/loop-ext/multi-source-loop.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected = {"o1": [2, 3, 4, 5, 8, 11, 14, 17, 20]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected - - -def test_multi_source_loop_input_fail_no_requirement() -> None: - """Test that a loop with two sources fails without MultipleInputFeatureRequirement.""" - params = [ - get_data("tests/loop-ext/invalid-multi-source-loop-no-requirement.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - assert main(params) == 1 - - -def test_default_value_loop(capsys: CaptureFixture[str]) -> None: - """Test a loop whose source has a default value.""" - params = [ - get_data("tests/loop-ext/default-value-loop.cwl"), - get_data("tests/loop-ext/single-var-loop-job.yml"), - ] - main(params) - expected = {"o1": [8, 11, 14, 17, 20]} - captured = capsys.readouterr() - assert json.loads(captured.out) == expected diff --git a/tests/test_data_manager.py b/tests/test_data_manager.py index e592cb01d..2e7d1f6e6 100644 --- a/tests/test_data_manager.py +++ b/tests/test_data_manager.py @@ -80,7 +80,7 @@ async def test_data_locations( location=src_location, ) dst_path = StreamFlowPath( - tempfile.gettempdir() if src_location.local else "/tmp", + tempfile.gettempdir() if dst_location.local else "/tmp", utils.random_name(), context=context, location=dst_location, diff --git a/tests/test_transfer.py b/tests/test_transfer.py index c0431f73a..0c97fa571 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -149,7 +149,7 @@ async def test_file_to_entity( location=src_location, ) dst_path = StreamFlowPath( - tempfile.gettempdir() if src_location.local else "/tmp", + tempfile.gettempdir() if dst_location.local else "/tmp", utils.random_name(), context=context, location=dst_location, diff --git a/tests/test_translator.py b/tests/test_translator.py index c128913a8..158899c0b 100644 --- a/tests/test_translator.py +++ b/tests/test_translator.py @@ -12,7 +12,6 @@ import cwl_utils.parser import cwl_utils.parser.utils import pytest -from cwltool.tests.util import get_data from streamflow.config.config import WorkflowConfig from streamflow.config.validator import SfValidator @@ -512,12 +511,3 @@ async def test_workdir_inheritance() -> None: if binding_config.targets[3].deployment == "local" else posixpath.join("/tmp", "streamflow") ) - - -def test_dot_product_transformer_raises_error() -> None: - """Test DotProductSizeTransformer which must raise an exception because the size tokens have different values""" - params = [ - get_data("tests/wf/scatter-wf4.cwl"), - _create_file({"inp1": ["one", "two", "extra"], "inp2": ["three", "four"]}), - ] - assert main(params) == 1