Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/runPytest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Run Pytest

on:
push:
branches:
- "**"
pull_request:
branches:
- "**"

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
# python-version: ["3.11", "3.12", "3.13", "3.14"]
python-version: ["3.14"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install -e .[test]

- name: Run tests
run: |
pytest tests/ -v

# pytest tests/ -v --cov-report xml --cov=NOCAT
# - name: Upload coverage to Codecov
# uses: codecov/codecov-action@v4
# with:
# fail_ci_if_error: false
# flags: pytest
# files: ./coverage.xml
# token: ${{ secrets.CODECOV_TOKEN }} # required
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ exclude = [
verbose = 1
quiet = false
color = true

[project.optional-dependencies]
test = [
"pytest",
"pytest-cov",
]
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ geopandas
pvlib
tqdm
glidertools
ipykernel
ipykernel
pytest
139 changes: 126 additions & 13 deletions src/toolbox/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Pipeline Class"""
"""Pipeline class definition to handle configuration and step execution."""

import yaml
import pandas as pd
Expand All @@ -34,9 +34,24 @@
)

_PIPELINE_LOGGER_NAME = "toolbox.pipeline"
"""Global logger name for the pipeline. Used to create child loggers for steps."""

def _setup_logging(log_file=None, level=logging.INFO):
"""Set up logging for the entire pipeline."""
"""
Set up logging for the entire pipeline.

Parameters
----------
log_file : str, optional
Path to the log file. If provided, logs will be written to this file.
level : int, optional
Logging level (e.g., logging.INFO, logging.DEBUG).

Returns
-------
logging.Logger
Configured logger instance.
"""
logger = logging.getLogger(_PIPELINE_LOGGER_NAME)
logger.setLevel(level)
logger.propagate = False
Expand Down Expand Up @@ -69,14 +84,29 @@ def _setup_logging(log_file=None, level=logging.INFO):

class Pipeline(ConfigMirrorMixin):
"""
Pipeline that manages a sequence of processing steps.

Config-aware pipeline that can:
- Load config YAML into private self._parameters
- Keep global_parameters mirrored to _parameters['pipeline']
- Build, run, and export steps as before

Parameters
----------
ConfigMirrorMixin : Class
Class to handle configuration

"""

def __init__(self, config_path=None):
"""Initialize pipeline with optional config file"""
"""
Initialize pipeline with optional config file.

Parameters
----------
config_path : str, optional
Path to the YAML configuration file.
"""
self.steps = [] # hierarchical step configs
self.graph = Digraph("Pipeline", format="png", graph_attr={"rankdir": "TB"})
self.global_parameters = {} # mirrors _parameters["pipeline"]
Expand All @@ -95,7 +125,16 @@ def __init__(self, config_path=None):
self.logger.info("Pipeline initialised")

def build_steps(self, steps_config, parent_name=None):
"""Recursively build steps from configuration"""
"""
Recursively build steps from configuration.

Parameters
----------
steps_config : list of dict
List of step configurations.
parent_name : str, optional
Name of the parent step, if any.
"""
for step in steps_config:
REQUIRED_STEPS = STEP_DEPENDENCIES.get(step["name"], [])
for required_step in REQUIRED_STEPS:
Expand All @@ -121,7 +160,27 @@ def add_step(
parent_name=None,
run_immediately=False,
):
"""Dynamically adds a step and optionally runs it immediately"""
"""
Dynamically adds a step and optionally runs it immediately.

Parameters
----------
step_name : str
Name of the step to add.
parameters : dict, optional
Parameters for the step.
diagnostics : bool, optional
Whether to enable diagnostics for this step.
parent_name : str, optional
Name of the parent step, if any.
run_immediately : bool, optional
Whether to run the step immediately after adding it.

Raises
------
ValueError
If the step name is not recognized or a specified parent step is not found.
"""
if step_name not in STEP_CLASSES:
raise ValueError(
f"Step '{step_name}' is not recognized or missing @register_step."
Expand Down Expand Up @@ -150,7 +209,16 @@ def add_step(
self._context = self.execute_step(step_config, self._context)

def _find_step(self, steps_list, step_name):
"""Recursively find a step by name"""
"""
Recursively find a step by name.

Parameters
----------
steps_list : list of dict
List of step configurations.
step_name : str
Name of the step to find.
"""
for step in steps_list:
if step["name"] == step_name:
return step
Expand All @@ -160,13 +228,24 @@ def _find_step(self, steps_list, step_name):
return None

def execute_step(self, step_config, _context):
"""Executes a single step"""
"""
Executes a single step.

Parameters
----------
step_config : dict
Configuration for the step to execute.
_context : dict
Current context to pass to the step.
"""
step = create_step(step_config, _context)
self.logger.info(f"Executing: {step.name}")
return step.run()

def run_last_step(self):
"""Runs only the most recently added step"""
"""
Runs only the most recently added step based on the index in self.steps.
"""
if not self.steps:
self.logger.info("No steps to run.")
return
Expand All @@ -175,18 +254,26 @@ def run_last_step(self):
self._context = self.execute_step(last_step, self._context)

def run(self):
"""Runs the entire pipeline"""
"""
Runs the entire pipeline.

If visualisation is specified in the configuration parameters, a visualisation
of the pipeline execution will be generated.
"""
for step in self.steps:
self._context = self.execute_step(step, self._context)

if self.global_parameters.get("visualisation", False):
self.visualise_pipeline()

def visualise_pipeline(self):
"""Generates a visualisation of the pipeline execution"""
"""
Generates a visualisation of the pipeline execution.
"""
self.graph.clear()

def add_to_graph(step_config, parent_name=None, step_order=None):
"""Add a step to the graph, intended for recursive use."""
step_name = step_config["name"]
diagnostics = step_config.get("diagnostics", False)
color = "red" if diagnostics else "black"
Expand All @@ -213,7 +300,14 @@ def add_to_graph(step_config, parent_name=None, step_order=None):
self.graph.render("pipeline_visualisation", view=True)

def generate_config(self):
"""Generate a configuration dictionary from the current pipeline setup"""
"""
Generate a configuration dictionary from the current pipeline setup.

returns
-------
dict
Configuration dictionary of the current pipeline.
"""
cfg = {
"pipeline": self.global_parameters,
"steps": self.steps,
Expand All @@ -223,15 +317,34 @@ def generate_config(self):
return cfg

def export_config(self, output_path="generated_pipeline.yaml"):
"""Write current config to file (respects private _parameters)"""
"""
Write current config to file (respects private _parameters)

parameters
----------
output_path : str
Path to save the exported configuration YAML file.

returns
-------
dict
Configuration dictionary of the current pipeline.
"""
cfg = self.generate_config()
with open(output_path, "w") as f:
yaml.safe_dump(cfg, f, sort_keys=False)
self.logger.info(f"Pipeline config exported → {output_path}")
return cfg

def save_config(self, path="pipeline_config.yaml"):
"""Save the canonical private config (same as manager.save_config)."""
"""
Save the canonical private config (same as manager.save_config).

parameters
----------
path : str
Path to save the exported configuration YAML file.
"""
# ensure _parameters is up to date
self._parameters.update(self.generate_config())
super().save_config(path)
8 changes: 7 additions & 1 deletion src/toolbox/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@

# Global registries
STEP_CLASSES = {}
"""Dictionary mapping step names to their implementing classes."""
QC_CLASSES = {}
"""Dictionary mapping QC test names to their implementing classes."""
STEP_DEPENDENCIES = {
"QC: Salinity": ["Load OG1"],
}
"""Dictionary of explicit dependencies between steps by name."""


def discover_steps():
"""Dynamically discover and import step modules from the custom directory."""
"""
Dynamically discover and import step modules from the custom directory.
This populates the global STEP_CLASSES and QC_CLASSES registries for use elsewhere.
"""
base_dir = pathlib.Path(__file__).parent.resolve()
custom_dir = base_dir / "custom"
print(f"[Discovery] Scanning for step modules in {custom_dir}")
Expand Down
3 changes: 2 additions & 1 deletion src/toolbox/steps/base_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module defines the base class for pipeline steps and configurations."""

from toolbox.utils.config_mirror import ConfigMirrorMixin
import warnings
Expand All @@ -21,8 +22,8 @@

warnings.formatwarning = lambda msg, *args, **kwargs: f"{msg}\n"

# Registry of explicitly registered step classes
REGISTERED_STEPS = {}
"""Registry of explicitly registered step classes."""


def register_step(cls):
Expand Down
Loading
Loading