|
6 | 6 |
|
7 | 7 | import os |
8 | 8 | import traceback |
9 | | -from typing import Any, Dict, Optional |
| 9 | +import functools |
| 10 | +from typing import Any, Dict, Optional, Callable, Type, Tuple |
10 | 11 |
|
11 | 12 | import yaml |
12 | 13 | from apscheduler.schedulers.background import BackgroundScheduler |
|
22 | 23 | registry = dict() |
23 | 24 |
|
24 | 25 |
|
25 | | -def pipe(*args, **kwargs): |
| 26 | +def pipe(*args, **kwargs) -> Callable: |
26 | 27 | """ |
27 | | - Register the decorated function in the pyff pipe registry |
28 | | - :param name: optional name - if None, use function name |
| 28 | + A decorator that registers a function as a pipeline in pyFF. Functions decorated *should* have the |
| 29 | + following prototype: |
| 30 | +
|
| 31 | + @pipe |
| 32 | + def foo(req: Plumbing.Request, *opts) |
| 33 | + pass |
29 | 34 | """ |
30 | 35 |
|
31 | | - def deco_none(f): |
32 | | - return f |
| 36 | + def pipe_decorator(f: Callable) -> Callable: |
| 37 | + if 'name' in kwargs: # called with the name argument @pipe(name=...) or as @pipe() |
| 38 | + f_name = kwargs.get('name', f.__name__) |
| 39 | + registry[f_name] = f |
| 40 | + |
| 41 | + @functools.wraps(f) |
| 42 | + def wrapper_pipe(*iargs, **ikwargs) -> Any: |
| 43 | + # the 'opts' parameter gets special treatment: |
| 44 | + # locate the type annotation of 'opts' and if it exists assume it refers to a pydantic dataclass |
| 45 | + # before propagating the call to the wrapped function replace opts with the pydantic dataclass object |
| 46 | + # created from the Tuple provided |
| 47 | + opts_type: Optional[Type] = None |
| 48 | + if 'opts' in f.__annotations__: |
| 49 | + opts_type = f.__annotations__['opts'] |
| 50 | + |
| 51 | + if opts_type is not None: |
| 52 | + opts_in = ikwargs.pop('opts') |
| 53 | + ikwargs['opts'] = opts_type(**dict(list(zip(opts_in[::2], opts_in[1::2])))) |
| 54 | + |
| 55 | + return f(*iargs, **ikwargs) |
33 | 56 |
|
34 | | - def deco_pipe(f): |
35 | | - f_name = kwargs.get('name', f.__name__) |
36 | | - registry[f_name] = f |
37 | | - return f |
| 57 | + return wrapper_pipe |
38 | 58 |
|
39 | | - if 1 == len(args): |
40 | | - f = args[0] |
41 | | - registry[f.__name__] = f |
42 | | - return deco_none |
| 59 | + if len(args) == 1 and callable(args[0]): # called without arguments @pipe |
| 60 | + registry[args[0].__name__] = args[0] |
| 61 | + return pipe_decorator(args[0]) |
43 | 62 | else: |
44 | | - return deco_pipe |
| 63 | + return pipe_decorator |
45 | 64 |
|
46 | 65 |
|
47 | 66 | class PipeException(PyffException): |
|
0 commit comments