diff --git a/.flake8 b/.flake8 index 304311f..bf68eed 100644 --- a/.flake8 +++ b/.flake8 @@ -2,3 +2,4 @@ max-line-length = 88 max-complexity = 10 ignore = W391,F401 +exclude=*.pyi diff --git a/Makefile b/Makefile index 11d6bee..70b6d72 100644 --- a/Makefile +++ b/Makefile @@ -2,4 +2,4 @@ format: poetry run black pipe_framework && poetry run isort pipe_framework && docformatter -r --in-place --force-wrap pipe_framework/* lint: - poetry run flake8 pipe_framework && poetry run isort --check-only + poetry run flake8 pipe_framework && poetry run isort --check-only . diff --git a/pipe_framework/core/__init__.py b/pipe_framework/core/__init__.py new file mode 100644 index 0000000..0011ad1 --- /dev/null +++ b/pipe_framework/core/__init__.py @@ -0,0 +1 @@ +# TODO: provide way to pass data from store with underscore notation. e.g.: _.reason diff --git a/pipe_framework/core/base.py b/pipe_framework/core/base.py new file mode 100644 index 0000000..13ffb5a --- /dev/null +++ b/pipe_framework/core/base.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from typing import ( + Callable, + Generic, + Iterable, + Iterator, + NamedTuple, + Protocol, + Tuple, + TypeVar, +) + +from core.runner import run_simple + +StateType = TypeVar("StateType") +DataType = TypeVar("DataType", covariant=True) +# Runner = Callable[["Pipe"[StateType]], StateType] + + +class Runner(Generic[StateType], Protocol): + def __call__(self, pipe: "Pipe"[StateType]) -> StateType: + ... + + +class Hooks(NamedTuple): + before: Callable + after: Callable + interrupt: Callable[..., bool] + + +class Step(Generic[StateType], Protocol): + def __call__(self, state: StateType) -> StateType: + ... + + +class Pipe(Generic[StateType], Iterable, Protocol): + hooks: Hooks + state: StateType + steps: Tuple[Step[StateType], ...] + run: Runner[StateType] + + def __init__( + self, + steps: Tuple[Step[StateType], ...], + state: StateType, + hooks: Hooks, + runner: Runner[StateType], + ) -> None: + ... + + def __call__(self) -> StateType: + ... + + def __iter__(self) -> Iterator[Step[StateType]]: + ... + + def __len__(self) -> int: + ... + + def add(self, step: Step) -> "Pipe": + ... + + def get_state(self) -> StateType: + ... + + def set_state(self, state: StateType) -> "Pipe": + ... + + def set_runner(self, runner: Runner[StateType]) -> "Pipe": + ... + + def get_runner(self) -> Runner[StateType]: + ... + + def handle_interrupt(self) -> bool: + ... + + +class SimplePipe(Generic[StateType], Pipe[StateType]): + def __init__( + self, + steps: Tuple[Step[StateType], ...], + state: StateType, + hooks, + runner: Runner[StateType] = run_simple, + ): + self.steps = steps + self.set_runner(runner) + self.set_hooks(hooks) + self.state = state + + def set_steps(self, steps): + map(self.add, steps) + + return self + + def get_state(self): + return self.state + + def set_state(self, state: StateType): + self.state = state + + def add(self, step): + self.steps = self.steps + (step,) + + return self + + def set_runner(self, runner: Runner[StateType]): + self.run = runner + + def set_hooks(self, hooks): + self.hooks = hooks + + return self + + def get_runner(self): + return self.run + + def handle_interrupt(self): + # TODO: figure out how to handle this + return self.hooks.interrupt() + + def __iter__(self): + return iter(self.steps) + + def __len__(self): + return len(self.steps) + + def __call__(self): + return self.run(self) diff --git a/pipe_framework/core/decorators.py b/pipe_framework/core/decorators.py new file mode 100644 index 0000000..6e6920e --- /dev/null +++ b/pipe_framework/core/decorators.py @@ -0,0 +1,15 @@ +from functools import wraps +from typing import Callable + + +def step(fn: Callable) -> Callable: + """Decorator to convert function to step. + + :param step_fn: function to convert :return: pipe step + """ + + @wraps(fn) + def step(*args, **kwargs): + return fn(*args, **kwargs) + + return step diff --git a/pipe_framework/core/factories.py b/pipe_framework/core/factories.py new file mode 100644 index 0000000..45caa11 --- /dev/null +++ b/pipe_framework/core/factories.py @@ -0,0 +1,27 @@ +from typing import Generic, Tuple, TypeVar + +from pipe_framework.core.base import Hooks, SimplePipe, Step + + +def create_hooks(*, before=None, after=None, interrupt=None): + bypass = lambda x: x + + return Hooks( + before=before or bypass, + after=after or bypass, + interrupt=interrupt or (lambda s: False), + ) + + +T = TypeVar("T") + + +def create_simple_pipe( + callables: Tuple[Step[T], ...], state: T, hooks: Hooks | None = None +) -> SimplePipe[T]: + if hooks is None: + hooks = create_hooks() + + _pipe = SimplePipe(callables, state, hooks) + + return _pipe diff --git a/pipe_framework/core/runner.py b/pipe_framework/core/runner.py new file mode 100644 index 0000000..009d789 --- /dev/null +++ b/pipe_framework/core/runner.py @@ -0,0 +1,29 @@ +from typing import TypeVar + +from core.base import Runner, SimplePipe + +T = TypeVar("T") + + +class __run_simple_factory(Runner[T]): + """Run the pipe. + + :param pipe: The pipe to run. :param hooks: The hooks to wrap the + pipe with. :return: The result of the pipe. + """ + + def __call__(self, pipe: SimplePipe[T]) -> T: + # get initial state for pipe + pipe.set_state(pipe.hooks.before(pipe.get_state())) + + for step in pipe: + result = step(pipe.get_state()) + pipe.set_state(result) + + if pipe.hooks.interrupt(pipe.get_state()): + break + + return pipe.hooks.after(pipe.get_state()) + + +run_simple = __run_simple_factory() diff --git a/pipe_framework/core/tests/conftest.py b/pipe_framework/core/tests/conftest.py new file mode 100644 index 0000000..96994d5 --- /dev/null +++ b/pipe_framework/core/tests/conftest.py @@ -0,0 +1,46 @@ +from typing import List, TypedDict + +import pytest +from core.base import SimplePipe + +from pipe_framework.core.factories import create_simple_pipe + + +class TestState(TypedDict): + x: int + y: int + operations: List[str] + result: int + + +def callable_one(state: TestState) -> TestState: + state["x"] = 1 + state["y"] = 1 + state["operations"] = ["add"] + + return state + + +def callable_two(state): + if "add" in state["operations"]: + state["result"] = state["x"] + state["y"] + + return state + + +test_state: TestState = {"x": 0, "y": 0, "operations": [], "result": 0} + +adder_pipe = create_simple_pipe((callable_one, callable_two), state=test_state) +adder_pipe_with_hooks = create_simple_pipe( + (callable_one, callable_two), state=test_state +) + + +@pytest.fixture(scope="module") +def pipe_instance() -> SimplePipe[TestState]: + return adder_pipe + + +@pytest.fixture(scope="module") +def expected_result(): + return {"x": 1, "y": 1, "operations": ["add"], "result": 2} diff --git a/pipe_framework/core/tests/test_simple.py b/pipe_framework/core/tests/test_simple.py new file mode 100644 index 0000000..913cc2e --- /dev/null +++ b/pipe_framework/core/tests/test_simple.py @@ -0,0 +1,8 @@ +from core.base import SimplePipe +from core.tests.conftest import TestState + + +def test_simple(pipe_instance: SimplePipe[TestState], expected_result): + result = pipe_instance() + + assert result == expected_result diff --git a/pipe_framework/pipe/__init__.py b/pipe_framework/pipe/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipe_framework/pipe/core/__init__.py b/pipe_framework/pipe/core/__init__.py deleted file mode 100644 index db78cf8..0000000 --- a/pipe_framework/pipe/core/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Core package for Pipe Framework. - -Contains all basic logic for pipes and steps. -""" diff --git a/pipe_framework/pipe/core/base.py b/pipe_framework/pipe/core/base.py deleted file mode 100644 index 7ffdb24..0000000 --- a/pipe_framework/pipe/core/base.py +++ /dev/null @@ -1,186 +0,0 @@ -import re -import typing as t - -import valideer as V -from frozendict import frozendict -from pipe.core.exceptions import StepExecutionException, StepValidationException -from rich.console import Console - - -class Step: - """Base class providing basic functionality for all steps related classes.""" - - _available_methods = ("extract", "transform", "load") - - # field for store validation schema - required_fields: dict = {} - - def __and__(self, other: "Step") -> "Step": - """Overriding boolean AND operation for merging steps""" - - def run(self, store: frozendict) -> frozendict: - - try: - result_a = self.obj_a.run(store) - result_b = self.obj_b.run(store) - except Exception: - return store - - return store.copy(**dict(obj_a=result_a, obj_b=result_b)) - - return Step.factory(run, "AndStep", obj_a=self, obj_b=other)() - - def __or__(self, other: "Step") -> "Step": - """Overriding boolean OR operation for merging steps:""" - - def run(self, store: frozendict) -> frozendict: - - try: - result = self.obj_a.run(store) - except Exception as e: - store = store.copy(**{"exception": e}) - result = self.obj_b.run(store) - - return result - - return Step.factory(run, "OrStep", obj_a=self, obj_b=other)() - - def _parse_dynamic_fields(self) -> None: - """Processes fields in validation config which should be taken from - step instance.""" - dynamic_config = {} - keys = list(self.required_fields.keys()) - - for key in keys: - if (key.startswith("+{") or key.startswith("{")) and key.endswith("}"): - variable_name = re.sub(r"\{|\}", "", key) - dynamic_config.update( - { - getattr( - self, variable_name.replace("+", "") - ): self.required_fields.get(key) - } - ) - del self.required_fields[key] - - self.required_fields = dict(**self.required_fields, **dynamic_config) - - def validate(self, store: frozendict) -> frozendict: - """Validates store according to `Step.required_fields` field.""" - self._parse_dynamic_fields() - - validator = V.parse(self.required_fields) - try: - adapted = validator.validate(store) - except V.ValidationError as e: - raise StepValidationException( - f"Validation for step {self.__class__.__name__} \ - failed with error \n{e.message}" - ) - - return store.copy(**adapted) - - @classmethod - def factory(cls, run_method: t.Callable, name: str = "", **arguments) -> type: - """Step factory, creates step with `run_method` provided.""" - return type(name, (cls,), dict(run=run_method, **arguments)) - - def run(self, store: frozendict) -> frozendict: - """Method which provide ability to run any step.""" - - if self.required_fields is not None: - store = self.validate(store) - - for method in self._available_methods: - if hasattr(self, method): - return getattr(self, method)(store) - - raise StepExecutionException( - f"You should define one of this methods \ - {','.join(self._available_methods)}" - ) - - -class BasePipe: - """Base class for all pipes, implements running logic and inspection of - pipe state on every step.""" - - # Flag which show, should pipe print its state every step - __inspection_mode: bool - - def __init__(self, initial: t.Mapping, inspection: bool = False): - """:param initial: Initial store state.""" - self.__inspection_mode = inspection - self.store = self.before_pipe(frozendict(initial)) - - def set_inspection(self, enable: bool = True) -> bool: - """Sets inspection mode.""" - self.__inspection_mode = enable - - return self.__inspection_mode - - @staticmethod - def __print_step(step: Step, store: frozendict) -> None: - """Prints passed step and store to the console.""" - console = Console() - - console.log( - "Current step is -> ", step.__class__.__name__, f"({step.__module__})" - ) - console.log(f"{step.__class__.__name__} STORE STATE") - console.print(store.__dict__, overflow="fold") - console.log("\n\n") - - def _run_pipe(self, pipe: t.Iterable[Step]) -> t.Union[None, t.Any]: - """Protected method to run subpipe declared in schema (schema can be - different depending on pipe type) - """ - - for item in pipe: - - if self.__inspection_mode: - self.__print_step(item, self.store) - - intermediate_store = item.run(self.store) - - if self.interrupt(intermediate_store): - return self.after_pipe(intermediate_store) - - self.store = intermediate_store - - return self.after_pipe(self.store) - - def before_pipe(self, store: frozendict) -> frozendict: - """Hook for running custom pipe (or anything) before every pipe - execution. - """ - return store - - def after_pipe(self, store: frozendict) -> frozendict: - """Hook for running custom pipe (or anything) after every pipe - execution. - """ - return store - - def interrupt(self, store: frozendict) -> bool: - """Interruption hook which could be overridden, allow all subclassed - pipes set one condition, which will be respected after any step was - run. If method returns true, pipe will not be finished and will return - value returned by step immediately (respects after_pipe hook) - - :param store: :return: - """ - return False - - def __str__(self) -> str: - return self.__class__.__name__ - - -class NamedPipe(BasePipe): - """Simple pipe structure to interact with named pipes.""" - - pipe_schema: t.Dict[str, t.Iterable[Step]] - - def run_pipe(self, name: str): - pipe_to_run = self.pipe_schema.get(name, ()) - return self._run_pipe(pipe_to_run) diff --git a/pipe_framework/pipe/core/decorators.py b/pipe_framework/pipe/core/decorators.py deleted file mode 100644 index 99bbb69..0000000 --- a/pipe_framework/pipe/core/decorators.py +++ /dev/null @@ -1,15 +0,0 @@ -import typing as t - - -def configure(config: dict) -> t.Callable: - """Configures Step class with values from `config variable` TODO: candidate - for deprecation? - """ - - def decorator(wrapped: t.Callable) -> t.Callable: - for key, value in config.items(): - setattr(wrapped, key, value) - - return wrapped - - return decorator diff --git a/pipe_framework/pipe/core/exceptions.py b/pipe_framework/pipe/core/exceptions.py deleted file mode 100644 index 0614bd7..0000000 --- a/pipe_framework/pipe/core/exceptions.py +++ /dev/null @@ -1,26 +0,0 @@ -class PipeException(Exception): - pass - - -class ExtractorException(Exception): - pass - - -class LoaderException(Exception): - pass - - -class TransformerException(Exception): - pass - - -class StepInitializationException(Exception): - pass - - -class StepExecutionException(Exception): - pass - - -class StepValidationException(Exception): - pass diff --git a/pipe_framework/pipe/generics/__init__.py b/pipe_framework/pipe/generics/__init__.py deleted file mode 100644 index db31e0e..0000000 --- a/pipe_framework/pipe/generics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -import pipe.generics.template as template diff --git a/pipe_framework/pipe/generics/db/__init__.py b/pipe_framework/pipe/generics/db/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipe_framework/pipe/generics/db/exceptions.py b/pipe_framework/pipe/generics/db/exceptions.py deleted file mode 100644 index a05b64b..0000000 --- a/pipe_framework/pipe/generics/db/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class DatabaseException(Exception): - pass diff --git a/pipe_framework/pipe/generics/db/orator_orm/__init__.py b/pipe_framework/pipe/generics/db/orator_orm/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipe_framework/pipe/generics/db/orator_orm/extract.py b/pipe_framework/pipe/generics/db/orator_orm/extract.py deleted file mode 100644 index 2b57be4..0000000 --- a/pipe_framework/pipe/generics/db/orator_orm/extract.py +++ /dev/null @@ -1,27 +0,0 @@ -from frozendict import frozendict -from pipe.core.base import Step -from pipe.generics.db.exceptions import DatabaseException -from pipe.generics.db.orator_orm.mixins import DatabaseBaseMixin, ReadMixin - - -class EDBReadBase(Step, DatabaseBaseMixin, ReadMixin): - """Base step for extracting data from database. Requires configuration for - connecting to the database. - - """ - - def extract(self, store: frozendict) -> frozendict: - pk = store.get(self.pk_field, False) - - result = self.select(pk=pk) if pk else self.select() - - if not pk and result is not None: - store = store.copy( - **{f"{self.table_name}_list": [dict(item) for item in result]} - ) - elif result is not None: - store = store.copy(**{f"{self.table_name}_item": dict(result)}) - else: - raise DatabaseException(f"Result for table {self.table_name} is empty") - - return store diff --git a/pipe_framework/pipe/generics/db/orator_orm/load.py b/pipe_framework/pipe/generics/db/orator_orm/load.py deleted file mode 100644 index e596a6b..0000000 --- a/pipe_framework/pipe/generics/db/orator_orm/load.py +++ /dev/null @@ -1,42 +0,0 @@ -import valideer -from frozendict import frozendict -from pipe.core.base import Step -from pipe.generics.db.orator_orm.mixins import ( - CreateUpdateMixin, - DatabaseBaseMixin, - DeleteMixin, -) - - -class LDBInsertUpdateBase(Step, DatabaseBaseMixin, CreateUpdateMixin): - """Loader for inserting or updating into database tables.""" - - required_fields = { - "+{data_field}": valideer.Type(dict), - "{pk_field}": valideer.Type((int, str)), - } - - def load(self, store: frozendict) -> frozendict: - # TODO: Something with update or insert checking is wrong, and I didn't figured out what yet - data_to_load: dict = store.get(self.data_field, {}) - update: bool = self.pk_field in data_to_load - - result = self.update(data_to_load) if update else self.insert(data_to_load) - store = store.copy( - **{f'{self.table_name}_{"update" if update else "insert"}': result} - ) - - return store - - -class LDatabaseDeleteBase(Step, DatabaseBaseMixin, DeleteMixin): - """Loader for deleting from database tables.""" - - required_fields = {"+{pk_field}": valideer.Type((int, str))} - - def load(self, store: frozendict) -> frozendict: - pk_to_delete = store.get(self.pk_field) - - self.delete(pk_to_delete) - - return store diff --git a/pipe_framework/pipe/generics/db/orator_orm/mixins.py b/pipe_framework/pipe/generics/db/orator_orm/mixins.py deleted file mode 100644 index 11de4a6..0000000 --- a/pipe_framework/pipe/generics/db/orator_orm/mixins.py +++ /dev/null @@ -1,135 +0,0 @@ -import copy -import typing as t - -from orator import DatabaseManager -from orator.query import QueryBuilder -from pipe.core.exceptions import StepInitializationException - - -class DatabaseBaseMixin: - """Generic mixin for all Steps related to Database.""" - - connection_config: t.Dict[str, str] - __db: t.Optional[DatabaseManager] = None - query: t.Optional[QueryBuilder] = None - data_field: t.Optional[str] = None - table_name: t.Optional[str] = None - pk_field: t.Optional[str] = None - - def __init__( - self, - table_name: t.Optional[str] = None, - data_field: t.Optional[str] = None, - pk_field: str = "id", - where: t.Optional[tuple] = None, - join: t.Optional[tuple] = None, - select: t.Optional[tuple] = None, - ): - - self.data_field = data_field if data_field is not None else self.data_field - self.table_name = table_name if table_name is not None else self.table_name - self.pk_field = pk_field if pk_field is not None else self.pk_field - - if self.table_name is None: - raise StepInitializationException("`table_name` is missing") - - self.where_clause = where - self.join_clause = join - self.select_clause = select - - def set_table(self, table_name: str) -> QueryBuilder: - """:param table_name: - - :return: Orator Query builder - """ - self.query = self.__db.table(table_name) - - return self.query - - def set_select(self, select: t.Optional[tuple] = None) -> QueryBuilder: - """Sets columns for selecting. See Orator docs for detailed info.""" - if select is not None: - return self.query.select(*select) - - def set_where(self, where: t.Optional[tuple] = None) -> QueryBuilder: - """Sets where clause. See Orator docs for detailed info.""" - if where is not None: - return self.query.where(*where) - - def set_join(self, _join: t.Optional[tuple] = None) -> QueryBuilder: - """Sets join clause. See Orator docs for detailed info.""" - if _join is not None: - return self.query.join(*_join) - - def create_connection(self) -> None: - """Creates connection to database if it is None.""" - if self.__db is None: - self.__db = DatabaseManager(self.connection_config) - - def clear_connection(self): - """Clears connection.""" - self.__db.disconnect() - - -class CreateUpdateMixin: - def insert(self, data: t.Dict) -> int: - """Inserts data into a table.""" - self.create_connection() - return self.set_table(self.table_name).insert_get_id(data) - - def update(self, data: t.Dict) -> int: - """Updates data in the table.""" - self.create_connection() - pk = copy.deepcopy(data).pop(self.pk_field) - - self.set_table(self.table_name) - - if pk is not None: - self.set_where((self.pk_field, "=", pk, "and")) - - self.set_where(self.where_clause) - self.set_join(self.join_clause) - - return self.query.update(data) - - -class ReadMixin: - """Small mixin which implements simplest 'select' operation for extracting.""" - - def select(self, pk: t.Optional[int] = None) -> t.Union[t.Mapping, list]: - """Returns list of the objects from database or just one object, if - 'pk' param is presented. - - :param pk: - """ - self.create_connection() - self.set_table(self.table_name) - self.set_select(self.select_clause) - - if pk is not None: - self.set_where((self.pk_field, "=", pk, "and")) - - self.set_where(self.where_clause) - self.set_join(self.join_clause) - - if pk is not None: - return self.query.first() - else: - return list(self.query.get()) - - -class DeleteMixin: - def delete(self, pk: t.Optional[int] = None) -> int: - """Deletes object by a 'pk' or by a where clause if presented.""" - self.create_connection() - - self.set_table(self.table_name) - - if self.where_clause is not None: - self.set_where(self.where_clause) - else: - self.set_where((self.pk_field, "=", pk, "and")) - - self.set_join(self.join_clause) - - return self.query.delete() diff --git a/pipe_framework/pipe/generics/helpers.py b/pipe_framework/pipe/generics/helpers.py deleted file mode 100644 index 4fcd5e2..0000000 --- a/pipe_framework/pipe/generics/helpers.py +++ /dev/null @@ -1,32 +0,0 @@ -import typing as t -from dataclasses import dataclass - -from frozendict import frozendict -from pipe.core.base import Step - - -@dataclass -class TPutDefaults(Step): - """Helper transformers, which puts values from `defaults` into `Store`, to - specific `field_name`""" - - defaults: dict - field_name: str - - def transform(self, store: frozendict) -> frozendict: - return store.copy( - **{self.field_name: dict(**self.defaults, **store.get(self.field_name))} - ) - - -@dataclass -class TLambda(Step): - """Step for small transformations of a store. - - Useful for cases where writing specific step is an overengineering - """ - - lambda_: t.Optional[t.Callable] = None - - def transform(self, store: frozendict) -> frozendict: - return self.lambda_(store) diff --git a/pipe_framework/pipe/generics/template/__init__.py b/pipe_framework/pipe/generics/template/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipe_framework/pipe/generics/template/transform.py b/pipe_framework/pipe/generics/template/transform.py deleted file mode 100644 index 2e31ee7..0000000 --- a/pipe_framework/pipe/generics/template/transform.py +++ /dev/null @@ -1,39 +0,0 @@ -import typing as t - -import jinja2 -import valideer as V -from frozendict import frozendict -from pipe.core.base import Step - - -class TTemplateResponseReady(Step): - required_fields = {"+{context_field}": V.Type((str, dict))} - - context_field: str = "context" - template_folder: t.Optional[str] = None - template_name: t.Optional[str] = None - - def __init__(self, template_name="", **options): - """Setting Jinja2 environment you can provide any options you can find - in Jinja2 documentation. - - By default we setting only loader and autoescape, but you can - rewrite it too. - """ - - loader = options.get("loader", jinja2.FileSystemLoader(self.template_folder)) - autoescape = options.get( - "autoescape", jinja2.select_autoescape(["html", "xml"]) - ) - - self.environ = jinja2.Environment( - loader=loader, autoescape=autoescape, **options - ) - self.template_name = template_name - - def transform(self, store: frozendict) -> frozendict: - context = store.get(self.context_field, {}) - template = self.environ.get_template(self.template_name) - - status = store.get("status", 200) - return frozendict(template=template.render(**context), status=status) diff --git a/pipe_framework/pipe/server/__init__.py b/pipe_framework/pipe/server/__init__.py deleted file mode 100644 index 5e73416..0000000 --- a/pipe_framework/pipe/server/__init__.py +++ /dev/null @@ -1,104 +0,0 @@ -"""WSGI App for http related Pipes.""" -import typing as t - -from frozendict import frozendict -from pipe.server.pipe import HTTPPipe -from pipe.server.wrappers import PipeRequest, PipeResponse -from werkzeug.exceptions import HTTPException -from werkzeug.middleware.shared_data import SharedDataMiddleware -from werkzeug.routing import Map, Rule -from werkzeug.serving import run_simple - - -class AppException(Exception): - pass - - -class App: - """Main WSGI app wrapper which run pipes by request method.""" - - __map: Map = Map() - __pipes: frozendict = frozendict() - __static_serving: bool = False - __static_folder: t.Optional[str] = None - __static_url: t.Optional[str] = None - __inspection_mode: bool = False - - @staticmethod - def __make_endpoint(pipe: HTTPPipe): - return pipe.__name__ - - def route(self, route: str): - """Decorator for adding pipe as a handler for a route. - - :param route: Werkzeug formatted route :type route: string - """ - - def decorator(pipe): - endpoint = self.__make_endpoint(pipe) - if endpoint in self.__pipes: - raise AppException( - "Route rewrites previously added route, please use hooks if you want to run " - "more then one pipe" - ) - - self.__map.add(Rule(route, endpoint=endpoint)) - self.__pipes = self.__pipes.copy(**{endpoint: pipe}) - - return decorator - - def wsgi_app(self, environ, start_response): - """Main WSGI app, see werkzeug documentation for more.""" - request = PipeRequest(environ) - adapter = self.__map.bind_to_environ(environ) - - try: - endpoint, values = adapter.match() - except HTTPException as e: - return e(environ, start_response) - - pipe = self.__pipes.get(endpoint) - result = pipe(request, values, self.__inspection_mode).run_pipe() - - if isinstance(result, PipeResponse): - return result(environ, start_response) - - response = PipeResponse(status=204) - - return response(environ, start_response) - - def __call__(self, environ, start_response): - if not self.__static_serving: - return self.wsgi_app(environ, start_response) - app_with_static = SharedDataMiddleware( - self.wsgi_app, {self.__static_url: self.__static_folder} - ) - return app_with_static(environ, start_response) - - def run( - self, - host: str = "127.0.0.1", - port: int = 8000, - use_inspection: bool = False, - static_folder: t.Optional[str] = None, - static_url: str = "/static", - *args, - **kwargs - ): - """Method for running application, actually pretty similar to the Flask - run method. - - """ - - if static_folder is not None: - self.__static_serving = True - self.__static_folder = static_folder - self.__static_url = static_url - - if use_inspection: - self.__inspection_mode = True - - run_simple(host, port, self, *args, **kwargs) - - -app = App() diff --git a/pipe_framework/pipe/server/http/__init__.py b/pipe_framework/pipe/server/http/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pipe_framework/pipe/server/http/exceptions.py b/pipe_framework/pipe/server/http/exceptions.py deleted file mode 100644 index 6b0c194..0000000 --- a/pipe_framework/pipe/server/http/exceptions.py +++ /dev/null @@ -1,5 +0,0 @@ -from pipe.core.exceptions import ExtractorException - - -class EFormDataException(ExtractorException): - pass diff --git a/pipe_framework/pipe/server/http/extract.py b/pipe_framework/pipe/server/http/extract.py deleted file mode 100644 index c78bbea..0000000 --- a/pipe_framework/pipe/server/http/extract.py +++ /dev/null @@ -1,55 +0,0 @@ -import valideer -from frozendict import frozendict -from pipe.core.base import Step -from pipe.core.exceptions import ExtractorException -from pipe.server.http.exceptions import EFormDataException -from pipe.server.wrappers import PipeRequest - - -class EFormData(Step): - """Generic extractor for form data from PipeRequest.""" - - required_fields = {"+{request_field}": valideer.Type(PipeRequest)} - - request_field = "request" - method: str = "POST" - - def extract(self, store: frozendict): - request = store.get(self.request_field) - if request.method != self.method: - raise EFormDataException("Invalid request method") - store = store.copy(**{"form": dict(request.form)}) - return store - - -class EQueryStringData(Step): - """Generic extractor for data from query string which you can find after ? - - sign in URL - """ - - required_fields = {"+{request_field}": valideer.Type(PipeRequest)} - - request_field = "request" - - def extract(self, store: frozendict): - request = store.get(self.request_field) - store = store.copy(**request.args) - return store - - -class EJsonBody(Step): - """Generic extractor for data which came in JSON format.""" - - required_fields = {"+{request_field}": valideer.Type(PipeRequest)} - - request_field = "request" - - def extract(self, store: frozendict): - request = store.get(self.request_field) - - if request.json is None: - raise ExtractorException("JSON is missing from request") - - store = store.copy(**{"json": request.json}) - return store diff --git a/pipe_framework/pipe/server/http/load.py b/pipe_framework/pipe/server/http/load.py deleted file mode 100644 index 36429eb..0000000 --- a/pipe_framework/pipe/server/http/load.py +++ /dev/null @@ -1,65 +0,0 @@ -import typing as t -from dataclasses import dataclass - -import valideer -from frozendict import frozendict -from pipe.core.base import Step -from pipe.server.wrappers import make_response - - -@dataclass -class LJsonResponse(Step): - """Creates JSON response from field in 'data_field' property.""" - - required_fields = {"+{data_field}": valideer.Type((list, dict))} - - data_field: str = "response" - status: int = 200 - - def load(self, store: frozendict): - return make_response( - store.get(self.data_field), is_json=True, status=self.status - ) - - -@dataclass -class LResponse(Step): - """Sends plain response from datafield, with status from field status.""" - - required_fields = { - "+{data_field}": valideer.Type((str, list, dict)), - "{status_field}": valideer.Type(int), - } - - data_field: str = "response" - status_field: str = "status" - headers: t.Optional[dict] = None - status: t.Optional[int] = None - - def load(self, store: frozendict): - if self.status is None: - self.status = store.get(self.status_field, 200) - - return make_response( - store.get(self.data_field), status=self.status, headers=self.headers - ) - - -class LNotFound(Step): - def load(self, store: frozendict): - return make_response(f'object not found: {store.get("exception")}', status=404) - - -class LServerError(Step): - def load(self, store: frozendict): - return make_response(f'server error: {store.get("exception")}', status=500) - - -class LUnauthorized(Step): - def load(self, store: frozendict): - return make_response(f'unauthorized: {store.get("exception")}', status=401) - - -class LBadRequest(Step): - def load(self, store: frozendict): - return make_response(f'bad request: {store.get("exception")}', status=400) diff --git a/pipe_framework/pipe/server/http/transform.py b/pipe_framework/pipe/server/http/transform.py deleted file mode 100644 index 73698f9..0000000 --- a/pipe_framework/pipe/server/http/transform.py +++ /dev/null @@ -1,32 +0,0 @@ -import typing as t -from collections import defaultdict -from dataclasses import dataclass - -import valideer -from frozendict import frozendict -from pipe.core.base import Step - - -@dataclass -class TJsonResponseReady(Step): - """Converts object from a 'data_field' for a simpliest API - representation.""" - - required_fields = {"+{data_field}": valideer.Type((dict, list))} - - data_field: str - - def transform(self, store: frozendict) -> frozendict: - response_data: t.Union[list, dict] = store.get(self.data_field) - - result: defaultdict = defaultdict() - - if isinstance(response_data, list): - result["count"] = len(response_data) - result["data"] = response_data - else: - result = response_data - - store = store.copy(**{"response": result}) - - return store diff --git a/pipe_framework/pipe/server/pipe.py b/pipe_framework/pipe/server/pipe.py deleted file mode 100644 index 06cb24c..0000000 --- a/pipe_framework/pipe/server/pipe.py +++ /dev/null @@ -1,43 +0,0 @@ -import typing as t - -from frozendict import frozendict -from pipe.core.base import BasePipe, Step -from pipe.server.wrappers import PipeRequest, PipeResponse, make_response - - -class HTTPPipe(BasePipe): - """Pipe structure for the `server` package.""" - - pipe_schema: t.Dict[str, t.Dict[str, t.Iterable[Step]]] = {} - - def __init__(self, request, initial, *args, **kwargs): - self.__request = request - - super(HTTPPipe, self).__init__( - dict(request=request, **initial), *args, **kwargs - ) - - @property - def request(self) -> PipeRequest: - """Getter for request object.""" - return self.__request - - def interrupt(self, store) -> bool: - # If some step returned response, we should interrupt `pipe` execution - return issubclass(store.__class__, PipeResponse) or isinstance( - store, PipeResponse - ) - - def run_pipe(self) -> frozendict: - """The main method. Takes data and pass through pipe. Handles request - and response. - - :raises: PipeException - """ - pipe_to_run = self.pipe_schema.get(self.request.method, None) - - if pipe_to_run is None: - return make_response("method isn't supported", status=400) - - self._run_pipe(pipe_to_run.get("in", ())) - return self._run_pipe(pipe_to_run.get("out", ())) diff --git a/pipe_framework/pipe/server/wrappers.py b/pipe_framework/pipe/server/wrappers.py deleted file mode 100644 index 9b4845a..0000000 --- a/pipe_framework/pipe/server/wrappers.py +++ /dev/null @@ -1,33 +0,0 @@ -import json -from datetime import datetime - -from werkzeug.wrappers import ( - BaseRequest, - BaseResponse, - CommonRequestDescriptorsMixin, - CommonResponseDescriptorsMixin, -) -from werkzeug.wrappers.json import JSONMixin - - -class PipeRequest(BaseRequest, JSONMixin, CommonRequestDescriptorsMixin): - pass - - -class PipeResponse(BaseResponse, JSONMixin, CommonResponseDescriptorsMixin): - pass - - -def make_response(data, is_json: bool = False, *args, **kwargs) -> PipeResponse: - """Makes WSGI Response from `data` argument.""" - if not is_json: - return PipeResponse(data, *args, **kwargs) - data = json.dumps(data, cls=PipeJsonEncoder) - return PipeResponse(data, content_type="application/json", *args, **kwargs) - - -class PipeJsonEncoder(json.JSONEncoder): - def default(self, obj): - # I'm sure you know what this is about - if isinstance(obj, datetime): - return str(obj) diff --git a/poetry.lock b/poetry.lock index a0b9778..c5e2e1e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,25 @@ +[[package]] +name = "atomicwrites" +version = "1.4.0" +description = "Atomic file writes." +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + +[[package]] +name = "attrs" +version = "21.4.0" +description = "Classes Without Boilerplate" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + +[package.extras] +dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface", "furo", "sphinx", "sphinx-notfound-page", "pre-commit", "cloudpickle"] +docs = ["furo", "sphinx", "zope.interface", "sphinx-notfound-page"] +tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface", "cloudpickle"] +tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "cloudpickle"] + [[package]] name = "backpack" version = "0.1" @@ -142,6 +164,14 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "iniconfig" +version = "1.1.1" +description = "iniconfig: brain-dead simple config-ini parsing" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "isort" version = "5.10.1" @@ -248,6 +278,17 @@ pgsql = ["psycopg2 (>=2.7,<3.0)"] mysql-python = ["PyMySQL (>=0.7,<0.8)"] mysql = ["mysqlclient (>=1.3,<2.0)"] +[[package]] +name = "packaging" +version = "21.3" +description = "Core utilities for Python packages" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +pyparsing = ">=2.0.2,<3.0.5 || >3.0.5" + [[package]] name = "pastel" version = "0.1.1" @@ -289,6 +330,26 @@ python-versions = ">=3.7" docs = ["Sphinx (>=4)", "furo (>=2021.7.5b38)", "proselint (>=0.10.2)", "sphinx-autodoc-typehints (>=1.12)"] test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock (>=3.6)"] +[[package]] +name = "pluggy" +version = "1.0.0" +description = "plugin and hook calling mechanisms for python" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.extras] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] + +[[package]] +name = "py" +version = "1.11.0" +description = "library with cross-python path, ini-parsing, io, code, log facilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + [[package]] name = "pyaml" version = "16.12.2" @@ -332,6 +393,38 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "pyparsing" +version = "3.0.7" +description = "Python parsing module" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.extras] +diagrams = ["jinja2", "railroad-diagrams"] + +[[package]] +name = "pytest" +version = "7.1.1" +description = "pytest: simple powerful testing with Python" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""} +attrs = ">=19.2.0" +colorama = {version = "*", markers = "sys_platform == \"win32\""} +iniconfig = "*" +packaging = "*" +pluggy = ">=0.12,<2.0" +py = ">=1.8.2" +tomli = ">=1.0.0" + +[package.extras] +testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"] + [[package]] name = "python-dateutil" version = "2.8.2" @@ -412,7 +505,7 @@ python-versions = "*" name = "tomli" version = "2.0.1" description = "A lil' TOML parser" -category = "dev" +category = "main" optional = false python-versions = ">=3.7" @@ -477,9 +570,17 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "0b733288bfabadb249f2dcab4997e05b35f09f5944eb159fa3658e2fbace6f8f" +content-hash = "641cc7d6cdbd0b45abef94d4b29081fabab2b7dd3e87c0a365dfcc6eb8530aee" [metadata.files] +atomicwrites = [ + {file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"}, + {file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"}, +] +attrs = [ + {file = "attrs-21.4.0-py2.py3-none-any.whl", hash = "sha256:2d27e3784d7a565d36ab851fe94887c5eccd6a463168875832a1be79c82828b4"}, + {file = "attrs-21.4.0.tar.gz", hash = "sha256:626ba8234211db98e869df76230a137c4c40a12d72445c45d5f5b716f076e2fd"}, +] backpack = [ {file = "backpack-0.1.tar.gz", hash = "sha256:0162cf7b34c810ba4ddbbd32a1e5e804ef96fcf2fea5ce2848aa4950770d3893"}, ] @@ -548,6 +649,10 @@ frozendict = [ inflection = [ {file = "inflection-0.3.1.tar.gz", hash = "sha256:18ea7fb7a7d152853386523def08736aa8c32636b047ade55f7578c4edeb16ca"}, ] +iniconfig = [ + {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, + {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"}, +] isort = [ {file = "isort-5.10.1-py3-none-any.whl", hash = "sha256:6f62d78e2f89b4500b080fe3a81690850cd254227f27f75c3a0c491a1f351ba7"}, {file = "isort-5.10.1.tar.gz", hash = "sha256:e8443a5e7a020e9d7f97f1d7d9cd17c88bcb3bc7e218bf9cf5095fe550be2951"}, @@ -674,6 +779,10 @@ orator = [ {file = "orator-0.9.9-py2.py3-none-any.whl", hash = "sha256:d9ab5bcc8a630ec449d856a4b4cf7dead9f68b1835dd04d62e7f115b96c3360f"}, {file = "orator-0.9.9.tar.gz", hash = "sha256:6fe7830c40f20e77929b80b741a3b9f2145634b5f411176ecad0e761fef26f55"}, ] +packaging = [ + {file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"}, + {file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"}, +] pastel = [ {file = "pastel-0.1.1-py2.py3-none-any.whl", hash = "sha256:a904e1659512cc9880a028f66de77cc813a4c32f7ceb68725cbc8afad57ef7ef"}, {file = "pastel-0.1.1.tar.gz", hash = "sha256:bf3b1901b2442ea0d8ab9a390594e5b0c9584709d543a3113506fe8b28cbace3"}, @@ -698,6 +807,14 @@ platformdirs = [ {file = "platformdirs-2.5.1-py3-none-any.whl", hash = "sha256:bcae7cab893c2d310a711b70b24efb93334febe65f8de776ee320b517471e227"}, {file = "platformdirs-2.5.1.tar.gz", hash = "sha256:7535e70dfa32e84d4b34996ea99c5e432fa29a708d0f4e394bbcb2a8faa4f16d"}, ] +pluggy = [ + {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"}, + {file = "pluggy-1.0.0.tar.gz", hash = "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"}, +] +py = [ + {file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"}, + {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, +] pyaml = [ {file = "pyaml-16.12.2-py2.py3-none-any.whl", hash = "sha256:8b70b7cc2afa55dd89d2a31ef8abe4cfb3daa100cd63564f8fd97e6cac6cff5b"}, {file = "pyaml-16.12.2.tar.gz", hash = "sha256:b865e4f53a85f4d8a092e7701f759a3237fb3ee8a928627401914aafadc00907"}, @@ -718,6 +835,14 @@ pylev = [ {file = "pylev-1.4.0-py2.py3-none-any.whl", hash = "sha256:7b2e2aa7b00e05bb3f7650eb506fc89f474f70493271a35c242d9a92188ad3dd"}, {file = "pylev-1.4.0.tar.gz", hash = "sha256:9e77e941042ad3a4cc305dcdf2b2dec1aec2fbe3dd9015d2698ad02b173006d1"}, ] +pyparsing = [ + {file = "pyparsing-3.0.7-py3-none-any.whl", hash = "sha256:a6c06a88f252e6c322f65faf8f418b16213b51bdfaece0524c1c1bc30c63c484"}, + {file = "pyparsing-3.0.7.tar.gz", hash = "sha256:18ee9022775d270c55187733956460083db60b37d0d0fb357445f3094eed3eea"}, +] +pytest = [ + {file = "pytest-7.1.1-py3-none-any.whl", hash = "sha256:92f723789a8fdd7180b6b06483874feca4c48a5c76968e03bb3e7f806a1869ea"}, + {file = "pytest-7.1.1.tar.gz", hash = "sha256:841132caef6b1ad17a9afde46dc4f6cfa59a05f9555aae5151f73bdf2820ca63"}, +] python-dateutil = [ {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, diff --git a/pyproject.toml b/pyproject.toml index 3947ccb..d4f286a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ isort = "^5.10.1" flake8 = "^4.0.1" mypy = "^0.942" docformatter = "^1.4" +pytest = "^7.1.1" [tool.poetry.scripts] my-script = "poetry.console:run" @@ -56,3 +57,10 @@ no_site_packages = true [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" + +[tool.pyright] +venv = "pipe-framework-uiqW28C9-py3.10" +include = ["pipe_framework"] +exclude = ["pipe_framework/pipe_alpha"] +stubPath = "typings/" +executionEnvironments = [{ root = "pipe_framework" }]