Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion taskflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__title__ = "Taskflow"
__version__ = "0.3.0"
__version__ = "0.4.0"

from .flow import * # noqa
from .tasks import * # noqa
34 changes: 22 additions & 12 deletions taskflow/flow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import deque
from copy import deepcopy
from uuid import uuid4

Expand Down Expand Up @@ -63,18 +64,27 @@ def _get_next(self, task):
if self.is_halted:
return None

sub_tasks = task.get_all_tasks()
for sub_task in sub_tasks:
if sub_task == task:
if sub_task.status == BaseTask.STATUS_PENDING:
return task
else:
next_task = self._get_next(sub_task)
if next_task:
return next_task

if task.status == BaseTask.STATUS_COMPLETE and task.next:
return self._get_next(task.next)
stack = deque()
stack.append(task)

while stack:
current_task = stack.pop()
current_status = current_task.status
if current_status != BaseTask.STATUS_PENDING:
if current_status == BaseTask.STATUS_COMPLETE and current_task.next:
# if it has a dependent, add that to the stack
stack.append(current_task.next)

# this task cannot be handled - move on.
continue

if current_task.is_standalone:
return current_task

sub_tasks = current_task.get_all_tasks()
# trying to avoid copying
for task_index in range(len(sub_tasks) - 1, -1, -1):
stack.append(sub_tasks[task_index])

return None

Expand Down
263 changes: 263 additions & 0 deletions taskflow/test/test_flow_get_next.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""
Comprehensive test suite for Flow._get_next method.

Covers:
- Simple chained tasks with different states (pending, running, complete, halted)
- Composite tasks with various child states
- Chained composite tasks with different states
"""

from taskflow.flow import Flow
from taskflow.tasks import BaseTask, CompositeTask, Task

from .fixtures import Handlers


def _make_simple_chain(n=3):
"""Create a simple chain: task1 -> task2 -> task3 (or more)."""
tasks = [Task(Handlers.repeat, args=(i,)) for i in range(1, n + 1)]
for i in range(len(tasks) - 1):
tasks[i].then(tasks[i + 1])
return tasks


def _make_flow_with_composite():
"""Create flow: task1 -> task2 -> Composite(task31, task32) -> task4."""
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
task31 = Task(Handlers.repeat, args=(31,))
task32 = Task(Handlers.repeat, args=(32,))
task3 = task2.then(CompositeTask(task31, task32))
task4 = task3.then(Task(Handlers.repeat, args=(4,)))
flow = Flow(task1)
return flow, task1, task2, task31, task32, task3, task4


def _reset_task(task, status=BaseTask.STATUS_PENDING, result=None):
"""Reset a task's state."""
task._status = status
task._result = result


class TestGetNextSimpleChain:
"""Tests for _get_next with simple chained tasks (no composites)."""

def test_all_pending_returns_first_task(self):
task1, _, _ = _make_simple_chain()
flow = Flow(task1)
assert flow._get_next(task1) == task1

def test_first_pending_returns_first(self):
task1, task2, task3 = _make_simple_chain()
flow = Flow(task1)
_reset_task(task2, BaseTask.STATUS_PENDING)
_reset_task(task3, BaseTask.STATUS_PENDING)
assert flow._get_next(task1) == task1

def test_first_complete_second_pending_returns_second(self):
task1, task2, _ = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
assert flow._get_next(task1) == task2

def test_first_running_returns_none(self):
task1, _, _ = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_RUNNING)
assert flow._get_next(task1) is None

def test_first_halted_returns_none(self):
task1, _, _ = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_HALTED)
assert flow._get_next(task1) is None

def test_second_running_returns_none(self):
task1, task2, _ = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_RUNNING)
assert flow._get_next(task1) is None

def test_second_halted_returns_none(self):
task1, task2, _ = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_HALTED)
assert flow._get_next(task1) is None

def test_all_complete_returns_none(self):
task1, task2, task3 = _make_simple_chain()
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task3, BaseTask.STATUS_COMPLETE, 3)
assert flow._get_next(task1) is None

def test_chain_of_four_progression(self):
tasks = _make_simple_chain(4)
task1, task2, task3, task4 = tasks
flow = Flow(task1)

assert flow._get_next(task1) == task1
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
assert flow._get_next(task1) == task2
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
assert flow._get_next(task1) == task3
_reset_task(task3, BaseTask.STATUS_COMPLETE, 3)
assert flow._get_next(task1) == task4
_reset_task(task4, BaseTask.STATUS_COMPLETE, 4)
assert flow._get_next(task1) is None


class TestGetNextSingleComposite:
"""Tests for _get_next with a single CompositeTask and various child states."""

def test_both_children_pending_returns_first_child(self):
flow, task1, task2, task31, _, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
result = flow._get_next(task1)
assert result == task31

