Skip to content

Add OpenPBS tests to the CI/CD#873

Draft
GlassOfWhiskey wants to merge 1 commit intomasterfrom
add-pbs-tests
Draft

Add OpenPBS tests to the CI/CD#873
GlassOfWhiskey wants to merge 1 commit intomasterfrom
add-pbs-tests

Conversation

@GlassOfWhiskey
Copy link
Copy Markdown
Member

This commit adds a Docker-based OpenPBS cluster to the set of target architectures for CI tests, ensuring that the PBSConnector is properly tested.

@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 2 times, most recently from 7021dbc to b84fa6a Compare November 20, 2025 21:02
@codecov
Copy link
Copy Markdown

codecov bot commented Nov 20, 2025

❌ 4 Tests Failed:

Tests completed Failed Passed Skipped
996 4 992 7
View the top 3 failed test(s) by shortest run time
tests/test_connector.py::test_connector_run_command_fails[openpbs]
Stack Traces | 0.063s run time
curr_connector = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x14d16cc20>
curr_location = <streamflow.core.deployment.ExecutionLocation object at 0x14fac8c80>

    @pytest.mark.asyncio
    async def test_connector_run_command_fails(
        curr_connector: Connector, curr_location: ExecutionLocation
    ) -> None:
        """Test connector run method on a job with an invalid command"""
>       _, returncode = await curr_connector.run(
            curr_location, ["ls -2"], capture_output=True, job_name="job_test"
        )

