|
6 | 6 |
|
7 | 7 | import os |
8 | 8 | import traceback |
9 | | -from typing import Any, Dict, Optional |
| 9 | +import functools |
| 10 | +from typing import Any, Dict, Optional, Callable, Type, Tuple |
10 | 11 |
|
11 | 12 | import yaml |
12 | 13 | from apscheduler.schedulers.background import BackgroundScheduler |
|
22 | 23 | registry = dict() |
23 | 24 |
|
24 | 25 |
|
25 | | -def pipe(*args, **kwargs): |
| 26 | +def pipe(*args, **kwargs) -> Callable: |
| 27 | + def pipe_decorator(f: Callable) -> Callable: |
| 28 | + if 'name' in kwargs: # called with the name argument @pipe(name=...) or as @pipe() |
| 29 | + f_name = kwargs.get('name', f.__name__) |
| 30 | + registry[f_name] = f |
| 31 | + |
| 32 | + @functools.wraps(f) |
| 33 | + def wrapper_pipe(*iargs, **ikwargs) -> Any: |
| 34 | + opts_type: Optional[Type] = None |
| 35 | + if 'opts' in f.__annotations__: |
| 36 | + opts_type = f.__annotations__['opts'] |
| 37 | + |
| 38 | + if opts_type is not None: |
| 39 | + opts_in = ikwargs.pop('opts') |
| 40 | + ikwargs['opts'] = opts_type(**dict(list(zip(opts_in[::2], opts_in[1::2])))) |
| 41 | + |
| 42 | + return f(*iargs, **ikwargs) |
| 43 | + |
| 44 | + return wrapper_pipe |
| 45 | + |
| 46 | + if len(args) == 1 and callable(args[0]): # called without arguments @pipe |
| 47 | + registry[args[0].__name__] = args[0] |
| 48 | + return pipe_decorator(args[0]) |
| 49 | + else: |
| 50 | + return pipe_decorator |
| 51 | + |
| 52 | + |
| 53 | +def pipe_old(*args, **kwargs): |
26 | 54 | """ |
27 | 55 | Register the decorated function in the pyff pipe registry |
28 | 56 | :param name: optional name - if None, use function name |
|
0 commit comments