def test_first_child_complete_second_pending_returns_second(self):
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_COMPLETE, 31)
assert flow._get_next(task1) == task32

def test_one_child_running_one_pending_returns_pending(self):
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_RUNNING)
assert flow._get_next(task1) == task32

def test_one_child_running_one_complete_returns_none(self):
"""Regression: avoids infinite loop when no pending task exists."""
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_RUNNING)
_reset_task(task32, BaseTask.STATUS_COMPLETE, 32)
assert flow._get_next(task1) is None

def test_both_children_running_returns_none(self):
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_RUNNING)
_reset_task(task32, BaseTask.STATUS_RUNNING)
assert flow._get_next(task1) is None

def test_one_child_halted_returns_none(self):
flow, task1, task2, task31, task32, _, _ = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_HALTED)
assert flow._get_next(task1) is None

def test_both_children_complete_returns_next_in_chain(self):
flow, task1, task2, task31, task32, _, task4 = _make_flow_with_composite()
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(task31, BaseTask.STATUS_COMPLETE, 31)
_reset_task(task32, BaseTask.STATUS_COMPLETE, 32)
assert flow._get_next(task1) == task4

def test_composite_with_three_children_returns_first_pending(self):
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
t31 = Task(Handlers.repeat, args=(31,))
t32 = Task(Handlers.repeat, args=(32,))
t33 = Task(Handlers.repeat, args=(33,))
task2.then(CompositeTask(t31, t32, t33))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
assert flow._get_next(task1) == t31
_reset_task(t31, BaseTask.STATUS_COMPLETE, 31)
assert flow._get_next(task1) == t32
_reset_task(t32, BaseTask.STATUS_COMPLETE, 32)
assert flow._get_next(task1) == t33


class TestGetNextChainedComposites:
"""Tests for _get_next with multiple composites chained together."""

def test_first_composite_both_pending_returns_first_child(self):
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
c1a = Task(Handlers.repeat, args=(101,))
c1b = Task(Handlers.repeat, args=(102,))
comp1 = task2.then(CompositeTask(c1a, c1b))
c2a = Task(Handlers.repeat, args=(201,))
c2b = Task(Handlers.repeat, args=(202,))
comp1.then(CompositeTask(c2a, c2b))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
assert flow._get_next(task1) == c1a

def test_first_composite_complete_second_has_pending_returns_second_composite_child(
self,
):
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
c1a = Task(Handlers.repeat, args=(101,))
c1b = Task(Handlers.repeat, args=(102,))
comp1 = task2.then(CompositeTask(c1a, c1b))
c2a = Task(Handlers.repeat, args=(201,))
c2b = Task(Handlers.repeat, args=(202,))
comp1.then(CompositeTask(c2a, c2b))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(c1a, BaseTask.STATUS_COMPLETE, 101)
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
assert flow._get_next(task1) == c2a

def test_first_composite_one_running_second_composite_pending_returns_none(self):
"""First composite blocks; second composite's children not yet reachable."""
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
c1a = Task(Handlers.repeat, args=(101,))
c1b = Task(Handlers.repeat, args=(102,))
comp1 = task2.then(CompositeTask(c1a, c1b))
c2a = Task(Handlers.repeat, args=(201,))
c2b = Task(Handlers.repeat, args=(202,))
comp1.then(CompositeTask(c2a, c2b))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(c1a, BaseTask.STATUS_RUNNING)
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
assert flow._get_next(task1) is None

def test_first_composite_one_running_one_pending_returns_pending(self):
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
c1a = Task(Handlers.repeat, args=(101,))
c1b = Task(Handlers.repeat, args=(102,))
comp1 = task2.then(CompositeTask(c1a, c1b))
comp1.then(Task(Handlers.repeat, args=(4,)))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(c1a, BaseTask.STATUS_RUNNING)
assert flow._get_next(task1) == c1b

def test_composite_complete_returns_next_task_in_chain(self):
task1 = Task(Handlers.repeat, args=(1,))
task2 = task1.then(Task(Handlers.repeat, args=(2,)))
c1a = Task(Handlers.repeat, args=(101,))
c1b = Task(Handlers.repeat, args=(102,))
comp1 = task2.then(CompositeTask(c1a, c1b))
task4 = comp1.then(Task(Handlers.repeat, args=(4,)))
flow = Flow(task1)
_reset_task(task1, BaseTask.STATUS_COMPLETE, 1)
_reset_task(task2, BaseTask.STATUS_COMPLETE, 2)
_reset_task(c1a, BaseTask.STATUS_COMPLETE, 101)
_reset_task(c1b, BaseTask.STATUS_COMPLETE, 102)
assert flow._get_next(task1) == task4