From 09ee62869bd6396dfc0650b78fde9b7e20e087ad Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Mon, 2 Feb 2026 23:16:39 +0100 Subject: [PATCH 1/2] Add guidelines for StreamFlow agents in `AGENTS.md` This commit adds and `AGENTS.md` file to instruct AI agents when implementing new StreamFlow features. --- AGENTS.md | 245 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 AGENTS.md diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..f657e9b6c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,245 @@ +# StreamFlow Agent Guidelines + +This document provides essential guidelines for agentic coding agents working on the StreamFlow codebase. + +## Project Overview + +StreamFlow is a container-native Workflow Management System (WMS) written in Python 3 (versions 3.10-3.14). It implements the Common Workflow Language (CWL) standard (v1.0-v1.3) for multi-cloud/HPC hybrid workflow executions. + +**Key Architecture:** +- **Deployment** → **Service** → **Location** (hierarchical execution units) +- Supports multiple connectors: local, docker, kubernetes, ssh, slurm, pbs, singularity, etc. + +## Setup & Installation + +```bash +# Clone and install dependencies +git clone git@github.com:alpha-unito/streamflow.git +cd streamflow +uv sync --all-extras +``` + +## Essential Commands + +### Testing +```bash +# Run all tests +uv run make test + +# Run specific test file +uv run pytest tests/test_file.py + +# Run single test function +uv run pytest tests/test_file.py::test_function_name + +# Run tests with coverage +uv run make testcov + +# Test specific connectors only (all tested in CI) +uv run pytest --deploys local,docker tests/test_remotepath.py +``` + +**Requirements:** Docker (for most connector tests), Singularity/Apptainer, Kubernetes (minikube) + +### Linting & Formatting (REQUIRED BEFORE COMMIT) +```bash +# Check all (must pass before committing) +uv run make format-check flake8 codespell-check + +# Auto-fix formatting +uv run make format codespell + +# Apply pyupgrade for Python 3.10+ compatibility +uv run make pyupgrade +``` + +## Mandatory Agent Behavior + +All agents **MUST** adhere to these non-negotiable rules: + +### Package & Dependency Management (MANDATORY) + +**MUST** obtain explicit user permission before installing packages or updating dependencies. Specify what is being installed/updated, why, and await confirmation before proceeding. + +### Git Commit Requirements (MANDATORY) + +**MUST** follow this sequence before committing: + +1. Propose a commit message following the "Git Commit Message Guidelines" +2. Present it to the user and request confirmation +3. Allow user modifications +4. Proceed **only after** explicit user approval + +### Pre-Commit Checks (MANDATORY) + +**MUST** run `uv run make format-check flake8 codespell-check` and ensure all checks pass before committing. Fix any failures and re-run checks. **MUST NOT** commit if any checks fail. + +## Code Style Guidelines + +**Target:** Python 3.10-3.14 | **Line length:** 88 chars | **Format:** Black + isort | **Exclude:** `streamflow/cwl/antlr` + +### Import Organization +```python +from __future__ import annotations # Always first + +import asyncio +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self # Third-party + +from streamflow.core.context import StreamFlowContext # Local +from streamflow.log_handler import logger + +if TYPE_CHECKING: # Avoid circular imports + from streamflow.core.data import DataManager +``` + +### Type Hints & Async +```python +# Always use type hints +def process_data( + self, + config: MutableMapping[str, Any], + items: MutableSequence[str] +) -> dict[str, Any]: + pass + +# Use Self for classmethods +@classmethod +async def load(cls, context: StreamFlowContext) -> Self: + pass + +# Proper async cleanup +async def close(self) -> None: + try: + await asyncio.gather( + asyncio.create_task(self.manager.close()), + asyncio.create_task(self.scheduler.close()), + ) + except Exception as e: + logger.exception(e) + finally: + await self.database.close() +``` + +### Naming & Error Handling +- **Classes:** `PascalCase` | **Functions:** `snake_case` | **Constants:** `UPPER_SNAKE_CASE` +- **Private:** `_method_name` | **Type vars:** `_KT`, `_VT` + +```python +# Use custom exceptions from streamflow.core.exception +from streamflow.core.exception import WorkflowExecutionException +from streamflow.log_handler import logger + +try: + result = await process() +except SpecificException as e: + logger.exception(e) + raise WorkflowExecutionException(f"Failed: {e}") from e +``` + +**Available exceptions:** `ProcessorTypeError`, `WorkflowException`, `WorkflowDefinitionException`, `WorkflowExecutionException`, `WorkflowProvenanceException`, `FailureHandlingException`, `InvalidPluginException` + +### Documentation (American English, reStructuredText) +```python +def process_workflow(workflow: Workflow, config: dict[str, Any]) -> bool: + """ + Process a workflow with the given configuration. + + :param workflow: The workflow to process + :param config: Configuration dictionary for processing + :returns: True if processing succeeded, False otherwise + :raises WorkflowExecutionException: If workflow processing fails + """ + pass +``` + +### Testing (REQUIRED for new features/bugfixes) +```python +# Use pytest with async support +async def test_workflow_execution(context: StreamFlowContext) -> None: + """Test basic workflow execution.""" + workflow = await build_workflow(context) + result = await workflow.execute() + assert result.status == "completed" +``` + +**Coverage:** https://app.codecov.io/gh/alpha-unito/streamflow + +## Git Commit Message Guidelines + +**Format:** +``` +(): + + +``` + +**Types:** `Add`, `Fix`, `Refactor`, `Update`, `Remove`, `Bump`, `Docs`, `Test`, `Chore` + +**Rules:** +- **Subject:** Imperative mood, capitalize, no period, max 50 chars +- **Scope (optional):** Module/component (e.g., `cwl`, `deployment`, `scheduling`) +- **Body (required):** Explain *what* and *why* (not *how*), wrap at 72 chars, separate with blank line, include issue refs (e.g., `Fixes #123`). Exception: trivial changes like typo fixes. +- **Language:** American English + +**Examples:** +``` +Add restore method to DataManager + +Implement restore method to enable workflow recovery from checkpoints. +This allows jobs to resume from the last completed step. + +Fix SSH connector authentication timeout (Fixes #931) + +Increase default timeout for SSH authentication from 5s to 30s to handle +slow networks and high-latency connections. + +Bump kubernetes-asyncio from 33.3.0 to 34.3.3 +``` + +## Common Workflows + +**Adding a feature:** +1. Write tests first in `tests/` +2. Implement feature with type hints and docstrings +3. Run `uv run make format` to auto-format +4. Run `uv run make format-check flake8 codespell-check` +5. Run `uv run pytest` to verify tests pass +6. Update docs if needed +7. Commit with proper message + +**Fixing a bug:** +1. Add regression test in `tests/` +2. Fix the bug +3. Follow linting/formatting guidelines +4. Verify with tests +5. Commit with proper message + +## Key Project Structure + +``` +streamflow/ +├── core/ # Abstractions (context, deployment, exception, workflow) +├── cwl/ # CWL implementation (v1.0-v1.3) +├── deployment/ # Connectors (docker, k8s, ssh, slurm, pbs, singularity) +├── workflow/ # Workflow execution engine +├── data/ # Data management +├── persistence/ # Database (SQLite) +├── scheduling/ # Scheduling policies +├── recovery/ # Checkpointing/fault tolerance +└── ext/ # Plugin system +tests/ # Pytest test suite +docs/ # Sphinx documentation +``` + +## Quick Reference + +**Extension Points:** Connector, BindingFilter, CWLDockerTranslator, Scheduler, Database, DataManager, CheckpointManager, FailureManager + +**CWL Conformance:** `./cwl-conformance-test.sh` (supports VERSION, DOCKER, EXCLUDE env vars) + +**Documentation:** `uv run make html` | Update checksum: `cd docs && uv run make checksum` + +**Resources:** [Website](https://streamflow.di.unito.it/) | [Docs](https://streamflow.di.unito.it/documentation/0.2/) | [GitHub](https://github.com/alpha-unito/streamflow) | [Contributing](CONTRIBUTING.md) From 0fa77922f48dedd5163ca8646be885f1bd8f4104 Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Sun, 22 Feb 2026 16:49:00 +0100 Subject: [PATCH 2/2] Add StreamFlow benchmark suite for remotepath module Add comprehensive benchmark suite to monitor performance of StreamFlow remotepath operations across different deployment connectors (local, Docker, Kubernetes, Singularity, SSH). This enables tracking performance improvements and regressions in critical code paths. Features: - 16 benchmark tests covering file queries, I/O, filesystem operations, and directory traversal - Support for all StreamFlow connectors with deployment selection flags (`--local`, `--remote`, `--deploys`) - Proper async handling with setup/teardown isolation - Resource leak prevention using root directory cleanup pattern - GitHub Actions CI integration for automated benchmarking - Clean benchmark output grouped by deployment type New files: - `benchmark/conftest.py`: pytest configuration with deployment parametrization and fixture management - `benchmark/remotepath.py`: 16 benchmark tests for StreamFlowPath operations (exists, mkdir, read/write, glob, walk, etc.) - `benchmark/utils.py`: Helper functions for async benchmarking and test file generation Code changes: - `streamflow/data/remotepath.py`: Add `_make_child_relpath()` method to support path joining operations needed by benchmarks --- .github/workflows/ci-tests.yaml | 51 +++ .gitignore | 3 +- Makefile | 19 +- benchmark/__init__.py | 0 benchmark/conftest.py | 194 +++++++++++ benchmark/test_remotepath.py | 548 ++++++++++++++++++++++++++++++++ benchmark/utils.py | 169 ++++++++++ pyproject.toml | 1 + tox.ini | 9 +- uv.lock | 26 ++ 10 files changed, 1011 insertions(+), 9 deletions(-) create mode 100644 benchmark/__init__.py create mode 100644 benchmark/conftest.py create mode 100644 benchmark/test_remotepath.py create mode 100644 benchmark/utils.py diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 809ee129d..243d1d525 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -10,6 +10,57 @@ concurrency: group: build-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true jobs: + benchmarks: + name: "StreamFlow benchmarks" + strategy: + matrix: + on: ["ubuntu-24.04"] + python: ["3.10", "3.11", "3.12", "3.13", "3.14"] + include: + - on: "macos-15-intel" + python: "3.14" + runs-on: ${{ matrix.on }} + env: + TOXENV: ${{ format('py{0}-benchmark', matrix.python) }} + steps: + - uses: actions/checkout@v6 + - uses: astral-sh/setup-uv@v7 + with: + version: "0.9.16" + - uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python }} + - uses: actions/setup-node@v6 + with: + node-version: "24" + - name: "Install Docker (MacOs X)" + uses: docker/setup-docker-action@v4 + env: + LIMA_START_ARGS: '--vm-type=vz --mount-type=virtiofs --mount /private/var/folders:w' + if: ${{ startsWith(matrix.on, 'macos-') }} + - uses: docker/setup-qemu-action@v3 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install Apptainer" + uses: eWaterCycle/setup-apptainer@v2 + with: + apptainer-version: 1.4.2 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install KinD" + uses: helm/kind-action@v1.14.0 + with: + config: .github/kind/config.yaml + kubectl_version: v1.35.0 + version: v0.31.0 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Configure Calico on KinD" + run: | + kubectl apply -f https://docs.projectcalico.org/v3.25/manifests/calico.yaml + kubectl -n kube-system set env daemonset/calico-node FELIX_IGNORELOOSERPF=true + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install Tox" + run: uv tool install tox --with tox-uv + - name: "Run StreamFlow benchmarks via Tox" + run: tox code-ql-check: name: "StreamFlow CodeQL check" runs-on: ubuntu-24.04 diff --git a/.gitignore b/.gitignore index 7a79b6bf3..c76e05cee 100644 --- a/.gitignore +++ b/.gitignore @@ -131,7 +131,8 @@ dmypy.json .DS_Store # StreamFlow -.streamflow +.benchmarks/ +.streamflow/ #SQLite *.db-shm diff --git a/Makefile b/Makefile index b62091007..7a9ca1083 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,8 @@ +.PHONY: benchmark codespell codespell-check coverage-report flake8 format format-check pyupgrade test testcov typing + +benchmark: + python -m pytest benchmark -rs ${PYTEST_EXTRA} + codespell: codespell -w $(shell git ls-files | grep -v streamflow/cwl/antlr) @@ -11,24 +16,24 @@ coverage-report: testcov coverage report flake8: - flake8 --exclude streamflow/cwl/antlr streamflow tests + flake8 --exclude streamflow/cwl/antlr streamflow tests benchmark format: - isort streamflow tests - black --target-version py310 streamflow tests + isort streamflow tests benchmark + black --target-version py310 streamflow tests benchmark format-check: - isort --check-only streamflow tests - black --target-version py310 --diff --check streamflow tests + isort --check-only streamflow tests benchmark + black --target-version py310 --diff --check streamflow tests benchmark pyupgrade: pyupgrade --py3-only --py310-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr) test: - python -m pytest -rs ${PYTEST_EXTRA} + python -m pytest tests -rs ${PYTEST_EXTRA} testcov: - python -m pytest -rs --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= ${PYTEST_EXTRA} + python -m pytest tests -rs --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= ${PYTEST_EXTRA} typing: mypy streamflow tests \ No newline at end of file diff --git a/benchmark/__init__.py b/benchmark/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmark/conftest.py b/benchmark/conftest.py new file mode 100644 index 000000000..0a2592e1a --- /dev/null +++ b/benchmark/conftest.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import argparse +import os +import platform +import tempfile +from collections.abc import AsyncGenerator, Callable, Collection +from contextlib import suppress +from typing import Any, cast + +import pytest +import pytest_asyncio + +from streamflow.core.context import StreamFlowContext +from streamflow.core.deployment import ExecutionLocation +from streamflow.main import build_context +from tests.utils.deployment import get_deployment_config, get_location + + +def all_deployment_types() -> list[str]: + """Get all deployment types based on platform.""" + deployments = ["local", "docker"] + if platform.system() == "Linux": + deployments.extend(["kubernetes", "singularity", "ssh"]) + return deployments + + +def csvtype(choices: Collection[str]) -> Callable[[str], list[str]]: + """Return a function that splits and checks comma-separated values.""" + + def splitarg(arg: str) -> list[str]: + values = arg.split(",") + for value in values: + if value not in choices: + raise argparse.ArgumentTypeError( + "invalid choice: {!r} (choose from {})".format( + value, ", ".join(map(repr, choices)) + ) + ) + return values + + return splitarg + + +def pytest_addoption(parser: pytest.Parser) -> None: + """Add command line options for benchmarks.""" + group = parser.getgroup("benchmark deployment") + group.addoption( + "--deploys", + type=csvtype(all_deployment_types()), + default=None, + help="List of deployments to benchmark. Use comma as delimiter e.g. --deploys " + "local,docker. Defaults to all available deployments if no deployment flag is " + "specified. Cannot be used with --local or --remote.", + ) + group.addoption( + "--local", + action="store_true", + default=False, + help="Run benchmarks only on the local connector. " + "Cannot be used with --deploys or --remote.", + ) + group.addoption( + "--remote", + action="store_true", + default=False, + help="Run benchmarks on all connectors except local. " + "Cannot be used with --deploys or --local.", + ) + + +def pytest_configure(config: pytest.Config) -> None: + """Configure pytest-benchmark and add custom markers.""" + config.option.benchmark_sort = "name" + config.option.benchmark_columns = ["mean", "min", "max", "stddev"] + config.option.benchmark_name = "short" + config.option.benchmark_group_by = "param:deployment" + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + """Remove [connector] suffix from test names for cleaner benchmark display.""" + for item in items: + if "[" in item.name and hasattr(item, "callspec"): + base_name = item.name.split("[")[0] + params = item.name.split("[")[1].split("]")[0] + item.name = ( + f"{base_name}[{params.split('-')[1]}]" if "-" in params else base_name + ) + item._nodeid = item.nodeid.replace(f"[{item.callspec.id}]", "") + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + """Generate deployment parametrization based on platform and deployment flags.""" + if "deployment" in metafunc.fixturenames: + deploys = metafunc.config.getoption("--deploys") + local_flag = metafunc.config.getoption("--local") + remote_flag = metafunc.config.getoption("--remote") + + flags_set = sum([deploys is not None, local_flag, remote_flag]) + if flags_set > 1: + raise pytest.UsageError( + "Only one of --deploys, --local, or --remote can be specified" + ) + + if local_flag: + deployments = ["local"] + elif remote_flag: + all_deploys = all_deployment_types() + deployments = [d for d in all_deploys if d != "local"] + elif deploys is not None: + deployments = deploys + else: + deployments = all_deployment_types() + + metafunc.parametrize("deployment", deployments, scope="session") + + +@pytest_asyncio.fixture(scope="session") +async def location(context: StreamFlowContext, deployment: str) -> ExecutionLocation: + """ + Get execution location for deployment type. + + Gracefully skip if deployment is not available. + """ + try: + return await get_location(context, deployment) + except Exception as e: + pytest.skip(f"Deployment {deployment} not available: {e}") + + +@pytest.fixture(scope="session") +def connector_type(context: StreamFlowContext, location: ExecutionLocation) -> str: + """ + Get connector type for the deployment. + + Returns the type string from the deployment config (e.g., 'local', 'docker', 'ssh'). + """ + from streamflow.deployment.manager import DefaultDeploymentManager + + manager = cast(DefaultDeploymentManager, context.deployment_manager) + config = manager.config_map.get(location.deployment) + return config.type if config else "unknown" + + +@pytest.fixture(scope="function") +def benchmark_tmpdir(location: ExecutionLocation) -> str: + """ + Create isolated temp directory for each benchmark. + + For local deployments, uses /tmp/streamflow-benchmark. + For remote deployments, uses /tmp directly since the container + might not have /tmp/streamflow-benchmark. + """ + if location.local: + tmpdir = os.path.join(tempfile.gettempdir(), "streamflow-benchmark") + os.makedirs(tmpdir, exist_ok=True) + return tmpdir + else: + return "/tmp" + + +@pytest_asyncio.fixture(scope="session") +async def context( + request: pytest.FixtureRequest, +) -> AsyncGenerator[StreamFlowContext, Any]: + """Create StreamFlowContext for benchmarks (replicates tests/conftest.py).""" + _context = build_context( + { + "database": {"type": "default", "config": {"connection": ":memory:"}}, + "path": os.getcwd(), + }, + ) + # Determine which deployments to deploy based on flags + deploys = request.config.getoption("--deploys") + local_flag = request.config.getoption("--local") + remote_flag = request.config.getoption("--remote") + + if local_flag: + deployments = ["local"] + elif remote_flag: + deployments = [d for d in all_deployment_types() if d != "local"] + elif deploys is not None: + deployments = deploys + else: + deployments = all_deployment_types() + + for deployment_t in deployments: + with suppress(Exception): + config = await get_deployment_config(_context, deployment_t) + await _context.deployment_manager.deploy(config) + + yield _context + await _context.deployment_manager.undeploy_all() + await _context.close() diff --git a/benchmark/test_remotepath.py b/benchmark/test_remotepath.py new file mode 100644 index 000000000..118b194e7 --- /dev/null +++ b/benchmark/test_remotepath.py @@ -0,0 +1,548 @@ +from __future__ import annotations + +import pytest + +from benchmark.utils import ( + FILE_SIZE_BYTES, + benchmark_async, + create_directory_structure, + create_test_file, +) +from streamflow.core.context import StreamFlowContext +from streamflow.core.deployment import ExecutionLocation +from streamflow.core.utils import random_name +from streamflow.data.remotepath import StreamFlowPath + +# ============================================================================ +# FILE QUERY OPERATIONS +# ============================================================================ + + +def test_exists( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.exists() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.exists() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_dir( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_dir() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await path.mkdir(parents=True, exist_ok=True) + + async def target(): + return await path.is_dir() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_file( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_file() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.is_file() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_symlink( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_symlink() operation.""" + src = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + link = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(src) + await link.symlink_to(src) + + async def target(): + return await link.is_symlink() + + async def teardown(): + await link.rmtree() + await src.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_size( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.size() on 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.size() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) == FILE_SIZE_BYTES + + +# ============================================================================ +# I/O OPERATIONS +# ============================================================================ + + +def test_checksum_small( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.checksum() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path, 2**10) + + async def target(): + return await path.checksum() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_checksum_large( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.checksum() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.checksum() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_read_text( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.read_text() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.read_text() + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) == FILE_SIZE_BYTES + + +def test_write_text( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.write_text() with 100KB file.""" + from benchmark.utils import generate_file_content + + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + # Generate content once (not benchmarked) + content = generate_file_content(FILE_SIZE_BYTES) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique file for each iteration + path = root_dir / random_name() + await path.write_text(content) + + async def teardown(): + # Cleanup all files at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +# ============================================================================ +# FILESYSTEM OPERATIONS +# ============================================================================ + + +def test_hardlink_to( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.hardlink_to() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + src = root_dir / "source_file" + + async def setup(): + await root_dir.mkdir() + await create_test_file(src) + + async def target(): + # Create unique link for each iteration + link = root_dir / random_name() + await link.hardlink_to(src) + + async def teardown(): + # Cleanup all files at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_mkdir( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.mkdir() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique directory for each iteration + path = root_dir / random_name() + await path.mkdir(mode=0o777) + + async def teardown(): + # Cleanup all directories at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_mkdir_parents( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.mkdir(parents=True) operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique nested path for each iteration + path = root_dir / random_name() / "nested" / "deep" / "directory" + await path.mkdir(mode=0o777, parents=True) + + async def teardown(): + # Cleanup all nested directories at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_resolve( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.resolve() on symbolic link.""" + src = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + link = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(src) + await link.symlink_to(src) + + async def target(): + return await link.resolve() + + async def teardown(): + await link.rmtree() + await src.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_rmtree( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.rmtree() on 10 files in 3 dirs.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + await path.rmtree() + + async def teardown(): + # Verify deletion succeeded + assert not await path.exists() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_symlink_to( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.symlink_to() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + src = root_dir / "source_file" + + async def setup(): + await root_dir.mkdir() + await create_test_file(src) + + async def target(): + # Create unique link for each iteration inside root_dir + link = root_dir / random_name() + await link.symlink_to(src) + + async def teardown(): + # Cleanup all links and source + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +# ============================================================================ +# DIRECTORY TRAVERSAL +# ============================================================================ + + +@pytest.mark.parametrize("pattern", ["**/*.txt", "*", "**/*"]) +def test_glob( + benchmark, + pattern: str, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.glob(pattern) iteration with different patterns.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + results = [] + async for p in path.glob(pattern): + results.append(p) + return results + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) > 0 + + +def test_walk( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.walk() iteration on 10 files in 3 dirs.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + results = [] + async for dirpath, dirnames, filenames in path.walk(): + results.append((dirpath, dirnames, filenames)) + return results + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) > 0 diff --git a/benchmark/utils.py b/benchmark/utils.py new file mode 100644 index 000000000..b9773c69b --- /dev/null +++ b/benchmark/utils.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import asyncio +import random +import string +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Callable + + from streamflow.data.remotepath import StreamFlowPath + +# Constants for small/fast test scenario +FILE_SIZE_BYTES = 102400 # 100 KB +NUM_FILES = 10 +NUM_SUBDIRS = 3 +DIR_DEPTH = 2 + + +def benchmark_async( + benchmark: Any, + target_func: Callable, + setup_func: Callable | None = None, + teardown_func: Callable | None = None, +) -> Any: + """ + Wrapper to run async functions in pytest-benchmark with proper setup/teardown. + + pytest-benchmark doesn't natively support async functions, + so we need to run them in the event loop. This wrapper ensures that: + - setup_func runs once before benchmarking + - target_func is benchmarked (repeated iterations) + - teardown_func runs once after benchmarking + + Note: setup and teardown happen outside the benchmark timing. + + :param benchmark: pytest-benchmark fixture + :param target_func: Async function to benchmark (called multiple times) + :param setup_func: Optional async setup function (called once before timing) + :param teardown_func: Optional async teardown function (called once after timing) + :return: Result of the last target_func call + """ + loop = asyncio.get_event_loop() + last_result = None + + # Run setup once before benchmarking + if setup_func: + loop.run_until_complete(setup_func()) + + # Define the sync wrapper for the benchmark target + def sync_wrapper() -> Any: + nonlocal last_result + last_result = loop.run_until_complete(target_func()) + return last_result + + # Run the benchmark with configured iterations and rounds for statistical significance + benchmark.pedantic(sync_wrapper, iterations=100, rounds=10, warmup_rounds=1) + + # Run teardown once after benchmarking + if teardown_func: + loop.run_until_complete(teardown_func()) + + return last_result + + +def calculate_overhead_percent(baseline_time: float, measured_time: float) -> float: + """ + Calculate overhead percentage compared to baseline. + + :param baseline_time: Baseline time (usually local deployment) + :param measured_time: Measured time for the operation + :return: Overhead percentage + """ + if baseline_time <= 0: + return 0 + return ((measured_time - baseline_time) / baseline_time) * 100 + + +def calculate_throughput_items_per_sec(num_items: int, time_seconds: float) -> float: + """ + Calculate items/s throughput. + + :param num_items: Number of items processed + :param time_seconds: Time taken in seconds + :return: Throughput in items/s + """ + return num_items / time_seconds if time_seconds > 0 else 0 + + +def calculate_throughput_mb_per_sec(size_bytes: int, time_seconds: float) -> float: + """ + Calculate MB/s throughput. + + :param size_bytes: Size of data processed + :param time_seconds: Time taken in seconds + :return: Throughput in MB/s + """ + mb = size_bytes / (1024 * 1024) + return mb / time_seconds if time_seconds > 0 else 0 + + +async def create_directory_structure( + base_path: StreamFlowPath, + num_files: int = NUM_FILES, + num_subdirs: int = NUM_SUBDIRS, +) -> dict[str, Any]: + """ + Create directory with files distributed across subdirectories. + + Structure: 10 files in 3 subdirectories (3-4 files per subdir) + test_dir/ + subdir_0/ + file_0.txt + file_1.txt + file_2.txt + subdir_1/ + file_3.txt + ... + + :param base_path: Base directory path + :param num_files: Total number of files to create + :param num_subdirs: Number of subdirectories + :return: Metadata dict with num_files, num_dirs, total_size_bytes, file_paths + """ + await base_path.mkdir(parents=True, exist_ok=True) + + file_paths = [] + files_per_dir = num_files // num_subdirs + + for i in range(num_subdirs): + subdir = base_path / f"subdir_{i}" + await subdir.mkdir(parents=True, exist_ok=True) + + for j in range(files_per_dir): + file_path = subdir / f"file_{i * files_per_dir + j}.txt" + await create_test_file(file_path) + file_paths.append(file_path) + + return { + "num_files": num_files, + "num_dirs": num_subdirs, + "total_size_bytes": num_files * FILE_SIZE_BYTES, + "file_paths": file_paths, + } + + +async def create_test_file( + path: StreamFlowPath, size_bytes: int = FILE_SIZE_BYTES +) -> None: + """ + Create a test file with specified size. + + :param path: Path where to create the file + :param size_bytes: Size of file content in bytes + """ + content = generate_file_content(size_bytes) + await path.write_text(content) + + +def generate_file_content(size_bytes: int = FILE_SIZE_BYTES) -> str: + """ + Generate random ASCII text content of specified size. + + :param size_bytes: Size of content to generate in bytes + :return: Random text string + """ + return "".join( + random.choices(string.ascii_letters + string.digits + "\n", k=size_bytes) + ) diff --git a/pyproject.toml b/pyproject.toml index c30c0d025..1db996cc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,6 +95,7 @@ test = [ "cwltool==3.1.20260315121657", "pytest==9.0.2", "pytest-asyncio==1.3.0", + "pytest-benchmark==5.2.1", "pytest-cov==7.1.0", "pytest-xdist==3.8.0" ] diff --git a/tox.ini b/tox.ini index b1efaf991..f5158045a 100644 --- a/tox.ini +++ b/tox.ini @@ -2,6 +2,7 @@ envlist = bandit lint + py3.{10,11,12,13,14}-benchmark py3.{10,11,12,13,14}-unit skip_missing_interpreters = True @@ -9,22 +10,28 @@ skip_missing_interpreters = True asyncio_default_fixture_loop_scope = session asyncio_default_test_loop_scope = session asyncio_mode = strict -testpaths = tests +testpaths = + benchmark + tests [testenv] allowlist_externals = make commands = + py3.{10,11,12,13,14}-benchmark: make benchmark py3.{10,11,12,13,14}-unit: make coverage-report coverage.xml PYTEST_EXTRA={posargs} dependency_groups = + py3.{10,11,12,13,14}-benchmark: test py3.{10,11,12,13,14}-unit: test extras = report description = + py3.{10,11,12,13,14}-benchmark: Run the StreamFlow benchmarks py3.{10,11,12,13,14}-unit: Run the unit tests passenv = CI GITHUB_* runner = uv-venv-lock-runner setenv = + py3.{10,11,12,13,14}-benchmark: LC_ALL = C.UTF-8 py3.{10,11,12,13,14}-unit: LC_ALL = C.UTF-8 uv_sync_flags = --no-dev diff --git a/uv.lock b/uv.lock index a44299453..8b8b9c371 100644 --- a/uv.lock +++ b/uv.lock @@ -2474,6 +2474,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, ] +[[package]] +name = "py-cpuinfo" +version = "9.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/37/a8/d832f7293ebb21690860d2e01d8115e5ff6f2ae8bbdc953f0eb0fa4bd2c7/py-cpuinfo-9.0.0.tar.gz", hash = "sha256:3cdbbf3fac90dc6f118bfd64384f309edeadd902d7c8fb17f02ffa1fc3f49690", size = 104716, upload-time = "2022-10-25T20:38:06.303Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335, upload-time = "2022-10-25T20:38:27.636Z" }, +] + [[package]] name = "pycodestyle" version = "2.14.0" @@ -2572,6 +2581,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, ] +[[package]] +name = "pytest-benchmark" +version = "5.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "py-cpuinfo" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b8/fd/237def5e4b775a6a1a98c5f6a5b01f6c75d1f3af740b3665b7f538aea14b/pytest_benchmark-5.2.1.tar.gz", hash = "sha256:56dc1455bda7ccb540aa671c496dafc8187d2769f278e5f137689476805b6f9d", size = 339278, upload-time = "2025-11-04T14:51:33.543Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9b/75/02cb44852083794606c6b1b3f21df517c8bcf4ad1e88c6e0c5b8adb98da3/pytest_benchmark-5.2.1-py3-none-any.whl", hash = "sha256:a6e18fe0df2155e9d993db6ba03bdf85324794035ad986553787024ca59e8db9", size = 44727, upload-time = "2025-11-04T14:51:31.532Z" }, +] + [[package]] name = "pytest-cov" version = "7.1.0" @@ -3348,6 +3370,7 @@ dev = [ { name = "pandas-stubs" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-benchmark" }, { name = "pytest-cov" }, { name = "pytest-xdist" }, { name = "pyupgrade" }, @@ -3379,6 +3402,7 @@ test = [ { name = "cwltool" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-benchmark" }, { name = "pytest-cov" }, { name = "pytest-xdist" }, ] @@ -3431,6 +3455,7 @@ dev = [ { name = "pandas-stubs", specifier = "==2.3.3.260113" }, { name = "pytest", specifier = "==9.0.2" }, { name = "pytest-asyncio", specifier = "==1.3.0" }, + { name = "pytest-benchmark", specifier = "==5.2.1" }, { name = "pytest-cov", specifier = "==7.1.0" }, { name = "pytest-xdist", specifier = "==3.8.0" }, { name = "pyupgrade", specifier = "==3.21.2" }, @@ -3462,6 +3487,7 @@ test = [ { name = "cwltool", specifier = "==3.1.20260315121657" }, { name = "pytest", specifier = "==9.0.2" }, { name = "pytest-asyncio", specifier = "==1.3.0" }, + { name = "pytest-benchmark", specifier = "==5.2.1" }, { name = "pytest-cov", specifier = "==7.1.0" }, { name = "pytest-xdist", specifier = "==3.8.0" }, ]