Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Executes parallel extraction of text content from both the CV PDF and job offer
"""
inputs = { cv_pdf = "PDF", job_offer_pdf = "PDF" }
output = "Dynamic"
parallels = [
branches = [
{ pipe = "extract_cv_text", result = "cv_pages" },
{ pipe = "extract_job_offer_text", result = "job_offer_pages" },
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ You must use `add_each_output`, `combined_output`, or both.
| `description` | string | A description of the parallel operation. | Yes |
| `inputs` | dictionary | The input concept(s) for the parallel operation, as a dictionary mapping input names to concept codes. | Yes |
| `output` | string | The output concept produced by the parallel operation. | Yes |
| `parallels` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes |
| `branches` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes |
| `add_each_output` | boolean | If `true`, adds the output of each parallel pipe to the working memory individually. Defaults to `true`. | No |
| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `parallels` array. | No |
| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `branches` array. | No |

### Parallel Step Configuration

Each entry in the `parallels` array is a table with the following keys:
Each entry in the `branches` array is a table with the following keys:

| Key | Type | Description | Required |
| -------- | ------ | ---------------------------------------------------------------------------------------- | -------- |
Expand Down Expand Up @@ -67,7 +67,7 @@ inputs = { description = "ProductDescription" }
output = "ProductAnalysis" # This name is for the combined output
add_each_output = true
combined_output = "ProductAnalysis"
parallels = [
branches = [
{ pipe = "extract_features", result = "features" },
{ pipe = "analyze_sentiment", result = "sentiment" },
]
Expand Down
2 changes: 1 addition & 1 deletion docs/home/9-tools/pipe-builder.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ And generates:

- **Domain concepts** - Data structures for your workflow (e.g., `CVAnalysis`, `InterviewQuestion`)
- **Pipe operators** - LLM calls, extractions, image generation steps
- **Pipe controllers** - Sequences, batches, parallels, conditions to orchestrate the flow
- **Pipe controllers** - Sequences, batches, parallel branches, conditions to orchestrate the flow
- **A complete bundle** - Ready to validate and run

## How It Works
Expand Down
2 changes: 1 addition & 1 deletion pipelex/builder/builder.plx
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Shape of the contract for PipeOperator is:
- steps: List of sub-pipes to execute sequentially. Each step has: pipe (name of the pipe to execute), result (variable name).

**PipeParallel:**
- parallels: List of sub-pipes to execute concurrently.
- branches: List of sub-pipes to execute concurrently.
- add_each_output: Boolean - include individual outputs in combined result.
- combined_output: Optional ConceptCode (PascalCase) for combined structure.

Expand Down
2 changes: 1 addition & 1 deletion pipelex/builder/builder_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _prune_unreachable_specs(self, pipelex_bundle_spec: PipelexBundleSpec) -> Pi
if isinstance(pipe_spec, PipeSequenceSpec):
sub_pipe_codes = [step.pipe_code for step in pipe_spec.steps]
elif isinstance(pipe_spec, PipeParallelSpec):
sub_pipe_codes = [parallel.pipe_code for parallel in pipe_spec.parallels]
sub_pipe_codes = [branch.pipe_code for branch in pipe_spec.branches]
elif isinstance(pipe_spec, PipeBatchSpec):
sub_pipe_codes = [pipe_spec.branch_pipe_code]
elif isinstance(pipe_spec, PipeConditionSpec):
Expand Down
26 changes: 13 additions & 13 deletions pipelex/builder/pipe/pipe_parallel_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ class PipeParallelSpec(PipeSpec):
and their outputs can be combined or kept separate.

Validation Rules:
1. Parallels list must not be empty.
2. Each parallel step must be a valid SubPipeSpec.
1. Branches list must not be empty.
2. Each branch must be a valid SubPipeSpec.
3. combined_output, when specified, must be a valid ConceptCode in PascalCase.
4. Pipe codes in parallels must reference existing pipes (snake_case).
4. Pipe codes in branches must reference existing pipes (snake_case).

"""

