Skip to content

Add checks for nested tasks #11

@zhanyuanucb

Description

@zhanyuanucb

Describe
Add check for nested tasks.

To Reproduce
A nested tasks in a vanilla function:

from pirlib.pipeline import pipeline
from pirlib.task import task
from pirlib.iotypes import DirectoryPath, FilePath

test_file_path = FilePath("test/file.txt")

def func(inp: FilePath) -> FilePath:
    @task
    def my_task(inp: FilePath) -> FilePath:
        return inp
    return my_task(test_file_path)

func(test_file_path)

See the error

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 5, in func
  File "/pirlib/pirlib/task.py", line 113, in __call__
    return self.instance(self.name)(*args, **kwargs)
  File "/pirlib/pirlib/package.py", line 258, in wrapper
    return func(instance, *args, **kwargs)
  File "/pirlib/pirlib/task.py", line 67, in __call__
    outputs = backend.execute(package, self.name, self.config, inputs=inputs)
  File "/pirlib/pirlib/backends/inproc.py", line 72, in execute
    node_outputs[node.id] = self._execute_node(node, node_inputs)
  File "/pirlib/pirlib/backends/inproc.py", line 88, in _execute_node
    handler = getattr(importlib.import_module(module_name), handler_name)
AttributeError: module '__main__' has no attribute 'my_task'

A nested task in a pipeline:

from pirlib.pipeline import pipeline
from pirlib.task import task
from pirlib.iotypes import DirectoryPath, FilePath

test_file_path = FilePath("test/file.txt")

@pipeline
def my_pipeline(inp: FilePath) -> FilePath:
    @task
    def my_task(inp: FilePath) -> FilePath:
        return inp
    return my_task(test_file_path)

my_pipeline(test_file_path)

See the error

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/pirlib/pirlib/pipeline.py", line 88, in __call__
    return self.instance(self.name)(*args, **kwargs)
  File "/pirlib/pirlib/package.py", line 177, in wrapper
    return method(instance, *args, **kwargs)
  File "/pirlib/pirlib/pipeline.py", line 37, in __call__
    package = package_pipeline(self.defn)
  File "/pirlib/pirlib/package.py", line 147, in package_pipeline
    _pipeline_to_graph(definition.func, definition.name, definition.config)
  File "/pirlib/pirlib/package.py", line 164, in _pipeline_to_graph
    return_value = pipeline_func(*args, **kwargs)
  File "/miniconda3/envs/pirlib/lib/python3.8/site-packages/typeguard/__init__.py", line 1033, in wrapper
    retval = func(*args, **kwargs)
  File "<stdin>", line 6, in my_pipeline
  File "/pirlib/pirlib/task.py", line 113, in __call__
    return self.instance(self.name)(*args, **kwargs)
  File "/pirlib/pirlib/package.py", line 268, in wrapper
    inputs=_inspect_inputs(instance.func, args, kwargs),
  File "/pirlib/pirlib/package.py", line 220, in _inspect_inputs
    recurse_hint(add_input, param.name, param.annotation, value)
  File "/pirlib/pirlib/package.py", line 70, in recurse_hint
    return func(prefix, hint, *values)
  File "/pirlib/pirlib/package.py", line 210, in add_input
    source=value.source
AttributeError: 'FilePath' object has no attribute 'source'

Expected behavior
Either support this behavior to throw exceptions if find them

Environment (please complete the following information):

  • OS: MacOS
  • Python 3.8

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions