From 719e207329db9199abd199fcb8462629adddd6fa Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 13 Oct 2025 17:36:35 +0000 Subject: [PATCH 1/2] chore: refactor code to use concerns and contracts --- src/_lib/__init__.py | 34 ----- src/_lib/concerns/base_pipeline.py | 18 +++ src/_lib/concerns/processable_pipeline.py | 95 ++++++++++++ src/_lib/concerns/stageable_pipeline.py | 72 +++++++++ src/_lib/contracts/base_pipeline.py | 11 ++ src/_lib/contracts/pipeline.py | 15 ++ src/_lib/contracts/pipeline_factory.py | 23 +++ src/_lib/contracts/processable_pipeline.py | 36 +++++ .../processor.py} | 8 +- .../stage_interface.py => contracts/stage.py} | 4 +- src/_lib/contracts/stageable_pipeline.py | 30 ++++ src/_lib/pipeline.py | 19 +++ src/_lib/pipeline/__init__.py | 36 ----- src/_lib/pipeline/pipeline.py | 140 ------------------ .../pipeline/pipeline_factory_interface.py | 23 --- src/_lib/pipeline/pipeline_interface.py | 46 ------ src/_lib/pipeline/processors/__init__.py | 0 .../pipeline/processors/chained_processor.py | 41 ----- .../processors/chained_processor_interface.py | 22 --- src/_lib/pipeline/traits/__init__.py | 0 src/_lib/{pipeline => }/pipeline_factory.py | 28 ++-- src/_lib/{pipeline => }/processor.py | 10 +- src/_lib/processors/__init__.py | 13 -- .../processors/chained_processor/__init__.py | 7 - .../chained_processor/chained_pipeline.py | 8 +- .../chained_processor/chained_processor.py | 52 ++++--- .../interruptible_processor/__init__.py | 7 - .../interruptible_pipeline.py | 3 +- .../interruptible_processor.py | 7 +- src/_lib/{pipeline => }/stage.py | 4 +- .../act_as_factory/__init__.py | 0 .../act_as_factory/act_as_factory.py | 0 .../act_as_factory_interface.py | 0 .../traits => support}/clonable/__init__.py | 0 .../traits => support}/clonable/clonable.py | 0 .../clonable/clonable_interface.py | 0 src/_lib/{pipeline => types}/callable_type.py | 0 src/_lib/{pipeline => types}/types.py | 0 src/thecodecrate_pipeline/__init__.py | 16 +- .../processors/__init__.py | 25 +++- src/thecodecrate_pipeline/types/__init__.py | 15 +- tests/test_pipeline.py | 26 ++-- 42 files changed, 437 insertions(+), 457 deletions(-) create mode 100644 src/_lib/concerns/base_pipeline.py create mode 100644 src/_lib/concerns/processable_pipeline.py create mode 100644 src/_lib/concerns/stageable_pipeline.py create mode 100644 src/_lib/contracts/base_pipeline.py create mode 100644 src/_lib/contracts/pipeline.py create mode 100644 src/_lib/contracts/pipeline_factory.py create mode 100644 src/_lib/contracts/processable_pipeline.py rename src/_lib/{pipeline/processor_interface.py => contracts/processor.py} (72%) rename src/_lib/{pipeline/stage_interface.py => contracts/stage.py} (84%) create mode 100644 src/_lib/contracts/stageable_pipeline.py create mode 100644 src/_lib/pipeline.py delete mode 100644 src/_lib/pipeline/__init__.py delete mode 100644 src/_lib/pipeline/pipeline.py delete mode 100644 src/_lib/pipeline/pipeline_factory_interface.py delete mode 100644 src/_lib/pipeline/pipeline_interface.py delete mode 100644 src/_lib/pipeline/processors/__init__.py delete mode 100644 src/_lib/pipeline/processors/chained_processor.py delete mode 100644 src/_lib/pipeline/processors/chained_processor_interface.py delete mode 100644 src/_lib/pipeline/traits/__init__.py rename src/_lib/{pipeline => }/pipeline_factory.py (65%) rename src/_lib/{pipeline => }/processor.py (77%) rename src/_lib/{pipeline => }/stage.py (78%) rename src/_lib/{pipeline/traits => support}/act_as_factory/__init__.py (100%) rename src/_lib/{pipeline/traits => support}/act_as_factory/act_as_factory.py (100%) rename src/_lib/{pipeline/traits => support}/act_as_factory/act_as_factory_interface.py (100%) rename src/_lib/{pipeline/traits => support}/clonable/__init__.py (100%) rename src/_lib/{pipeline/traits => support}/clonable/clonable.py (100%) rename src/_lib/{pipeline/traits => support}/clonable/clonable_interface.py (100%) rename src/_lib/{pipeline => types}/callable_type.py (100%) rename src/_lib/{pipeline => types}/types.py (100%) diff --git a/src/_lib/__init__.py b/src/_lib/__init__.py index af488b1..e69de29 100644 --- a/src/_lib/__init__.py +++ b/src/_lib/__init__.py @@ -1,34 +0,0 @@ -# Version of the package -# DO NOT MODIFY MANUALLY -# This will be updated by `bumpver` command. -# - Make sure to commit all changes first before running `bumpver`. -# - Run `bumpver update --[minor|major|patch]` -__version__ = "1.26.0" - -# Re-exporting symbols -from .pipeline import CallableCollection as CallableCollection -from .pipeline import CallableType as CallableType -from .pipeline import Pipeline as Pipeline -from .pipeline import PipelineFactory as PipelineFactory -from .pipeline import PipelineFactoryInterface as PipelineFactoryInterface -from .pipeline import PipelineInterface as PipelineInterface -from .pipeline import Processor as Processor -from .pipeline import ProcessorInterface as ProcessorInterface -from .pipeline import Stage as Stage -from .pipeline import StageDefinition as StageDefinition -from .pipeline import StageDefinitionCollection as StageDefinitionCollection -from .pipeline import StageInterface as StageInterface -from .pipeline import T_in as T_in -from .pipeline import T_out as T_out -from .pipeline import __all__ as _pipeline_all -from .processors import ChainedPipeline as ChainedPipeline -from .processors import ChainedProcessor as ChainedProcessor -from .processors import InterruptiblePipeline as InterruptiblePipeline -from .processors import InterruptibleProcessor as InterruptibleProcessor -from .processors import __all__ as _processor_all - -# pyright: reportUnsupportedDunderAll=false -__all__ = ( - *_pipeline_all, - *_processor_all, -) diff --git a/src/_lib/concerns/base_pipeline.py b/src/_lib/concerns/base_pipeline.py new file mode 100644 index 0000000..0075dca --- /dev/null +++ b/src/_lib/concerns/base_pipeline.py @@ -0,0 +1,18 @@ +from abc import ABC +from typing import Any, Generic + +from ..support.clonable import Clonable +from ..types.types import T_in, T_out + + +class BasePipeline( + Clonable, + Generic[T_in, T_out], + ABC, +): + def __init__( + self, + *args: Any, + **kwds: Any, + ) -> None: + pass diff --git a/src/_lib/concerns/processable_pipeline.py b/src/_lib/concerns/processable_pipeline.py new file mode 100644 index 0000000..2c1bff2 --- /dev/null +++ b/src/_lib/concerns/processable_pipeline.py @@ -0,0 +1,95 @@ +from abc import ABC +from typing import Any, Optional, Self + +from ..contracts.processable_pipeline import ProcessablePipeline as ImplementsInterface +from ..contracts.processor import Processor as ProcessorContract +from ..processors.chained_processor.chained_processor import ChainedProcessor +from ..types.types import T_in, T_out + + +class ProcessablePipeline( + ImplementsInterface[T_in, T_out], + ABC, +): + processor: Optional[type[ProcessorContract] | ProcessorContract] + processor_instance: Optional[ProcessorContract] + + def __init__( + self, + processor: Optional[type[ProcessorContract] | ProcessorContract] = None, + processor_instance: Optional[ProcessorContract] = None, + *args: Any, + **kwds: Any, + ) -> None: + super().__init__(*args, **kwds) # type: ignore + + if not hasattr(self, "processor"): + self.processor = self._get_default_processor() + + if not hasattr(self, "processor_instance"): + self.processor_instance = None + + if processor: + self.processor = processor + + if processor_instance: + self.processor_instance = processor_instance + + if self._should_instantiate_processor(): + self._instantiate_processor() + + async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: + """ + Process the given payload through the pipeline. + """ + if self.processor_instance is None: + raise ValueError("Processor not set") + + return await self.processor_instance.process( + payload=payload, stages=self.get_stages_instances(), *args, **kwds + ) + + async def __call__( + self, + payload: T_in, + /, # Make 'payload' a positional-only parameter + *args: Any, + **kwds: Any, + ) -> T_out: + """ + Processes payload through the pipeline. + """ + return await self.process(payload, *args, **kwds) + + def with_processor( + self, processor: type[ProcessorContract] | ProcessorContract + ) -> Self: + """ + Attachs a processor (class or instance) to the pipeline. + """ + cloned = self.clone({"processor": processor, "processor_instance": None}) + + return cloned._instantiate_processor() + + def get_processor_instance(self) -> Optional[ProcessorContract]: + return self.processor_instance + + def _get_default_processor(self) -> type[ChainedProcessor[T_in, T_out]]: + return ChainedProcessor + + def _should_instantiate_processor(self) -> bool: + return self.processor_instance is None + + def _instantiate_processor(self) -> Self: + if self.processor is None: + raise ValueError("Processor class not set") + + if isinstance(self.processor, type): + self.processor_instance = self.processor() + else: + self.processor_instance = self.processor + + if isinstance(self.processor_instance, type): + raise ValueError("Processor instance could not be created") + + return self diff --git a/src/_lib/concerns/stageable_pipeline.py b/src/_lib/concerns/stageable_pipeline.py new file mode 100644 index 0000000..0018982 --- /dev/null +++ b/src/_lib/concerns/stageable_pipeline.py @@ -0,0 +1,72 @@ +from abc import ABC +from typing import Any, Optional, Self + +from ..contracts.stageable_pipeline import StageablePipeline as ImplementsInterface +from ..types.callable_type import ( + CallableCollection, + CallableType, + StageDefinitionCollection, +) +from ..types.types import T_in, T_out + + +class StageablePipeline( + ImplementsInterface[T_in, T_out], + ABC, +): + stages: StageDefinitionCollection + stages_instances: CallableCollection + + def __init__( + self, + stages: Optional[StageDefinitionCollection] = None, + stages_instances: Optional[CallableCollection] = None, + *args: Any, + **kwds: Any, + ) -> None: + super().__init__(*args, **kwds) # type: ignore + + if not hasattr(self, "stages"): + self.stages = tuple() + + if not hasattr(self, "stages_instances"): + self.stages_instances = tuple() + + if stages: + self.stages = stages + + if stages_instances: + self.stages_instances = stages_instances + + if self._should_instantiate_stages(): + self._instantiate_stages() + + def pipe(self, stage: CallableType) -> Self: + """ + Adds a single stage to the pipeline. + """ + return self.clone({"stages_instances": tuple([*self.stages_instances, stage])}) + + def with_stages(self, stages: StageDefinitionCollection) -> Self: + """ + Adds a collection of stages to the pipeline. + """ + cloned = self.clone({"stages": stages, "stages_instances": []}) + + return cloned._instantiate_stages() + + def get_stages(self) -> StageDefinitionCollection: + return self.stages + + def get_stages_instances(self) -> CallableCollection: + return self.stages_instances + + def _should_instantiate_stages(self) -> bool: + return len(self.stages_instances) == 0 and len(self.stages) > 0 + + def _instantiate_stages(self) -> Self: + self.stages_instances = tuple( + stage() if isinstance(stage, type) else stage for stage in self.stages + ) + + return self diff --git a/src/_lib/contracts/base_pipeline.py b/src/_lib/contracts/base_pipeline.py new file mode 100644 index 0000000..bb5428b --- /dev/null +++ b/src/_lib/contracts/base_pipeline.py @@ -0,0 +1,11 @@ +from typing import Protocol + +from ..support.clonable import ClonableInterface +from ..types.types import T_in, T_out + + +class BasePipeline( + ClonableInterface, + Protocol[T_in, T_out], +): + pass diff --git a/src/_lib/contracts/pipeline.py b/src/_lib/contracts/pipeline.py new file mode 100644 index 0000000..3b3d16c --- /dev/null +++ b/src/_lib/contracts/pipeline.py @@ -0,0 +1,15 @@ +from typing import Protocol + +from ..types.types import T_in, T_out +from .base_pipeline import BasePipeline as BasePipelineContract +from .processable_pipeline import ProcessablePipeline as ProcessablePipelineContract +from .stageable_pipeline import StageablePipeline as StageablePipelineContract + + +class Pipeline( + ProcessablePipelineContract[T_in, T_out], + StageablePipelineContract[T_in, T_out], + BasePipelineContract[T_in, T_out], + Protocol[T_in, T_out], +): + pass diff --git a/src/_lib/contracts/pipeline_factory.py b/src/_lib/contracts/pipeline_factory.py new file mode 100644 index 0000000..b6bbb92 --- /dev/null +++ b/src/_lib/contracts/pipeline_factory.py @@ -0,0 +1,23 @@ +from typing import Any, Optional, Protocol, Self + +from ..support.act_as_factory.act_as_factory_interface import ActAsFactoryInterface +from ..types.callable_type import StageDefinition, StageDefinitionCollection +from ..types.types import T_in, T_out +from .pipeline import Pipeline as PipelineContract + + +class PipelineFactory( + ActAsFactoryInterface[PipelineContract[T_in, T_out]], + Protocol[T_in, T_out], +): + def __init__( + self, + stages: Optional[StageDefinitionCollection] = None, + pipeline_class: Optional[type[PipelineContract[T_in, T_out]]] = None, + *args: Any, + **kwds: Any, + ) -> None: ... + + def add_stage(self, stage: StageDefinition) -> Self: ... + + def with_stages(self, stages: StageDefinitionCollection) -> Self: ... diff --git a/src/_lib/contracts/processable_pipeline.py b/src/_lib/contracts/processable_pipeline.py new file mode 100644 index 0000000..4e7ad9c --- /dev/null +++ b/src/_lib/contracts/processable_pipeline.py @@ -0,0 +1,36 @@ +from typing import Any, Optional, Protocol, Self + +from ..types.types import T_in, T_out +from .base_pipeline import BasePipeline as BasePipelineContract +from .processor import Processor as ProcessorContract +from .stageable_pipeline import StageablePipeline as StageablePipelineContract + + +class ProcessablePipeline( + StageablePipelineContract[T_in, T_out], + BasePipelineContract[T_in, T_out], + Protocol[T_in, T_out], +): + def __init__( + self, + processor: Optional[type[ProcessorContract] | ProcessorContract] = None, + processor_instance: Optional[ProcessorContract] = None, + *args: Any, + **kwds: Any, + ) -> None: ... + + async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: ... + + async def __call__( + self, + payload: T_in, + /, # Make 'payload' a positional-only parameter + *args: Any, + **kwds: Any, + ) -> T_out: ... + + def with_processor( + self, processor: type[ProcessorContract] | ProcessorContract + ) -> Self: ... + + def get_processor_instance(self) -> Optional[ProcessorContract]: ... diff --git a/src/_lib/pipeline/processor_interface.py b/src/_lib/contracts/processor.py similarity index 72% rename from src/_lib/pipeline/processor_interface.py rename to src/_lib/contracts/processor.py index a71a401..cee2a32 100644 --- a/src/_lib/pipeline/processor_interface.py +++ b/src/_lib/contracts/processor.py @@ -1,12 +1,12 @@ from abc import abstractmethod from typing import Any, Awaitable, Protocol -from .callable_type import CallableCollection, CallableType -from .traits.clonable import ClonableInterface -from .types import T_in, T_out +from ..support.clonable.clonable_interface import ClonableInterface +from ..types.callable_type import CallableCollection, CallableType +from ..types.types import T_in, T_out -class ProcessorInterface( +class Processor( ClonableInterface, Protocol[T_in, T_out], ): diff --git a/src/_lib/pipeline/stage_interface.py b/src/_lib/contracts/stage.py similarity index 84% rename from src/_lib/pipeline/stage_interface.py rename to src/_lib/contracts/stage.py index eac9a9f..35212d3 100644 --- a/src/_lib/pipeline/stage_interface.py +++ b/src/_lib/contracts/stage.py @@ -1,10 +1,10 @@ from abc import abstractmethod from typing import Any, Protocol -from .types import T_in, T_out +from ..types.types import T_in, T_out -class StageInterface( +class Stage( Protocol[T_in, T_out], ): @abstractmethod diff --git a/src/_lib/contracts/stageable_pipeline.py b/src/_lib/contracts/stageable_pipeline.py new file mode 100644 index 0000000..3e19ab4 --- /dev/null +++ b/src/_lib/contracts/stageable_pipeline.py @@ -0,0 +1,30 @@ +from typing import Any, Optional, Protocol, Self + +from ..types.callable_type import ( + CallableCollection, + CallableType, + StageDefinitionCollection, +) +from ..types.types import T_in, T_out +from .base_pipeline import BasePipeline as BasePipelineContract + + +class StageablePipeline( + BasePipelineContract[T_in, T_out], + Protocol[T_in, T_out], +): + def __init__( + self, + stages: Optional[StageDefinitionCollection] = None, + stages_instances: Optional[CallableCollection] = None, + *args: Any, + **kwds: Any, + ) -> None: ... + + def pipe(self, stage: CallableType) -> Self: ... + + def with_stages(self, stages: StageDefinitionCollection) -> Self: ... + + def get_stages(self) -> StageDefinitionCollection: ... + + def get_stages_instances(self) -> CallableCollection: ... diff --git a/src/_lib/pipeline.py b/src/_lib/pipeline.py new file mode 100644 index 0000000..41345a5 --- /dev/null +++ b/src/_lib/pipeline.py @@ -0,0 +1,19 @@ +from typing import TypeVar + +from .concerns.base_pipeline import BasePipeline +from .concerns.processable_pipeline import ProcessablePipeline +from .concerns.stageable_pipeline import StageablePipeline +from .contracts.pipeline import Pipeline as ImplementsInterface +from .types.types import T_in, T_out + + +class Pipeline( + ProcessablePipeline[T_in, T_out], + StageablePipeline[T_in, T_out], + BasePipeline[T_in, T_out], + ImplementsInterface[T_in, T_out], +): + pass + + +TPipeline = TypeVar("TPipeline", bound=Pipeline, infer_variance=True) diff --git a/src/_lib/pipeline/__init__.py b/src/_lib/pipeline/__init__.py deleted file mode 100644 index ac6778e..0000000 --- a/src/_lib/pipeline/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -from .callable_type import ( - CallableCollection, - CallableType, - StageDefinition, - StageDefinitionCollection, -) -from .pipeline import Pipeline -from .pipeline_factory import PipelineFactory -from .pipeline_factory_interface import PipelineFactoryInterface -from .pipeline_interface import PipelineInterface -from .processor import Processor -from .processor_interface import ProcessorInterface -from .processors.chained_processor import ChainedProcessor -from .processors.chained_processor_interface import ChainedProcessorInterface -from .stage import Stage -from .stage_interface import StageInterface -from .types import T_in, T_out - -__all__ = ( - "Pipeline", - "PipelineInterface", - "Stage", - "StageInterface", - "Processor", - "ProcessorInterface", - "PipelineFactory", - "PipelineFactoryInterface", - "T_in", - "T_out", - "CallableType", - "CallableCollection", - "StageDefinition", - "StageDefinitionCollection", - "ChainedProcessor", - "ChainedProcessorInterface", -) diff --git a/src/_lib/pipeline/pipeline.py b/src/_lib/pipeline/pipeline.py deleted file mode 100644 index d418d46..0000000 --- a/src/_lib/pipeline/pipeline.py +++ /dev/null @@ -1,140 +0,0 @@ -from typing import Any, Optional, Self - -from .callable_type import CallableCollection, CallableType, StageDefinitionCollection -from .pipeline_interface import PipelineInterface as ImplementsInterface -from .processor_interface import ProcessorInterface -from .processors.chained_processor import ChainedProcessor -from .traits.clonable import Clonable -from .types import T_in, T_out - - -class Pipeline( - Clonable, - ImplementsInterface[T_in, T_out], -): - stages: StageDefinitionCollection - stage_instances: CallableCollection - processor: Optional[type[ProcessorInterface] | ProcessorInterface] - processor_instance: Optional[ProcessorInterface] - - def __init__( - self, - stages: Optional[StageDefinitionCollection] = None, - stage_instances: Optional[CallableCollection] = None, - processor: Optional[type[ProcessorInterface] | ProcessorInterface] = None, - processor_instance: Optional[ProcessorInterface] = None, - *args: Any, - **kwds: Any, - ) -> None: - if not hasattr(self, "stages"): - self.stages = tuple() - - if not hasattr(self, "stage_instances"): - self.stage_instances = tuple() - - if not hasattr(self, "processor"): - self.processor = self._get_default_processor() - - if not hasattr(self, "processor_instance"): - self.processor_instance = None - - if stages: - self.stages = stages - - if stage_instances: - self.stage_instances = stage_instances - - if self._should_instantiate_stages(): - self._instantiate_stages() - - if processor: - self.processor = processor - - if processor_instance: - self.processor_instance = processor_instance - - if self._should_instantiate_processor(): - self._instantiate_processor() - - async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: - """ - Process the given payload through the pipeline. - """ - if self.processor_instance is None: - raise ValueError("Processor not set") - - return await self.processor_instance.process( - payload=payload, stages=self.stage_instances, *args, **kwds - ) - - def pipe(self, stage: CallableType) -> Self: - """ - Adds a single stage to the pipeline. - """ - return self.clone({"stage_instances": tuple([*self.stage_instances, stage])}) - - async def __call__( - self, - payload: T_in, - /, # Make 'payload' a positional-only parameter - *args: Any, - **kwds: Any, - ) -> T_out: - """ - Processes payload through the pipeline. - """ - return await self.process(payload, *args, **kwds) - - def with_processor( - self, processor: type[ProcessorInterface] | ProcessorInterface - ) -> Self: - """ - Attachs a processor (class or instance) to the pipeline. - """ - cloned = self.clone({"processor": processor, "processor_instance": None}) - - return cloned._instantiate_processor() - - def with_stages(self, stages: StageDefinitionCollection) -> Self: - """ - Adds a collection of stages to the pipeline. - """ - cloned = self.clone({"stages": stages, "stage_instances": []}) - - return cloned._instantiate_stages() - - def get_processor_instance(self) -> Optional[ProcessorInterface]: - return self.processor_instance - - def get_stages(self) -> StageDefinitionCollection: - return self.stages - - def _get_default_processor(self) -> type[ChainedProcessor[T_in, T_out]]: - return ChainedProcessor - - def _should_instantiate_processor(self) -> bool: - return self.processor_instance is None - - def _instantiate_processor(self) -> Self: - if self.processor is None: - raise ValueError("Processor class not set") - - if isinstance(self.processor, type): - self.processor_instance = self.processor() - else: - self.processor_instance = self.processor - - if isinstance(self.processor_instance, type): - raise ValueError("Processor instance could not be created") - - return self - - def _should_instantiate_stages(self) -> bool: - return len(self.stage_instances) == 0 and len(self.stages) > 0 - - def _instantiate_stages(self) -> Self: - self.stage_instances = tuple( - stage() if isinstance(stage, type) else stage for stage in self.stages - ) - - return self diff --git a/src/_lib/pipeline/pipeline_factory_interface.py b/src/_lib/pipeline/pipeline_factory_interface.py deleted file mode 100644 index 6fc594e..0000000 --- a/src/_lib/pipeline/pipeline_factory_interface.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Any, Optional, Protocol, Self - -from .callable_type import StageDefinition, StageDefinitionCollection -from .pipeline_interface import PipelineInterface -from .traits.act_as_factory.act_as_factory_interface import ActAsFactoryInterface -from .types import T_in, T_out - - -class PipelineFactoryInterface( - ActAsFactoryInterface[PipelineInterface[T_in, T_out]], - Protocol[T_in, T_out], -): - def __init__( - self, - stages: Optional[StageDefinitionCollection] = None, - pipeline_class: Optional[type[PipelineInterface[T_in, T_out]]] = None, - *args: Any, - **kwds: Any, - ) -> None: ... - - def add_stage(self, stage: StageDefinition) -> Self: ... - - def with_stages(self, stages: StageDefinitionCollection) -> Self: ... diff --git a/src/_lib/pipeline/pipeline_interface.py b/src/_lib/pipeline/pipeline_interface.py deleted file mode 100644 index 977e599..0000000 --- a/src/_lib/pipeline/pipeline_interface.py +++ /dev/null @@ -1,46 +0,0 @@ -from typing import Any, Optional, Protocol, Self, TypeVar - -from .callable_type import CallableCollection, CallableType, StageDefinitionCollection -from .processor_interface import ProcessorInterface -from .traits.clonable import ClonableInterface -from .types import T_in, T_out - - -class PipelineInterface( - ClonableInterface, - Protocol[T_in, T_out], -): - def __init__( - self, - stages: Optional[StageDefinitionCollection] = None, - stage_instances: Optional[CallableCollection] = None, - processor: Optional[type[ProcessorInterface] | ProcessorInterface] = None, - processor_instance: Optional[ProcessorInterface] = None, - *args: Any, - **kwds: Any, - ) -> None: ... - - async def process(self, payload: T_in, *args: Any, **kwds: Any) -> T_out: ... - - def pipe(self, stage: CallableType) -> Self: ... - - async def __call__( - self, - payload: T_in, - /, # Make 'payload' a positional-only parameter - *args: Any, - **kwds: Any, - ) -> T_out: ... - - def with_processor( - self, processor: type[ProcessorInterface] | ProcessorInterface - ) -> Self: ... - - def with_stages(self, stages: StageDefinitionCollection) -> Self: ... - - def get_processor_instance(self) -> Optional[ProcessorInterface]: ... - - def get_stages(self) -> StageDefinitionCollection: ... - - -TPipeline = TypeVar("TPipeline", bound=PipelineInterface, infer_variance=True) diff --git a/src/_lib/pipeline/processors/__init__.py b/src/_lib/pipeline/processors/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/_lib/pipeline/processors/chained_processor.py b/src/_lib/pipeline/processors/chained_processor.py deleted file mode 100644 index 3abc1f3..0000000 --- a/src/_lib/pipeline/processors/chained_processor.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import Any, cast - -from ..callable_type import CallableCollection -from ..processor import Processor -from ..types import T_in, T_out -from .chained_processor_interface import ( - ChainedProcessorInterface as ImplementsInterface, -) - - -class ChainedProcessor( - Processor[T_in, T_out], - ImplementsInterface[T_in, T_out], -): - async def process( - self, - payload: T_in, - stages: CallableCollection, - *args: Any, - **kwds: Any, - ) -> T_out: - """ - Process the given payload through the provided stages. - - Args: - payload (T_in): The input payload to process. - stages (CallableCollection): The collection of stages to process the payload through. - *args (Any): Additional positional arguments. - **kwds (Any): Additional keyword arguments. - - Returns: - T_out: The processed output. - """ - payload_out: Any = payload - - for stage in stages: - payload_out = await self._call( - callable=stage, payload=payload_out, *args, **kwds - ) - - return cast(T_out, payload_out) diff --git a/src/_lib/pipeline/processors/chained_processor_interface.py b/src/_lib/pipeline/processors/chained_processor_interface.py deleted file mode 100644 index 7bdd36a..0000000 --- a/src/_lib/pipeline/processors/chained_processor_interface.py +++ /dev/null @@ -1,22 +0,0 @@ -from typing import Any, Protocol - -from ..callable_type import CallableCollection -from ..processor_interface import ProcessorInterface -from ..types import T_in, T_out - - -class ChainedProcessorInterface( - ProcessorInterface[T_in, T_out], - Protocol[T_in, T_out], -): - """ - A processor that processes the payload through a series of stages. - """ - - async def process( - self, - payload: T_in, - stages: CallableCollection, - *args: Any, - **kwds: Any, - ) -> T_out: ... diff --git a/src/_lib/pipeline/traits/__init__.py b/src/_lib/pipeline/traits/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/_lib/pipeline/pipeline_factory.py b/src/_lib/pipeline_factory.py similarity index 65% rename from src/_lib/pipeline/pipeline_factory.py rename to src/_lib/pipeline_factory.py index 40dddb2..127f5f2 100644 --- a/src/_lib/pipeline/pipeline_factory.py +++ b/src/_lib/pipeline_factory.py @@ -1,27 +1,27 @@ from typing import Any, Optional, Self -from .callable_type import StageDefinition, StageDefinitionCollection +from .contracts.pipeline import Pipeline as PipelineContract +from .contracts.pipeline_factory import PipelineFactory as ImplementsInterface +from .contracts.processor import Processor as ProcessorContract from .pipeline import Pipeline -from .pipeline_factory_interface import PipelineFactoryInterface as ImplementsInterface -from .pipeline_interface import PipelineInterface -from .processor_interface import ProcessorInterface -from .traits.act_as_factory.act_as_factory import ActAsFactory -from .types import T_in, T_out +from .support.act_as_factory.act_as_factory import ActAsFactory +from .types.callable_type import StageDefinition, StageDefinitionCollection +from .types.types import T_in, T_out class PipelineFactory( - ActAsFactory[PipelineInterface[T_in, T_out]], + ActAsFactory[PipelineContract[T_in, T_out]], ImplementsInterface[T_in, T_out], ): stages: StageDefinitionCollection - processor: Optional[type[ProcessorInterface[T_in, T_out]] | ProcessorInterface] - pipeline_class: Optional[type[PipelineInterface[T_in, T_out]]] + processor: Optional[type[ProcessorContract[T_in, T_out]] | ProcessorContract] + pipeline_class: Optional[type[PipelineContract[T_in, T_out]]] def __init__( self, stages: Optional[StageDefinitionCollection] = None, - processor: Optional[type[ProcessorInterface] | ProcessorInterface] = None, - pipeline_class: Optional[type[PipelineInterface[T_in, T_out]]] = None, + processor: Optional[type[ProcessorContract] | ProcessorContract] = None, + pipeline_class: Optional[type[PipelineContract[T_in, T_out]]] = None, *args: Any, **kwds: Any, ) -> None: @@ -60,7 +60,7 @@ def with_stages(self, stages: StageDefinitionCollection) -> Self: return self def with_processor( - self, processor: type[ProcessorInterface] | ProcessorInterface + self, processor: type[ProcessorContract] | ProcessorContract ) -> Self: """ Attachs a processor (class or instance) to the pipeline factory. @@ -71,7 +71,7 @@ def with_processor( def _get_default_pipeline_class( self, - ) -> Optional[type[PipelineInterface[T_in, T_out]]]: + ) -> Optional[type[PipelineContract[T_in, T_out]]]: return Pipeline # ActAsFactory @@ -82,7 +82,7 @@ def _definition(self) -> dict[str, Any]: } # ActAsFactory - def _get_target_class(self) -> type[PipelineInterface[T_in, T_out]]: + def _get_target_class(self) -> type[PipelineContract[T_in, T_out]]: if self.pipeline_class is None: raise ValueError("Pipeline class not set in factory.") diff --git a/src/_lib/pipeline/processor.py b/src/_lib/processor.py similarity index 77% rename from src/_lib/pipeline/processor.py rename to src/_lib/processor.py index 8099f92..a31a0a1 100644 --- a/src/_lib/pipeline/processor.py +++ b/src/_lib/processor.py @@ -2,10 +2,10 @@ from abc import abstractmethod from typing import Any -from .callable_type import CallableCollection, CallableType -from .processor_interface import ProcessorInterface as ImplementsInterface -from .traits.clonable import Clonable -from .types import T_in, T_out +from .contracts.processor import Processor as ImplementsInterface +from .support.clonable import Clonable +from .types.callable_type import CallableCollection, CallableType +from .types.types import T_in, T_out class Processor( @@ -42,7 +42,7 @@ async def _call( **kwds: Any, ) -> T_out: """ - Process the given payload. + Do the actual processing of the payload - the "process" method is just an alias to this method. """ result = callable(payload, *args, **kwds) diff --git a/src/_lib/processors/__init__.py b/src/_lib/processors/__init__.py index b61d8c0..e69de29 100644 --- a/src/_lib/processors/__init__.py +++ b/src/_lib/processors/__init__.py @@ -1,13 +0,0 @@ -# Re-exporting symbols -from .chained_processor import ChainedPipeline as ChainedPipeline -from .chained_processor import ChainedProcessor as ChainedProcessor -from .chained_processor import __all__ as _chained_processor_all -from .interruptible_processor import InterruptiblePipeline as InterruptiblePipeline -from .interruptible_processor import InterruptibleProcessor as InterruptibleProcessor -from .interruptible_processor import __all__ as _interruptible_processor_all - -# pyright: reportUnsupportedDunderAll=false -__all__ = ( - *_chained_processor_all, - *_interruptible_processor_all, -) diff --git a/src/_lib/processors/chained_processor/__init__.py b/src/_lib/processors/chained_processor/__init__.py index 26f2618..e69de29 100644 --- a/src/_lib/processors/chained_processor/__init__.py +++ b/src/_lib/processors/chained_processor/__init__.py @@ -1,7 +0,0 @@ -from .chained_pipeline import ChainedPipeline -from .chained_processor import ChainedProcessor - -__all__ = ( - "ChainedPipeline", - "ChainedProcessor", -) diff --git a/src/_lib/processors/chained_processor/chained_pipeline.py b/src/_lib/processors/chained_processor/chained_pipeline.py index 79d995c..91bebba 100644 --- a/src/_lib/processors/chained_processor/chained_pipeline.py +++ b/src/_lib/processors/chained_processor/chained_pipeline.py @@ -24,6 +24,8 @@ def __init__(self, *args: Any, **kwargs: Any): assert await pipeline.process(1) == 5 ``` """ - processor = ChainedProcessor[T_in, T_out]() - - super().__init__(processor=processor, *args, **kwargs) + super().__init__( + processor=ChainedProcessor[T_in, T_out](), + *args, + **kwargs, + ) diff --git a/src/_lib/processors/chained_processor/chained_processor.py b/src/_lib/processors/chained_processor/chained_processor.py index 0bf73f4..e27b36c 100644 --- a/src/_lib/processors/chained_processor/chained_processor.py +++ b/src/_lib/processors/chained_processor/chained_processor.py @@ -1,25 +1,37 @@ -from ...pipeline import ChainedProcessor as ChainedProcessorBase -from ...pipeline import T_in, T_out +from typing import Any, cast +from ...processor import Processor +from ...types.callable_type import CallableCollection +from ...types.types import T_in, T_out -class ChainedProcessor(ChainedProcessorBase[T_in, T_out]): - """Default processor. Sequentially processes data through multiple stages. - Example: - ```python - # Create processor - processor = ChainedProcessor[int]() +class ChainedProcessor( + Processor[T_in, T_out], +): + async def process( + self, + payload: T_in, + stages: CallableCollection, + *args: Any, + **kwds: Any, + ) -> T_out: + """ + Process the given payload through the provided stages. - # Stages - result = await processor.process( - payload=5, - stages=( - lambda payload: payload + 1, - lambda payload: payload * 2, - ), - ) + Args: + payload (T_in): The input payload to process. + stages (CallableCollection): The collection of stages to process the payload through. + *args (Any): Additional positional arguments. + **kwds (Any): Additional keyword arguments. - # Assert result - assert result == 12 - ``` - """ + Returns: + T_out: The processed output. + """ + payload_out: Any = payload + + for stage in stages: + payload_out = await self._call( + callable=stage, payload=payload_out, *args, **kwds + ) + + return cast(T_out, payload_out) diff --git a/src/_lib/processors/interruptible_processor/__init__.py b/src/_lib/processors/interruptible_processor/__init__.py index 1e000eb..e69de29 100644 --- a/src/_lib/processors/interruptible_processor/__init__.py +++ b/src/_lib/processors/interruptible_processor/__init__.py @@ -1,7 +0,0 @@ -from .interruptible_pipeline import InterruptiblePipeline -from .interruptible_processor import InterruptibleProcessor - -__all__ = ( - "InterruptiblePipeline", - "InterruptibleProcessor", -) diff --git a/src/_lib/processors/interruptible_processor/interruptible_pipeline.py b/src/_lib/processors/interruptible_processor/interruptible_pipeline.py index 0812110..4e0968e 100644 --- a/src/_lib/processors/interruptible_processor/interruptible_pipeline.py +++ b/src/_lib/processors/interruptible_processor/interruptible_pipeline.py @@ -1,6 +1,7 @@ from typing import Any -from ...pipeline import Pipeline, T_in, T_out +from ...pipeline import Pipeline +from ...types.types import T_in, T_out from .interruptible_processor import CheckCallable, InterruptibleProcessor diff --git a/src/_lib/processors/interruptible_processor/interruptible_processor.py b/src/_lib/processors/interruptible_processor/interruptible_processor.py index 269191c..f58b4b0 100644 --- a/src/_lib/processors/interruptible_processor/interruptible_processor.py +++ b/src/_lib/processors/interruptible_processor/interruptible_processor.py @@ -1,8 +1,11 @@ import inspect from typing import Any, Awaitable, Callable, cast -from ...pipeline import CallableCollection, Processor, T_in, T_out +from ...processor import Processor +from ...types.callable_type import CallableCollection +from ...types.types import T_in, T_out +# Type for a callable that checks a condition on the payload. CheckCallable = Callable[[T_in], bool | Awaitable[bool]] @@ -10,7 +13,7 @@ class InterruptibleProcessor(Processor[T_in, T_out]): """Processor with conditional interruption.""" check: CheckCallable[T_in] - """Callable for processing interruption. Useful for declarative subclassing. + """Callable for processing interruption. Example: ```python diff --git a/src/_lib/pipeline/stage.py b/src/_lib/stage.py similarity index 78% rename from src/_lib/pipeline/stage.py rename to src/_lib/stage.py index f1c24f5..c118c02 100644 --- a/src/_lib/pipeline/stage.py +++ b/src/_lib/stage.py @@ -1,8 +1,8 @@ from abc import abstractmethod from typing import Any -from .stage_interface import StageInterface as ImplementsInterface -from .types import T_in, T_out +from .contracts.stage import Stage as ImplementsInterface +from .types.types import T_in, T_out class Stage( diff --git a/src/_lib/pipeline/traits/act_as_factory/__init__.py b/src/_lib/support/act_as_factory/__init__.py similarity index 100% rename from src/_lib/pipeline/traits/act_as_factory/__init__.py rename to src/_lib/support/act_as_factory/__init__.py diff --git a/src/_lib/pipeline/traits/act_as_factory/act_as_factory.py b/src/_lib/support/act_as_factory/act_as_factory.py similarity index 100% rename from src/_lib/pipeline/traits/act_as_factory/act_as_factory.py rename to src/_lib/support/act_as_factory/act_as_factory.py diff --git a/src/_lib/pipeline/traits/act_as_factory/act_as_factory_interface.py b/src/_lib/support/act_as_factory/act_as_factory_interface.py similarity index 100% rename from src/_lib/pipeline/traits/act_as_factory/act_as_factory_interface.py rename to src/_lib/support/act_as_factory/act_as_factory_interface.py diff --git a/src/_lib/pipeline/traits/clonable/__init__.py b/src/_lib/support/clonable/__init__.py similarity index 100% rename from src/_lib/pipeline/traits/clonable/__init__.py rename to src/_lib/support/clonable/__init__.py diff --git a/src/_lib/pipeline/traits/clonable/clonable.py b/src/_lib/support/clonable/clonable.py similarity index 100% rename from src/_lib/pipeline/traits/clonable/clonable.py rename to src/_lib/support/clonable/clonable.py diff --git a/src/_lib/pipeline/traits/clonable/clonable_interface.py b/src/_lib/support/clonable/clonable_interface.py similarity index 100% rename from src/_lib/pipeline/traits/clonable/clonable_interface.py rename to src/_lib/support/clonable/clonable_interface.py diff --git a/src/_lib/pipeline/callable_type.py b/src/_lib/types/callable_type.py similarity index 100% rename from src/_lib/pipeline/callable_type.py rename to src/_lib/types/callable_type.py diff --git a/src/_lib/pipeline/types.py b/src/_lib/types/types.py similarity index 100% rename from src/_lib/pipeline/types.py rename to src/_lib/types/types.py diff --git a/src/thecodecrate_pipeline/__init__.py b/src/thecodecrate_pipeline/__init__.py index e8f6cfa..6b9c863 100644 --- a/src/thecodecrate_pipeline/__init__.py +++ b/src/thecodecrate_pipeline/__init__.py @@ -6,14 +6,14 @@ __version__ = "1.27.0" # Re-exporting symbols -from _lib import Pipeline as Pipeline -from _lib import PipelineFactory as PipelineFactory -from _lib import PipelineFactoryInterface as PipelineFactoryInterface -from _lib import PipelineInterface as PipelineInterface -from _lib import Processor as Processor -from _lib import ProcessorInterface as ProcessorInterface -from _lib import Stage as Stage -from _lib import StageInterface as StageInterface +from _lib.contracts.pipeline import Pipeline as PipelineInterface +from _lib.contracts.pipeline_factory import PipelineFactory as PipelineFactoryInterface +from _lib.contracts.processor import Processor as ProcessorInterface +from _lib.contracts.stage import Stage as StageInterface +from _lib.pipeline import Pipeline as Pipeline +from _lib.pipeline_factory import PipelineFactory as PipelineFactory +from _lib.processor import Processor as Processor +from _lib.stage import Stage as Stage # pyright: reportUnsupportedDunderAll=false __all__ = ( diff --git a/src/thecodecrate_pipeline/processors/__init__.py b/src/thecodecrate_pipeline/processors/__init__.py index 0625641..a9cdcfd 100644 --- a/src/thecodecrate_pipeline/processors/__init__.py +++ b/src/thecodecrate_pipeline/processors/__init__.py @@ -1,11 +1,22 @@ """A collection of processors and their pipelines""" # Re-exporting symbols -from _lib.processors import ChainedPipeline as ChainedPipeline -from _lib.processors import ChainedProcessor as ChainedProcessor -from _lib.processors import InterruptiblePipeline as InterruptiblePipeline -from _lib.processors import InterruptibleProcessor as InterruptibleProcessor -from _lib.processors import __all__ as _processors_all +from _lib.processors.chained_processor.chained_pipeline import ( + ChainedPipeline as ChainedPipeline, +) +from _lib.processors.chained_processor.chained_processor import ( + ChainedProcessor as ChainedProcessor, +) +from _lib.processors.interruptible_processor.interruptible_pipeline import ( + InterruptiblePipeline as InterruptiblePipeline, +) +from _lib.processors.interruptible_processor.interruptible_processor import ( + InterruptibleProcessor as InterruptibleProcessor, +) -# pyright: reportUnsupportedDunderAll=false -__all__ = (*_processors_all,) +__all__ = ( + "ChainedPipeline", + "ChainedProcessor", + "InterruptiblePipeline", + "InterruptibleProcessor", +) diff --git a/src/thecodecrate_pipeline/types/__init__.py b/src/thecodecrate_pipeline/types/__init__.py index 1640622..fdc6d88 100644 --- a/src/thecodecrate_pipeline/types/__init__.py +++ b/src/thecodecrate_pipeline/types/__init__.py @@ -1,14 +1,15 @@ """Library's public types""" # Re-exporting symbols -from _lib import CallableCollection as CallableCollection -from _lib import CallableType as CallableType -from _lib import StageDefinition as StageDefinition -from _lib import StageDefinitionCollection as StageDefinitionCollection -from _lib import T_in as T_in -from _lib import T_out as T_out +from _lib.types.callable_type import CallableCollection as CallableCollection +from _lib.types.callable_type import CallableType as CallableType +from _lib.types.callable_type import StageDefinition as StageDefinition +from _lib.types.callable_type import ( + StageDefinitionCollection as StageDefinitionCollection, +) +from _lib.types.types import T_in as T_in +from _lib.types.types import T_out as T_out -# pyright: reportUnsupportedDunderAll=false __all__ = ( "CallableType", "CallableCollection", diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 9f57021..ae07cb7 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -159,7 +159,7 @@ class MyPipeline(Pipeline[int]): @pytest.mark.asyncio -async def test_declarative_stage_instances(): +async def test_declarative_stages_instances(): pipeline = (Pipeline[int]()).pipe(TimesTwoStage()).pipe(AddOneStage()) def add_seven(payload: int) -> int: @@ -169,7 +169,7 @@ async def sub_three_async(payload: int) -> int: return payload - 3 class MyPipeline(Pipeline[int]): - stage_instances = ( + stages_instances = ( TimesThreeStage(), # stage instance pipeline, # pipeline AddOneStage(), # another stage instance @@ -177,34 +177,34 @@ class MyPipeline(Pipeline[int]): sub_three_async, # async function ) - stage_instances: CallableCollection = ( + stages_instances: CallableCollection = ( add_seven, # function sub_three_async, # async function ) assert await MyPipeline().process(5) == 36 - assert await MyPipeline(stage_instances=stage_instances).process(5) == 9 + assert await MyPipeline(stages_instances=stages_instances).process(5) == 9 @pytest.mark.asyncio -async def test_declarative_stage_instances__immutability(): +async def test_declarative_stages_instances__immutability(): def add_seven(payload: int) -> int: return payload + 7 async def sub_three_async(payload: int) -> int: return payload - 3 - stage_instances: CallableCollection = ( + stages_instances: CallableCollection = ( add_seven, # function sub_three_async, # async function ) - pipeline = Pipeline(stage_instances=stage_instances) + pipeline = Pipeline(stages_instances=stages_instances) assert await pipeline.process(5) == 9 # adding more stages should not affect the original pipeline - stage_instances += (TimesTwoStage(),) + stages_instances += (TimesTwoStage(),) assert await pipeline.process(5) == 9 @@ -222,15 +222,15 @@ class MyPipeline(Pipeline[int]): TimesTwoStage(), # stage instance ) - stage_instances: CallableCollection = ( + stages_instances: CallableCollection = ( AddOneStage(), # only instances can be added TimesTwoStage(), # only instances can be added ) assert await MyPipeline().process(5) == 30 assert await MyPipeline(stages=stages).process(5) == 12 - assert await MyPipeline(stages=stage_instances).process(5) == 12 - assert await MyPipeline(stage_instances=stage_instances).process(5) == 12 + assert await MyPipeline(stages=stages_instances).process(5) == 12 + assert await MyPipeline(stages_instances=stages_instances).process(5) == 12 @pytest.mark.asyncio @@ -274,8 +274,8 @@ async def test_method__with_stages__override_current_instances(): new_pipeline = pipeline.with_stages(stages) - assert len(pipeline.stage_instances) == 1 - assert len(new_pipeline.stage_instances) == 2 + assert len(pipeline.get_stages_instances()) == 1 + assert len(new_pipeline.get_stages_instances()) == 2 assert await pipeline.process(5) == 6 assert await new_pipeline.process(5) == 30 From 17a1fcb8467d703c9a65fe20bd8296e1d9861f7d Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 13 Oct 2025 17:38:05 +0000 Subject: [PATCH 2/2] feat: bump version 1.27.0 -> 1.28.0 --- pyproject.toml | 4 ++-- src/thecodecrate_pipeline/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ddf71c1..04500dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "thecodecrate-pipeline" -version = "1.27.0" +version = "1.28.0" description = "This package provides a pipeline pattern implementation" readme = "README.md" authors = [{ name = "TheCodeCrate", email = "loureiro.rg@gmail.com" }] @@ -48,7 +48,7 @@ build-backend = "hatchling.build" line-length = 79 [tool.bumpver] -current_version = "1.27.0" +current_version = "1.28.0" version_pattern = "MAJOR.MINOR.PATCH" commit_message = "feat: bump version {old_version} -> {new_version}" tag_message = "{new_version}" diff --git a/src/thecodecrate_pipeline/__init__.py b/src/thecodecrate_pipeline/__init__.py index 6b9c863..ee5cc93 100644 --- a/src/thecodecrate_pipeline/__init__.py +++ b/src/thecodecrate_pipeline/__init__.py @@ -3,7 +3,7 @@ # This will be updated by `bumpver` command. # - Make sure to commit all changes first before running `bumpver`. # - Run `bumpver update --[minor|major|patch]` -__version__ = "1.27.0" +__version__ = "1.28.0" # Re-exporting symbols from _lib.contracts.pipeline import Pipeline as PipelineInterface