tests/test_connector.py:95: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../deployment/connector/queue_manager.py:602: in run
    job_id = await self._run_batch_command(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x14d16cc20>
command = '#!/bin/sh\n\nls -2 2>&1', environment = {}, job_name = 'job_test'
location = <streamflow.core.deployment.ExecutionLocation object at 0x14fac8c80>
workdir = None, stdin = None, stdout = 'qsub: Bad UID for job execution'
stderr = -2, timeout = None

    async def _run_batch_command(
        self,
        command: str,
        environment: MutableMapping[str, str] | None,
        job_name: str,
        location: ExecutionLocation,
        workdir: str | None = None,
        stdin: int | str | None = None,
        stdout: int | str = asyncio.subprocess.STDOUT,
        stderr: int | str = asyncio.subprocess.STDOUT,
        timeout: int | None = None,
    ) -> str:
        batch_command = ["sh", "-c"]
        if workdir is not None:
            batch_command.extend(["cd", workdir, "&&"])
        resources = (
            {"walltime": utils.format_seconds_to_hhmmss(timeout)} if timeout else {}
        )
        batch_command.extend(
            [
                "echo",
                base64.b64encode(command.encode("utf-8")).decode("utf-8"),
                "|",
                "base64",
                "-d",
                "|",
            ]
        )
        if environment:
            batch_command.extend([f"{k}={v}" for k, v in environment.items()])
        batch_command.extend(
            [
                "qsub",
                get_option(
                    "o",
                    (
                        stdout
                        if stdout != asyncio.subprocess.STDOUT
                        else utils.random_name()
                    ),
                ),
            ]
        )
        if stdin is not None and stdin != asyncio.subprocess.DEVNULL:
            batch_command.append(get_option("i", stdin))
        if stderr != asyncio.subprocess.STDOUT and stderr != stdout:
            batch_command.append(get_option("e", self._format_stream(stderr)))
        if stderr == stdout:
            batch_command.append(get_option("j", "oe"))
        if service := cast(PBSService, self.services.get(location.service)):
            batch_command.extend(
                [
                    get_option("a", service.begin),
                    get_option("A", service.account),
                    get_option("c", service.checkpoint),
                    get_option("C", service.prefix),
                    get_option("m", service.mailOptions),
                    get_option("N", service.jobName),
                    get_option("p", service.priority),
                    get_option("q", service.destination),
                    get_option("r", "y" if service.rerunnable else "n"),
                    get_option("S", service.shellList),
                    get_option("u", service.userList),
                    get_option("v", service.variableList),
                    get_option("V", service.exportAllVariables),
                    get_option(
                        "W",
                        (
                            ",".join(
                                [
                                    f"{k}={v}"
                                    for k, v in service.additionalAttributes.items()
                                ]
                            )
                            if service.additionalAttributes
                            else None
                        ),
                    ),
                ]
            )
        if resources := cast(dict[str, str], service.resources) | resources:
            batch_command.append(
                get_option("l", ",".join([f"{k}={v}" for k, v in resources.items()]))
            )
        batch_command.append("-")
        stdout, returncode = await super().run(
            location=location, command=batch_command, capture_output=True
        )
        if returncode == 0:
            return stdout.strip()
        else:
>           raise WorkflowExecutionException(
                f"Error submitting job {job_name} to PBS: {stdout.strip()}"
            )
E           streamflow.core.exception.WorkflowExecutionException: Error submitting job job_test to PBS: qsub: Bad UID for job execution

.../deployment/connector/queue_manager.py:1077: WorkflowExecutionException
cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/conformance_tests.cwltest.yaml::conformance_tests::cwltest::yaml::timelimit_invalid_wf
Stack Traces | 6.36s run time
CWL test execution failed. 
expected: {
    "o": "time passed"
}
got: {}
caused by: failed comparison for key 'o': expected: "time passed"
got: null
Test: job: 
  file:.../streamflow/streamflow/cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/tests/empty.json
output:
  o: time passed
tool: 
  file:.../streamflow/streamflow/cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/tests/timelimit2-wf.cwl
label: timelimit_invalid_wf
doc: Test that workflow level time limit is not applied to workflow execution 
  time
tags:
- workflow
- timelimit
- inline_javascript
id: 220
line: '2840'
cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/conformance_tests.cwltest.yaml::conformance_tests::cwltest::yaml::timelimit_zero_unlimited_wf
Stack Traces | 10.2s run time
CWL test execution failed. 
expected: {
    "o": "time passed"
}
got: {}
caused by: failed comparison for key 'o': expected: "time passed"
got: null
Test: job: 
  file:.../streamflow/streamflow/cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/tests/empty.json
output:
  o: time passed
tool: 
  file:.../streamflow/streamflow/cwl-v1.1-6397014050177074c9ccd0d771577f7fa9f728a3/tests/timelimit3-wf.cwl
label: timelimit_zero_unlimited_wf
doc: Test zero timelimit means no limit in workflow
tags:
- workflow
- timelimit
- inline_javascript
id: 221
line: '2849'
tests/test_connector.py::test_connector_run_command[openpbs]
Stack Traces | 139s run time
context = <streamflow.core.context.StreamFlowContext object at 0x14d92f770>
curr_connector = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x14d16cc20>
curr_location = <streamflow.core.deployment.ExecutionLocation object at 0x14fac8c80>

    @pytest.mark.asyncio
    async def test_connector_run_command(
        context: StreamFlowContext,
        curr_connector: Connector,
        curr_location: ExecutionLocation,
    ) -> None:
        """Test connector run method"""
>       _, returncode = await curr_connector.run(
            location=curr_location, command=["ls"], capture_output=True, job_name="job_test"
        )

tests/test_connector.py:84: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../deployment/connector/queue_manager.py:602: in run
    job_id = await self._run_batch_command(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <streamflow.deployment.connector.queue_manager.PBSConnector object at 0x14d16cc20>
command = '#!/bin/sh\n\nls 2>&1', environment = {}, job_name = 'job_test'
location = <streamflow.core.deployment.ExecutionLocation object at 0x14fac8c80>
workdir = None, stdin = None, stdout = 'qsub: Bad UID for job execution'
stderr = -2, timeout = None

    async def _run_batch_command(
        self,
        command: str,
        environment: MutableMapping[str, str] | None,
        job_name: str,
        location: ExecutionLocation,
        workdir: str | None = None,
        stdin: int | str | None = None,
        stdout: int | str = asyncio.subprocess.STDOUT,
        stderr: int | str = asyncio.subprocess.STDOUT,
        timeout: int | None = None,
    ) -> str:
        batch_command = ["sh", "-c"]
        if workdir is not None:
            batch_command.extend(["cd", workdir, "&&"])
        resources = (
            {"walltime": utils.format_seconds_to_hhmmss(timeout)} if timeout else {}
        )
        batch_command.extend(
            [
                "echo",
                base64.b64encode(command.encode("utf-8")).decode("utf-8"),
                "|",
                "base64",
                "-d",
                "|",
            ]
        )
        if environment:
            batch_command.extend([f"{k}={v}" for k, v in environment.items()])
        batch_command.extend(
            [
                "qsub",
                get_option(
                    "o",
                    (
                        stdout
                        if stdout != asyncio.subprocess.STDOUT
                        else utils.random_name()
                    ),
                ),
            ]
        )
        if stdin is not None and stdin != asyncio.subprocess.DEVNULL:
            batch_command.append(get_option("i", stdin))
        if stderr != asyncio.subprocess.STDOUT and stderr != stdout:
            batch_command.append(get_option("e", self._format_stream(stderr)))
        if stderr == stdout:
            batch_command.append(get_option("j", "oe"))
        if service := cast(PBSService, self.services.get(location.service)):
            batch_command.extend(
                [
                    get_option("a", service.begin),
                    get_option("A", service.account),
                    get_option("c", service.checkpoint),
                    get_option("C", service.prefix),
                    get_option("m", service.mailOptions),
                    get_option("N", service.jobName),
                    get_option("p", service.priority),
                    get_option("q", service.destination),
                    get_option("r", "y" if service.rerunnable else "n"),
                    get_option("S", service.shellList),
                    get_option("u", service.userList),
                    get_option("v", service.variableList),
                    get_option("V", service.exportAllVariables),
                    get_option(
                        "W",
                        (
                            ",".join(
                                [
                                    f"{k}={v}"
                                    for k, v in service.additionalAttributes.items()
                                ]
                            )
                            if service.additionalAttributes
                            else None
                        ),
                    ),
                ]
            )
        if resources := cast(dict[str, str], service.resources) | resources:
            batch_command.append(
                get_option("l", ",".join([f"{k}={v}" for k, v in resources.items()]))
            )
        batch_command.append("-")
        stdout, returncode = await super().run(
            location=location, command=batch_command, capture_output=True
        )
        if returncode == 0:
            return stdout.strip()
        else:
>           raise WorkflowExecutionException(
                f"Error submitting job {job_name} to PBS: {stdout.strip()}"
            )
E           streamflow.core.exception.WorkflowExecutionException: Error submitting job job_test to PBS: qsub: Bad UID for job execution

.../deployment/connector/queue_manager.py:1077: WorkflowExecutionException

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 11 times, most recently from 6b012a3 to 834a042 Compare November 22, 2025 23:09
@GlassOfWhiskey GlassOfWhiskey force-pushed the master branch 7 times, most recently from b21f362 to e7820f1 Compare January 12, 2026 08:25
@GlassOfWhiskey GlassOfWhiskey force-pushed the master branch 4 times, most recently from 8c05dac to 4a6edc9 Compare January 24, 2026 13:16
@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 5 times, most recently from 9e0a0f2 to cd89f18 Compare March 24, 2026 19:25
@GlassOfWhiskey GlassOfWhiskey force-pushed the add-pbs-tests branch 2 times, most recently from 6dbdc9d to 882961f Compare March 24, 2026 21:17
This commit adds a Docker-based OpenPBS cluster to the set of target
architectures for CI tests, ensuring that the `PBSConnector` is properly
tested.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant