diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 809ee129d..243d1d525 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -10,6 +10,57 @@ concurrency: group: build-${{ github.event.pull_request.number || github.ref }} cancel-in-progress: true jobs: + benchmarks: + name: "StreamFlow benchmarks" + strategy: + matrix: + on: ["ubuntu-24.04"] + python: ["3.10", "3.11", "3.12", "3.13", "3.14"] + include: + - on: "macos-15-intel" + python: "3.14" + runs-on: ${{ matrix.on }} + env: + TOXENV: ${{ format('py{0}-benchmark', matrix.python) }} + steps: + - uses: actions/checkout@v6 + - uses: astral-sh/setup-uv@v7 + with: + version: "0.9.16" + - uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python }} + - uses: actions/setup-node@v6 + with: + node-version: "24" + - name: "Install Docker (MacOs X)" + uses: docker/setup-docker-action@v4 + env: + LIMA_START_ARGS: '--vm-type=vz --mount-type=virtiofs --mount /private/var/folders:w' + if: ${{ startsWith(matrix.on, 'macos-') }} + - uses: docker/setup-qemu-action@v3 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install Apptainer" + uses: eWaterCycle/setup-apptainer@v2 + with: + apptainer-version: 1.4.2 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install KinD" + uses: helm/kind-action@v1.14.0 + with: + config: .github/kind/config.yaml + kubectl_version: v1.35.0 + version: v0.31.0 + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Configure Calico on KinD" + run: | + kubectl apply -f https://docs.projectcalico.org/v3.25/manifests/calico.yaml + kubectl -n kube-system set env daemonset/calico-node FELIX_IGNORELOOSERPF=true + if: ${{ startsWith(matrix.on, 'ubuntu-') }} + - name: "Install Tox" + run: uv tool install tox --with tox-uv + - name: "Run StreamFlow benchmarks via Tox" + run: tox code-ql-check: name: "StreamFlow CodeQL check" runs-on: ubuntu-24.04 diff --git a/.gitignore b/.gitignore index 7a79b6bf3..c76e05cee 100644 --- a/.gitignore +++ b/.gitignore @@ -131,7 +131,8 @@ dmypy.json .DS_Store # StreamFlow -.streamflow +.benchmarks/ +.streamflow/ #SQLite *.db-shm diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..f657e9b6c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,245 @@ +# StreamFlow Agent Guidelines + +This document provides essential guidelines for agentic coding agents working on the StreamFlow codebase. + +## Project Overview + +StreamFlow is a container-native Workflow Management System (WMS) written in Python 3 (versions 3.10-3.14). It implements the Common Workflow Language (CWL) standard (v1.0-v1.3) for multi-cloud/HPC hybrid workflow executions. + +**Key Architecture:** +- **Deployment** → **Service** → **Location** (hierarchical execution units) +- Supports multiple connectors: local, docker, kubernetes, ssh, slurm, pbs, singularity, etc. + +## Setup & Installation + +```bash +# Clone and install dependencies +git clone git@github.com:alpha-unito/streamflow.git +cd streamflow +uv sync --all-extras +``` + +## Essential Commands + +### Testing +```bash +# Run all tests +uv run make test + +# Run specific test file +uv run pytest tests/test_file.py + +# Run single test function +uv run pytest tests/test_file.py::test_function_name + +# Run tests with coverage +uv run make testcov + +# Test specific connectors only (all tested in CI) +uv run pytest --deploys local,docker tests/test_remotepath.py +``` + +**Requirements:** Docker (for most connector tests), Singularity/Apptainer, Kubernetes (minikube) + +### Linting & Formatting (REQUIRED BEFORE COMMIT) +```bash +# Check all (must pass before committing) +uv run make format-check flake8 codespell-check + +# Auto-fix formatting +uv run make format codespell + +# Apply pyupgrade for Python 3.10+ compatibility +uv run make pyupgrade +``` + +## Mandatory Agent Behavior + +All agents **MUST** adhere to these non-negotiable rules: + +### Package & Dependency Management (MANDATORY) + +**MUST** obtain explicit user permission before installing packages or updating dependencies. Specify what is being installed/updated, why, and await confirmation before proceeding. + +### Git Commit Requirements (MANDATORY) + +**MUST** follow this sequence before committing: + +1. Propose a commit message following the "Git Commit Message Guidelines" +2. Present it to the user and request confirmation +3. Allow user modifications +4. Proceed **only after** explicit user approval + +### Pre-Commit Checks (MANDATORY) + +**MUST** run `uv run make format-check flake8 codespell-check` and ensure all checks pass before committing. Fix any failures and re-run checks. **MUST NOT** commit if any checks fail. + +## Code Style Guidelines + +**Target:** Python 3.10-3.14 | **Line length:** 88 chars | **Format:** Black + isort | **Exclude:** `streamflow/cwl/antlr` + +### Import Organization +```python +from __future__ import annotations # Always first + +import asyncio +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any + +from typing_extensions import Self # Third-party + +from streamflow.core.context import StreamFlowContext # Local +from streamflow.log_handler import logger + +if TYPE_CHECKING: # Avoid circular imports + from streamflow.core.data import DataManager +``` + +### Type Hints & Async +```python +# Always use type hints +def process_data( + self, + config: MutableMapping[str, Any], + items: MutableSequence[str] +) -> dict[str, Any]: + pass + +# Use Self for classmethods +@classmethod +async def load(cls, context: StreamFlowContext) -> Self: + pass + +# Proper async cleanup +async def close(self) -> None: + try: + await asyncio.gather( + asyncio.create_task(self.manager.close()), + asyncio.create_task(self.scheduler.close()), + ) + except Exception as e: + logger.exception(e) + finally: + await self.database.close() +``` + +### Naming & Error Handling +- **Classes:** `PascalCase` | **Functions:** `snake_case` | **Constants:** `UPPER_SNAKE_CASE` +- **Private:** `_method_name` | **Type vars:** `_KT`, `_VT` + +```python +# Use custom exceptions from streamflow.core.exception +from streamflow.core.exception import WorkflowExecutionException +from streamflow.log_handler import logger + +try: + result = await process() +except SpecificException as e: + logger.exception(e) + raise WorkflowExecutionException(f"Failed: {e}") from e +``` + +**Available exceptions:** `ProcessorTypeError`, `WorkflowException`, `WorkflowDefinitionException`, `WorkflowExecutionException`, `WorkflowProvenanceException`, `FailureHandlingException`, `InvalidPluginException` + +### Documentation (American English, reStructuredText) +```python +def process_workflow(workflow: Workflow, config: dict[str, Any]) -> bool: + """ + Process a workflow with the given configuration. + + :param workflow: The workflow to process + :param config: Configuration dictionary for processing + :returns: True if processing succeeded, False otherwise + :raises WorkflowExecutionException: If workflow processing fails + """ + pass +``` + +### Testing (REQUIRED for new features/bugfixes) +```python +# Use pytest with async support +async def test_workflow_execution(context: StreamFlowContext) -> None: + """Test basic workflow execution.""" + workflow = await build_workflow(context) + result = await workflow.execute() + assert result.status == "completed" +``` + +**Coverage:** https://app.codecov.io/gh/alpha-unito/streamflow + +## Git Commit Message Guidelines + +**Format:** +``` +(): + + +``` + +**Types:** `Add`, `Fix`, `Refactor`, `Update`, `Remove`, `Bump`, `Docs`, `Test`, `Chore` + +**Rules:** +- **Subject:** Imperative mood, capitalize, no period, max 50 chars +- **Scope (optional):** Module/component (e.g., `cwl`, `deployment`, `scheduling`) +- **Body (required):** Explain *what* and *why* (not *how*), wrap at 72 chars, separate with blank line, include issue refs (e.g., `Fixes #123`). Exception: trivial changes like typo fixes. +- **Language:** American English + +**Examples:** +``` +Add restore method to DataManager + +Implement restore method to enable workflow recovery from checkpoints. +This allows jobs to resume from the last completed step. + +Fix SSH connector authentication timeout (Fixes #931) + +Increase default timeout for SSH authentication from 5s to 30s to handle +slow networks and high-latency connections. + +Bump kubernetes-asyncio from 33.3.0 to 34.3.3 +``` + +## Common Workflows + +**Adding a feature:** +1. Write tests first in `tests/` +2. Implement feature with type hints and docstrings +3. Run `uv run make format` to auto-format +4. Run `uv run make format-check flake8 codespell-check` +5. Run `uv run pytest` to verify tests pass +6. Update docs if needed +7. Commit with proper message + +**Fixing a bug:** +1. Add regression test in `tests/` +2. Fix the bug +3. Follow linting/formatting guidelines +4. Verify with tests +5. Commit with proper message + +## Key Project Structure + +``` +streamflow/ +├── core/ # Abstractions (context, deployment, exception, workflow) +├── cwl/ # CWL implementation (v1.0-v1.3) +├── deployment/ # Connectors (docker, k8s, ssh, slurm, pbs, singularity) +├── workflow/ # Workflow execution engine +├── data/ # Data management +├── persistence/ # Database (SQLite) +├── scheduling/ # Scheduling policies +├── recovery/ # Checkpointing/fault tolerance +└── ext/ # Plugin system +tests/ # Pytest test suite +docs/ # Sphinx documentation +``` + +## Quick Reference + +**Extension Points:** Connector, BindingFilter, CWLDockerTranslator, Scheduler, Database, DataManager, CheckpointManager, FailureManager + +**CWL Conformance:** `./cwl-conformance-test.sh` (supports VERSION, DOCKER, EXCLUDE env vars) + +**Documentation:** `uv run make html` | Update checksum: `cd docs && uv run make checksum` + +**Resources:** [Website](https://streamflow.di.unito.it/) | [Docs](https://streamflow.di.unito.it/documentation/0.2/) | [GitHub](https://github.com/alpha-unito/streamflow) | [Contributing](CONTRIBUTING.md) diff --git a/Makefile b/Makefile index b62091007..7a9ca1083 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,8 @@ +.PHONY: benchmark codespell codespell-check coverage-report flake8 format format-check pyupgrade test testcov typing + +benchmark: + python -m pytest benchmark -rs ${PYTEST_EXTRA} + codespell: codespell -w $(shell git ls-files | grep -v streamflow/cwl/antlr) @@ -11,24 +16,24 @@ coverage-report: testcov coverage report flake8: - flake8 --exclude streamflow/cwl/antlr streamflow tests + flake8 --exclude streamflow/cwl/antlr streamflow tests benchmark format: - isort streamflow tests - black --target-version py310 streamflow tests + isort streamflow tests benchmark + black --target-version py310 streamflow tests benchmark format-check: - isort --check-only streamflow tests - black --target-version py310 --diff --check streamflow tests + isort --check-only streamflow tests benchmark + black --target-version py310 --diff --check streamflow tests benchmark pyupgrade: pyupgrade --py3-only --py310-plus $(shell git ls-files | grep .py | grep -v streamflow/cwl/antlr) test: - python -m pytest -rs ${PYTEST_EXTRA} + python -m pytest tests -rs ${PYTEST_EXTRA} testcov: - python -m pytest -rs --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= ${PYTEST_EXTRA} + python -m pytest tests -rs --cov --junitxml=junit.xml -o junit_family=legacy --cov-report= ${PYTEST_EXTRA} typing: mypy streamflow tests \ No newline at end of file diff --git a/benchmark/__init__.py b/benchmark/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmark/conftest.py b/benchmark/conftest.py new file mode 100644 index 000000000..0a2592e1a --- /dev/null +++ b/benchmark/conftest.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import argparse +import os +import platform +import tempfile +from collections.abc import AsyncGenerator, Callable, Collection +from contextlib import suppress +from typing import Any, cast + +import pytest +import pytest_asyncio + +from streamflow.core.context import StreamFlowContext +from streamflow.core.deployment import ExecutionLocation +from streamflow.main import build_context +from tests.utils.deployment import get_deployment_config, get_location + + +def all_deployment_types() -> list[str]: + """Get all deployment types based on platform.""" + deployments = ["local", "docker"] + if platform.system() == "Linux": + deployments.extend(["kubernetes", "singularity", "ssh"]) + return deployments + + +def csvtype(choices: Collection[str]) -> Callable[[str], list[str]]: + """Return a function that splits and checks comma-separated values.""" + + def splitarg(arg: str) -> list[str]: + values = arg.split(",") + for value in values: + if value not in choices: + raise argparse.ArgumentTypeError( + "invalid choice: {!r} (choose from {})".format( + value, ", ".join(map(repr, choices)) + ) + ) + return values + + return splitarg + + +def pytest_addoption(parser: pytest.Parser) -> None: + """Add command line options for benchmarks.""" + group = parser.getgroup("benchmark deployment") + group.addoption( + "--deploys", + type=csvtype(all_deployment_types()), + default=None, + help="List of deployments to benchmark. Use comma as delimiter e.g. --deploys " + "local,docker. Defaults to all available deployments if no deployment flag is " + "specified. Cannot be used with --local or --remote.", + ) + group.addoption( + "--local", + action="store_true", + default=False, + help="Run benchmarks only on the local connector. " + "Cannot be used with --deploys or --remote.", + ) + group.addoption( + "--remote", + action="store_true", + default=False, + help="Run benchmarks on all connectors except local. " + "Cannot be used with --deploys or --local.", + ) + + +def pytest_configure(config: pytest.Config) -> None: + """Configure pytest-benchmark and add custom markers.""" + config.option.benchmark_sort = "name" + config.option.benchmark_columns = ["mean", "min", "max", "stddev"] + config.option.benchmark_name = "short" + config.option.benchmark_group_by = "param:deployment" + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + """Remove [connector] suffix from test names for cleaner benchmark display.""" + for item in items: + if "[" in item.name and hasattr(item, "callspec"): + base_name = item.name.split("[")[0] + params = item.name.split("[")[1].split("]")[0] + item.name = ( + f"{base_name}[{params.split('-')[1]}]" if "-" in params else base_name + ) + item._nodeid = item.nodeid.replace(f"[{item.callspec.id}]", "") + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + """Generate deployment parametrization based on platform and deployment flags.""" + if "deployment" in metafunc.fixturenames: + deploys = metafunc.config.getoption("--deploys") + local_flag = metafunc.config.getoption("--local") + remote_flag = metafunc.config.getoption("--remote") + + flags_set = sum([deploys is not None, local_flag, remote_flag]) + if flags_set > 1: + raise pytest.UsageError( + "Only one of --deploys, --local, or --remote can be specified" + ) + + if local_flag: + deployments = ["local"] + elif remote_flag: + all_deploys = all_deployment_types() + deployments = [d for d in all_deploys if d != "local"] + elif deploys is not None: + deployments = deploys + else: + deployments = all_deployment_types() + + metafunc.parametrize("deployment", deployments, scope="session") + + +@pytest_asyncio.fixture(scope="session") +async def location(context: StreamFlowContext, deployment: str) -> ExecutionLocation: + """ + Get execution location for deployment type. + + Gracefully skip if deployment is not available. + """ + try: + return await get_location(context, deployment) + except Exception as e: + pytest.skip(f"Deployment {deployment} not available: {e}") + + +@pytest.fixture(scope="session") +def connector_type(context: StreamFlowContext, location: ExecutionLocation) -> str: + """ + Get connector type for the deployment. + + Returns the type string from the deployment config (e.g., 'local', 'docker', 'ssh'). + """ + from streamflow.deployment.manager import DefaultDeploymentManager + + manager = cast(DefaultDeploymentManager, context.deployment_manager) + config = manager.config_map.get(location.deployment) + return config.type if config else "unknown" + + +@pytest.fixture(scope="function") +def benchmark_tmpdir(location: ExecutionLocation) -> str: + """ + Create isolated temp directory for each benchmark. + + For local deployments, uses /tmp/streamflow-benchmark. + For remote deployments, uses /tmp directly since the container + might not have /tmp/streamflow-benchmark. + """ + if location.local: + tmpdir = os.path.join(tempfile.gettempdir(), "streamflow-benchmark") + os.makedirs(tmpdir, exist_ok=True) + return tmpdir + else: + return "/tmp" + + +@pytest_asyncio.fixture(scope="session") +async def context( + request: pytest.FixtureRequest, +) -> AsyncGenerator[StreamFlowContext, Any]: + """Create StreamFlowContext for benchmarks (replicates tests/conftest.py).""" + _context = build_context( + { + "database": {"type": "default", "config": {"connection": ":memory:"}}, + "path": os.getcwd(), + }, + ) + # Determine which deployments to deploy based on flags + deploys = request.config.getoption("--deploys") + local_flag = request.config.getoption("--local") + remote_flag = request.config.getoption("--remote") + + if local_flag: + deployments = ["local"] + elif remote_flag: + deployments = [d for d in all_deployment_types() if d != "local"] + elif deploys is not None: + deployments = deploys + else: + deployments = all_deployment_types() + + for deployment_t in deployments: + with suppress(Exception): + config = await get_deployment_config(_context, deployment_t) + await _context.deployment_manager.deploy(config) + + yield _context + await _context.deployment_manager.undeploy_all() + await _context.close() diff --git a/benchmark/test_remotepath.py b/benchmark/test_remotepath.py new file mode 100644 index 000000000..118b194e7 --- /dev/null +++ b/benchmark/test_remotepath.py @@ -0,0 +1,548 @@ +from __future__ import annotations + +import pytest + +from benchmark.utils import ( + FILE_SIZE_BYTES, + benchmark_async, + create_directory_structure, + create_test_file, +) +from streamflow.core.context import StreamFlowContext +from streamflow.core.deployment import ExecutionLocation +from streamflow.core.utils import random_name +from streamflow.data.remotepath import StreamFlowPath + +# ============================================================================ +# FILE QUERY OPERATIONS +# ============================================================================ + + +def test_exists( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.exists() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.exists() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_dir( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_dir() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await path.mkdir(parents=True, exist_ok=True) + + async def target(): + return await path.is_dir() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_file( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_file() operation.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.is_file() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_is_symlink( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.is_symlink() operation.""" + src = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + link = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(src) + await link.symlink_to(src) + + async def target(): + return await link.is_symlink() + + async def teardown(): + await link.rmtree() + await src.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is True + + +def test_size( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.size() on 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.size() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) == FILE_SIZE_BYTES + + +# ============================================================================ +# I/O OPERATIONS +# ============================================================================ + + +def test_checksum_small( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.checksum() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path, 2**10) + + async def target(): + return await path.checksum() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_checksum_large( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.checksum() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.checksum() + + async def teardown(): + await path.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_read_text( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.read_text() with 100KB file.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(path) + + async def target(): + return await path.read_text() + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) == FILE_SIZE_BYTES + + +def test_write_text( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.write_text() with 100KB file.""" + from benchmark.utils import generate_file_content + + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + # Generate content once (not benchmarked) + content = generate_file_content(FILE_SIZE_BYTES) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique file for each iteration + path = root_dir / random_name() + await path.write_text(content) + + async def teardown(): + # Cleanup all files at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +# ============================================================================ +# FILESYSTEM OPERATIONS +# ============================================================================ + + +def test_hardlink_to( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.hardlink_to() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + src = root_dir / "source_file" + + async def setup(): + await root_dir.mkdir() + await create_test_file(src) + + async def target(): + # Create unique link for each iteration + link = root_dir / random_name() + await link.hardlink_to(src) + + async def teardown(): + # Cleanup all files at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_mkdir( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.mkdir() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique directory for each iteration + path = root_dir / random_name() + await path.mkdir(mode=0o777) + + async def teardown(): + # Cleanup all directories at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_mkdir_parents( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.mkdir(parents=True) operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await root_dir.mkdir() + + async def target(): + # Create unique nested path for each iteration + path = root_dir / random_name() / "nested" / "deep" / "directory" + await path.mkdir(mode=0o777, parents=True) + + async def teardown(): + # Cleanup all nested directories at once + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_resolve( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.resolve() on symbolic link.""" + src = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + link = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_test_file(src) + await link.symlink_to(src) + + async def target(): + return await link.resolve() + + async def teardown(): + await link.rmtree() + await src.rmtree() + + assert benchmark_async(benchmark, target, setup, teardown) is not None + + +def test_rmtree( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.rmtree() on 10 files in 3 dirs.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + await path.rmtree() + + async def teardown(): + # Verify deletion succeeded + assert not await path.exists() + + benchmark_async(benchmark, target, setup, teardown) + + +def test_symlink_to( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.symlink_to() operation.""" + # Create root directory for all iterations + root_dir = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + src = root_dir / "source_file" + + async def setup(): + await root_dir.mkdir() + await create_test_file(src) + + async def target(): + # Create unique link for each iteration inside root_dir + link = root_dir / random_name() + await link.symlink_to(src) + + async def teardown(): + # Cleanup all links and source + await root_dir.rmtree() + + benchmark_async(benchmark, target, setup, teardown) + + +# ============================================================================ +# DIRECTORY TRAVERSAL +# ============================================================================ + + +@pytest.mark.parametrize("pattern", ["**/*.txt", "*", "**/*"]) +def test_glob( + benchmark, + pattern: str, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.glob(pattern) iteration with different patterns.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + results = [] + async for p in path.glob(pattern): + results.append(p) + return results + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) > 0 + + +def test_walk( + benchmark, + location: ExecutionLocation, + connector_type: str, + context: StreamFlowContext, + benchmark_tmpdir: str, +) -> None: + """Benchmark path.walk() iteration on 10 files in 3 dirs.""" + path = StreamFlowPath( + benchmark_tmpdir, + random_name(), + context=context, + location=location, + ) + + async def setup(): + await create_directory_structure(path) + + async def target(): + results = [] + async for dirpath, dirnames, filenames in path.walk(): + results.append((dirpath, dirnames, filenames)) + return results + + async def teardown(): + await path.rmtree() + + assert len(benchmark_async(benchmark, target, setup, teardown)) > 0 diff --git a/benchmark/utils.py b/benchmark/utils.py new file mode 100644 index 000000000..b9773c69b --- /dev/null +++ b/benchmark/utils.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import asyncio +import random +import string +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Callable + + from streamflow.data.remotepath import StreamFlowPath + +# Constants for small/fast test scenario +FILE_SIZE_BYTES = 102400 # 100 KB +NUM_FILES = 10 +NUM_SUBDIRS = 3 +DIR_DEPTH = 2 + + +def benchmark_async( + benchmark: Any, + target_func: Callable, + setup_func: Callable | None = None, + teardown_func: Callable | None = None, +) -> Any: + """ + Wrapper to run async functions in pytest-benchmark with proper setup/teardown. + + pytest-benchmark doesn't natively support async functions, + so we need to run them in the event loop. This wrapper ensures that: + - setup_func runs once before benchmarking + - target_func is benchmarked (repeated iterations) + - teardown_func runs once after benchmarking + + Note: setup and teardown happen outside the benchmark timing. + + :param benchmark: pytest-benchmark fixture + :param target_func: Async function to benchmark (called multiple times) + :param setup_func: Optional async setup function (called once before timing) + :param teardown_func: Optional async teardown function (called once after timing) + :return: Result of the last target_func call + """ + loop = asyncio.get_event_loop() + last_result = None + + # Run setup once before benchmarking + if setup_func: + loop.run_until_complete(setup_func()) + + # Define the sync wrapper for the benchmark target + def sync_wrapper() -> Any: + nonlocal last_result + last_result = loop.run_until_complete(target_func()) + return last_result + + # Run the benchmark with configured iterations and rounds for statistical significance + benchmark.pedantic(sync_wrapper, iterations=100, rounds=10, warmup_rounds=1) + + # Run teardown once after benchmarking + if teardown_func: + loop.run_until_complete(teardown_func()) + + return last_result + + +def calculate_overhead_percent(baseline_time: float, measured_time: float) -> float: + """ + Calculate overhead percentage compared to baseline. + + :param baseline_time: Baseline time (usually local deployment) + :param measured_time: Measured time for the operation + :return: Overhead percentage + """ + if baseline_time <= 0: + return 0 + return ((measured_time - baseline_time) / baseline_time) * 100 + + +def calculate_throughput_items_per_sec(num_items: int, time_seconds: float) -> float: + """ + Calculate items/s throughput. + + :param num_items: Number of items processed + :param time_seconds: Time taken in seconds + :return: Throughput in items/s + """ + return num_items / time_seconds if time_seconds > 0 else 0 + + +def calculate_throughput_mb_per_sec(size_bytes: int, time_seconds: float) -> float: + """ + Calculate MB/s throughput. + + :param size_bytes: Size of data processed + :param time_seconds: Time taken in seconds + :return: Throughput in MB/s + """ + mb = size_bytes / (1024 * 1024) + return mb / time_seconds if time_seconds > 0 else 0 + + +async def create_directory_structure( + base_path: StreamFlowPath, + num_files: int = NUM_FILES, + num_subdirs: int = NUM_SUBDIRS, +) -> dict[str, Any]: + """ + Create directory with files distributed across subdirectories. + + Structure: 10 files in 3 subdirectories (3-4 files per subdir) + test_dir/ + subdir_0/ + file_0.txt + file_1.txt + file_2.txt + subdir_1/ + file_3.txt + ... + + :param base_path: Base directory path + :param num_files: Total number of files to create + :param num_subdirs: Number of subdirectories + :return: Metadata dict with num_files, num_dirs, total_size_bytes, file_paths + """ + await base_path.mkdir(parents=True, exist_ok=True) + + file_paths = [] + files_per_dir = num_files // num_subdirs + + for i in range(num_subdirs): + subdir = base_path / f"subdir_{i}" + await subdir.mkdir(parents=True, exist_ok=True) + + for j in range(files_per_dir): + file_path = subdir / f"file_{i * files_per_dir + j}.txt" + await create_test_file(file_path) + file_paths.append(file_path) + + return { + "num_files": num_files, + "num_dirs": num_subdirs, + "total_size_bytes": num_files * FILE_SIZE_BYTES, + "file_paths": file_paths, + } + + +async def create_test_file( + path: StreamFlowPath, size_bytes: int = FILE_SIZE_BYTES +) -> None: + """ + Create a test file with specified size. + + :param path: Path where to create the file + :param size_bytes: Size of file content in bytes + """ + content = generate_file_content(size_bytes) + await path.write_text(content) + + +def generate_file_content(size_bytes: int = FILE_SIZE_BYTES) -> str: + """ + Generate random ASCII text content of specified size. + + :param size_bytes: Size of content to generate in bytes + :return: Random text string + """ + return "".join( + random.choices(string.ascii_letters + string.digits + "\n", k=size_bytes) + ) diff --git a/pyproject.toml b/pyproject.toml index c30c0d025..1db996cc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,6 +95,7 @@ test = [ "cwltool==3.1.20260315121657", "pytest==9.0.2", "pytest-asyncio==1.3.0", + "pytest-benchmark==5.2.1", "pytest-cov==7.1.0", "pytest-xdist==3.8.0" ] diff --git a/tox.ini b/tox.ini index b1efaf991..f5158045a 100644 --- a/tox.ini +++ b/tox.ini @@ -2,6 +2,7 @@ envlist = bandit lint + py3.{10,11,12,13,14}-benchmark py3.{10,11,12,13,14}-unit skip_missing_interpreters = True @@ -9,22 +10,28 @@ skip_missing_interpreters = True asyncio_default_fixture_loop_scope = session asyncio_default_test_loop_scope = session asyncio_mode = strict -testpaths = tests +testpaths = + benchmark + tests [testenv] allowlist_externals = make commands = + py3.{10,11,12,13,14}-benchmark: make benchmark py3.{10,11,12,13,14}-unit: make coverage-report coverage.xml PYTEST_EXTRA={posargs} dependency_groups = + py3.{10,11,12,13,14}-benchmark: test py3.{10,11,12,13,14}-unit: test extras = report description = + py3.{10,11,12,13,14}-benchmark: Run the StreamFlow benchmarks py3.{10,11,12,13,14}-unit: Run the unit tests passenv = CI GITHUB_* runner = uv-venv-lock-runner setenv = + py3.{10,11,12,13,14}-benchmark: LC_ALL = C.UTF-8 py3.{10,11,12,13,14}-unit: LC_ALL = C.UTF-8 uv_sync_flags = --no-dev diff --git a/uv.lock b/uv.lock index a44299453..8b8b9c371 100644 --- a/uv.lock +++ b/uv.lock @@ -2474,6 +2474,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, ] +[[package]] +name = "py-cpuinfo" +version = "9.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/37/a8/d832f7293ebb21690860d2e01d8115e5ff6f2ae8bbdc953f0eb0fa4bd2c7/py-cpuinfo-9.0.0.tar.gz", hash = "sha256:3cdbbf3fac90dc6f118bfd64384f309edeadd902d7c8fb17f02ffa1fc3f49690", size = 104716, upload-time = "2022-10-25T20:38:06.303Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335, upload-time = "2022-10-25T20:38:27.636Z" }, +] + [[package]] name = "pycodestyle" version = "2.14.0" @@ -2572,6 +2581,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, ] +[[package]] +name = "pytest-benchmark" +version = "5.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "py-cpuinfo" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b8/fd/237def5e4b775a6a1a98c5f6a5b01f6c75d1f3af740b3665b7f538aea14b/pytest_benchmark-5.2.1.tar.gz", hash = "sha256:56dc1455bda7ccb540aa671c496dafc8187d2769f278e5f137689476805b6f9d", size = 339278, upload-time = "2025-11-04T14:51:33.543Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/9b/75/02cb44852083794606c6b1b3f21df517c8bcf4ad1e88c6e0c5b8adb98da3/pytest_benchmark-5.2.1-py3-none-any.whl", hash = "sha256:a6e18fe0df2155e9d993db6ba03bdf85324794035ad986553787024ca59e8db9", size = 44727, upload-time = "2025-11-04T14:51:31.532Z" }, +] + [[package]] name = "pytest-cov" version = "7.1.0" @@ -3348,6 +3370,7 @@ dev = [ { name = "pandas-stubs" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-benchmark" }, { name = "pytest-cov" }, { name = "pytest-xdist" }, { name = "pyupgrade" }, @@ -3379,6 +3402,7 @@ test = [ { name = "cwltool" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-benchmark" }, { name = "pytest-cov" }, { name = "pytest-xdist" }, ] @@ -3431,6 +3455,7 @@ dev = [ { name = "pandas-stubs", specifier = "==2.3.3.260113" }, { name = "pytest", specifier = "==9.0.2" }, { name = "pytest-asyncio", specifier = "==1.3.0" }, + { name = "pytest-benchmark", specifier = "==5.2.1" }, { name = "pytest-cov", specifier = "==7.1.0" }, { name = "pytest-xdist", specifier = "==3.8.0" }, { name = "pyupgrade", specifier = "==3.21.2" }, @@ -3462,6 +3487,7 @@ test = [ { name = "cwltool", specifier = "==3.1.20260315121657" }, { name = "pytest", specifier = "==9.0.2" }, { name = "pytest-asyncio", specifier = "==1.3.0" }, + { name = "pytest-benchmark", specifier = "==5.2.1" }, { name = "pytest-cov", specifier = "==7.1.0" }, { name = "pytest-xdist", specifier = "==3.8.0" }, ]