type: Literal["PipeParallel"] = "PipeParallel"
pipe_category: Literal["PipeController"] = "PipeController"
parallels: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.")
branches: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.")
add_each_output: bool = Field(description="Whether to include individual pipe outputs in the combined result.")
combined_output: str | None = Field(
default=None,
Expand Down Expand Up @@ -74,7 +74,7 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri

# Add parallel branches as a table
parallel_group.renderables.append(Text()) # Blank line
parallels_table = Table(
branches_table = Table(
title="Parallel Branches:",
title_justify="left",
title_style="not italic",
Expand All @@ -84,28 +84,28 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri
show_lines=True,
border_style="dim",
)
parallels_table.add_column("Branch", style="dim", width=6, justify="right")
parallels_table.add_column("Pipe", style="red")
parallels_table.add_column("Result name", style="cyan")
branches_table.add_column("Branch", style="dim", width=6, justify="right")
branches_table.add_column("Pipe", style="red")
branches_table.add_column("Result name", style="cyan")

for idx, parallel in enumerate(self.parallels, start=1):
parallels_table.add_row(str(idx), parallel.pipe_code, parallel.result)
for idx, branch in enumerate(self.branches, start=1):
branches_table.add_row(str(idx), branch.pipe_code, branch.result)

parallel_group.renderables.append(parallels_table)
parallel_group.renderables.append(branches_table)

return parallel_group

@override
def to_blueprint(self) -> PipeParallelBlueprint:
base_blueprint = super().to_blueprint()
core_parallels = [parallel.to_blueprint() for parallel in self.parallels]
core_branches = [branch.to_blueprint() for branch in self.branches]
return PipeParallelBlueprint(
description=base_blueprint.description,
inputs=base_blueprint.inputs,
output=base_blueprint.output,
type=self.type,
pipe_category=self.pipe_category,
parallels=core_parallels,
branches=core_branches,
add_each_output=self.add_each_output,
combined_output=self.combined_output,
)
30 changes: 15 additions & 15 deletions pipelex/cli/agent_cli/commands/pipe_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ def _add_type_specific_fields(pipe_spec: PipeSpec, pipe_table: tomlkit.TOMLDocum
pipe_table.add("add_each_output", pipe_spec.add_each_output)
if pipe_spec.combined_output:
pipe_table.add("combined_output", pipe_spec.combined_output)
parallels_array = tomlkit.array()
for parallel in pipe_spec.parallels:
parallel_inline = tomlkit.inline_table()
parallel_inline.append("pipe", parallel.pipe_code)
parallel_inline.append("result", parallel.result)
parallels_array.append(parallel_inline)
pipe_table.add("parallels", parallels_array)
branches_array = tomlkit.array()
for branch in pipe_spec.branches:
branch_inline = tomlkit.inline_table()
branch_inline.append("pipe", branch.pipe_code)
branch_inline.append("result", branch.result)
branches_array.append(branch_inline)
pipe_table.add("branches", branches_array)

elif isinstance(pipe_spec, PipeConditionSpec):
pipe_table.add("expression", pipe_spec.jinja2_expression_template)
Expand Down Expand Up @@ -189,7 +189,7 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip
# Add type to spec_data if not present
spec_data["type"] = pipe_type

# Handle steps/parallels conversion - need to convert pipe to pipe_code
# Handle steps/branches conversion - need to convert pipe to pipe_code
if "steps" in spec_data:
converted_steps = []
for step in spec_data["steps"]:
Expand All @@ -198,13 +198,13 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip
converted_steps.append(step)
spec_data["steps"] = converted_steps

if "parallels" in spec_data:
converted_parallels = []
for parallel in spec_data["parallels"]:
if "pipe" in parallel and "pipe_code" not in parallel:
parallel["pipe_code"] = parallel.pop("pipe")
converted_parallels.append(parallel)
spec_data["parallels"] = converted_parallels
if "branches" in spec_data:
converted_branches = []
for branch in spec_data["branches"]:
if "pipe" in branch and "pipe_code" not in branch:
branch["pipe_code"] = branch.pop("pipe")
converted_branches.append(branch)
spec_data["branches"] = converted_branches

# Handle expression -> jinja2_expression_template for PipeCondition
if pipe_type == "PipeCondition" and "expression" in spec_data:
Expand Down
1 change: 0 additions & 1 deletion pipelex/core/stuffs/stuff_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def combine_stuffs(
stuff_contents: dict[str, StuffContent],
name: str | None = None,
) -> Stuff:
# TODO: Add unit tests for this method
"""Combine a dictionary of stuffs into a single stuff."""
the_subclass = get_class_registry().get_required_subclass(name=concept.structure_class_name, base_class=StuffContent)
try:
Expand Down
103 changes: 95 additions & 8 deletions pipelex/graph/graph_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
self.metrics: dict[str, float] = {}
self.error: ErrorSpec | None = None
self.input_specs: list[IOSpec] = input_specs or []
self.output_spec: IOSpec | None = None
self.output_specs: list[IOSpec] = []

def to_node_spec(self) -> NodeSpec:
"""Convert to immutable NodeSpec."""
Expand All @@ -59,9 +59,7 @@ def to_node_spec(self) -> NodeSpec:
)

# Build NodeIOSpec from captured input/output specs
outputs: list[IOSpec] = []
if self.output_spec is not None:
outputs = [self.output_spec]
outputs = list(self.output_specs)

node_io = NodeIOSpec(
inputs=self.input_specs,
Expand Down Expand Up @@ -110,6 +108,11 @@ def __init__(self) -> None:
# The batch_controller_node_id is tracked to ensure BATCH_AGGREGATE edges target the correct node
# (the PipeBatch), not a parent controller that may later register as producer of the same stuff
self._batch_aggregate_map: dict[str, tuple[str | None, list[tuple[str, int]]]] = {}
# Maps combined_stuff_code -> (parallel_controller_node_id, [(branch_stuff_code, branch_producer_node_id)])
# Used to create PARALLEL_COMBINE edges from branch outputs to combined output
# The branch_producer_node_id is snapshotted at registration time, before register_controller_output
# overrides _stuff_producer_map to point branch stuff codes to the controller node
self._parallel_combine_map: dict[str, tuple[str, list[tuple[str, str]]]] = {}

@property
def is_active(self) -> bool:
Expand Down Expand Up @@ -139,6 +142,7 @@ def setup(
self._stuff_producer_map = {}
self._batch_item_map = {}
self._batch_aggregate_map = {}
self._parallel_combine_map = {}

return GraphContext(
graph_id=graph_id,
Expand All @@ -164,6 +168,7 @@ def teardown(self) -> GraphSpec | None:
self._generate_data_edges()
self._generate_batch_item_edges()
self._generate_batch_aggregate_edges()
self._generate_parallel_combine_edges()

self._is_active = False

Expand All @@ -187,6 +192,7 @@ def teardown(self) -> GraphSpec | None:
self._stuff_producer_map = {}
self._batch_item_map = {}
self._batch_aggregate_map = {}
self._parallel_combine_map = {}

return graph

Expand Down Expand Up @@ -294,6 +300,26 @@ def _generate_batch_aggregate_edges(self) -> None:
target_stuff_digest=output_list_stuff_code,
)

def _generate_parallel_combine_edges(self) -> None:
"""Generate PARALLEL_COMBINE edges from branch output stuff nodes to the combined output stuff node.

For each registered parallel combine, create edges from each branch output
to the combined output, showing how individual branch results are merged.

Uses snapshotted branch producer node IDs captured during register_parallel_combine,
before register_controller_output overrides _stuff_producer_map.
"""
for combined_stuff_code, (parallel_controller_node_id, branch_entries) in self._parallel_combine_map.items():
for branch_stuff_code, branch_producer_id in branch_entries:
if branch_producer_id != parallel_controller_node_id:
self.add_edge(
source_node_id=branch_producer_id,
target_node_id=parallel_controller_node_id,
edge_kind=EdgeKind.PARALLEL_COMBINE,
source_stuff_digest=branch_stuff_code,
target_stuff_digest=combined_stuff_code,
)

@override
def register_batch_item_extraction(
self,
Expand Down Expand Up @@ -348,6 +374,32 @@ def register_batch_aggregation(
# Note: We keep the first batch_controller_node_id registered for this output list
# (all items for the same output list should come from the same batch controller)

@override
def register_parallel_combine(
self,
combined_stuff_code: str,
branch_stuff_codes: list[str],
parallel_controller_node_id: str,
) -> None:
"""Register that branch outputs are combined into a single output in PipeParallel.

Args:
combined_stuff_code: The stuff_code of the combined output.
branch_stuff_codes: The stuff_codes of the individual branch outputs.
parallel_controller_node_id: The node_id of the PipeParallel controller.
"""
if not self._is_active:
return
# Snapshot the current branch producers from _stuff_producer_map before
# register_controller_output overrides them to point to the controller node.
# This must be called BEFORE _register_branch_outputs_with_graph_tracer.
branch_entries: list[tuple[str, str]] = []
for branch_code in branch_stuff_codes:
producer_id = self._stuff_producer_map.get(branch_code)
if producer_id:
branch_entries.append((branch_code, producer_id))
self._parallel_combine_map[combined_stuff_code] = (parallel_controller_node_id, branch_entries)

@override
def on_pipe_start(
self,
Expand Down Expand Up @@ -422,10 +474,45 @@ def on_pipe_end_success(

# Store output spec and register in producer map for data flow tracking
if output_spec is not None:
node_data.output_spec = output_spec
# Register this node as the producer of this stuff_code (digest)
if output_spec.digest:
self._stuff_producer_map[output_spec.digest] = node_id
# Skip pass-through outputs: if the output digest matches one of the node's
# input digests, the output is just the unchanged input flowing through
# (e.g., PipeParallel with add_each_output where main_stuff is the original input)
input_digests = {spec.digest for spec in node_data.input_specs if spec.digest is not None}
if output_spec.digest in input_digests:
# Pass-through: don't register as output or producer
pass
else:
node_data.output_specs.append(output_spec)
# Register this node as the producer of this stuff_code (digest)
if output_spec.digest:
self._stuff_producer_map[output_spec.digest] = node_id

@override
def register_controller_output(
self,
node_id: str,
output_spec: IOSpec,
) -> None:
"""Register an additional output for a controller node.

This allows controllers like PipeParallel to explicitly register their
branch outputs, overriding sub-pipe registrations in _stuff_producer_map
so that DATA edges flow from the controller to downstream consumers.

Args:
node_id: The controller node ID.
output_spec: The IOSpec describing the output.
"""
if not self._is_active:
return

node_data = self._nodes.get(node_id)
if node_data is None:
return

node_data.output_specs.append(output_spec)
if output_spec.digest:
self._stuff_producer_map[output_spec.digest] = node_id

@override
def on_pipe_end_error(
Expand Down
Loading
Loading