diff --git a/.github/workflows/claude-code-review.yml b/.github/workflows/claude-code-review.yml new file mode 100644 index 000000000..f04e92439 --- /dev/null +++ b/.github/workflows/claude-code-review.yml @@ -0,0 +1,43 @@ +name: Claude Code Review + +on: + pull_request: + types: [opened, synchronize, ready_for_review, reopened] + # Optional: Only run on specific file changes + # paths: + # - "src/**/*.ts" + # - "src/**/*.tsx" + # - "src/**/*.js" + # - "src/**/*.jsx" + +jobs: + claude-review: + # Optional: Filter by PR author + # if: | + # github.event.pull_request.user.login == 'external-contributor' || + # github.event.pull_request.user.login == 'new-developer' || + # github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR' + + runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write + issues: read + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Run Claude Code Review + id: claude-review + uses: anthropics/claude-code-action@v1 + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + plugin_marketplaces: 'https://github.com/anthropics/claude-code.git' + plugins: 'code-review@claude-code-plugins' + prompt: '/code-review:code-review ${{ github.repository }}/pull/${{ github.event.pull_request.number }}' + # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md + # or https://code.claude.com/docs/en/cli-reference for available options diff --git a/.github/workflows/claude.yml b/.github/workflows/claude.yml new file mode 100644 index 000000000..4c9215930 --- /dev/null +++ b/.github/workflows/claude.yml @@ -0,0 +1,49 @@ +name: Claude Code + +on: + issue_comment: + types: [created] + pull_request_review_comment: + types: [created] + issues: + types: [opened, assigned] + pull_request_review: + types: [submitted] + +jobs: + claude: + if: | + (github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) || + (github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) || + (github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) || + (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) + runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write + issues: write + id-token: write + actions: read # Required for Claude to read CI results on PRs + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Run Claude Code + id: claude + uses: anthropics/claude-code-action@v1 + with: + claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }} + + # This is an optional setting that allows Claude to read CI results on PRs + additional_permissions: | + actions: read + + # Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it. + # prompt: 'Update the pull request description to include a summary of changes.' + + # Optional: Add claude_args to customize behavior and configuration + # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md + # or https://code.claude.com/docs/en/cli-reference for available options + # claude_args: '--allowed-tools Bash(gh pr:*)' diff --git a/README.md b/README.md index 41bd2b779..1c4eb35a7 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ Executes parallel extraction of text content from both the CV PDF and job offer """ inputs = { cv_pdf = "PDF", job_offer_pdf = "PDF" } output = "Dynamic" -parallels = [ +branches = [ { pipe = "extract_cv_text", result = "cv_pages" }, { pipe = "extract_job_offer_text", result = "job_offer_pages" }, ] diff --git a/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md b/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md index 9bff1cb53..f3243188e 100644 --- a/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md +++ b/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md @@ -26,13 +26,13 @@ You must use `add_each_output`, `combined_output`, or both. | `description` | string | A description of the parallel operation. | Yes | | `inputs` | dictionary | The input concept(s) for the parallel operation, as a dictionary mapping input names to concept codes. | Yes | | `output` | string | The output concept produced by the parallel operation. | Yes | -| `parallels` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes | +| `branches` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes | | `add_each_output` | boolean | If `true`, adds the output of each parallel pipe to the working memory individually. Defaults to `true`. | No | -| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `parallels` array. | No | +| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `branches` array. | No | ### Parallel Step Configuration -Each entry in the `parallels` array is a table with the following keys: +Each entry in the `branches` array is a table with the following keys: | Key | Type | Description | Required | | -------- | ------ | ---------------------------------------------------------------------------------------- | -------- | @@ -67,7 +67,7 @@ inputs = { description = "ProductDescription" } output = "ProductAnalysis" # This name is for the combined output add_each_output = true combined_output = "ProductAnalysis" -parallels = [ +branches = [ { pipe = "extract_features", result = "features" }, { pipe = "analyze_sentiment", result = "sentiment" }, ] diff --git a/docs/home/9-tools/pipe-builder.md b/docs/home/9-tools/pipe-builder.md index a0efc748c..e7ab37a5b 100644 --- a/docs/home/9-tools/pipe-builder.md +++ b/docs/home/9-tools/pipe-builder.md @@ -15,7 +15,7 @@ And generates: - **Domain concepts** - Data structures for your workflow (e.g., `CVAnalysis`, `InterviewQuestion`) - **Pipe operators** - LLM calls, extractions, image generation steps -- **Pipe controllers** - Sequences, batches, parallels, conditions to orchestrate the flow +- **Pipe controllers** - Sequences, batches, parallel branches, conditions to orchestrate the flow - **A complete bundle** - Ready to validate and run ## How It Works diff --git a/pipelex/builder/builder.plx b/pipelex/builder/builder.plx index aba5d53ab..043a7f18a 100644 --- a/pipelex/builder/builder.plx +++ b/pipelex/builder/builder.plx @@ -222,7 +222,7 @@ Shape of the contract for PipeOperator is: - steps: List of sub-pipes to execute sequentially. Each step has: pipe (name of the pipe to execute), result (variable name). **PipeParallel:** -- parallels: List of sub-pipes to execute concurrently. +- branches: List of sub-pipes to execute concurrently. - add_each_output: Boolean - include individual outputs in combined result. - combined_output: Optional ConceptCode (PascalCase) for combined structure. diff --git a/pipelex/builder/builder_loop.py b/pipelex/builder/builder_loop.py index e3586c50d..afd69a6d6 100644 --- a/pipelex/builder/builder_loop.py +++ b/pipelex/builder/builder_loop.py @@ -313,7 +313,7 @@ def _prune_unreachable_specs(self, pipelex_bundle_spec: PipelexBundleSpec) -> Pi if isinstance(pipe_spec, PipeSequenceSpec): sub_pipe_codes = [step.pipe_code for step in pipe_spec.steps] elif isinstance(pipe_spec, PipeParallelSpec): - sub_pipe_codes = [parallel.pipe_code for parallel in pipe_spec.parallels] + sub_pipe_codes = [branch.pipe_code for branch in pipe_spec.branches] elif isinstance(pipe_spec, PipeBatchSpec): sub_pipe_codes = [pipe_spec.branch_pipe_code] elif isinstance(pipe_spec, PipeConditionSpec): diff --git a/pipelex/builder/pipe/pipe_parallel_spec.py b/pipelex/builder/pipe/pipe_parallel_spec.py index 216689cd6..6890bc16d 100644 --- a/pipelex/builder/pipe/pipe_parallel_spec.py +++ b/pipelex/builder/pipe/pipe_parallel_spec.py @@ -23,16 +23,16 @@ class PipeParallelSpec(PipeSpec): and their outputs can be combined or kept separate. Validation Rules: - 1. Parallels list must not be empty. - 2. Each parallel step must be a valid SubPipeSpec. + 1. Branches list must not be empty. + 2. Each branch must be a valid SubPipeSpec. 3. combined_output, when specified, must be a valid ConceptCode in PascalCase. - 4. Pipe codes in parallels must reference existing pipes (snake_case). + 4. Pipe codes in branches must reference existing pipes (snake_case). """ type: Literal["PipeParallel"] = "PipeParallel" pipe_category: Literal["PipeController"] = "PipeController" - parallels: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.") + branches: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.") add_each_output: bool = Field(description="Whether to include individual pipe outputs in the combined result.") combined_output: str | None = Field( default=None, @@ -74,7 +74,7 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri # Add parallel branches as a table parallel_group.renderables.append(Text()) # Blank line - parallels_table = Table( + branches_table = Table( title="Parallel Branches:", title_justify="left", title_style="not italic", @@ -84,28 +84,28 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri show_lines=True, border_style="dim", ) - parallels_table.add_column("Branch", style="dim", width=6, justify="right") - parallels_table.add_column("Pipe", style="red") - parallels_table.add_column("Result name", style="cyan") + branches_table.add_column("Branch", style="dim", width=6, justify="right") + branches_table.add_column("Pipe", style="red") + branches_table.add_column("Result name", style="cyan") - for idx, parallel in enumerate(self.parallels, start=1): - parallels_table.add_row(str(idx), parallel.pipe_code, parallel.result) + for idx, branch in enumerate(self.branches, start=1): + branches_table.add_row(str(idx), branch.pipe_code, branch.result) - parallel_group.renderables.append(parallels_table) + parallel_group.renderables.append(branches_table) return parallel_group @override def to_blueprint(self) -> PipeParallelBlueprint: base_blueprint = super().to_blueprint() - core_parallels = [parallel.to_blueprint() for parallel in self.parallels] + core_branches = [branch.to_blueprint() for branch in self.branches] return PipeParallelBlueprint( description=base_blueprint.description, inputs=base_blueprint.inputs, output=base_blueprint.output, type=self.type, pipe_category=self.pipe_category, - parallels=core_parallels, + branches=core_branches, add_each_output=self.add_each_output, combined_output=self.combined_output, ) diff --git a/pipelex/cli/agent_cli/commands/pipe_cmd.py b/pipelex/cli/agent_cli/commands/pipe_cmd.py index d41276195..b641b8868 100644 --- a/pipelex/cli/agent_cli/commands/pipe_cmd.py +++ b/pipelex/cli/agent_cli/commands/pipe_cmd.py @@ -129,13 +129,13 @@ def _add_type_specific_fields(pipe_spec: PipeSpec, pipe_table: tomlkit.TOMLDocum pipe_table.add("add_each_output", pipe_spec.add_each_output) if pipe_spec.combined_output: pipe_table.add("combined_output", pipe_spec.combined_output) - parallels_array = tomlkit.array() - for parallel in pipe_spec.parallels: - parallel_inline = tomlkit.inline_table() - parallel_inline.append("pipe", parallel.pipe_code) - parallel_inline.append("result", parallel.result) - parallels_array.append(parallel_inline) - pipe_table.add("parallels", parallels_array) + branches_array = tomlkit.array() + for branch in pipe_spec.branches: + branch_inline = tomlkit.inline_table() + branch_inline.append("pipe", branch.pipe_code) + branch_inline.append("result", branch.result) + branches_array.append(branch_inline) + pipe_table.add("branches", branches_array) elif isinstance(pipe_spec, PipeConditionSpec): pipe_table.add("expression", pipe_spec.jinja2_expression_template) @@ -189,7 +189,7 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip # Add type to spec_data if not present spec_data["type"] = pipe_type - # Handle steps/parallels conversion - need to convert pipe to pipe_code + # Handle steps/branches conversion - need to convert pipe to pipe_code if "steps" in spec_data: converted_steps = [] for step in spec_data["steps"]: @@ -198,13 +198,13 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip converted_steps.append(step) spec_data["steps"] = converted_steps - if "parallels" in spec_data: - converted_parallels = [] - for parallel in spec_data["parallels"]: - if "pipe" in parallel and "pipe_code" not in parallel: - parallel["pipe_code"] = parallel.pop("pipe") - converted_parallels.append(parallel) - spec_data["parallels"] = converted_parallels + if "branches" in spec_data: + converted_branches = [] + for branch in spec_data["branches"]: + if "pipe" in branch and "pipe_code" not in branch: + branch["pipe_code"] = branch.pop("pipe") + converted_branches.append(branch) + spec_data["branches"] = converted_branches # Handle expression -> jinja2_expression_template for PipeCondition if pipe_type == "PipeCondition" and "expression" in spec_data: diff --git a/pipelex/core/stuffs/stuff_factory.py b/pipelex/core/stuffs/stuff_factory.py index 53e86c3c7..0bb90aff3 100644 --- a/pipelex/core/stuffs/stuff_factory.py +++ b/pipelex/core/stuffs/stuff_factory.py @@ -110,7 +110,6 @@ def combine_stuffs( stuff_contents: dict[str, StuffContent], name: str | None = None, ) -> Stuff: - # TODO: Add unit tests for this method """Combine a dictionary of stuffs into a single stuff.""" the_subclass = get_class_registry().get_required_subclass(name=concept.structure_class_name, base_class=StuffContent) try: diff --git a/pipelex/graph/graph_tracer.py b/pipelex/graph/graph_tracer.py index 175f87d68..125273799 100644 --- a/pipelex/graph/graph_tracer.py +++ b/pipelex/graph/graph_tracer.py @@ -47,7 +47,7 @@ def __init__( self.metrics: dict[str, float] = {} self.error: ErrorSpec | None = None self.input_specs: list[IOSpec] = input_specs or [] - self.output_spec: IOSpec | None = None + self.output_specs: list[IOSpec] = [] def to_node_spec(self) -> NodeSpec: """Convert to immutable NodeSpec.""" @@ -59,9 +59,7 @@ def to_node_spec(self) -> NodeSpec: ) # Build NodeIOSpec from captured input/output specs - outputs: list[IOSpec] = [] - if self.output_spec is not None: - outputs = [self.output_spec] + outputs = list(self.output_specs) node_io = NodeIOSpec( inputs=self.input_specs, @@ -110,6 +108,11 @@ def __init__(self) -> None: # The batch_controller_node_id is tracked to ensure BATCH_AGGREGATE edges target the correct node # (the PipeBatch), not a parent controller that may later register as producer of the same stuff self._batch_aggregate_map: dict[str, tuple[str | None, list[tuple[str, int]]]] = {} + # Maps combined_stuff_code -> (parallel_controller_node_id, [(branch_stuff_code, branch_producer_node_id)]) + # Used to create PARALLEL_COMBINE edges from branch outputs to combined output + # The branch_producer_node_id is snapshotted at registration time, before register_controller_output + # overrides _stuff_producer_map to point branch stuff codes to the controller node + self._parallel_combine_map: dict[str, tuple[str, list[tuple[str, str]]]] = {} @property def is_active(self) -> bool: @@ -139,6 +142,7 @@ def setup( self._stuff_producer_map = {} self._batch_item_map = {} self._batch_aggregate_map = {} + self._parallel_combine_map = {} return GraphContext( graph_id=graph_id, @@ -164,6 +168,7 @@ def teardown(self) -> GraphSpec | None: self._generate_data_edges() self._generate_batch_item_edges() self._generate_batch_aggregate_edges() + self._generate_parallel_combine_edges() self._is_active = False @@ -187,6 +192,7 @@ def teardown(self) -> GraphSpec | None: self._stuff_producer_map = {} self._batch_item_map = {} self._batch_aggregate_map = {} + self._parallel_combine_map = {} return graph @@ -294,6 +300,26 @@ def _generate_batch_aggregate_edges(self) -> None: target_stuff_digest=output_list_stuff_code, ) + def _generate_parallel_combine_edges(self) -> None: + """Generate PARALLEL_COMBINE edges from branch output stuff nodes to the combined output stuff node. + + For each registered parallel combine, create edges from each branch output + to the combined output, showing how individual branch results are merged. + + Uses snapshotted branch producer node IDs captured during register_parallel_combine, + before register_controller_output overrides _stuff_producer_map. + """ + for combined_stuff_code, (parallel_controller_node_id, branch_entries) in self._parallel_combine_map.items(): + for branch_stuff_code, branch_producer_id in branch_entries: + if branch_producer_id != parallel_controller_node_id: + self.add_edge( + source_node_id=branch_producer_id, + target_node_id=parallel_controller_node_id, + edge_kind=EdgeKind.PARALLEL_COMBINE, + source_stuff_digest=branch_stuff_code, + target_stuff_digest=combined_stuff_code, + ) + @override def register_batch_item_extraction( self, @@ -348,6 +374,32 @@ def register_batch_aggregation( # Note: We keep the first batch_controller_node_id registered for this output list # (all items for the same output list should come from the same batch controller) + @override + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Args: + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + if not self._is_active: + return + # Snapshot the current branch producers from _stuff_producer_map before + # register_controller_output overrides them to point to the controller node. + # This must be called BEFORE _register_branch_outputs_with_graph_tracer. + branch_entries: list[tuple[str, str]] = [] + for branch_code in branch_stuff_codes: + producer_id = self._stuff_producer_map.get(branch_code) + if producer_id: + branch_entries.append((branch_code, producer_id)) + self._parallel_combine_map[combined_stuff_code] = (parallel_controller_node_id, branch_entries) + @override def on_pipe_start( self, @@ -422,10 +474,45 @@ def on_pipe_end_success( # Store output spec and register in producer map for data flow tracking if output_spec is not None: - node_data.output_spec = output_spec - # Register this node as the producer of this stuff_code (digest) - if output_spec.digest: - self._stuff_producer_map[output_spec.digest] = node_id + # Skip pass-through outputs: if the output digest matches one of the node's + # input digests, the output is just the unchanged input flowing through + # (e.g., PipeParallel with add_each_output where main_stuff is the original input) + input_digests = {spec.digest for spec in node_data.input_specs if spec.digest is not None} + if output_spec.digest in input_digests: + # Pass-through: don't register as output or producer + pass + else: + node_data.output_specs.append(output_spec) + # Register this node as the producer of this stuff_code (digest) + if output_spec.digest: + self._stuff_producer_map[output_spec.digest] = node_id + + @override + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + This allows controllers like PipeParallel to explicitly register their + branch outputs, overriding sub-pipe registrations in _stuff_producer_map + so that DATA edges flow from the controller to downstream consumers. + + Args: + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + if not self._is_active: + return + + node_data = self._nodes.get(node_id) + if node_data is None: + return + + node_data.output_specs.append(output_spec) + if output_spec.digest: + self._stuff_producer_map[output_spec.digest] = node_id @override def on_pipe_end_error( diff --git a/pipelex/graph/graph_tracer_manager.py b/pipelex/graph/graph_tracer_manager.py index f2cdf238b..3b4d770df 100644 --- a/pipelex/graph/graph_tracer_manager.py +++ b/pipelex/graph/graph_tracer_manager.py @@ -298,6 +298,27 @@ def add_edge( label=label, ) + def register_controller_output( + self, + graph_id: str, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + Args: + graph_id: The graph identifier. + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + tracer = self._get_tracer(graph_id) + if tracer is None: + return + tracer.register_controller_output( + node_id=node_id, + output_spec=output_spec, + ) + def register_batch_item_extraction( self, graph_id: str, @@ -353,3 +374,27 @@ def register_batch_aggregation( item_index=item_index, batch_controller_node_id=batch_controller_node_id, ) + + def register_parallel_combine( + self, + graph_id: str, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Args: + graph_id: The graph identifier. + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + tracer = self._get_tracer(graph_id) + if tracer is None: + return + tracer.register_parallel_combine( + combined_stuff_code=combined_stuff_code, + branch_stuff_codes=branch_stuff_codes, + parallel_controller_node_id=parallel_controller_node_id, + ) diff --git a/pipelex/graph/graph_tracer_protocol.py b/pipelex/graph/graph_tracer_protocol.py index 213e342cd..adbb217a8 100644 --- a/pipelex/graph/graph_tracer_protocol.py +++ b/pipelex/graph/graph_tracer_protocol.py @@ -126,6 +126,22 @@ def add_edge( """ ... + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + This allows controllers like PipeParallel to explicitly register their + branch outputs so that DATA edges flow from the controller to downstream consumers. + + Args: + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + ... + def register_batch_item_extraction( self, list_stuff_code: str, @@ -163,6 +179,24 @@ def register_batch_aggregation( """ ... + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Creates PARALLEL_COMBINE edges from each branch output stuff node + to the combined output stuff node. + + Args: + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + ... + class GraphTracerNoOp(GraphTracerProtocol): """No-operation implementation of GraphTracerProtocol. @@ -235,6 +269,14 @@ def add_edge( ) -> None: pass + @override + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + pass + @override def register_batch_item_extraction( self, @@ -254,3 +296,12 @@ def register_batch_aggregation( batch_controller_node_id: str | None = None, ) -> None: pass + + @override + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + pass diff --git a/pipelex/graph/graphspec.py b/pipelex/graph/graphspec.py index a7d4c440c..66892c827 100644 --- a/pipelex/graph/graphspec.py +++ b/pipelex/graph/graphspec.py @@ -49,13 +49,21 @@ class EdgeKind(StrEnum): SELECTED_OUTCOME = "selected_outcome" BATCH_ITEM = "batch_item" # list → item extraction during batch iteration BATCH_AGGREGATE = "batch_aggregate" # items → output list aggregation + PARALLEL_COMBINE = "parallel_combine" # branch outputs → combined output in PipeParallel @property def is_data(self) -> bool: match self: case EdgeKind.DATA: return True - case EdgeKind.CONTROL | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.CONTAINS + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_ITEM + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -63,7 +71,14 @@ def is_contains(self) -> bool: match self: case EdgeKind.CONTAINS: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.DATA + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_ITEM + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -71,7 +86,7 @@ def is_selected_outcome(self) -> bool: match self: case EdgeKind.SELECTED_OUTCOME: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE | EdgeKind.PARALLEL_COMBINE: return False @property @@ -79,7 +94,14 @@ def is_batch_item(self) -> bool: match self: case EdgeKind.BATCH_ITEM: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.DATA + | EdgeKind.CONTAINS + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -87,7 +109,15 @@ def is_batch_aggregate(self) -> bool: match self: case EdgeKind.BATCH_AGGREGATE: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM: + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.PARALLEL_COMBINE: + return False + + @property + def is_parallel_combine(self) -> bool: + match self: + case EdgeKind.PARALLEL_COMBINE: + return True + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: return False diff --git a/pipelex/graph/mermaidflow/mermaidflow_factory.py b/pipelex/graph/mermaidflow/mermaidflow_factory.py index b84a723fb..7a3159c05 100644 --- a/pipelex/graph/mermaidflow/mermaidflow_factory.py +++ b/pipelex/graph/mermaidflow/mermaidflow_factory.py @@ -13,6 +13,7 @@ from pipelex.graph.graph_analysis import GraphAnalysis from pipelex.graph.graph_config import GraphConfig from pipelex.graph.graphspec import ( + EdgeSpec, GraphSpec, NodeKind, NodeSpec, @@ -125,6 +126,23 @@ def make_from_graphspec( # This allows batch item stuffs to be placed inside their consumer's subgraph rendered_orphan_stuffs: set[str] = set() + # Build mapping of controller node_id → {digest: (name, concept)} for parallel_combine + # target stuffs. These are outputs of parallel controllers and should be rendered + # inside the controller's subgraph rather than as orphans at top level. + # We collect the stuff info from controller node outputs directly, because these + # stuffs may not be in stuff_registry (which skips controller nodes). + controller_output_stuffs: dict[str, dict[str, tuple[str, str | None]]] = {} + for edge in graph.edges: + if edge.kind.is_parallel_combine and edge.target_stuff_digest: + controller_output_stuffs.setdefault(edge.target, {})[edge.target_stuff_digest] = ("", None) + # Resolve names and concepts from the controller nodes' outputs + for controller_id, digest_map in controller_output_stuffs.items(): + controller_node = analysis.nodes_by_id.get(controller_id) + if controller_node: + for output_spec in controller_node.node_io.outputs: + if output_spec.digest and output_spec.digest in digest_map: + digest_map[output_spec.digest] = (output_spec.name, output_spec.concept) + # Render pipe nodes and their produced stuff within controller subgraphs lines.append("") lines.append(" %% Pipe and stuff nodes within controller subgraphs") @@ -141,6 +159,7 @@ def make_from_graphspec( subgraph_depths=subgraph_depths, show_stuff_codes=show_stuff_codes, rendered_orphan_stuffs=rendered_orphan_stuffs, + controller_output_stuffs=controller_output_stuffs, ) lines.extend(node_lines) @@ -199,6 +218,15 @@ def make_from_graphspec( ) lines.append(stuff_line) + # Build supplementary stuff info from all nodes (including controllers) + # This is needed for batch_aggregate target_stuff_digest which may not be in stuff_registry + # (GraphAnalysis.stuff_registry skips controller outputs) + all_stuff_info: dict[str, tuple[str, str | None]] = {} + for node in graph.nodes: + for output_spec in node.node_io.outputs: + if output_spec.digest and output_spec.digest not in all_stuff_info: + all_stuff_info[output_spec.digest] = (output_spec.name, output_spec.concept) + # Render edges: producer -> stuff lines.append("") lines.append(" %% Data flow edges: producer -> stuff -> consumer") @@ -220,26 +248,24 @@ def make_from_graphspec( lines.append(f" {cons_stuff_mermaid_id} --> {consumer_mermaid_id}") # Render batch edges (BATCH_ITEM and BATCH_AGGREGATE) with dashed styling + # These edges connect stuff-to-stuff (not node-to-node) because their source/target + # are controllers rendered as Mermaid subgraphs, not nodes. batch_item_edges = [edge for edge in graph.edges if edge.kind.is_batch_item] batch_aggregate_edges = [edge for edge in graph.edges if edge.kind.is_batch_aggregate] if batch_item_edges or batch_aggregate_edges: lines.append("") lines.append(" %% Batch edges: list-item relationships") + cls._render_dashed_edges(batch_item_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) + cls._render_dashed_edges(batch_aggregate_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) - for edge in batch_item_edges: - source_mermaid_id = id_mapping.get(edge.source) - target_mermaid_id = id_mapping.get(edge.target) - if source_mermaid_id and target_mermaid_id: - label = edge.label or "" - lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') - - for edge in batch_aggregate_edges: - source_mermaid_id = id_mapping.get(edge.source) - target_mermaid_id = id_mapping.get(edge.target) - if source_mermaid_id and target_mermaid_id: - label = edge.label or "" - lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + # Render parallel combine edges (branch outputs → combined output) with dashed styling + # Same approach: use stuff digests to connect stuff-to-stuff. + parallel_combine_edges = [edge for edge in graph.edges if edge.kind.is_parallel_combine] + if parallel_combine_edges: + lines.append("") + lines.append(" %% Parallel combine edges: branch outputs → combined output") + cls._render_dashed_edges(parallel_combine_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) # Style definitions lines.append("") @@ -381,6 +407,65 @@ def _render_stuff_node( return f'{indent}{stuff_mermaid_id}(["{label}"]):::stuff' + @classmethod + def _render_dashed_edges( + cls, + edges: list[EdgeSpec], + lines: list[str], + stuff_id_mapping: dict[str, str], + all_stuff_info: dict[str, tuple[str, str | None]], + show_stuff_codes: bool, + ) -> None: + """Render dashed edges between stuff nodes, resolving missing stuff nodes on the fly. + + This handles BATCH_ITEM, BATCH_AGGREGATE, and PARALLEL_COMBINE edges which all share + the same rendering logic: look up source/target stuff IDs, render any missing stuff + nodes from all_stuff_info, and emit a dashed arrow with an optional label. + + Args: + edges: The edges to render as dashed arrows. + lines: The mermaid output lines list (mutated). + stuff_id_mapping: Map to store/retrieve stuff mermaid IDs (mutated). + all_stuff_info: Supplementary stuff info from all nodes including controllers. + show_stuff_codes: Whether to show digest in stuff labels. + """ + for edge in edges: + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None + # Render missing stuff nodes on the fly + if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.source_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.source_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) + if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.target_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.target_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) + if source_sid and target_sid: + label = edge.label or "" + if label: + lines.append(f' {source_sid} -."{label}".-> {target_sid}') + else: + lines.append(f" {source_sid} -.-> {target_sid}") + @classmethod def _render_subgraph_recursive( cls, @@ -395,6 +480,7 @@ def _render_subgraph_recursive( subgraph_depths: dict[str, int], show_stuff_codes: bool, rendered_orphan_stuffs: set[str], + controller_output_stuffs: dict[str, dict[str, tuple[str, str | None]]], indent_level: int = 1, depth: int = 0, ) -> list[str]: @@ -403,6 +489,8 @@ def _render_subgraph_recursive( This renders both pipe nodes and their produced stuff nodes inside subgraphs. Orphan stuffs (no producer) consumed by leaf nodes are also rendered inside the same subgraph as their consumer, enabling proper placement of batch item stuffs. + Controller output stuffs (e.g., parallel_combine targets) are rendered inside + their controller's subgraph. Args: node_id: The node to render. @@ -416,6 +504,7 @@ def _render_subgraph_recursive( subgraph_depths: Map to track subgraph IDs and their depths (mutated). show_stuff_codes: Whether to show digest in stuff labels. rendered_orphan_stuffs: Set of orphan stuff digests already rendered (mutated). + controller_output_stuffs: Map of controller node_id to {digest: (name, concept)} for stuffs to render inside. indent_level: Current indentation level. depth: Current depth in the subgraph hierarchy (for coloring). @@ -464,11 +553,26 @@ def _render_subgraph_recursive( subgraph_depths=subgraph_depths, show_stuff_codes=show_stuff_codes, rendered_orphan_stuffs=rendered_orphan_stuffs, + controller_output_stuffs=controller_output_stuffs, indent_level=indent_level + 1, depth=depth + 1, ) lines.extend(child_lines) + # Render controller output stuffs (e.g., parallel_combine targets) inside the subgraph + for digest, (name, concept) in sorted(controller_output_stuffs.get(node_id, {}).items(), key=lambda item: item[1][0]): + if digest not in stuff_id_mapping: + stuff_line = cls._render_stuff_node( + digest=digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=indent + " ", + ) + lines.append(stuff_line) + rendered_orphan_stuffs.add(digest) + lines.append(f"{indent}end") else: # Leaf node - render as simple node diff --git a/pipelex/graph/reactflow/templates/_scripts.js.jinja2 b/pipelex/graph/reactflow/templates/_scripts.js.jinja2 index 5a8d550a1..ecb59bdc7 100644 --- a/pipelex/graph/reactflow/templates/_scripts.js.jinja2 +++ b/pipelex/graph/reactflow/templates/_scripts.js.jinja2 @@ -435,6 +435,33 @@ function buildDataflowGraph(graphspec, analysis) { // batch_item: batch_controller → stuff_item, batch_aggregate: stuff_item → batch_controller // (showBatchController is declared earlier in the function) + // Create PARALLEL_COMBINE edges from GraphSpec + // These show branch outputs flowing into the combined output + for (const edge of graphspec.edges) { + if (edge.kind !== 'parallel_combine') continue; + + if (!edge.source_stuff_digest || !edge.target_stuff_digest) continue; + const sourceId = `stuff_${edge.source_stuff_digest}`; + const targetId = `stuff_${edge.target_stuff_digest}`; + + edges.push({ + id: edge.id, + source: sourceId, + target: targetId, + type: {{ edge_type | tojson }}, + animated: false, + style: { + stroke: 'var(--color-parallel-combine)', + strokeWidth: 2, + strokeDasharray: '5,5', + }, + markerEnd: { + type: MarkerType?.ArrowClosed || 'arrowclosed', + color: 'var(--color-parallel-combine)', + }, + }); + } + for (const edge of graphspec.edges) { if (edge.kind !== 'batch_item' && edge.kind !== 'batch_aggregate') { continue; diff --git a/pipelex/graph/reactflow/templates/_styles.css.jinja2 b/pipelex/graph/reactflow/templates/_styles.css.jinja2 index f75fddf7e..4b33a060e 100644 --- a/pipelex/graph/reactflow/templates/_styles.css.jinja2 +++ b/pipelex/graph/reactflow/templates/_styles.css.jinja2 @@ -27,6 +27,7 @@ --color-edge: #3b82f6; --color-batch-item: #a855f7; --color-batch-aggregate: #22c55e; + --color-parallel-combine: #c084fc; --font-sans: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; --font-mono: 'JetBrains Mono', 'Monaco', 'Menlo', monospace; --radius-sm: 4px; @@ -66,6 +67,7 @@ --color-edge: #3b82f6; --color-batch-item: #9333ea; --color-batch-aggregate: #16a34a; + --color-parallel-combine: #a855f7; --shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05); --shadow-md: 0 4px 12px rgba(0, 0, 0, 0.1); --shadow-lg: 0 8px 24px rgba(0, 0, 0, 0.15); @@ -100,6 +102,7 @@ --color-edge: #3b82f6; --color-batch-item: #9333ea; --color-batch-aggregate: #16a34a; + --color-parallel-combine: #a855f7; --shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05); --shadow-md: 0 4px 12px rgba(0, 0, 0, 0.1); --shadow-lg: 0 8px 24px rgba(0, 0, 0, 0.15); @@ -127,6 +130,7 @@ --color-edge: #FFFACD; --color-batch-item: #bd93f9; --color-batch-aggregate: #50fa7b; + --color-parallel-combine: #d6a4ff; /* Status colors */ --color-success: #50FA7B; /* Bright Green */ diff --git a/pipelex/graph/reactflow/viewspec_transformer.py b/pipelex/graph/reactflow/viewspec_transformer.py index 0af3ba5ad..e391dffbc 100644 --- a/pipelex/graph/reactflow/viewspec_transformer.py +++ b/pipelex/graph/reactflow/viewspec_transformer.py @@ -63,6 +63,8 @@ def _map_edge_kind_to_view_type(kind: EdgeKind) -> str: return "batch_item" case EdgeKind.BATCH_AGGREGATE: return "batch_aggregate" + case EdgeKind.PARALLEL_COMBINE: + return "parallel_combine" def _build_node_label(node_spec: Any) -> str: diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel.py b/pipelex/pipe_controllers/parallel/pipe_parallel.py index 90d5453e6..5dcc78ff5 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel.py @@ -13,6 +13,8 @@ from pipelex.core.pipes.inputs.input_stuff_specs_factory import InputStuffSpecsFactory from pipelex.core.pipes.pipe_output import PipeOutput from pipelex.core.stuffs.stuff_factory import StuffFactory +from pipelex.graph.graph_tracer_manager import GraphTracerManager +from pipelex.graph.graphspec import IOSpec from pipelex.hub import get_required_pipe from pipelex.libraries.pipe.exceptions import PipeNotFoundError from pipelex.pipe_controllers.pipe_controller import PipeController @@ -189,6 +191,21 @@ async def _live_run_controller_pipe( name=output_name, ) + # Register parallel combine edges BEFORE register_branch_outputs, because + # register_parallel_combine snapshots the original branch producers from + # _stuff_producer_map before register_controller_output overrides them + self._register_parallel_combine_with_graph_tracer( + job_metadata=job_metadata, + combined_stuff=combined_output_stuff, + branch_stuffs=output_stuffs, + ) + + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + return PipeOutput( working_memory=working_memory, pipeline_run_id=job_metadata.pipeline_run_id, @@ -261,11 +278,94 @@ async def _dry_run_controller_pipe( stuff=combined_output_stuff, name=output_name, ) + + # Register parallel combine edges BEFORE register_branch_outputs, because + # register_parallel_combine snapshots the original branch producers from + # _stuff_producer_map before register_controller_output overrides them + self._register_parallel_combine_with_graph_tracer( + job_metadata=job_metadata, + combined_stuff=combined_output_stuff, + branch_stuffs=output_stuffs, + ) + + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + return PipeOutput( working_memory=working_memory, pipeline_run_id=job_metadata.pipeline_run_id, ) + def _register_branch_outputs_with_graph_tracer( + self, + job_metadata: JobMetadata, + output_stuffs: dict[str, "Stuff"], + ) -> None: + """Register branch outputs with the graph tracer. + + This re-registers each branch output's stuff_code as produced by the PipeParallel + node, overriding the sub-pipe's registration so that DATA edges flow from + PipeParallel to downstream consumers. + + Args: + job_metadata: The job metadata containing graph context. + output_stuffs: Mapping of output_name to the branch output Stuff. + """ + graph_context = job_metadata.graph_context + if graph_context is None: + return + tracer_manager = GraphTracerManager.get_instance() + if tracer_manager is None or graph_context.parent_node_id is None: + return + for output_name_key, output_stuff in output_stuffs.items(): + output_spec = IOSpec( + name=output_name_key, + concept=output_stuff.concept.code, + content_type=output_stuff.content.content_type, + digest=output_stuff.stuff_code, + data=output_stuff.content.smart_dump() if graph_context.data_inclusion.stuff_json_content else None, + data_text=output_stuff.content.rendered_pretty_text() if graph_context.data_inclusion.stuff_text_content else None, + data_html=output_stuff.content.rendered_pretty_html() if graph_context.data_inclusion.stuff_html_content else None, + ) + tracer_manager.register_controller_output( + graph_id=graph_context.graph_id, + node_id=graph_context.parent_node_id, + output_spec=output_spec, + ) + + def _register_parallel_combine_with_graph_tracer( + self, + job_metadata: JobMetadata, + combined_stuff: "Stuff", + branch_stuffs: dict[str, "Stuff"], + ) -> None: + """Register parallel combine edges (branch outputs → combined output). + + Creates PARALLEL_COMBINE edges showing how individual branch results + are merged into the combined output. + + Args: + job_metadata: The job metadata containing graph context. + combined_stuff: The combined output Stuff. + branch_stuffs: Mapping of output_name to the branch output Stuff. + """ + graph_context = job_metadata.graph_context + if graph_context is None: + return + tracer_manager = GraphTracerManager.get_instance() + if tracer_manager is None or graph_context.parent_node_id is None: + return + branch_stuff_codes = [stuff.stuff_code for stuff in branch_stuffs.values()] + tracer_manager.register_parallel_combine( + graph_id=graph_context.graph_id, + combined_stuff_code=combined_stuff.stuff_code, + branch_stuff_codes=branch_stuff_codes, + parallel_controller_node_id=graph_context.parent_node_id, + ) + @override async def _validate_before_run( self, job_metadata: JobMetadata, working_memory: WorkingMemory, pipe_run_params: PipeRunParams, output_name: str | None = None diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py b/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py index 576277c96..a1c6f6886 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py @@ -12,7 +12,7 @@ class PipeParallelBlueprint(PipeBlueprint): type: Literal["PipeParallel"] = "PipeParallel" pipe_category: Literal["PipeController"] = "PipeController" - parallels: list[SubPipeBlueprint] + branches: list[SubPipeBlueprint] add_each_output: bool = False combined_output: str | None = None @@ -20,7 +20,7 @@ class PipeParallelBlueprint(PipeBlueprint): @override def pipe_dependencies(self) -> set[str]: """Return the set of pipe codes from the parallel branches.""" - return {parallel.pipe for parallel in self.parallels} + return {branch.pipe for branch in self.branches} @field_validator("combined_output", mode="before") @classmethod diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py b/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py index a1a19c8a6..4e421c5b0 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py @@ -31,7 +31,7 @@ def make( blueprint: PipeParallelBlueprint, ) -> PipeParallel: parallel_sub_pipes: list[SubPipe] = [] - for sub_pipe_blueprint in blueprint.parallels: + for sub_pipe_blueprint in blueprint.branches: if not sub_pipe_blueprint.result: msg = f"Unexpected error in pipe '{pipe_code}': PipeParallel requires a result specified for each parallel sub pipe" raise PipeParallelFactoryError(message=msg) diff --git a/pipelex/pipelex.toml b/pipelex/pipelex.toml index 0254b537d..3b0dcd7ff 100644 --- a/pipelex/pipelex.toml +++ b/pipelex/pipelex.toml @@ -432,6 +432,7 @@ llm_to_structure = "model_to_structure" llm_skill = "llm_talent" img_gen_skill = "img_gen_talent" extract_skill = "extract_talent" +parallels = "branches" #################################################################################################### diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx new file mode 100644 index 000000000..d1fe6c478 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx @@ -0,0 +1,82 @@ +domain = "test_parallel_graph_3branch" +description = "Test 3-branch PipeParallel with selective downstream consumption" +main_pipe = "pg3_sequence" + +[concept.Pg3ToneResult] +description = "Result of tone analysis" +refines = "Text" + +[concept.Pg3LengthResult] +description = "Result of length analysis" +refines = "Text" + +[concept.Pg3StyleResult] +description = "Result of style analysis" +refines = "Text" + +[concept.Pg3CombinedResult] +description = "Combined results from 3-branch parallel analysis" + +[pipe.pg3_sequence] +type = "PipeSequence" +description = "Run 3-branch parallel analysis then selectively consume 2 of 3 branch outputs" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "pg3_parallel", result = "full_combo" }, + { pipe = "pg3_refine_tone", result = "refined_tone" }, + { pipe = "pg3_refine_length", result = "refined_length" }, +] + +[pipe.pg3_parallel] +type = "PipeParallel" +description = "Analyze tone, length, and style in parallel with combined output" +inputs = { input_text = "Text" } +output = "Pg3CombinedResult" +add_each_output = true +combined_output = "Pg3CombinedResult" +branches = [ + { pipe = "pg3_analyze_tone", result = "tone_result" }, + { pipe = "pg3_analyze_length", result = "length_result" }, + { pipe = "pg3_analyze_style", result = "style_result" }, +] + +[pipe.pg3_analyze_tone] +type = "PipeLLM" +description = "Analyze the tone of the text" +inputs = { input_text = "Text" } +output = "Pg3ToneResult" +model = "$testing-text" +prompt = "Describe the tone of: @input_text.text" + +[pipe.pg3_analyze_length] +type = "PipeLLM" +description = "Analyze the length of the text" +inputs = { input_text = "Text" } +output = "Pg3LengthResult" +model = "$testing-text" +prompt = "Describe the length characteristics of: @input_text.text" + +[pipe.pg3_analyze_style] +type = "PipeLLM" +description = "Analyze the writing style of the text" +inputs = { input_text = "Text" } +output = "Pg3StyleResult" +model = "$testing-text" +prompt = "Describe the writing style of: @input_text.text" + +[pipe.pg3_refine_tone] +type = "PipeLLM" +description = "Refine the tone analysis" +inputs = { tone_result = "Pg3ToneResult" } +output = "Text" +model = "$testing-text" +prompt = "Refine and elaborate on this tone analysis: @tone_result.text" + +[pipe.pg3_refine_length] +type = "PipeLLM" +description = "Refine the length analysis" +inputs = { length_result = "Pg3LengthResult" } +output = "Text" +model = "$testing-text" +prompt = "Refine and elaborate on this length analysis: @length_result.text" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx new file mode 100644 index 000000000..bb5e18060 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx @@ -0,0 +1,59 @@ +domain = "test_parallel_graph_add_each" +description = "Test PipeParallel with add_each_output for graph edge verification" +main_pipe = "parallel_then_consume" + +[concept.ShortSummary] +description = "A brief one-sentence summary" +refines = "Text" + +[concept.DetailedSummary] +description = "A detailed multi-sentence summary" +refines = "Text" + +[pipe.parallel_then_consume] +type = "PipeSequence" +description = "Run parallel summaries then consume one downstream" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "parallel_summarize", result = "..." }, + { pipe = "combine_summaries" }, +] + +[pipe.parallel_summarize] +type = "PipeParallel" +description = "Generate short and detailed summaries in parallel" +inputs = { input_text = "Text" } +output = "Text" +add_each_output = true +branches = [ + { pipe = "summarize_short", result = "short_summary" }, + { pipe = "summarize_detailed", result = "detailed_summary" }, +] + +[pipe.summarize_short] +type = "PipeLLM" +description = "Generate a short one-sentence summary" +inputs = { input_text = "Text" } +output = "ShortSummary" +model = "$testing-text" +prompt = "Summarize in one sentence: @input_text.text" + +[pipe.summarize_detailed] +type = "PipeLLM" +description = "Generate a detailed summary" +inputs = { input_text = "Text" } +output = "DetailedSummary" +model = "$testing-text" +prompt = "Write a detailed summary of: @input_text.text" + +[pipe.combine_summaries] +type = "PipeLLM" +description = "Combine short and detailed summaries into a final result" +inputs = { short_summary = "ShortSummary", detailed_summary = "DetailedSummary" } +output = "Text" +model = "$testing-text" +prompt = """Combine these two summaries into a final result: + +Short: @short_summary.text +Detailed: @detailed_summary.text""" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx new file mode 100644 index 000000000..6212ae0be --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx @@ -0,0 +1,60 @@ +domain = "test_parallel_graph_combined" +description = "Test PipeParallel with combined_output wrapped in PipeSequence with follow-up consumer" +main_pipe = "pgc_analysis_then_summarize" + +[concept.PgcToneResult] +description = "Result of tone analysis" +refines = "Text" + +[concept.PgcLengthResult] +description = "Result of length analysis" +refines = "Text" + +[concept.PgcCombinedResult] +description = "Combined results from parallel analysis" + +[pipe.pgc_analysis_then_summarize] +type = "PipeSequence" +description = "Run parallel analysis then summarize the combined result" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "pgc_parallel_analysis", result = "pgc_combined_result" }, + { pipe = "pgc_summarize_combined" }, +] + +[pipe.pgc_parallel_analysis] +type = "PipeParallel" +description = "Analyze tone and length in parallel with combined output" +inputs = { input_text = "Text" } +output = "PgcCombinedResult" +add_each_output = true +combined_output = "PgcCombinedResult" +branches = [ + { pipe = "pgc_analyze_tone", result = "tone_result" }, + { pipe = "pgc_analyze_length", result = "length_result" }, +] + +[pipe.pgc_analyze_tone] +type = "PipeLLM" +description = "Analyze the tone of the text" +inputs = { input_text = "Text" } +output = "PgcToneResult" +model = "$testing-text" +prompt = "Describe the tone of: @input_text.text" + +[pipe.pgc_analyze_length] +type = "PipeLLM" +description = "Analyze the length of the text" +inputs = { input_text = "Text" } +output = "PgcLengthResult" +model = "$testing-text" +prompt = "Describe the length characteristics of: @input_text.text" + +[pipe.pgc_summarize_combined] +type = "PipeLLM" +description = "Summarize the combined parallel analysis result" +inputs = { pgc_combined_result = "PgcCombinedResult" } +output = "Text" +model = "$testing-text" +prompt = "Summarize the following analysis: @pgc_combined_result" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py new file mode 100644 index 000000000..b073ebaa4 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py @@ -0,0 +1,19 @@ +from pydantic import Field + +from pipelex.core.stuffs.structured_content import StructuredContent +from pipelex.core.stuffs.text_content import TextContent + + +class PgcCombinedResult(StructuredContent): + """Combined results from parallel analysis branches.""" + + tone_result: TextContent = Field(..., description="Result of tone analysis") + length_result: TextContent = Field(..., description="Result of length analysis") + + +class Pg3CombinedResult(StructuredContent): + """Combined results from 3-branch parallel analysis.""" + + tone_result: TextContent = Field(..., description="Result of tone analysis") + length_result: TextContent = Field(..., description="Result of length analysis") + style_result: TextContent = Field(..., description="Result of style analysis") diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py new file mode 100644 index 000000000..cfd77ed2b --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py @@ -0,0 +1,106 @@ +"""Test data for PipeParallel graph tests.""" + +from typing import ClassVar + + +class ParallelCombinedGraphExpectationsBase: + """Base class for PipeParallel graph expectations with combined_output.""" + + PARALLEL_PIPE_CODE: ClassVar[str] + EXPECTED_PIPE_CODES: ClassVar[set[str]] + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] + + +class ParallelAddEachGraphExpectations: + """Expected structure for the parallel_graph_add_each graph.""" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "parallel_then_consume", # PipeSequence (outer controller) + "parallel_summarize", # PipeParallel (parallel controller) + "summarize_short", # PipeLLM (branch 1) + "summarize_detailed", # PipeLLM (branch 2) + "combine_summaries", # PipeLLM (downstream consumer) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "parallel_then_consume": 1, + "parallel_summarize": 1, + "summarize_short": 1, + "summarize_detailed": 1, + "combine_summaries": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 4, # sequence->parallel, sequence->combine, parallel->short, parallel->detailed + "data": 2, # parallel->combine (short_summary), parallel->combine (detailed_summary) + } + + +class ParallelCombinedGraphExpectations(ParallelCombinedGraphExpectationsBase): + """Expected structure for the parallel_graph_combined graph (PipeSequence wrapping PipeParallel with combined_output).""" + + PARALLEL_PIPE_CODE: ClassVar[str] = "pgc_parallel_analysis" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "pgc_analysis_then_summarize", # PipeSequence (outer controller) + "pgc_parallel_analysis", # PipeParallel (parallel controller with combined_output) + "pgc_analyze_tone", # PipeLLM (branch 1) + "pgc_analyze_length", # PipeLLM (branch 2) + "pgc_summarize_combined", # PipeLLM (downstream consumer of combined result) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "pgc_analysis_then_summarize": 1, + "pgc_parallel_analysis": 1, + "pgc_analyze_tone": 1, + "pgc_analyze_length": 1, + "pgc_summarize_combined": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 4, # sequence->parallel, sequence->summarize_combined, parallel->tone, parallel->length + "parallel_combine": 2, # tone_result->combined, length_result->combined + "data": 1, # parallel->summarize_combined (combined result) + } + + +class Parallel3BranchGraphExpectations(ParallelCombinedGraphExpectationsBase): + """Expected structure for the parallel_graph_3branch graph (3-branch PipeParallel with selective consumption).""" + + PARALLEL_PIPE_CODE: ClassVar[str] = "pg3_parallel" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "pg3_sequence", # PipeSequence (outer controller) + "pg3_parallel", # PipeParallel (3-branch parallel with combined_output) + "pg3_analyze_tone", # PipeLLM (branch 1) + "pg3_analyze_length", # PipeLLM (branch 2) + "pg3_analyze_style", # PipeLLM (branch 3 - unused downstream) + "pg3_refine_tone", # PipeLLM (consumes tone_result) + "pg3_refine_length", # PipeLLM (consumes length_result) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "pg3_sequence": 1, + "pg3_parallel": 1, + "pg3_analyze_tone": 1, + "pg3_analyze_length": 1, + "pg3_analyze_style": 1, + "pg3_refine_tone": 1, + "pg3_refine_length": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 6, # sequence->parallel, sequence->refine_tone, sequence->refine_length, parallel->tone, parallel->length, parallel->style + "parallel_combine": 3, # tone->combined, length->combined, style->combined + "data": 2, # parallel->refine_tone (tone_result), parallel->refine_length (length_result) + } diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py new file mode 100644 index 000000000..de8fc63e8 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py @@ -0,0 +1,296 @@ +"""E2E test for PipeParallel with graph tracing to verify DATA edges from controller to consumers.""" + +from collections import Counter +from pathlib import Path + +import pytest + +from pipelex import log, pretty_print +from pipelex.config import get_config +from pipelex.core.stuffs.text_content import TextContent +from pipelex.graph.graph_factory import generate_graph_outputs +from pipelex.graph.graphspec import GraphSpec, NodeSpec +from pipelex.pipe_run.pipe_run_mode import PipeRunMode +from pipelex.pipeline.execute import execute_pipeline +from pipelex.tools.misc.file_utils import get_incremental_directory_path, save_text_to_path +from tests.conftest import TEST_OUTPUTS_DIR +from tests.e2e.pipelex.pipes.pipe_controller.pipe_parallel.test_data import ( + Parallel3BranchGraphExpectations, + ParallelAddEachGraphExpectations, + ParallelCombinedGraphExpectations, + ParallelCombinedGraphExpectationsBase, +) + + +def _get_next_output_folder(subfolder: str) -> Path: + """Get the next numbered output folder for parallel graph outputs.""" + base_dir = str(Path(TEST_OUTPUTS_DIR) / f"pipe_parallel_graph_{subfolder}") + return Path(get_incremental_directory_path(base_dir, "run")) + + +@pytest.mark.dry_runnable +@pytest.mark.llm +@pytest.mark.inference +@pytest.mark.asyncio(loop_scope="class") +class TestPipeParallelGraph: + """E2E tests for PipeParallel graph generation with correct DATA edges.""" + + async def test_parallel_add_each_output_graph(self, pipe_run_mode: PipeRunMode): + """Verify PipeParallel with add_each_output generates correct DATA edges. + + This test runs a PipeSequence containing: + 1. PipeParallel (add_each_output=true) that produces short_summary and detailed_summary + 2. A downstream PipeLLM (combine_summaries) that consumes both branch outputs + + Expected: DATA edges flow from PipeParallel to combine_summaries (not from sub-pipes). + """ + # Build config with graph tracing and all graph outputs enabled + base_config = get_config().pipelex.pipeline_execution_config + exec_config = base_config.with_graph_config_overrides( + generate_graph=True, + force_include_full_data=False, + ) + graph_config = exec_config.graph_config.model_copy( + update={ + "graphs_inclusion": exec_config.graph_config.graphs_inclusion.model_copy( + update={ + "graphspec_json": True, + "mermaidflow_html": True, + "reactflow_html": True, + } + ) + } + ) + exec_config = exec_config.model_copy(update={"graph_config": graph_config}) + + # Run pipeline with input text + pipe_output = await execute_pipeline( + pipe_code="parallel_then_consume", + library_dirs=["tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel"], + inputs={ + "input_text": TextContent(text="The quick brown fox jumps over the lazy dog. This is a sample text for testing parallel processing.") + }, + pipe_run_mode=pipe_run_mode, + execution_config=exec_config, + ) + + # Basic assertions + assert pipe_output is not None + assert pipe_output.working_memory is not None + assert pipe_output.main_stuff is not None + + # Verify graph was generated + graph_spec = pipe_output.graph_spec + assert graph_spec is not None, "GraphSpec should be populated when generate_graph=True" + assert isinstance(graph_spec, GraphSpec) + assert len(graph_spec.nodes) > 0, "Graph should have nodes" + assert len(graph_spec.edges) > 0, "Graph should have edges" + + log.info(f"Parallel add_each graph: {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") + + # Build node lookup + nodes_by_id: dict[str, NodeSpec] = {node.node_id: node for node in graph_spec.nodes} + nodes_by_pipe_code: dict[str, list[NodeSpec]] = {} + for node in graph_spec.nodes: + if node.pipe_code: + nodes_by_pipe_code.setdefault(node.pipe_code, []).append(node) + + # 1. Verify all expected pipe_codes exist + actual_pipe_codes = set(nodes_by_pipe_code.keys()) + assert actual_pipe_codes == ParallelAddEachGraphExpectations.EXPECTED_PIPE_CODES, ( + f"Unexpected pipe codes. Expected: {ParallelAddEachGraphExpectations.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" + ) + + # 2. Verify node counts per pipe_code + for pipe_code, expected_count in ParallelAddEachGraphExpectations.EXPECTED_NODE_COUNTS.items(): + actual_count = len(nodes_by_pipe_code.get(pipe_code, [])) + assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{pipe_code}', got {actual_count}" + + # 3. Verify edge counts by kind + actual_edge_counts = Counter(str(edge.kind) for edge in graph_spec.edges) + for kind, expected_count in ParallelAddEachGraphExpectations.EXPECTED_EDGE_COUNTS.items(): + actual_count = actual_edge_counts.get(kind, 0) + assert actual_count == expected_count, f"Expected {expected_count} edges of kind '{kind}', got {actual_count}" + + # 4. Verify DATA edges source from PipeParallel, not from sub-pipes + parallel_node = nodes_by_pipe_code["parallel_summarize"][0] + combine_node = nodes_by_pipe_code["combine_summaries"][0] + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + + for edge in data_edges: + # DATA edges targeting combine_summaries should come from PipeParallel + if edge.target == combine_node.node_id: + assert edge.source == parallel_node.node_id, ( + f"DATA edge to combine_summaries should come from PipeParallel '{parallel_node.node_id}', " + f"but comes from '{edge.source}' (pipe_code: '{nodes_by_id[edge.source].pipe_code}')" + ) + + # 5. Verify PipeParallel node has output specs for both branch outputs + assert len(parallel_node.node_io.outputs) >= 2, ( + f"PipeParallel should have at least 2 output specs (branch outputs), got {len(parallel_node.node_io.outputs)}" + ) + output_names = {output.name for output in parallel_node.node_io.outputs} + assert "short_summary" in output_names, "PipeParallel should have 'short_summary' output" + assert "detailed_summary" in output_names, "PipeParallel should have 'detailed_summary' output" + + # 6. Verify containment: sub-pipes are inside PipeParallel + contains_edges = [edge for edge in graph_spec.edges if edge.kind.is_contains] + parallel_children = {edge.target for edge in contains_edges if edge.source == parallel_node.node_id} + branch_pipe_codes = {"summarize_short", "summarize_detailed"} + branch_node_ids = {node.node_id for pipe_code in branch_pipe_codes for node in nodes_by_pipe_code.get(pipe_code, [])} + assert branch_node_ids.issubset(parallel_children), ( + f"Branch nodes should be children of PipeParallel. Branch IDs: {branch_node_ids}, Parallel children: {parallel_children}" + ) + + # Generate and save graph outputs + graph_outputs = await generate_graph_outputs( + graph_spec=graph_spec, + graph_config=graph_config, + pipe_code="parallel_then_consume", + ) + + output_dir = _get_next_output_folder("add_each") + if graph_outputs.graphspec_json: + save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) + if graph_outputs.mermaidflow_html: + save_text_to_path(graph_outputs.mermaidflow_html, str(output_dir / "mermaidflow.html")) + if graph_outputs.reactflow_html: + save_text_to_path(graph_outputs.reactflow_html, str(output_dir / "reactflow.html")) + + pretty_print( + { + "graph_id": graph_spec.graph_id, + "nodes": len(graph_spec.nodes), + "edges": len(graph_spec.edges), + "edges_by_kind": dict(actual_edge_counts), + "output_dir": str(output_dir), + }, + title="Parallel Add Each Graph Outputs", + ) + + log.info("Structural validation passed: DATA edges correctly source from PipeParallel") + + @pytest.mark.parametrize( + ("pipe_code", "expectations_class"), + [ + ("pgc_analysis_then_summarize", ParallelCombinedGraphExpectations), + ("pg3_sequence", Parallel3BranchGraphExpectations), + ], + ) + async def test_parallel_combined_output_graph( + self, + pipe_run_mode: PipeRunMode, + pipe_code: str, + expectations_class: type[ParallelCombinedGraphExpectationsBase], + ): + """Verify PipeParallel with combined_output generates correct graph structure. + + Parametrized with: + - pgc_analysis_then_summarize: 2-branch PipeParallel wrapped in PipeSequence with follow-up consumer + - pg3_sequence: 3-branch PipeParallel with selective downstream consumption (1 branch unused) + """ + # Build config with graph tracing + base_config = get_config().pipelex.pipeline_execution_config + exec_config = base_config.with_graph_config_overrides( + generate_graph=True, + force_include_full_data=False, + ) + graph_config = exec_config.graph_config.model_copy( + update={ + "graphs_inclusion": exec_config.graph_config.graphs_inclusion.model_copy( + update={ + "graphspec_json": True, + "reactflow_html": True, + } + ) + } + ) + exec_config = exec_config.model_copy(update={"graph_config": graph_config}) + + # Run pipeline + pipe_output = await execute_pipeline( + pipe_code=pipe_code, + library_dirs=["tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel"], + inputs={"input_text": TextContent(text="Hello world, this is a test document for parallel analysis.")}, + pipe_run_mode=pipe_run_mode, + execution_config=exec_config, + ) + + assert pipe_output is not None + assert pipe_output.main_stuff is not None + + # Verify graph + graph_spec = pipe_output.graph_spec + assert graph_spec is not None + assert isinstance(graph_spec, GraphSpec) + + log.info(f"Parallel combined graph ({pipe_code}): {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") + + # Build node lookup + nodes_by_pipe_code: dict[str, list[NodeSpec]] = {} + for node in graph_spec.nodes: + if node.pipe_code: + nodes_by_pipe_code.setdefault(node.pipe_code, []).append(node) + + # 1. Verify all expected pipe_codes exist + actual_pipe_codes = set(nodes_by_pipe_code.keys()) + assert actual_pipe_codes == expectations_class.EXPECTED_PIPE_CODES, ( + f"Unexpected pipe codes. Expected: {expectations_class.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" + ) + + # 2. Verify node counts per pipe_code + for node_pipe_code, expected_count in expectations_class.EXPECTED_NODE_COUNTS.items(): + actual_count = len(nodes_by_pipe_code.get(node_pipe_code, [])) + assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{node_pipe_code}', got {actual_count}" + + # 3. Verify edge counts by kind + actual_edge_counts = Counter(str(edge.kind) for edge in graph_spec.edges) + for kind, expected_count in expectations_class.EXPECTED_EDGE_COUNTS.items(): + actual_count = actual_edge_counts.get(kind, 0) + assert actual_count == expected_count, f"Expected {expected_count} edges of kind '{kind}', got {actual_count}" + + # 4. Verify PARALLEL_COMBINE edges connect branch producers to the PipeParallel node + parallel_pipe_code = expectations_class.PARALLEL_PIPE_CODE + parallel_node = nodes_by_pipe_code[parallel_pipe_code][0] + parallel_combine_edges = [edge for edge in graph_spec.edges if edge.kind.is_parallel_combine] + expected_combine_count = expectations_class.EXPECTED_EDGE_COUNTS.get("parallel_combine", 0) + assert len(parallel_combine_edges) == expected_combine_count, ( + f"Expected {expected_combine_count} PARALLEL_COMBINE edges, got {len(parallel_combine_edges)}" + ) + for edge in parallel_combine_edges: + assert edge.target == parallel_node.node_id, ( + f"PARALLEL_COMBINE edge target should be PipeParallel '{parallel_node.node_id}', got '{edge.target}'" + ) + assert edge.source_stuff_digest is not None, "PARALLEL_COMBINE edge should have source_stuff_digest" + assert edge.target_stuff_digest is not None, "PARALLEL_COMBINE edge should have target_stuff_digest" + + # Generate and save graph outputs + graph_outputs = await generate_graph_outputs( + graph_spec=graph_spec, + graph_config=graph_config, + pipe_code=pipe_code, + ) + + output_dir = _get_next_output_folder(pipe_code) + if graph_outputs.graphspec_json: + save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) + if graph_outputs.mermaidflow_html: + save_text_to_path(graph_outputs.mermaidflow_html, str(output_dir / "mermaidflow.html")) + if graph_outputs.mermaidflow_mmd: + save_text_to_path(graph_outputs.mermaidflow_mmd, str(output_dir / "mermaidflow.mmd")) + if graph_outputs.reactflow_html: + save_text_to_path(graph_outputs.reactflow_html, str(output_dir / "reactflow.html")) + + pretty_print( + { + "graph_id": graph_spec.graph_id, + "nodes": len(graph_spec.nodes), + "edges": len(graph_spec.edges), + "edges_by_kind": dict(actual_edge_counts), + "parallel_outputs": [output.name for output in parallel_node.node_io.outputs], + "output_dir": str(output_dir), + }, + title=f"Parallel Combined Graph Outputs ({pipe_code})", + ) + + log.info(f"Structural validation passed: {pipe_code} combined_output graph is correct") diff --git a/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx b/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx index e87487e43..818c82f9a 100644 --- a/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx +++ b/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx @@ -29,7 +29,7 @@ type = "PipeParallel" description = "Extracts text content from both the CV and job offer PDFs concurrently" inputs = { cv_pdf = "Document", job_offer_pdf = "Document" } output = "Page[]" -parallels = [ +branches = [ { pipe = "extract_cv", result = "cv_pages" }, { pipe = "extract_job_offer", result = "job_offer_pages" }, ] @@ -54,7 +54,7 @@ type = "PipeParallel" description = "Analyzes both the CV and job offer documents concurrently to extract structured information" inputs = { cv_pages = "Page", job_offer_pages = "Page" } output = "Text" -parallels = [ +branches = [ { pipe = "analyze_cv", result = "cv_analysis" }, { pipe = "analyze_job_offer", result = "job_requirements" }, ] diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx b/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx index 3c4cf42dd..d3b928bfc 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx @@ -15,7 +15,7 @@ inputs = { document = "DocumentInput" } output = "CombinedAnalysis" add_each_output = true combined_output = "CombinedAnalysis" -parallels = [ +branches = [ { pipe = "analyze_length", result = "length_result" }, { pipe = "analyze_content", result = "content_result" }, ] diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py index 2ec240177..5d572009c 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py @@ -32,7 +32,7 @@ async def test_parallel_text_analysis( description="Parallel text analysis pipeline", inputs={"input_text": f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}"}, output=f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_sentiment", result="sentiment_result"), SubPipeBlueprint(pipe="count_words", result="word_count_result"), SubPipeBlueprint(pipe="extract_keywords", result="keywords_result"), @@ -151,7 +151,7 @@ async def test_parallel_short_text_analysis( description="Parallel text analysis pipeline for short text", inputs={"input_text": f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}"}, output=f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_sentiment", result="sentiment_result"), SubPipeBlueprint(pipe="count_words", result="word_count_result"), SubPipeBlueprint(pipe="extract_keywords", result="keywords_result"), diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py index 1db6fa6d5..1ddd7e105 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py @@ -71,7 +71,7 @@ def test_pipe_parallel_with_real_pipe_structure(self, load_empty_library: Callab "context": concept_2.code, }, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[SubPipeBlueprint(pipe=real_pipe.code, result="analysis_result")], + branches=[SubPipeBlueprint(pipe=real_pipe.code, result="analysis_result")], add_each_output=True, combined_output=None, ) @@ -123,7 +123,7 @@ def test_pipe_parallel_creation(self, load_empty_library: Callable[[], None]): description="Basic parallel pipe for testing", inputs={"input_var": concept_1.concept_ref}, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[SubPipeBlueprint(pipe="test_pipe_1", result="result_1")], + branches=[SubPipeBlueprint(pipe="test_pipe_1", result="result_1")], add_each_output=True, combined_output=None, ) @@ -178,7 +178,7 @@ def test_pipe_parallel_needed_inputs_structure(self, load_empty_library: Callabl "context": concept_2.concept_ref, }, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[], # No sub-pipes to avoid dependency issues + branches=[], # No sub-pipes to avoid dependency issues add_each_output=True, combined_output=None, ) diff --git a/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py b/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py index ed128ce67..dee8c2082 100644 --- a/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py +++ b/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py @@ -39,7 +39,7 @@ def test_pipe_parallel_with_bracket_notation(self, load_empty_library: Callable[ description="Process items in parallel", inputs={"data": "DataItem[2]"}, output="ProcessedData", - parallels=[], + branches=[], add_each_output=True, ) diff --git a/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py b/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py index 97d9a0696..12a3c424b 100644 --- a/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py +++ b/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py @@ -14,7 +14,7 @@ class PipeParallelTestCases: description="Run pipes in parallel", inputs={"data": "Data"}, output="Results", - parallels=[ + branches=[ SubPipeSpec(pipe_code="analyze_data", result="analysis"), SubPipeSpec(pipe_code="transform_data", result="transformed"), SubPipeSpec(pipe_code="validate_data", result="validation"), @@ -25,7 +25,7 @@ class PipeParallelTestCases: description="Run pipes in parallel", inputs={"data": "Data"}, output="Results", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_data", result="analysis"), SubPipeBlueprint(pipe="transform_data", result="transformed"), SubPipeBlueprint(pipe="validate_data", result="validation"), @@ -43,7 +43,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeSpec(pipe_code="pipe1", result="result1"), SubPipeSpec(pipe_code="pipe2", result="result2"), ], @@ -54,7 +54,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe1", result="result1"), SubPipeBlueprint(pipe="pipe2", result="result2"), ], @@ -71,7 +71,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeSpec(pipe_code="pipe1", result="result1"), SubPipeSpec(pipe_code="pipe2", result="result2"), ], @@ -82,7 +82,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe1", result="result1"), SubPipeBlueprint(pipe="pipe2", result="result2"), ], diff --git a/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py b/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py index a0e56fc68..3dae376d5 100644 --- a/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py +++ b/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py @@ -53,7 +53,7 @@ class PipeSorterTestCases: description="D depends on B and C", inputs={}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe_b", result="result_b"), SubPipeBlueprint(pipe="pipe_c", result="result_c"), ], diff --git a/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py b/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py new file mode 100644 index 000000000..e79d1f169 --- /dev/null +++ b/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py @@ -0,0 +1,164 @@ +import os +from pathlib import Path +from typing import TYPE_CHECKING, Callable + +import pytest +from pydantic import Field + +from pipelex.core.concepts.concept_factory import ConceptFactory +from pipelex.core.stuffs.exceptions import StuffFactoryError +from pipelex.core.stuffs.structured_content import StructuredContent +from pipelex.core.stuffs.stuff_factory import StuffFactory +from pipelex.core.stuffs.text_content import TextContent +from pipelex.hub import get_concept_library +from pipelex.system.registries.class_registry_utils import ClassRegistryUtils + +if TYPE_CHECKING: + from pipelex.core.stuffs.stuff_content import StuffContent + + +class SentimentAndWordCount(StructuredContent): + """A structured content combining sentiment and word count results.""" + + sentiment_result: TextContent = Field(description="Sentiment analysis result") + word_count_result: TextContent = Field(description="Word count result") + + +class SingleFieldContent(StructuredContent): + """A structured content with a single field.""" + + summary: TextContent = Field(description="Summary text") + + +DOMAIN_CODE = "test_combine" + + +@pytest.fixture(scope="class") +def setup_combine_concepts(load_test_library: Callable[[list[Path]], None]): + """Register structured content classes and create concepts for combine_stuffs tests.""" + load_test_library([Path(__file__).parent]) + ClassRegistryUtils.register_classes_in_file( + file_path=os.path.join(os.path.dirname(__file__), "test_stuff_factory_combine_stuffs.py"), + base_class=StructuredContent, + is_include_imported=False, + ) + + concept_library = get_concept_library() + + concept_sentiment_and_word_count = ConceptFactory.make( + concept_code="SentimentAndWordCount", + domain_code=DOMAIN_CODE, + description="Combined sentiment and word count", + structure_class_name="SentimentAndWordCount", + ) + concept_library.add_new_concept(concept=concept_sentiment_and_word_count) + + concept_single_field = ConceptFactory.make( + concept_code="SingleFieldContent", + domain_code=DOMAIN_CODE, + description="Single field content", + structure_class_name="SingleFieldContent", + ) + concept_library.add_new_concept(concept=concept_single_field) + + yield + + concept_library.remove_concepts_by_concept_refs( + concept_refs=[ + f"{DOMAIN_CODE}.SentimentAndWordCount", + f"{DOMAIN_CODE}.SingleFieldContent", + ] + ) + + +@pytest.mark.usefixtures("setup_combine_concepts") +class TestStuffFactoryCombineStuffs: + """Tests for StuffFactory.combine_stuffs method.""" + + def test_combine_two_text_contents(self): + """Test combining two TextContent fields into a StructuredContent stuff.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + "word_count_result": TextContent(text="42"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="combined_analysis", + ) + + assert result.stuff_name == "combined_analysis" + assert isinstance(result.content, SentimentAndWordCount) + assert result.content.sentiment_result.text == "positive" + assert result.content.word_count_result.text == "42" + assert result.concept.code == "SentimentAndWordCount" + assert result.concept.domain_code == DOMAIN_CODE + + def test_combine_single_field(self): + """Test combining a single TextContent field.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SingleFieldContent") + + stuff_contents: dict[str, StuffContent] = { + "summary": TextContent(text="This is a summary"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="single_field_stuff", + ) + + assert isinstance(result.content, SingleFieldContent) + assert result.content.summary.text == "This is a summary" + + def test_combine_without_name_auto_generates(self): + """Test that omitting the name parameter still produces a valid Stuff.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SingleFieldContent") + + stuff_contents: dict[str, StuffContent] = { + "summary": TextContent(text="auto-named"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + ) + + assert result.stuff_name is not None + assert len(result.stuff_name) > 0 + assert isinstance(result.content, SingleFieldContent) + + def test_combine_with_missing_field_raises_error(self): + """Test that missing a required field raises StuffFactoryError.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + # missing word_count_result + } + + with pytest.raises(StuffFactoryError, match="Error combining stuffs"): + StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="incomplete", + ) + + def test_combine_with_wrong_content_type_raises_error(self): + """Test that passing wrong content type for a field raises StuffFactoryError.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + "word_count_result": "not_a_stuff_content", # type: ignore[dict-item] + } + + with pytest.raises(StuffFactoryError, match="Error combining stuffs"): + StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="wrong_type", + ) diff --git a/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py b/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py index 3ab345bcd..3c880382b 100644 --- a/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py +++ b/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py @@ -14,7 +14,7 @@ type = "PipeParallel" description = "PipeParallel example in PIPE_PARALLEL_TEST_CASES" output = "ProcessedData" -parallels = [ +branches = [ { pipe = "process_a", result = "result_a" }, { pipe = "process_b", result = "result_b" }, ] @@ -29,7 +29,7 @@ type="PipeParallel", description="PipeParallel example in PIPE_PARALLEL_TEST_CASES", output="ProcessedData", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], diff --git a/tests/unit/pipelex/graph/test_dashed_edge_rendering.py b/tests/unit/pipelex/graph/test_dashed_edge_rendering.py new file mode 100644 index 000000000..eccf4f7a8 --- /dev/null +++ b/tests/unit/pipelex/graph/test_dashed_edge_rendering.py @@ -0,0 +1,279 @@ +import re +from datetime import datetime, timezone +from typing import Any, ClassVar + +import pytest + +from pipelex.graph.graphspec import ( + EdgeKind, + EdgeSpec, + GraphSpec, + IOSpec, + NodeIOSpec, + NodeKind, + NodeSpec, + NodeStatus, + PipelineRef, +) +from pipelex.graph.mermaidflow.mermaidflow_factory import MermaidflowFactory + +from .conftest import make_graph_config + + +class TestDashedEdgeRendering: + """Tests for dashed-edge rendering logic across BATCH_ITEM, BATCH_AGGREGATE, and PARALLEL_COMBINE edge kinds.""" + + GRAPH_ID: ClassVar[str] = "dashed_edge_test:001" + CREATED_AT: ClassVar[datetime] = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + + def _make_graph( + self, + nodes: list[dict[str, Any]], + edges: list[dict[str, Any]] | None = None, + ) -> GraphSpec: + """Helper to create a GraphSpec with nodes and edges.""" + node_specs: list[NodeSpec] = [] + for node_dict in nodes: + node_specs.append(NodeSpec(**node_dict)) + + edge_specs: list[EdgeSpec] = [] + if edges: + for edge_dict in edges: + edge_specs.append(EdgeSpec(**edge_dict)) + + return GraphSpec( + graph_id=self.GRAPH_ID, + created_at=self.CREATED_AT, + pipeline_ref=PipelineRef(), + nodes=node_specs, + edges=edge_specs, + ) + + def _extract_dashed_edges(self, mermaid_code: str) -> list[str]: + """Extract all dashed-edge lines from mermaid code. + + Returns: + Lines containing dashed arrows (-.-> or -."label".->). + """ + return [line.strip() for line in mermaid_code.split("\n") if ".->" in line] + + def _build_controller_graph_with_dashed_edge( + self, + edge_kind: EdgeKind, + edge_label: str | None = None, + ) -> GraphSpec: + """Build a graph with a controller, two children, and a dashed edge between their stuffs. + + The controller contains two child pipes. The dashed edge connects + source_stuff from child_a to target_stuff owned by the controller (for aggregate/combine) + or child_b (for batch_item). + + Args: + edge_kind: The kind of dashed edge to create. + edge_label: Optional label for the dashed edge. + + Returns: + A GraphSpec with the dashed-edge scenario. + """ + controller = { + "node_id": "ctrl_1", + "kind": NodeKind.CONTROLLER, + "pipe_code": "batch_ctrl", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="ctrl_output", concept="OutputList", digest="ctrl_out_digest")], + ), + } + child_a = { + "node_id": "child_a", + "kind": NodeKind.OPERATOR, + "pipe_code": "pipe_a", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="source_stuff", concept="Text", digest="source_digest")], + ), + } + child_b = { + "node_id": "child_b", + "kind": NodeKind.OPERATOR, + "pipe_code": "pipe_b", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[IOSpec(name="target_stuff", concept="Text", digest="target_digest")], + outputs=[], + ), + } + contains_a = { + "edge_id": "edge_contains_a", + "source": "ctrl_1", + "target": "child_a", + "kind": EdgeKind.CONTAINS, + } + contains_b = { + "edge_id": "edge_contains_b", + "source": "ctrl_1", + "target": "child_b", + "kind": EdgeKind.CONTAINS, + } + + # For BATCH_AGGREGATE and PARALLEL_COMBINE, target is the controller's output stuff + # For BATCH_ITEM, target is child_b's input stuff + target_stuff_digest: str + match edge_kind: + case EdgeKind.BATCH_ITEM: + target_stuff_digest = "target_digest" + case EdgeKind.BATCH_AGGREGATE | EdgeKind.PARALLEL_COMBINE: + target_stuff_digest = "ctrl_out_digest" + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME: + msg = f"Unexpected edge kind for dashed edge test: {edge_kind}" + raise ValueError(msg) + + dashed_edge: dict[str, Any] = { + "edge_id": "edge_dashed", + "source": "child_a", + "target": "ctrl_1", + "kind": edge_kind, + "source_stuff_digest": "source_digest", + "target_stuff_digest": target_stuff_digest, + } + if edge_label: + dashed_edge["label"] = edge_label + + return self._make_graph( + nodes=[controller, child_a, child_b], + edges=[contains_a, contains_b, dashed_edge], + ) + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_rendered_for_each_kind(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that each dashed-edge kind produces at least one dashed arrow.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + assert len(dashed_lines) >= 1, f"Expected at least one dashed edge for {topic}, got none" + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_with_label(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that labeled dashed edges include the label in the mermaid syntax.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind, edge_label="my_label") + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + labeled = [line for line in dashed_lines if "my_label" in line] + assert len(labeled) >= 1, f"Expected a labeled dashed edge for {topic}, got: {dashed_lines}" + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_without_label(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that unlabeled dashed edges use plain dashed arrow syntax.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + # Unlabeled edges use `-.->` without a label string + plain_dashed = [line for line in dashed_lines if ".->" in line and '-."' not in line] + assert len(plain_dashed) >= 1, f"Expected a plain dashed edge for {topic}, got: {dashed_lines}" + + def test_all_edge_kinds_use_same_dashed_syntax(self) -> None: + """Verify that all three dashed-edge kinds produce structurally identical dashed arrow syntax. + + This test catches divergence if one copy of the logic is modified but not the others. + """ + results_by_kind: dict[str, list[str]] = {} + for edge_kind in (EdgeKind.BATCH_ITEM, EdgeKind.BATCH_AGGREGATE, EdgeKind.PARALLEL_COMBINE): + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind, edge_label="test_label") + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + # Extract just the arrow operator from each line (e.g., `-."test_label".->` or `-.->`) + # by replacing stuff IDs (s_XXX) with a placeholder + normalized = [re.sub(r"s_[a-f0-9]+", "ID", line) for line in dashed_lines] + results_by_kind[edge_kind] = normalized + + # All three should produce the same normalized patterns + kinds = list(results_by_kind.keys()) + for index_kind in range(1, len(kinds)): + assert results_by_kind[kinds[0]] == results_by_kind[kinds[index_kind]], ( + f"Dashed edge syntax differs between {kinds[0]} and {kinds[index_kind]}: " + f"{results_by_kind[kinds[0]]} vs {results_by_kind[kinds[index_kind]]}" + ) + + def test_missing_stuff_resolved_on_the_fly(self) -> None: + """Verify that stuff nodes not in the normal stuff_registry get rendered on-the-fly for dashed edges. + + Creates a scenario where the target stuff only exists on the controller's output + (not registered through normal pipe IOSpec), so it must be resolved from all_stuff_info. + """ + controller = { + "node_id": "ctrl_1", + "kind": NodeKind.CONTROLLER, + "pipe_code": "batch_ctrl", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="aggregated_output", concept="OutputList", digest="agg_digest")], + ), + } + child = { + "node_id": "child_1", + "kind": NodeKind.OPERATOR, + "pipe_code": "child_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="item_output", concept="Text", digest="item_digest")], + ), + } + contains = { + "edge_id": "edge_contains", + "source": "ctrl_1", + "target": "child_1", + "kind": EdgeKind.CONTAINS, + } + aggregate_edge = { + "edge_id": "edge_agg", + "source": "child_1", + "target": "ctrl_1", + "kind": EdgeKind.BATCH_AGGREGATE, + "source_stuff_digest": "item_digest", + "target_stuff_digest": "agg_digest", + } + graph = self._make_graph( + nodes=[controller, child], + edges=[contains, aggregate_edge], + ) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + # The aggregated_output stuff should be rendered (resolved on the fly) + assert "aggregated_output" in result.mermaid_code + # And there should be a dashed edge connecting them + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + assert len(dashed_lines) >= 1, "Expected a dashed edge for aggregate, got none" diff --git a/tests/unit/pipelex/graph/test_graph_tracer.py b/tests/unit/pipelex/graph/test_graph_tracer.py index 50bdeb7eb..82fb0b975 100644 --- a/tests/unit/pipelex/graph/test_graph_tracer.py +++ b/tests/unit/pipelex/graph/test_graph_tracer.py @@ -872,3 +872,258 @@ def test_batch_aggregate_edges_contain_stuff_digests(self) -> None: edge = batch_aggregate_edges[0] assert edge.source_stuff_digest == "item_result_digest" assert edge.target_stuff_digest == "output_list_digest" + + def test_register_controller_output(self) -> None: + """Test that register_controller_output adds to output_specs and _stuff_producer_map. + + When a controller explicitly registers outputs, DATA edges should go from + the controller node to consumers of those outputs. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="controller-output-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Controller node (e.g., PipeParallel) + controller_id, ctrl_ctx = tracer.on_pipe_start( + graph_context=context, + pipe_code="my_parallel", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at, + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + + # Branch 1: produces output with digest "branch_output_1" + branch1_id, _ = tracer.on_pipe_start( + graph_context=ctrl_ctx, + pipe_code="branch_pipe_1", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=10), + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + tracer.on_pipe_end_success( + node_id=branch1_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="short_summary", concept="Text", digest="branch_output_1"), + ) + + # Branch 2: produces output with digest "branch_output_2" + branch2_id, _ = tracer.on_pipe_start( + graph_context=ctrl_ctx, + pipe_code="branch_pipe_2", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=10), + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + tracer.on_pipe_end_success( + node_id=branch2_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="long_summary", concept="Text", digest="branch_output_2"), + ) + + # Controller registers branch outputs (overriding sub-pipe registrations) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="short_summary", concept="Text", digest="branch_output_1"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="long_summary", concept="Text", digest="branch_output_2"), + ) + + # Consumer pipe that uses branch_output_1 + consumer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_pipe", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=60), + input_specs=[IOSpec(name="summary", concept="Text", digest="branch_output_1")], + ) + tracer.on_pipe_end_success( + node_id=consumer_id, + ended_at=started_at + timedelta(milliseconds=100), + ) + + # End controller + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=110), + ) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Verify controller node has 2 output specs + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 2 + output_names = {output.name for output in controller_node.node_io.outputs} + assert output_names == {"short_summary", "long_summary"} + + # Verify DATA edge goes from controller (not branch) to consumer + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + controller_to_consumer = [edge for edge in data_edges if edge.target == consumer_id] + assert len(controller_to_consumer) == 1 + assert controller_to_consumer[0].source == controller_id + + def test_passthrough_output_skipped(self) -> None: + """Test that on_pipe_end_success skips output registration when output matches an input. + + When a controller's main_stuff is unchanged from one of its inputs (pass-through), + the output should not be registered to avoid corrupting data edges. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="passthrough-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Producer pipe creates stuff with digest "original_stuff" + producer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="producer", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at, + ) + tracer.on_pipe_end_success( + node_id=producer_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="output", concept="Text", digest="original_stuff"), + ) + + # Controller consumes "original_stuff" and its main_stuff is the same + controller_id, _ctrl_ctx = tracer.on_pipe_start( + graph_context=context, + pipe_code="my_parallel", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at + timedelta(milliseconds=60), + input_specs=[IOSpec(name="input_text", concept="Text", digest="original_stuff")], + ) + + # Controller ends with the same digest as its input (pass-through) + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=100), + output_spec=IOSpec(name="input_text", concept="Text", digest="original_stuff"), + ) + + # Consumer should still get the edge from the original producer, not the controller + consumer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=110), + input_specs=[IOSpec(name="input", concept="Text", digest="original_stuff")], + ) + tracer.on_pipe_end_success( + node_id=consumer_id, + ended_at=started_at + timedelta(milliseconds=150), + ) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Controller should have NO outputs (pass-through was skipped) + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 0 + + # DATA edges should go from producer to both controller (as input) and consumer + # The controller does NOT steal the producer registration (pass-through skipped) + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + assert len(data_edges) == 2 + assert all(edge.source == producer_id for edge in data_edges) + targets = {edge.target for edge in data_edges} + assert targets == {controller_id, consumer_id} + + def test_multiple_output_specs(self) -> None: + """Test that a node can have multiple outputs via register_controller_output. + + All registered outputs should produce correct DATA edges to their consumers. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="multi-output-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Controller with multiple outputs + controller_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="multi_output_pipe", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at, + ) + + # Register three different outputs + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_a", concept="Text", digest="digest_a"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_b", concept="Text", digest="digest_b"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_c", concept="Text", digest="digest_c"), + ) + + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=100), + ) + + # Consumer A reads digest_a + consumer_a_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_a", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=110), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_a")], + ) + tracer.on_pipe_end_success(node_id=consumer_a_id, ended_at=started_at + timedelta(milliseconds=120)) + + # Consumer B reads digest_b + consumer_b_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_b", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=130), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_b")], + ) + tracer.on_pipe_end_success(node_id=consumer_b_id, ended_at=started_at + timedelta(milliseconds=140)) + + # Consumer C reads digest_c + consumer_c_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_c", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=150), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_c")], + ) + tracer.on_pipe_end_success(node_id=consumer_c_id, ended_at=started_at + timedelta(milliseconds=160)) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Controller should have 3 output specs + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 3 + + # 3 DATA edges: controller -> consumer_a, controller -> consumer_b, controller -> consumer_c + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + assert len(data_edges) == 3 + assert all(edge.source == controller_id for edge in data_edges) + targets = {edge.target for edge in data_edges} + assert targets == {consumer_a_id, consumer_b_id, consumer_c_id} diff --git a/tests/unit/pipelex/graph/test_mermaidflow.py b/tests/unit/pipelex/graph/test_mermaidflow.py index 599b4d693..c875dcf39 100644 --- a/tests/unit/pipelex/graph/test_mermaidflow.py +++ b/tests/unit/pipelex/graph/test_mermaidflow.py @@ -386,3 +386,89 @@ def test_subgraph_depth_coloring(self) -> None: # Should have multiple subgraphs with different colors assert "subgraph" in result.mermaid_code assert "style sg_" in result.mermaid_code # Subgraph styling + + def test_parallel_combine_stuff_rendered_inside_controller_subgraph(self) -> None: + """Test that PARALLEL_COMBINE target stuffs are rendered inside the controller's subgraph.""" + parallel_ctrl = { + "node_id": "parallel_ctrl", + "kind": NodeKind.CONTROLLER, + "pipe_code": "parallel_controller", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="combined_output", concept="MergedText", digest="combined_digest_001")], + ), + } + branch_a = { + "node_id": "branch_a", + "kind": NodeKind.OPERATOR, + "pipe_code": "branch_a_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="branch_a_out", concept="Text", digest="branch_a_digest")], + ), + } + branch_b = { + "node_id": "branch_b", + "kind": NodeKind.OPERATOR, + "pipe_code": "branch_b_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="branch_b_out", concept="Text", digest="branch_b_digest")], + ), + } + contains_a = { + "edge_id": "edge_contains_a", + "source": "parallel_ctrl", + "target": "branch_a", + "kind": EdgeKind.CONTAINS, + } + contains_b = { + "edge_id": "edge_contains_b", + "source": "parallel_ctrl", + "target": "branch_b", + "kind": EdgeKind.CONTAINS, + } + combine_a = { + "edge_id": "edge_combine_a", + "source": "branch_a", + "target": "parallel_ctrl", + "kind": EdgeKind.PARALLEL_COMBINE, + "source_stuff_digest": "branch_a_digest", + "target_stuff_digest": "combined_digest_001", + } + combine_b = { + "edge_id": "edge_combine_b", + "source": "branch_b", + "target": "parallel_ctrl", + "kind": EdgeKind.PARALLEL_COMBINE, + "source_stuff_digest": "branch_b_digest", + "target_stuff_digest": "combined_digest_001", + } + graph = self._make_graph( + nodes=[parallel_ctrl, branch_a, branch_b], + edges=[contains_a, contains_b, combine_a, combine_b], + ) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + # The combined output stuff should appear inside the controller's subgraph + # (between subgraph ... and end) + lines = result.mermaid_code.split("\n") + subgraph_start_idx = None + subgraph_end_idx = None + for index_line, line in enumerate(lines): + if "subgraph" in line and "parallel_controller" in line: + subgraph_start_idx = index_line + if subgraph_start_idx is not None and subgraph_end_idx is None and line.strip() == "end": + subgraph_end_idx = index_line + break + + assert subgraph_start_idx is not None, "Controller subgraph not found" + assert subgraph_end_idx is not None, "Controller subgraph end not found" + + subgraph_content = "\n".join(lines[subgraph_start_idx : subgraph_end_idx + 1]) + assert "combined_output" in subgraph_content, "Combined output stuff should be inside the controller subgraph" + assert ":::stuff" in subgraph_content, "Combined output stuff should have :::stuff class styling" diff --git a/tests/unit/pipelex/pipe_controllers/parallel/data.py b/tests/unit/pipelex/pipe_controllers/parallel/data.py index bdbfda0f9..9d65b0263 100644 --- a/tests/unit/pipelex/pipe_controllers/parallel/data.py +++ b/tests/unit/pipelex/pipe_controllers/parallel/data.py @@ -14,7 +14,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_add_each_output", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], @@ -28,7 +28,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_combined_output", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_1", result="analysis_1"), SubPipeBlueprint(pipe="analyze_2", result="analysis_2"), ], @@ -42,7 +42,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_both_output_options", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="compute_x", result="x"), SubPipeBlueprint(pipe="compute_y", result="y"), ], @@ -52,12 +52,12 @@ class PipeParallelInputTestCases: ) VALID_THREE_PARALLELS: ClassVar[tuple[str, PipeParallelBlueprint]] = ( - "valid_three_parallels", + "valid_three_branches", PipeParallelBlueprint( description="Test case: valid_three_parallels", inputs={"input_data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="branch_1", result="result_1"), SubPipeBlueprint(pipe="branch_2", result="result_2"), SubPipeBlueprint(pipe="branch_3", result="result_3"), @@ -72,7 +72,7 @@ class PipeParallelInputTestCases: description="Test case: valid_multiple_inputs", inputs={"text_data": "native.Text", "image_data": "native.Image"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_text", result="text_result"), SubPipeBlueprint(pipe="process_image", result="image_result"), ], @@ -96,7 +96,7 @@ class PipeParallelInputTestCases: "description": "Test case: no_output_options", "inputs": {"data": "native.Text"}, "output": "native.Text", - "parallels": [ + "branches": [ {"pipe": "process_a", "result": "result_a"}, {"pipe": "process_b", "result": "result_b"}, ], diff --git a/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py b/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py index 3574cffac..24373dfcb 100644 --- a/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py +++ b/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py @@ -11,7 +11,7 @@ def test_pipe_dependencies_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], @@ -23,7 +23,7 @@ def test_pipe_dependencies_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="step1", result="result1"), SubPipeBlueprint(pipe="step2", result="result2"), SubPipeBlueprint(pipe="step3", result="result3"), @@ -37,7 +37,7 @@ def test_validate_combined_output_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Text", ) assert blueprint.combined_output == "Text" @@ -46,7 +46,7 @@ def test_validate_combined_output_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Number", ) assert blueprint.combined_output == "Number" @@ -57,7 +57,7 @@ def test_validate_combined_output_incorrect(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="InvalidConcept!", ) assert "Combined output 'InvalidConcept!' is not a valid concept string or code" in str(exc_info.value) @@ -67,7 +67,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=True, ) assert blueprint.add_each_output is True @@ -76,7 +76,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Text", ) assert blueprint.combined_output == "Text" @@ -85,7 +85,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=True, combined_output="Text", ) @@ -98,7 +98,7 @@ def test_validate_output_options_incorrect(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=False, combined_output=None, )