From 143f2730d5ee6c5b741f694a7f776256ed9ec9e8 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 15:35:40 +0100 Subject: [PATCH 1/8] Rename `parallels` field to `branches` on PipeParallel models MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The docs already use "branch" terminology extensively; this aligns the field name with the conceptual model. A migration mapping entry (`parallels → branches`) is added so users get a helpful error on the old name. Co-Authored-By: Claude Opus 4.6 --- README.md | 2 +- .../pipes/pipe-controllers/PipeParallel.md | 8 ++--- docs/home/9-tools/pipe-builder.md | 2 +- pipelex/builder/builder.plx | 2 +- pipelex/builder/builder_loop.py | 2 +- pipelex/builder/pipe/pipe_parallel_spec.py | 26 ++++++++-------- pipelex/cli/agent_cli/commands/pipe_cmd.py | 30 +++++++++---------- .../parallel/pipe_parallel_blueprint.py | 4 +-- .../parallel/pipe_parallel_factory.py | 2 +- pipelex/pipelex.toml | 1 + .../pipe_compose/cv_job_match.plx | 4 +-- .../pipe_parallel/pipe_parallel_1.plx | 2 +- .../test_pipe_parallel_simple.py | 4 +-- .../test_pipe_parallel_validation.py | 6 ++-- .../test_bracket_notation_controllers.py | 2 +- .../pipe_parallel/test_data.py | 12 ++++---- .../core/bundles/test_data_pipe_sorter.py | 2 +- .../controllers/parallel/pipe_parallel.py | 4 +-- .../pipelex/pipe_controllers/parallel/data.py | 14 ++++----- .../parallel/test_pipe_parallel_blueprint.py | 18 +++++------ 20 files changed, 74 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 41bd2b779..1c4eb35a7 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ Executes parallel extraction of text content from both the CV PDF and job offer """ inputs = { cv_pdf = "PDF", job_offer_pdf = "PDF" } output = "Dynamic" -parallels = [ +branches = [ { pipe = "extract_cv_text", result = "cv_pages" }, { pipe = "extract_job_offer_text", result = "job_offer_pages" }, ] diff --git a/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md b/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md index 9bff1cb53..f3243188e 100644 --- a/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md +++ b/docs/home/6-build-reliable-ai-workflows/pipes/pipe-controllers/PipeParallel.md @@ -26,13 +26,13 @@ You must use `add_each_output`, `combined_output`, or both. | `description` | string | A description of the parallel operation. | Yes | | `inputs` | dictionary | The input concept(s) for the parallel operation, as a dictionary mapping input names to concept codes. | Yes | | `output` | string | The output concept produced by the parallel operation. | Yes | -| `parallels` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes | +| `branches` | array of tables| An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes | | `add_each_output` | boolean | If `true`, adds the output of each parallel pipe to the working memory individually. Defaults to `true`. | No | -| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `parallels` array. | No | +| `combined_output` | string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the `result` names from the `branches` array. | No | ### Parallel Step Configuration -Each entry in the `parallels` array is a table with the following keys: +Each entry in the `branches` array is a table with the following keys: | Key | Type | Description | Required | | -------- | ------ | ---------------------------------------------------------------------------------------- | -------- | @@ -67,7 +67,7 @@ inputs = { description = "ProductDescription" } output = "ProductAnalysis" # This name is for the combined output add_each_output = true combined_output = "ProductAnalysis" -parallels = [ +branches = [ { pipe = "extract_features", result = "features" }, { pipe = "analyze_sentiment", result = "sentiment" }, ] diff --git a/docs/home/9-tools/pipe-builder.md b/docs/home/9-tools/pipe-builder.md index a0efc748c..e7ab37a5b 100644 --- a/docs/home/9-tools/pipe-builder.md +++ b/docs/home/9-tools/pipe-builder.md @@ -15,7 +15,7 @@ And generates: - **Domain concepts** - Data structures for your workflow (e.g., `CVAnalysis`, `InterviewQuestion`) - **Pipe operators** - LLM calls, extractions, image generation steps -- **Pipe controllers** - Sequences, batches, parallels, conditions to orchestrate the flow +- **Pipe controllers** - Sequences, batches, parallel branches, conditions to orchestrate the flow - **A complete bundle** - Ready to validate and run ## How It Works diff --git a/pipelex/builder/builder.plx b/pipelex/builder/builder.plx index aba5d53ab..043a7f18a 100644 --- a/pipelex/builder/builder.plx +++ b/pipelex/builder/builder.plx @@ -222,7 +222,7 @@ Shape of the contract for PipeOperator is: - steps: List of sub-pipes to execute sequentially. Each step has: pipe (name of the pipe to execute), result (variable name). **PipeParallel:** -- parallels: List of sub-pipes to execute concurrently. +- branches: List of sub-pipes to execute concurrently. - add_each_output: Boolean - include individual outputs in combined result. - combined_output: Optional ConceptCode (PascalCase) for combined structure. diff --git a/pipelex/builder/builder_loop.py b/pipelex/builder/builder_loop.py index e3586c50d..afd69a6d6 100644 --- a/pipelex/builder/builder_loop.py +++ b/pipelex/builder/builder_loop.py @@ -313,7 +313,7 @@ def _prune_unreachable_specs(self, pipelex_bundle_spec: PipelexBundleSpec) -> Pi if isinstance(pipe_spec, PipeSequenceSpec): sub_pipe_codes = [step.pipe_code for step in pipe_spec.steps] elif isinstance(pipe_spec, PipeParallelSpec): - sub_pipe_codes = [parallel.pipe_code for parallel in pipe_spec.parallels] + sub_pipe_codes = [branch.pipe_code for branch in pipe_spec.branches] elif isinstance(pipe_spec, PipeBatchSpec): sub_pipe_codes = [pipe_spec.branch_pipe_code] elif isinstance(pipe_spec, PipeConditionSpec): diff --git a/pipelex/builder/pipe/pipe_parallel_spec.py b/pipelex/builder/pipe/pipe_parallel_spec.py index 216689cd6..6890bc16d 100644 --- a/pipelex/builder/pipe/pipe_parallel_spec.py +++ b/pipelex/builder/pipe/pipe_parallel_spec.py @@ -23,16 +23,16 @@ class PipeParallelSpec(PipeSpec): and their outputs can be combined or kept separate. Validation Rules: - 1. Parallels list must not be empty. - 2. Each parallel step must be a valid SubPipeSpec. + 1. Branches list must not be empty. + 2. Each branch must be a valid SubPipeSpec. 3. combined_output, when specified, must be a valid ConceptCode in PascalCase. - 4. Pipe codes in parallels must reference existing pipes (snake_case). + 4. Pipe codes in branches must reference existing pipes (snake_case). """ type: Literal["PipeParallel"] = "PipeParallel" pipe_category: Literal["PipeController"] = "PipeController" - parallels: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.") + branches: list[SubPipeSpec] = Field(description="List of SubPipeSpec instances to execute concurrently.") add_each_output: bool = Field(description="Whether to include individual pipe outputs in the combined result.") combined_output: str | None = Field( default=None, @@ -74,7 +74,7 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri # Add parallel branches as a table parallel_group.renderables.append(Text()) # Blank line - parallels_table = Table( + branches_table = Table( title="Parallel Branches:", title_justify="left", title_style="not italic", @@ -84,28 +84,28 @@ def rendered_pretty(self, title: str | None = None, depth: int = 0) -> PrettyPri show_lines=True, border_style="dim", ) - parallels_table.add_column("Branch", style="dim", width=6, justify="right") - parallels_table.add_column("Pipe", style="red") - parallels_table.add_column("Result name", style="cyan") + branches_table.add_column("Branch", style="dim", width=6, justify="right") + branches_table.add_column("Pipe", style="red") + branches_table.add_column("Result name", style="cyan") - for idx, parallel in enumerate(self.parallels, start=1): - parallels_table.add_row(str(idx), parallel.pipe_code, parallel.result) + for idx, branch in enumerate(self.branches, start=1): + branches_table.add_row(str(idx), branch.pipe_code, branch.result) - parallel_group.renderables.append(parallels_table) + parallel_group.renderables.append(branches_table) return parallel_group @override def to_blueprint(self) -> PipeParallelBlueprint: base_blueprint = super().to_blueprint() - core_parallels = [parallel.to_blueprint() for parallel in self.parallels] + core_branches = [branch.to_blueprint() for branch in self.branches] return PipeParallelBlueprint( description=base_blueprint.description, inputs=base_blueprint.inputs, output=base_blueprint.output, type=self.type, pipe_category=self.pipe_category, - parallels=core_parallels, + branches=core_branches, add_each_output=self.add_each_output, combined_output=self.combined_output, ) diff --git a/pipelex/cli/agent_cli/commands/pipe_cmd.py b/pipelex/cli/agent_cli/commands/pipe_cmd.py index d41276195..b641b8868 100644 --- a/pipelex/cli/agent_cli/commands/pipe_cmd.py +++ b/pipelex/cli/agent_cli/commands/pipe_cmd.py @@ -129,13 +129,13 @@ def _add_type_specific_fields(pipe_spec: PipeSpec, pipe_table: tomlkit.TOMLDocum pipe_table.add("add_each_output", pipe_spec.add_each_output) if pipe_spec.combined_output: pipe_table.add("combined_output", pipe_spec.combined_output) - parallels_array = tomlkit.array() - for parallel in pipe_spec.parallels: - parallel_inline = tomlkit.inline_table() - parallel_inline.append("pipe", parallel.pipe_code) - parallel_inline.append("result", parallel.result) - parallels_array.append(parallel_inline) - pipe_table.add("parallels", parallels_array) + branches_array = tomlkit.array() + for branch in pipe_spec.branches: + branch_inline = tomlkit.inline_table() + branch_inline.append("pipe", branch.pipe_code) + branch_inline.append("result", branch.result) + branches_array.append(branch_inline) + pipe_table.add("branches", branches_array) elif isinstance(pipe_spec, PipeConditionSpec): pipe_table.add("expression", pipe_spec.jinja2_expression_template) @@ -189,7 +189,7 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip # Add type to spec_data if not present spec_data["type"] = pipe_type - # Handle steps/parallels conversion - need to convert pipe to pipe_code + # Handle steps/branches conversion - need to convert pipe to pipe_code if "steps" in spec_data: converted_steps = [] for step in spec_data["steps"]: @@ -198,13 +198,13 @@ def _parse_pipe_spec_from_json(pipe_type: str, spec_data: dict[str, Any]) -> Pip converted_steps.append(step) spec_data["steps"] = converted_steps - if "parallels" in spec_data: - converted_parallels = [] - for parallel in spec_data["parallels"]: - if "pipe" in parallel and "pipe_code" not in parallel: - parallel["pipe_code"] = parallel.pop("pipe") - converted_parallels.append(parallel) - spec_data["parallels"] = converted_parallels + if "branches" in spec_data: + converted_branches = [] + for branch in spec_data["branches"]: + if "pipe" in branch and "pipe_code" not in branch: + branch["pipe_code"] = branch.pop("pipe") + converted_branches.append(branch) + spec_data["branches"] = converted_branches # Handle expression -> jinja2_expression_template for PipeCondition if pipe_type == "PipeCondition" and "expression" in spec_data: diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py b/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py index 576277c96..a1c6f6886 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel_blueprint.py @@ -12,7 +12,7 @@ class PipeParallelBlueprint(PipeBlueprint): type: Literal["PipeParallel"] = "PipeParallel" pipe_category: Literal["PipeController"] = "PipeController" - parallels: list[SubPipeBlueprint] + branches: list[SubPipeBlueprint] add_each_output: bool = False combined_output: str | None = None @@ -20,7 +20,7 @@ class PipeParallelBlueprint(PipeBlueprint): @override def pipe_dependencies(self) -> set[str]: """Return the set of pipe codes from the parallel branches.""" - return {parallel.pipe for parallel in self.parallels} + return {branch.pipe for branch in self.branches} @field_validator("combined_output", mode="before") @classmethod diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py b/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py index a1a19c8a6..4e421c5b0 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel_factory.py @@ -31,7 +31,7 @@ def make( blueprint: PipeParallelBlueprint, ) -> PipeParallel: parallel_sub_pipes: list[SubPipe] = [] - for sub_pipe_blueprint in blueprint.parallels: + for sub_pipe_blueprint in blueprint.branches: if not sub_pipe_blueprint.result: msg = f"Unexpected error in pipe '{pipe_code}': PipeParallel requires a result specified for each parallel sub pipe" raise PipeParallelFactoryError(message=msg) diff --git a/pipelex/pipelex.toml b/pipelex/pipelex.toml index 0254b537d..3b0dcd7ff 100644 --- a/pipelex/pipelex.toml +++ b/pipelex/pipelex.toml @@ -432,6 +432,7 @@ llm_to_structure = "model_to_structure" llm_skill = "llm_talent" img_gen_skill = "img_gen_talent" extract_skill = "extract_talent" +parallels = "branches" #################################################################################################### diff --git a/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx b/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx index e87487e43..818c82f9a 100644 --- a/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx +++ b/tests/e2e/pipelex/pipes/pipe_operators/pipe_compose/cv_job_match.plx @@ -29,7 +29,7 @@ type = "PipeParallel" description = "Extracts text content from both the CV and job offer PDFs concurrently" inputs = { cv_pdf = "Document", job_offer_pdf = "Document" } output = "Page[]" -parallels = [ +branches = [ { pipe = "extract_cv", result = "cv_pages" }, { pipe = "extract_job_offer", result = "job_offer_pages" }, ] @@ -54,7 +54,7 @@ type = "PipeParallel" description = "Analyzes both the CV and job offer documents concurrently to extract structured information" inputs = { cv_pages = "Page", job_offer_pages = "Page" } output = "Text" -parallels = [ +branches = [ { pipe = "analyze_cv", result = "cv_analysis" }, { pipe = "analyze_job_offer", result = "job_requirements" }, ] diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx b/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx index 3c4cf42dd..d3b928bfc 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.plx @@ -15,7 +15,7 @@ inputs = { document = "DocumentInput" } output = "CombinedAnalysis" add_each_output = true combined_output = "CombinedAnalysis" -parallels = [ +branches = [ { pipe = "analyze_length", result = "length_result" }, { pipe = "analyze_content", result = "content_result" }, ] diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py index 2ec240177..5d572009c 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_simple.py @@ -32,7 +32,7 @@ async def test_parallel_text_analysis( description="Parallel text analysis pipeline", inputs={"input_text": f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}"}, output=f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_sentiment", result="sentiment_result"), SubPipeBlueprint(pipe="count_words", result="word_count_result"), SubPipeBlueprint(pipe="extract_keywords", result="keywords_result"), @@ -151,7 +151,7 @@ async def test_parallel_short_text_analysis( description="Parallel text analysis pipeline for short text", inputs={"input_text": f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}"}, output=f"{SpecialDomain.NATIVE}.{NativeConceptCode.TEXT}", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_sentiment", result="sentiment_result"), SubPipeBlueprint(pipe="count_words", result="word_count_result"), SubPipeBlueprint(pipe="extract_keywords", result="keywords_result"), diff --git a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py index 1db6fa6d5..1ddd7e105 100644 --- a/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py +++ b/tests/integration/pipelex/pipes/controller/pipe_parallel/test_pipe_parallel_validation.py @@ -71,7 +71,7 @@ def test_pipe_parallel_with_real_pipe_structure(self, load_empty_library: Callab "context": concept_2.code, }, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[SubPipeBlueprint(pipe=real_pipe.code, result="analysis_result")], + branches=[SubPipeBlueprint(pipe=real_pipe.code, result="analysis_result")], add_each_output=True, combined_output=None, ) @@ -123,7 +123,7 @@ def test_pipe_parallel_creation(self, load_empty_library: Callable[[], None]): description="Basic parallel pipe for testing", inputs={"input_var": concept_1.concept_ref}, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[SubPipeBlueprint(pipe="test_pipe_1", result="result_1")], + branches=[SubPipeBlueprint(pipe="test_pipe_1", result="result_1")], add_each_output=True, combined_output=None, ) @@ -178,7 +178,7 @@ def test_pipe_parallel_needed_inputs_structure(self, load_empty_library: Callabl "context": concept_2.concept_ref, }, output=ConceptFactory.make_concept_ref_with_domain(domain_code=domain_code, concept_code=concept_3.code), - parallels=[], # No sub-pipes to avoid dependency issues + branches=[], # No sub-pipes to avoid dependency issues add_each_output=True, combined_output=None, ) diff --git a/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py b/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py index ed128ce67..dee8c2082 100644 --- a/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py +++ b/tests/integration/pipelex/pipes/test_bracket_notation_controllers.py @@ -39,7 +39,7 @@ def test_pipe_parallel_with_bracket_notation(self, load_empty_library: Callable[ description="Process items in parallel", inputs={"data": "DataItem[2]"}, output="ProcessedData", - parallels=[], + branches=[], add_each_output=True, ) diff --git a/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py b/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py index 97d9a0696..12a3c424b 100644 --- a/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py +++ b/tests/unit/pipelex/builder/pipe/pipe_controller/pipe_parallel/test_data.py @@ -14,7 +14,7 @@ class PipeParallelTestCases: description="Run pipes in parallel", inputs={"data": "Data"}, output="Results", - parallels=[ + branches=[ SubPipeSpec(pipe_code="analyze_data", result="analysis"), SubPipeSpec(pipe_code="transform_data", result="transformed"), SubPipeSpec(pipe_code="validate_data", result="validation"), @@ -25,7 +25,7 @@ class PipeParallelTestCases: description="Run pipes in parallel", inputs={"data": "Data"}, output="Results", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_data", result="analysis"), SubPipeBlueprint(pipe="transform_data", result="transformed"), SubPipeBlueprint(pipe="validate_data", result="validation"), @@ -43,7 +43,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeSpec(pipe_code="pipe1", result="result1"), SubPipeSpec(pipe_code="pipe2", result="result2"), ], @@ -54,7 +54,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe1", result="result1"), SubPipeBlueprint(pipe="pipe2", result="result2"), ], @@ -71,7 +71,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeSpec(pipe_code="pipe1", result="result1"), SubPipeSpec(pipe_code="pipe2", result="result2"), ], @@ -82,7 +82,7 @@ class PipeParallelTestCases: description="Parallel with combined output", inputs={"input": "Input"}, output="CombinedResult", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe1", result="result1"), SubPipeBlueprint(pipe="pipe2", result="result2"), ], diff --git a/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py b/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py index a0e56fc68..3dae376d5 100644 --- a/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py +++ b/tests/unit/pipelex/core/bundles/test_data_pipe_sorter.py @@ -53,7 +53,7 @@ class PipeSorterTestCases: description="D depends on B and C", inputs={}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="pipe_b", result="result_b"), SubPipeBlueprint(pipe="pipe_c", result="result_c"), ], diff --git a/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py b/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py index 3ab345bcd..3c880382b 100644 --- a/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py +++ b/tests/unit/pipelex/core/test_data/pipes/controllers/parallel/pipe_parallel.py @@ -14,7 +14,7 @@ type = "PipeParallel" description = "PipeParallel example in PIPE_PARALLEL_TEST_CASES" output = "ProcessedData" -parallels = [ +branches = [ { pipe = "process_a", result = "result_a" }, { pipe = "process_b", result = "result_b" }, ] @@ -29,7 +29,7 @@ type="PipeParallel", description="PipeParallel example in PIPE_PARALLEL_TEST_CASES", output="ProcessedData", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], diff --git a/tests/unit/pipelex/pipe_controllers/parallel/data.py b/tests/unit/pipelex/pipe_controllers/parallel/data.py index bdbfda0f9..9d65b0263 100644 --- a/tests/unit/pipelex/pipe_controllers/parallel/data.py +++ b/tests/unit/pipelex/pipe_controllers/parallel/data.py @@ -14,7 +14,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_add_each_output", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], @@ -28,7 +28,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_combined_output", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="analyze_1", result="analysis_1"), SubPipeBlueprint(pipe="analyze_2", result="analysis_2"), ], @@ -42,7 +42,7 @@ class PipeParallelInputTestCases: description="Test case: valid_with_both_output_options", inputs={"data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="compute_x", result="x"), SubPipeBlueprint(pipe="compute_y", result="y"), ], @@ -52,12 +52,12 @@ class PipeParallelInputTestCases: ) VALID_THREE_PARALLELS: ClassVar[tuple[str, PipeParallelBlueprint]] = ( - "valid_three_parallels", + "valid_three_branches", PipeParallelBlueprint( description="Test case: valid_three_parallels", inputs={"input_data": "native.Text"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="branch_1", result="result_1"), SubPipeBlueprint(pipe="branch_2", result="result_2"), SubPipeBlueprint(pipe="branch_3", result="result_3"), @@ -72,7 +72,7 @@ class PipeParallelInputTestCases: description="Test case: valid_multiple_inputs", inputs={"text_data": "native.Text", "image_data": "native.Image"}, output="native.Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_text", result="text_result"), SubPipeBlueprint(pipe="process_image", result="image_result"), ], @@ -96,7 +96,7 @@ class PipeParallelInputTestCases: "description": "Test case: no_output_options", "inputs": {"data": "native.Text"}, "output": "native.Text", - "parallels": [ + "branches": [ {"pipe": "process_a", "result": "result_a"}, {"pipe": "process_b", "result": "result_b"}, ], diff --git a/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py b/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py index 3574cffac..24373dfcb 100644 --- a/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py +++ b/tests/unit/pipelex/pipe_controllers/parallel/test_pipe_parallel_blueprint.py @@ -11,7 +11,7 @@ def test_pipe_dependencies_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="process_a", result="result_a"), SubPipeBlueprint(pipe="process_b", result="result_b"), ], @@ -23,7 +23,7 @@ def test_pipe_dependencies_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[ + branches=[ SubPipeBlueprint(pipe="step1", result="result1"), SubPipeBlueprint(pipe="step2", result="result2"), SubPipeBlueprint(pipe="step3", result="result3"), @@ -37,7 +37,7 @@ def test_validate_combined_output_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Text", ) assert blueprint.combined_output == "Text" @@ -46,7 +46,7 @@ def test_validate_combined_output_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Number", ) assert blueprint.combined_output == "Number" @@ -57,7 +57,7 @@ def test_validate_combined_output_incorrect(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="InvalidConcept!", ) assert "Combined output 'InvalidConcept!' is not a valid concept string or code" in str(exc_info.value) @@ -67,7 +67,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=True, ) assert blueprint.add_each_output is True @@ -76,7 +76,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], combined_output="Text", ) assert blueprint.combined_output == "Text" @@ -85,7 +85,7 @@ def test_validate_output_options_correct(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=True, combined_output="Text", ) @@ -98,7 +98,7 @@ def test_validate_output_options_incorrect(self): description="lorem ipsum", inputs={"data": "Text"}, output="Text", - parallels=[SubPipeBlueprint(pipe="process", result="result")], + branches=[SubPipeBlueprint(pipe="process", result="result")], add_each_output=False, combined_output=None, ) From 6b551fb97191100225b35402e13d9263dc98af8b Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 16:22:56 +0100 Subject: [PATCH 2/8] Add unit tests for StuffFactory.combine_stuffs and remove TODO Tests cover success cases (multi-field, single-field, auto-generated name) and error cases (missing required field, wrong content type). Co-Authored-By: Claude Opus 4.6 --- pipelex/core/stuffs/stuff_factory.py | 1 - .../test_stuff_factory_combine_stuffs.py | 164 ++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py diff --git a/pipelex/core/stuffs/stuff_factory.py b/pipelex/core/stuffs/stuff_factory.py index 53e86c3c7..0bb90aff3 100644 --- a/pipelex/core/stuffs/stuff_factory.py +++ b/pipelex/core/stuffs/stuff_factory.py @@ -110,7 +110,6 @@ def combine_stuffs( stuff_contents: dict[str, StuffContent], name: str | None = None, ) -> Stuff: - # TODO: Add unit tests for this method """Combine a dictionary of stuffs into a single stuff.""" the_subclass = get_class_registry().get_required_subclass(name=concept.structure_class_name, base_class=StuffContent) try: diff --git a/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py b/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py new file mode 100644 index 000000000..e79d1f169 --- /dev/null +++ b/tests/unit/pipelex/core/stuffs/test_stuff_factory_combine_stuffs.py @@ -0,0 +1,164 @@ +import os +from pathlib import Path +from typing import TYPE_CHECKING, Callable + +import pytest +from pydantic import Field + +from pipelex.core.concepts.concept_factory import ConceptFactory +from pipelex.core.stuffs.exceptions import StuffFactoryError +from pipelex.core.stuffs.structured_content import StructuredContent +from pipelex.core.stuffs.stuff_factory import StuffFactory +from pipelex.core.stuffs.text_content import TextContent +from pipelex.hub import get_concept_library +from pipelex.system.registries.class_registry_utils import ClassRegistryUtils + +if TYPE_CHECKING: + from pipelex.core.stuffs.stuff_content import StuffContent + + +class SentimentAndWordCount(StructuredContent): + """A structured content combining sentiment and word count results.""" + + sentiment_result: TextContent = Field(description="Sentiment analysis result") + word_count_result: TextContent = Field(description="Word count result") + + +class SingleFieldContent(StructuredContent): + """A structured content with a single field.""" + + summary: TextContent = Field(description="Summary text") + + +DOMAIN_CODE = "test_combine" + + +@pytest.fixture(scope="class") +def setup_combine_concepts(load_test_library: Callable[[list[Path]], None]): + """Register structured content classes and create concepts for combine_stuffs tests.""" + load_test_library([Path(__file__).parent]) + ClassRegistryUtils.register_classes_in_file( + file_path=os.path.join(os.path.dirname(__file__), "test_stuff_factory_combine_stuffs.py"), + base_class=StructuredContent, + is_include_imported=False, + ) + + concept_library = get_concept_library() + + concept_sentiment_and_word_count = ConceptFactory.make( + concept_code="SentimentAndWordCount", + domain_code=DOMAIN_CODE, + description="Combined sentiment and word count", + structure_class_name="SentimentAndWordCount", + ) + concept_library.add_new_concept(concept=concept_sentiment_and_word_count) + + concept_single_field = ConceptFactory.make( + concept_code="SingleFieldContent", + domain_code=DOMAIN_CODE, + description="Single field content", + structure_class_name="SingleFieldContent", + ) + concept_library.add_new_concept(concept=concept_single_field) + + yield + + concept_library.remove_concepts_by_concept_refs( + concept_refs=[ + f"{DOMAIN_CODE}.SentimentAndWordCount", + f"{DOMAIN_CODE}.SingleFieldContent", + ] + ) + + +@pytest.mark.usefixtures("setup_combine_concepts") +class TestStuffFactoryCombineStuffs: + """Tests for StuffFactory.combine_stuffs method.""" + + def test_combine_two_text_contents(self): + """Test combining two TextContent fields into a StructuredContent stuff.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + "word_count_result": TextContent(text="42"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="combined_analysis", + ) + + assert result.stuff_name == "combined_analysis" + assert isinstance(result.content, SentimentAndWordCount) + assert result.content.sentiment_result.text == "positive" + assert result.content.word_count_result.text == "42" + assert result.concept.code == "SentimentAndWordCount" + assert result.concept.domain_code == DOMAIN_CODE + + def test_combine_single_field(self): + """Test combining a single TextContent field.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SingleFieldContent") + + stuff_contents: dict[str, StuffContent] = { + "summary": TextContent(text="This is a summary"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="single_field_stuff", + ) + + assert isinstance(result.content, SingleFieldContent) + assert result.content.summary.text == "This is a summary" + + def test_combine_without_name_auto_generates(self): + """Test that omitting the name parameter still produces a valid Stuff.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SingleFieldContent") + + stuff_contents: dict[str, StuffContent] = { + "summary": TextContent(text="auto-named"), + } + + result = StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + ) + + assert result.stuff_name is not None + assert len(result.stuff_name) > 0 + assert isinstance(result.content, SingleFieldContent) + + def test_combine_with_missing_field_raises_error(self): + """Test that missing a required field raises StuffFactoryError.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + # missing word_count_result + } + + with pytest.raises(StuffFactoryError, match="Error combining stuffs"): + StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="incomplete", + ) + + def test_combine_with_wrong_content_type_raises_error(self): + """Test that passing wrong content type for a field raises StuffFactoryError.""" + concept = get_concept_library().get_required_concept(concept_ref=f"{DOMAIN_CODE}.SentimentAndWordCount") + + stuff_contents: dict[str, StuffContent] = { + "sentiment_result": TextContent(text="positive"), + "word_count_result": "not_a_stuff_content", # type: ignore[dict-item] + } + + with pytest.raises(StuffFactoryError, match="Error combining stuffs"): + StuffFactory.combine_stuffs( + concept=concept, + stuff_contents=stuff_contents, + name="wrong_type", + ) From f60a5b0993dfd9a13b41e2a47562ce067308d759 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 17:52:55 +0100 Subject: [PATCH 3/8] Add register_controller_output method to GraphTracer and related classes --- pipelex/graph/graph_tracer.py | 49 +++- pipelex/graph/graph_tracer_manager.py | 21 ++ pipelex/graph/graph_tracer_protocol.py | 24 ++ .../parallel/pipe_parallel.py | 51 ++++ .../pipe_parallel/parallel_graph_add_each.plx | 59 ++++ .../pipe_parallel/parallel_graph_combined.plx | 42 +++ .../pipe_parallel/parallel_graph_models.py | 11 + .../pipe_parallel/test_data.py | 54 ++++ .../pipe_parallel/test_pipe_parallel_graph.py | 271 ++++++++++++++++++ tests/unit/pipelex/graph/test_graph_tracer.py | 255 ++++++++++++++++ 10 files changed, 829 insertions(+), 8 deletions(-) create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py diff --git a/pipelex/graph/graph_tracer.py b/pipelex/graph/graph_tracer.py index 175f87d68..da0676052 100644 --- a/pipelex/graph/graph_tracer.py +++ b/pipelex/graph/graph_tracer.py @@ -47,7 +47,7 @@ def __init__( self.metrics: dict[str, float] = {} self.error: ErrorSpec | None = None self.input_specs: list[IOSpec] = input_specs or [] - self.output_spec: IOSpec | None = None + self.output_specs: list[IOSpec] = [] def to_node_spec(self) -> NodeSpec: """Convert to immutable NodeSpec.""" @@ -59,9 +59,7 @@ def to_node_spec(self) -> NodeSpec: ) # Build NodeIOSpec from captured input/output specs - outputs: list[IOSpec] = [] - if self.output_spec is not None: - outputs = [self.output_spec] + outputs = list(self.output_specs) node_io = NodeIOSpec( inputs=self.input_specs, @@ -422,10 +420,45 @@ def on_pipe_end_success( # Store output spec and register in producer map for data flow tracking if output_spec is not None: - node_data.output_spec = output_spec - # Register this node as the producer of this stuff_code (digest) - if output_spec.digest: - self._stuff_producer_map[output_spec.digest] = node_id + # Skip pass-through outputs: if the output digest matches one of the node's + # input digests, the output is just the unchanged input flowing through + # (e.g., PipeParallel with add_each_output where main_stuff is the original input) + input_digests = {spec.digest for spec in node_data.input_specs if spec.digest is not None} + if output_spec.digest in input_digests: + # Pass-through: don't register as output or producer + pass + else: + node_data.output_specs.append(output_spec) + # Register this node as the producer of this stuff_code (digest) + if output_spec.digest: + self._stuff_producer_map[output_spec.digest] = node_id + + @override + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + This allows controllers like PipeParallel to explicitly register their + branch outputs, overriding sub-pipe registrations in _stuff_producer_map + so that DATA edges flow from the controller to downstream consumers. + + Args: + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + if not self._is_active: + return + + node_data = self._nodes.get(node_id) + if node_data is None: + return + + node_data.output_specs.append(output_spec) + if output_spec.digest: + self._stuff_producer_map[output_spec.digest] = node_id @override def on_pipe_end_error( diff --git a/pipelex/graph/graph_tracer_manager.py b/pipelex/graph/graph_tracer_manager.py index f2cdf238b..4078b1c0c 100644 --- a/pipelex/graph/graph_tracer_manager.py +++ b/pipelex/graph/graph_tracer_manager.py @@ -298,6 +298,27 @@ def add_edge( label=label, ) + def register_controller_output( + self, + graph_id: str, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + Args: + graph_id: The graph identifier. + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + tracer = self._get_tracer(graph_id) + if tracer is None: + return + tracer.register_controller_output( + node_id=node_id, + output_spec=output_spec, + ) + def register_batch_item_extraction( self, graph_id: str, diff --git a/pipelex/graph/graph_tracer_protocol.py b/pipelex/graph/graph_tracer_protocol.py index 213e342cd..cdf924975 100644 --- a/pipelex/graph/graph_tracer_protocol.py +++ b/pipelex/graph/graph_tracer_protocol.py @@ -126,6 +126,22 @@ def add_edge( """ ... + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + """Register an additional output for a controller node. + + This allows controllers like PipeParallel to explicitly register their + branch outputs so that DATA edges flow from the controller to downstream consumers. + + Args: + node_id: The controller node ID. + output_spec: The IOSpec describing the output. + """ + ... + def register_batch_item_extraction( self, list_stuff_code: str, @@ -235,6 +251,14 @@ def add_edge( ) -> None: pass + @override + def register_controller_output( + self, + node_id: str, + output_spec: IOSpec, + ) -> None: + pass + @override def register_batch_item_extraction( self, diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel.py b/pipelex/pipe_controllers/parallel/pipe_parallel.py index 90d5453e6..16c2fc5d9 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel.py @@ -13,6 +13,8 @@ from pipelex.core.pipes.inputs.input_stuff_specs_factory import InputStuffSpecsFactory from pipelex.core.pipes.pipe_output import PipeOutput from pipelex.core.stuffs.stuff_factory import StuffFactory +from pipelex.graph.graph_tracer_manager import GraphTracerManager +from pipelex.graph.graphspec import IOSpec from pipelex.hub import get_required_pipe from pipelex.libraries.pipe.exceptions import PipeNotFoundError from pipelex.pipe_controllers.pipe_controller import PipeController @@ -178,6 +180,12 @@ async def _live_run_controller_pipe( output_stuff_contents[sub_pipe_output_name] = output_stuff.content log.verbose(f"PipeParallel '{self.code}': output_stuff_contents[{sub_pipe_output_name}]: {output_stuff_contents[sub_pipe_output_name]}") + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + if self.combined_output: combined_output_stuff = StuffFactory.combine_stuffs( concept=self.combined_output, @@ -250,6 +258,12 @@ async def _dry_run_controller_pipe( output_stuffs[sub_pipe_output_name] = output_stuff output_stuff_contents[sub_pipe_output_name] = output_stuff.content + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + # 4. Handle combined output if specified if self.combined_output: combined_output_stuff = StuffFactory.combine_stuffs( @@ -266,6 +280,43 @@ async def _dry_run_controller_pipe( pipeline_run_id=job_metadata.pipeline_run_id, ) + def _register_branch_outputs_with_graph_tracer( + self, + job_metadata: JobMetadata, + output_stuffs: dict[str, "Stuff"], + ) -> None: + """Register branch outputs with the graph tracer. + + This re-registers each branch output's stuff_code as produced by the PipeParallel + node, overriding the sub-pipe's registration so that DATA edges flow from + PipeParallel to downstream consumers. + + Args: + job_metadata: The job metadata containing graph context. + output_stuffs: Mapping of output_name to the branch output Stuff. + """ + graph_context = job_metadata.graph_context + if graph_context is None: + return + tracer_manager = GraphTracerManager.get_instance() + if tracer_manager is None or graph_context.parent_node_id is None: + return + for output_name_key, output_stuff in output_stuffs.items(): + output_spec = IOSpec( + name=output_name_key, + concept=output_stuff.concept.code, + content_type=output_stuff.content.content_type, + digest=output_stuff.stuff_code, + data=output_stuff.content.smart_dump() if graph_context.data_inclusion.stuff_json_content else None, + data_text=output_stuff.content.rendered_pretty_text() if graph_context.data_inclusion.stuff_text_content else None, + data_html=output_stuff.content.rendered_pretty_html() if graph_context.data_inclusion.stuff_html_content else None, + ) + tracer_manager.register_controller_output( + graph_id=graph_context.graph_id, + node_id=graph_context.parent_node_id, + output_spec=output_spec, + ) + @override async def _validate_before_run( self, job_metadata: JobMetadata, working_memory: WorkingMemory, pipe_run_params: PipeRunParams, output_name: str | None = None diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx new file mode 100644 index 000000000..bb5e18060 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_add_each.plx @@ -0,0 +1,59 @@ +domain = "test_parallel_graph_add_each" +description = "Test PipeParallel with add_each_output for graph edge verification" +main_pipe = "parallel_then_consume" + +[concept.ShortSummary] +description = "A brief one-sentence summary" +refines = "Text" + +[concept.DetailedSummary] +description = "A detailed multi-sentence summary" +refines = "Text" + +[pipe.parallel_then_consume] +type = "PipeSequence" +description = "Run parallel summaries then consume one downstream" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "parallel_summarize", result = "..." }, + { pipe = "combine_summaries" }, +] + +[pipe.parallel_summarize] +type = "PipeParallel" +description = "Generate short and detailed summaries in parallel" +inputs = { input_text = "Text" } +output = "Text" +add_each_output = true +branches = [ + { pipe = "summarize_short", result = "short_summary" }, + { pipe = "summarize_detailed", result = "detailed_summary" }, +] + +[pipe.summarize_short] +type = "PipeLLM" +description = "Generate a short one-sentence summary" +inputs = { input_text = "Text" } +output = "ShortSummary" +model = "$testing-text" +prompt = "Summarize in one sentence: @input_text.text" + +[pipe.summarize_detailed] +type = "PipeLLM" +description = "Generate a detailed summary" +inputs = { input_text = "Text" } +output = "DetailedSummary" +model = "$testing-text" +prompt = "Write a detailed summary of: @input_text.text" + +[pipe.combine_summaries] +type = "PipeLLM" +description = "Combine short and detailed summaries into a final result" +inputs = { short_summary = "ShortSummary", detailed_summary = "DetailedSummary" } +output = "Text" +model = "$testing-text" +prompt = """Combine these two summaries into a final result: + +Short: @short_summary.text +Detailed: @detailed_summary.text""" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx new file mode 100644 index 000000000..407092d52 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx @@ -0,0 +1,42 @@ +domain = "test_parallel_graph_combined" +description = "Test PipeParallel with combined_output for graph edge verification" +main_pipe = "pgc_parallel_analysis" + +[concept.PgcToneResult] +description = "Result of tone analysis" +refines = "Text" + +[concept.PgcLengthResult] +description = "Result of length analysis" +refines = "Text" + +[concept.PgcCombinedResult] +description = "Combined results from parallel analysis" + +[pipe.pgc_parallel_analysis] +type = "PipeParallel" +description = "Analyze tone and length in parallel with combined output" +inputs = { input_text = "Text" } +output = "PgcCombinedResult" +add_each_output = true +combined_output = "PgcCombinedResult" +branches = [ + { pipe = "pgc_analyze_tone", result = "tone_result" }, + { pipe = "pgc_analyze_length", result = "length_result" }, +] + +[pipe.pgc_analyze_tone] +type = "PipeLLM" +description = "Analyze the tone of the text" +inputs = { input_text = "Text" } +output = "PgcToneResult" +model = "$testing-text" +prompt = "Describe the tone of: @input_text.text" + +[pipe.pgc_analyze_length] +type = "PipeLLM" +description = "Analyze the length of the text" +inputs = { input_text = "Text" } +output = "PgcLengthResult" +model = "$testing-text" +prompt = "Describe the length characteristics of: @input_text.text" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py new file mode 100644 index 000000000..341225ff3 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py @@ -0,0 +1,11 @@ +from pydantic import Field + +from pipelex.core.stuffs.structured_content import StructuredContent +from pipelex.core.stuffs.text_content import TextContent + + +class PgcCombinedResult(StructuredContent): + """Combined results from parallel analysis branches.""" + + tone_result: TextContent = Field(..., description="Result of tone analysis") + length_result: TextContent = Field(..., description="Result of length analysis") diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py new file mode 100644 index 000000000..0db8eeab4 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py @@ -0,0 +1,54 @@ +"""Test data for PipeParallel graph tests.""" + +from typing import ClassVar + + +class ParallelAddEachGraphExpectations: + """Expected structure for the parallel_graph_add_each graph.""" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "parallel_then_consume", # PipeSequence (outer controller) + "parallel_summarize", # PipeParallel (parallel controller) + "summarize_short", # PipeLLM (branch 1) + "summarize_detailed", # PipeLLM (branch 2) + "combine_summaries", # PipeLLM (downstream consumer) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "parallel_then_consume": 1, + "parallel_summarize": 1, + "summarize_short": 1, + "summarize_detailed": 1, + "combine_summaries": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 4, # sequence->parallel, sequence->combine, parallel->short, parallel->detailed + "data": 2, # parallel->combine (short_summary), parallel->combine (detailed_summary) + } + + +class ParallelCombinedGraphExpectations: + """Expected structure for the parallel_graph_combined graph.""" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "pgc_parallel_analysis", # PipeParallel (parallel controller with combined_output) + "pgc_analyze_tone", # PipeLLM (branch 1) + "pgc_analyze_length", # PipeLLM (branch 2) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "pgc_parallel_analysis": 1, + "pgc_analyze_tone": 1, + "pgc_analyze_length": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 2, # parallel->tone, parallel->length + } diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py new file mode 100644 index 000000000..141860dd5 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py @@ -0,0 +1,271 @@ +"""E2E test for PipeParallel with graph tracing to verify DATA edges from controller to consumers.""" + +from collections import Counter +from pathlib import Path + +import pytest + +from pipelex import log, pretty_print +from pipelex.config import get_config +from pipelex.core.stuffs.text_content import TextContent +from pipelex.graph.graph_factory import generate_graph_outputs +from pipelex.graph.graphspec import GraphSpec, NodeSpec +from pipelex.pipe_run.pipe_run_mode import PipeRunMode +from pipelex.pipeline.execute import execute_pipeline +from pipelex.tools.misc.file_utils import get_incremental_directory_path, save_text_to_path +from tests.conftest import TEST_OUTPUTS_DIR +from tests.e2e.pipelex.pipes.pipe_controller.pipe_parallel.test_data import ( + ParallelAddEachGraphExpectations, + ParallelCombinedGraphExpectations, +) + + +def _get_next_output_folder(subfolder: str) -> Path: + """Get the next numbered output folder for parallel graph outputs.""" + base_dir = str(Path(TEST_OUTPUTS_DIR) / f"pipe_parallel_graph_{subfolder}") + return Path(get_incremental_directory_path(base_dir, "run")) + + +@pytest.mark.dry_runnable +@pytest.mark.llm +@pytest.mark.inference +@pytest.mark.asyncio(loop_scope="class") +class TestPipeParallelGraph: + """E2E tests for PipeParallel graph generation with correct DATA edges.""" + + async def test_parallel_add_each_output_graph(self, pipe_run_mode: PipeRunMode): + """Verify PipeParallel with add_each_output generates correct DATA edges. + + This test runs a PipeSequence containing: + 1. PipeParallel (add_each_output=true) that produces short_summary and detailed_summary + 2. A downstream PipeLLM (combine_summaries) that consumes both branch outputs + + Expected: DATA edges flow from PipeParallel to combine_summaries (not from sub-pipes). + """ + # Build config with graph tracing and all graph outputs enabled + base_config = get_config().pipelex.pipeline_execution_config + exec_config = base_config.with_graph_config_overrides( + generate_graph=True, + force_include_full_data=False, + ) + graph_config = exec_config.graph_config.model_copy( + update={ + "graphs_inclusion": exec_config.graph_config.graphs_inclusion.model_copy( + update={ + "graphspec_json": True, + "mermaidflow_html": True, + "reactflow_html": True, + } + ) + } + ) + exec_config = exec_config.model_copy(update={"graph_config": graph_config}) + + # Run pipeline with input text + pipe_output = await execute_pipeline( + pipe_code="parallel_then_consume", + library_dirs=["tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel"], + inputs={ + "input_text": TextContent(text="The quick brown fox jumps over the lazy dog. This is a sample text for testing parallel processing.") + }, + pipe_run_mode=pipe_run_mode, + execution_config=exec_config, + ) + + # Basic assertions + assert pipe_output is not None + assert pipe_output.working_memory is not None + assert pipe_output.main_stuff is not None + + # Verify graph was generated + graph_spec = pipe_output.graph_spec + assert graph_spec is not None, "GraphSpec should be populated when generate_graph=True" + assert isinstance(graph_spec, GraphSpec) + assert len(graph_spec.nodes) > 0, "Graph should have nodes" + assert len(graph_spec.edges) > 0, "Graph should have edges" + + log.info(f"Parallel add_each graph: {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") + + # Build node lookup + nodes_by_id: dict[str, NodeSpec] = {node.node_id: node for node in graph_spec.nodes} + nodes_by_pipe_code: dict[str, list[NodeSpec]] = {} + for node in graph_spec.nodes: + if node.pipe_code: + nodes_by_pipe_code.setdefault(node.pipe_code, []).append(node) + + # 1. Verify all expected pipe_codes exist + actual_pipe_codes = set(nodes_by_pipe_code.keys()) + assert actual_pipe_codes == ParallelAddEachGraphExpectations.EXPECTED_PIPE_CODES, ( + f"Unexpected pipe codes. Expected: {ParallelAddEachGraphExpectations.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" + ) + + # 2. Verify node counts per pipe_code + for pipe_code, expected_count in ParallelAddEachGraphExpectations.EXPECTED_NODE_COUNTS.items(): + actual_count = len(nodes_by_pipe_code.get(pipe_code, [])) + assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{pipe_code}', got {actual_count}" + + # 3. Verify edge counts by kind + actual_edge_counts = Counter(str(edge.kind) for edge in graph_spec.edges) + for kind, expected_count in ParallelAddEachGraphExpectations.EXPECTED_EDGE_COUNTS.items(): + actual_count = actual_edge_counts.get(kind, 0) + assert actual_count == expected_count, f"Expected {expected_count} edges of kind '{kind}', got {actual_count}" + + # 4. Verify DATA edges source from PipeParallel, not from sub-pipes + parallel_node = nodes_by_pipe_code["parallel_summarize"][0] + combine_node = nodes_by_pipe_code["combine_summaries"][0] + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + + for edge in data_edges: + # DATA edges targeting combine_summaries should come from PipeParallel + if edge.target == combine_node.node_id: + assert edge.source == parallel_node.node_id, ( + f"DATA edge to combine_summaries should come from PipeParallel '{parallel_node.node_id}', " + f"but comes from '{edge.source}' (pipe_code: '{nodes_by_id[edge.source].pipe_code}')" + ) + + # 5. Verify PipeParallel node has output specs for both branch outputs + assert len(parallel_node.node_io.outputs) >= 2, ( + f"PipeParallel should have at least 2 output specs (branch outputs), got {len(parallel_node.node_io.outputs)}" + ) + output_names = {output.name for output in parallel_node.node_io.outputs} + assert "short_summary" in output_names, "PipeParallel should have 'short_summary' output" + assert "detailed_summary" in output_names, "PipeParallel should have 'detailed_summary' output" + + # 6. Verify containment: sub-pipes are inside PipeParallel + contains_edges = [edge for edge in graph_spec.edges if edge.kind.is_contains] + parallel_children = {edge.target for edge in contains_edges if edge.source == parallel_node.node_id} + branch_pipe_codes = {"summarize_short", "summarize_detailed"} + branch_node_ids = {node.node_id for pipe_code in branch_pipe_codes for node in nodes_by_pipe_code.get(pipe_code, [])} + assert branch_node_ids.issubset(parallel_children), ( + f"Branch nodes should be children of PipeParallel. Branch IDs: {branch_node_ids}, Parallel children: {parallel_children}" + ) + + # Generate and save graph outputs + graph_outputs = await generate_graph_outputs( + graph_spec=graph_spec, + graph_config=graph_config, + pipe_code="parallel_then_consume", + ) + + output_dir = _get_next_output_folder("add_each") + if graph_outputs.graphspec_json: + save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) + if graph_outputs.mermaidflow_html: + save_text_to_path(graph_outputs.mermaidflow_html, str(output_dir / "mermaidflow.html")) + if graph_outputs.reactflow_html: + save_text_to_path(graph_outputs.reactflow_html, str(output_dir / "reactflow.html")) + + pretty_print( + { + "graph_id": graph_spec.graph_id, + "nodes": len(graph_spec.nodes), + "edges": len(graph_spec.edges), + "edges_by_kind": dict(actual_edge_counts), + "output_dir": str(output_dir), + }, + title="Parallel Add Each Graph Outputs", + ) + + log.info("Structural validation passed: DATA edges correctly source from PipeParallel") + + async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): + """Verify PipeParallel with combined_output generates correct graph structure. + + This test runs a PipeParallel with both add_each_output and combined_output. + Expected: PipeParallel node has branch outputs + combined output in its output specs. + """ + # Build config with graph tracing + base_config = get_config().pipelex.pipeline_execution_config + exec_config = base_config.with_graph_config_overrides( + generate_graph=True, + force_include_full_data=False, + ) + graph_config = exec_config.graph_config.model_copy( + update={ + "graphs_inclusion": exec_config.graph_config.graphs_inclusion.model_copy( + update={ + "graphspec_json": True, + "reactflow_html": True, + } + ) + } + ) + exec_config = exec_config.model_copy(update={"graph_config": graph_config}) + + # Run pipeline + pipe_output = await execute_pipeline( + pipe_code="pgc_parallel_analysis", + library_dirs=["tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel"], + inputs={"input_text": TextContent(text="Hello world, this is a test document for parallel analysis.")}, + pipe_run_mode=pipe_run_mode, + execution_config=exec_config, + ) + + assert pipe_output is not None + assert pipe_output.main_stuff is not None + + # Verify graph + graph_spec = pipe_output.graph_spec + assert graph_spec is not None + assert isinstance(graph_spec, GraphSpec) + + log.info(f"Parallel combined graph: {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") + + # Build node lookup + nodes_by_pipe_code: dict[str, list[NodeSpec]] = {} + for node in graph_spec.nodes: + if node.pipe_code: + nodes_by_pipe_code.setdefault(node.pipe_code, []).append(node) + + # 1. Verify all expected pipe_codes exist + actual_pipe_codes = set(nodes_by_pipe_code.keys()) + assert actual_pipe_codes == ParallelCombinedGraphExpectations.EXPECTED_PIPE_CODES, ( + f"Unexpected pipe codes. Expected: {ParallelCombinedGraphExpectations.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" + ) + + # 2. Verify node counts per pipe_code + for pipe_code, expected_count in ParallelCombinedGraphExpectations.EXPECTED_NODE_COUNTS.items(): + actual_count = len(nodes_by_pipe_code.get(pipe_code, [])) + assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{pipe_code}', got {actual_count}" + + # 3. Verify edge counts by kind + actual_edge_counts = Counter(str(edge.kind) for edge in graph_spec.edges) + for kind, expected_count in ParallelCombinedGraphExpectations.EXPECTED_EDGE_COUNTS.items(): + actual_count = actual_edge_counts.get(kind, 0) + assert actual_count == expected_count, f"Expected {expected_count} edges of kind '{kind}', got {actual_count}" + + # 4. Verify PipeParallel node has outputs (branch outputs + combined output) + parallel_node = nodes_by_pipe_code["pgc_parallel_analysis"][0] + assert len(parallel_node.node_io.outputs) >= 2, ( + f"PipeParallel with combined_output should have at least 2 output specs (branch outputs), got {len(parallel_node.node_io.outputs)}" + ) + output_names = {output.name for output in parallel_node.node_io.outputs} + assert "tone_result" in output_names, "PipeParallel should have 'tone_result' output" + assert "length_result" in output_names, "PipeParallel should have 'length_result' output" + + # Generate and save graph outputs + graph_outputs = await generate_graph_outputs( + graph_spec=graph_spec, + graph_config=graph_config, + pipe_code="pgc_parallel_analysis", + ) + + output_dir = _get_next_output_folder("combined") + if graph_outputs.graphspec_json: + save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) + if graph_outputs.reactflow_html: + save_text_to_path(graph_outputs.reactflow_html, str(output_dir / "reactflow.html")) + + pretty_print( + { + "graph_id": graph_spec.graph_id, + "nodes": len(graph_spec.nodes), + "edges": len(graph_spec.edges), + "edges_by_kind": dict(actual_edge_counts), + "parallel_outputs": [output.name for output in parallel_node.node_io.outputs], + "output_dir": str(output_dir), + }, + title="Parallel Combined Graph Outputs", + ) + + log.info("Structural validation passed: PipeParallel combined_output graph is correct") diff --git a/tests/unit/pipelex/graph/test_graph_tracer.py b/tests/unit/pipelex/graph/test_graph_tracer.py index 50bdeb7eb..82fb0b975 100644 --- a/tests/unit/pipelex/graph/test_graph_tracer.py +++ b/tests/unit/pipelex/graph/test_graph_tracer.py @@ -872,3 +872,258 @@ def test_batch_aggregate_edges_contain_stuff_digests(self) -> None: edge = batch_aggregate_edges[0] assert edge.source_stuff_digest == "item_result_digest" assert edge.target_stuff_digest == "output_list_digest" + + def test_register_controller_output(self) -> None: + """Test that register_controller_output adds to output_specs and _stuff_producer_map. + + When a controller explicitly registers outputs, DATA edges should go from + the controller node to consumers of those outputs. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="controller-output-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Controller node (e.g., PipeParallel) + controller_id, ctrl_ctx = tracer.on_pipe_start( + graph_context=context, + pipe_code="my_parallel", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at, + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + + # Branch 1: produces output with digest "branch_output_1" + branch1_id, _ = tracer.on_pipe_start( + graph_context=ctrl_ctx, + pipe_code="branch_pipe_1", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=10), + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + tracer.on_pipe_end_success( + node_id=branch1_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="short_summary", concept="Text", digest="branch_output_1"), + ) + + # Branch 2: produces output with digest "branch_output_2" + branch2_id, _ = tracer.on_pipe_start( + graph_context=ctrl_ctx, + pipe_code="branch_pipe_2", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=10), + input_specs=[IOSpec(name="input_text", concept="Text", digest="input_digest")], + ) + tracer.on_pipe_end_success( + node_id=branch2_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="long_summary", concept="Text", digest="branch_output_2"), + ) + + # Controller registers branch outputs (overriding sub-pipe registrations) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="short_summary", concept="Text", digest="branch_output_1"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="long_summary", concept="Text", digest="branch_output_2"), + ) + + # Consumer pipe that uses branch_output_1 + consumer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_pipe", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=60), + input_specs=[IOSpec(name="summary", concept="Text", digest="branch_output_1")], + ) + tracer.on_pipe_end_success( + node_id=consumer_id, + ended_at=started_at + timedelta(milliseconds=100), + ) + + # End controller + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=110), + ) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Verify controller node has 2 output specs + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 2 + output_names = {output.name for output in controller_node.node_io.outputs} + assert output_names == {"short_summary", "long_summary"} + + # Verify DATA edge goes from controller (not branch) to consumer + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + controller_to_consumer = [edge for edge in data_edges if edge.target == consumer_id] + assert len(controller_to_consumer) == 1 + assert controller_to_consumer[0].source == controller_id + + def test_passthrough_output_skipped(self) -> None: + """Test that on_pipe_end_success skips output registration when output matches an input. + + When a controller's main_stuff is unchanged from one of its inputs (pass-through), + the output should not be registered to avoid corrupting data edges. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="passthrough-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Producer pipe creates stuff with digest "original_stuff" + producer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="producer", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at, + ) + tracer.on_pipe_end_success( + node_id=producer_id, + ended_at=started_at + timedelta(milliseconds=50), + output_spec=IOSpec(name="output", concept="Text", digest="original_stuff"), + ) + + # Controller consumes "original_stuff" and its main_stuff is the same + controller_id, _ctrl_ctx = tracer.on_pipe_start( + graph_context=context, + pipe_code="my_parallel", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at + timedelta(milliseconds=60), + input_specs=[IOSpec(name="input_text", concept="Text", digest="original_stuff")], + ) + + # Controller ends with the same digest as its input (pass-through) + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=100), + output_spec=IOSpec(name="input_text", concept="Text", digest="original_stuff"), + ) + + # Consumer should still get the edge from the original producer, not the controller + consumer_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=110), + input_specs=[IOSpec(name="input", concept="Text", digest="original_stuff")], + ) + tracer.on_pipe_end_success( + node_id=consumer_id, + ended_at=started_at + timedelta(milliseconds=150), + ) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Controller should have NO outputs (pass-through was skipped) + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 0 + + # DATA edges should go from producer to both controller (as input) and consumer + # The controller does NOT steal the producer registration (pass-through skipped) + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + assert len(data_edges) == 2 + assert all(edge.source == producer_id for edge in data_edges) + targets = {edge.target for edge in data_edges} + assert targets == {controller_id, consumer_id} + + def test_multiple_output_specs(self) -> None: + """Test that a node can have multiple outputs via register_controller_output. + + All registered outputs should produce correct DATA edges to their consumers. + """ + tracer = GraphTracer() + context = tracer.setup(graph_id="multi-output-test", data_inclusion=make_defaulted_data_inclusion_config()) + + started_at = datetime.now(timezone.utc) + + # Controller with multiple outputs + controller_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="multi_output_pipe", + pipe_type="PipeParallel", + node_kind=NodeKind.CONTROLLER, + started_at=started_at, + ) + + # Register three different outputs + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_a", concept="Text", digest="digest_a"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_b", concept="Text", digest="digest_b"), + ) + tracer.register_controller_output( + node_id=controller_id, + output_spec=IOSpec(name="output_c", concept="Text", digest="digest_c"), + ) + + tracer.on_pipe_end_success( + node_id=controller_id, + ended_at=started_at + timedelta(milliseconds=100), + ) + + # Consumer A reads digest_a + consumer_a_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_a", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=110), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_a")], + ) + tracer.on_pipe_end_success(node_id=consumer_a_id, ended_at=started_at + timedelta(milliseconds=120)) + + # Consumer B reads digest_b + consumer_b_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_b", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=130), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_b")], + ) + tracer.on_pipe_end_success(node_id=consumer_b_id, ended_at=started_at + timedelta(milliseconds=140)) + + # Consumer C reads digest_c + consumer_c_id, _ = tracer.on_pipe_start( + graph_context=context, + pipe_code="consumer_c", + pipe_type="PipeLLM", + node_kind=NodeKind.OPERATOR, + started_at=started_at + timedelta(milliseconds=150), + input_specs=[IOSpec(name="input", concept="Text", digest="digest_c")], + ) + tracer.on_pipe_end_success(node_id=consumer_c_id, ended_at=started_at + timedelta(milliseconds=160)) + + graph_spec = tracer.teardown() + + assert graph_spec is not None + + # Controller should have 3 output specs + controller_node = next(node for node in graph_spec.nodes if node.node_id == controller_id) + assert len(controller_node.node_io.outputs) == 3 + + # 3 DATA edges: controller -> consumer_a, controller -> consumer_b, controller -> consumer_c + data_edges = [edge for edge in graph_spec.edges if edge.kind.is_data] + assert len(data_edges) == 3 + assert all(edge.source == controller_id for edge in data_edges) + targets = {edge.target for edge in data_edges} + assert targets == {consumer_a_id, consumer_b_id, consumer_c_id} From 62f76c80f8608ea2cb52371d3d6dc7ae536f3b02 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 20:32:00 +0100 Subject: [PATCH 4/8] Add PARALLEL_COMBINE edge kind for PipeParallel combined output visualization Introduces a new EdgeKind.PARALLEL_COMBINE to show how individual branch outputs are merged into the combined result in PipeParallel, analogous to BATCH_AGGREGATE for PipeBatch. The edges render as purple dashed lines in ReactFlow and Mermaid views. The graph tracer snapshots original branch producers before register_controller_output overrides the producer map, ensuring correct edge source resolution during teardown. Co-Authored-By: Claude Opus 4.6 --- pipelex/graph/graph_tracer.py | 54 ++++++++++++++ pipelex/graph/graph_tracer_manager.py | 24 ++++++ pipelex/graph/graph_tracer_protocol.py | 27 +++++++ pipelex/graph/graphspec.py | 40 ++++++++-- .../graph/mermaidflow/mermaidflow_factory.py | 12 +++ .../reactflow/templates/_scripts.js.jinja2 | 27 +++++++ .../reactflow/templates/_styles.css.jinja2 | 4 + .../graph/reactflow/viewspec_transformer.py | 2 + .../parallel/pipe_parallel.py | 73 ++++++++++++++++--- .../pipe_parallel/test_data.py | 1 + .../pipe_parallel/test_pipe_parallel_graph.py | 10 +++ 11 files changed, 257 insertions(+), 17 deletions(-) diff --git a/pipelex/graph/graph_tracer.py b/pipelex/graph/graph_tracer.py index da0676052..125273799 100644 --- a/pipelex/graph/graph_tracer.py +++ b/pipelex/graph/graph_tracer.py @@ -108,6 +108,11 @@ def __init__(self) -> None: # The batch_controller_node_id is tracked to ensure BATCH_AGGREGATE edges target the correct node # (the PipeBatch), not a parent controller that may later register as producer of the same stuff self._batch_aggregate_map: dict[str, tuple[str | None, list[tuple[str, int]]]] = {} + # Maps combined_stuff_code -> (parallel_controller_node_id, [(branch_stuff_code, branch_producer_node_id)]) + # Used to create PARALLEL_COMBINE edges from branch outputs to combined output + # The branch_producer_node_id is snapshotted at registration time, before register_controller_output + # overrides _stuff_producer_map to point branch stuff codes to the controller node + self._parallel_combine_map: dict[str, tuple[str, list[tuple[str, str]]]] = {} @property def is_active(self) -> bool: @@ -137,6 +142,7 @@ def setup( self._stuff_producer_map = {} self._batch_item_map = {} self._batch_aggregate_map = {} + self._parallel_combine_map = {} return GraphContext( graph_id=graph_id, @@ -162,6 +168,7 @@ def teardown(self) -> GraphSpec | None: self._generate_data_edges() self._generate_batch_item_edges() self._generate_batch_aggregate_edges() + self._generate_parallel_combine_edges() self._is_active = False @@ -185,6 +192,7 @@ def teardown(self) -> GraphSpec | None: self._stuff_producer_map = {} self._batch_item_map = {} self._batch_aggregate_map = {} + self._parallel_combine_map = {} return graph @@ -292,6 +300,26 @@ def _generate_batch_aggregate_edges(self) -> None: target_stuff_digest=output_list_stuff_code, ) + def _generate_parallel_combine_edges(self) -> None: + """Generate PARALLEL_COMBINE edges from branch output stuff nodes to the combined output stuff node. + + For each registered parallel combine, create edges from each branch output + to the combined output, showing how individual branch results are merged. + + Uses snapshotted branch producer node IDs captured during register_parallel_combine, + before register_controller_output overrides _stuff_producer_map. + """ + for combined_stuff_code, (parallel_controller_node_id, branch_entries) in self._parallel_combine_map.items(): + for branch_stuff_code, branch_producer_id in branch_entries: + if branch_producer_id != parallel_controller_node_id: + self.add_edge( + source_node_id=branch_producer_id, + target_node_id=parallel_controller_node_id, + edge_kind=EdgeKind.PARALLEL_COMBINE, + source_stuff_digest=branch_stuff_code, + target_stuff_digest=combined_stuff_code, + ) + @override def register_batch_item_extraction( self, @@ -346,6 +374,32 @@ def register_batch_aggregation( # Note: We keep the first batch_controller_node_id registered for this output list # (all items for the same output list should come from the same batch controller) + @override + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Args: + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + if not self._is_active: + return + # Snapshot the current branch producers from _stuff_producer_map before + # register_controller_output overrides them to point to the controller node. + # This must be called BEFORE _register_branch_outputs_with_graph_tracer. + branch_entries: list[tuple[str, str]] = [] + for branch_code in branch_stuff_codes: + producer_id = self._stuff_producer_map.get(branch_code) + if producer_id: + branch_entries.append((branch_code, producer_id)) + self._parallel_combine_map[combined_stuff_code] = (parallel_controller_node_id, branch_entries) + @override def on_pipe_start( self, diff --git a/pipelex/graph/graph_tracer_manager.py b/pipelex/graph/graph_tracer_manager.py index 4078b1c0c..3b4d770df 100644 --- a/pipelex/graph/graph_tracer_manager.py +++ b/pipelex/graph/graph_tracer_manager.py @@ -374,3 +374,27 @@ def register_batch_aggregation( item_index=item_index, batch_controller_node_id=batch_controller_node_id, ) + + def register_parallel_combine( + self, + graph_id: str, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Args: + graph_id: The graph identifier. + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + tracer = self._get_tracer(graph_id) + if tracer is None: + return + tracer.register_parallel_combine( + combined_stuff_code=combined_stuff_code, + branch_stuff_codes=branch_stuff_codes, + parallel_controller_node_id=parallel_controller_node_id, + ) diff --git a/pipelex/graph/graph_tracer_protocol.py b/pipelex/graph/graph_tracer_protocol.py index cdf924975..adbb217a8 100644 --- a/pipelex/graph/graph_tracer_protocol.py +++ b/pipelex/graph/graph_tracer_protocol.py @@ -179,6 +179,24 @@ def register_batch_aggregation( """ ... + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + """Register that branch outputs are combined into a single output in PipeParallel. + + Creates PARALLEL_COMBINE edges from each branch output stuff node + to the combined output stuff node. + + Args: + combined_stuff_code: The stuff_code of the combined output. + branch_stuff_codes: The stuff_codes of the individual branch outputs. + parallel_controller_node_id: The node_id of the PipeParallel controller. + """ + ... + class GraphTracerNoOp(GraphTracerProtocol): """No-operation implementation of GraphTracerProtocol. @@ -278,3 +296,12 @@ def register_batch_aggregation( batch_controller_node_id: str | None = None, ) -> None: pass + + @override + def register_parallel_combine( + self, + combined_stuff_code: str, + branch_stuff_codes: list[str], + parallel_controller_node_id: str, + ) -> None: + pass diff --git a/pipelex/graph/graphspec.py b/pipelex/graph/graphspec.py index a7d4c440c..66892c827 100644 --- a/pipelex/graph/graphspec.py +++ b/pipelex/graph/graphspec.py @@ -49,13 +49,21 @@ class EdgeKind(StrEnum): SELECTED_OUTCOME = "selected_outcome" BATCH_ITEM = "batch_item" # list → item extraction during batch iteration BATCH_AGGREGATE = "batch_aggregate" # items → output list aggregation + PARALLEL_COMBINE = "parallel_combine" # branch outputs → combined output in PipeParallel @property def is_data(self) -> bool: match self: case EdgeKind.DATA: return True - case EdgeKind.CONTROL | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.CONTAINS + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_ITEM + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -63,7 +71,14 @@ def is_contains(self) -> bool: match self: case EdgeKind.CONTAINS: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.DATA + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_ITEM + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -71,7 +86,7 @@ def is_selected_outcome(self) -> bool: match self: case EdgeKind.SELECTED_OUTCOME: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE | EdgeKind.PARALLEL_COMBINE: return False @property @@ -79,7 +94,14 @@ def is_batch_item(self) -> bool: match self: case EdgeKind.BATCH_ITEM: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_AGGREGATE: + case ( + EdgeKind.CONTROL + | EdgeKind.DATA + | EdgeKind.CONTAINS + | EdgeKind.SELECTED_OUTCOME + | EdgeKind.BATCH_AGGREGATE + | EdgeKind.PARALLEL_COMBINE + ): return False @property @@ -87,7 +109,15 @@ def is_batch_aggregate(self) -> bool: match self: case EdgeKind.BATCH_AGGREGATE: return True - case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM: + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.PARALLEL_COMBINE: + return False + + @property + def is_parallel_combine(self) -> bool: + match self: + case EdgeKind.PARALLEL_COMBINE: + return True + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME | EdgeKind.BATCH_ITEM | EdgeKind.BATCH_AGGREGATE: return False diff --git a/pipelex/graph/mermaidflow/mermaidflow_factory.py b/pipelex/graph/mermaidflow/mermaidflow_factory.py index b84a723fb..311257f55 100644 --- a/pipelex/graph/mermaidflow/mermaidflow_factory.py +++ b/pipelex/graph/mermaidflow/mermaidflow_factory.py @@ -241,6 +241,18 @@ def make_from_graphspec( label = edge.label or "" lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + # Render parallel combine edges (branch outputs → combined output) with dashed styling + parallel_combine_edges = [edge for edge in graph.edges if edge.kind.is_parallel_combine] + if parallel_combine_edges: + lines.append("") + lines.append(" %% Parallel combine edges: branch outputs → combined output") + for edge in parallel_combine_edges: + source_mermaid_id = id_mapping.get(edge.source) + target_mermaid_id = id_mapping.get(edge.target) + if source_mermaid_id and target_mermaid_id: + label = edge.label or "" + lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + # Style definitions lines.append("") lines.append(" %% Style definitions") diff --git a/pipelex/graph/reactflow/templates/_scripts.js.jinja2 b/pipelex/graph/reactflow/templates/_scripts.js.jinja2 index 5a8d550a1..ecb59bdc7 100644 --- a/pipelex/graph/reactflow/templates/_scripts.js.jinja2 +++ b/pipelex/graph/reactflow/templates/_scripts.js.jinja2 @@ -435,6 +435,33 @@ function buildDataflowGraph(graphspec, analysis) { // batch_item: batch_controller → stuff_item, batch_aggregate: stuff_item → batch_controller // (showBatchController is declared earlier in the function) + // Create PARALLEL_COMBINE edges from GraphSpec + // These show branch outputs flowing into the combined output + for (const edge of graphspec.edges) { + if (edge.kind !== 'parallel_combine') continue; + + if (!edge.source_stuff_digest || !edge.target_stuff_digest) continue; + const sourceId = `stuff_${edge.source_stuff_digest}`; + const targetId = `stuff_${edge.target_stuff_digest}`; + + edges.push({ + id: edge.id, + source: sourceId, + target: targetId, + type: {{ edge_type | tojson }}, + animated: false, + style: { + stroke: 'var(--color-parallel-combine)', + strokeWidth: 2, + strokeDasharray: '5,5', + }, + markerEnd: { + type: MarkerType?.ArrowClosed || 'arrowclosed', + color: 'var(--color-parallel-combine)', + }, + }); + } + for (const edge of graphspec.edges) { if (edge.kind !== 'batch_item' && edge.kind !== 'batch_aggregate') { continue; diff --git a/pipelex/graph/reactflow/templates/_styles.css.jinja2 b/pipelex/graph/reactflow/templates/_styles.css.jinja2 index f75fddf7e..4b33a060e 100644 --- a/pipelex/graph/reactflow/templates/_styles.css.jinja2 +++ b/pipelex/graph/reactflow/templates/_styles.css.jinja2 @@ -27,6 +27,7 @@ --color-edge: #3b82f6; --color-batch-item: #a855f7; --color-batch-aggregate: #22c55e; + --color-parallel-combine: #c084fc; --font-sans: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; --font-mono: 'JetBrains Mono', 'Monaco', 'Menlo', monospace; --radius-sm: 4px; @@ -66,6 +67,7 @@ --color-edge: #3b82f6; --color-batch-item: #9333ea; --color-batch-aggregate: #16a34a; + --color-parallel-combine: #a855f7; --shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05); --shadow-md: 0 4px 12px rgba(0, 0, 0, 0.1); --shadow-lg: 0 8px 24px rgba(0, 0, 0, 0.15); @@ -100,6 +102,7 @@ --color-edge: #3b82f6; --color-batch-item: #9333ea; --color-batch-aggregate: #16a34a; + --color-parallel-combine: #a855f7; --shadow-sm: 0 1px 2px rgba(0, 0, 0, 0.05); --shadow-md: 0 4px 12px rgba(0, 0, 0, 0.1); --shadow-lg: 0 8px 24px rgba(0, 0, 0, 0.15); @@ -127,6 +130,7 @@ --color-edge: #FFFACD; --color-batch-item: #bd93f9; --color-batch-aggregate: #50fa7b; + --color-parallel-combine: #d6a4ff; /* Status colors */ --color-success: #50FA7B; /* Bright Green */ diff --git a/pipelex/graph/reactflow/viewspec_transformer.py b/pipelex/graph/reactflow/viewspec_transformer.py index 0af3ba5ad..e391dffbc 100644 --- a/pipelex/graph/reactflow/viewspec_transformer.py +++ b/pipelex/graph/reactflow/viewspec_transformer.py @@ -63,6 +63,8 @@ def _map_edge_kind_to_view_type(kind: EdgeKind) -> str: return "batch_item" case EdgeKind.BATCH_AGGREGATE: return "batch_aggregate" + case EdgeKind.PARALLEL_COMBINE: + return "parallel_combine" def _build_node_label(node_spec: Any) -> str: diff --git a/pipelex/pipe_controllers/parallel/pipe_parallel.py b/pipelex/pipe_controllers/parallel/pipe_parallel.py index 16c2fc5d9..5dcc78ff5 100644 --- a/pipelex/pipe_controllers/parallel/pipe_parallel.py +++ b/pipelex/pipe_controllers/parallel/pipe_parallel.py @@ -180,12 +180,6 @@ async def _live_run_controller_pipe( output_stuff_contents[sub_pipe_output_name] = output_stuff.content log.verbose(f"PipeParallel '{self.code}': output_stuff_contents[{sub_pipe_output_name}]: {output_stuff_contents[sub_pipe_output_name]}") - # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers - self._register_branch_outputs_with_graph_tracer( - job_metadata=job_metadata, - output_stuffs=output_stuffs, - ) - if self.combined_output: combined_output_stuff = StuffFactory.combine_stuffs( concept=self.combined_output, @@ -197,6 +191,21 @@ async def _live_run_controller_pipe( name=output_name, ) + # Register parallel combine edges BEFORE register_branch_outputs, because + # register_parallel_combine snapshots the original branch producers from + # _stuff_producer_map before register_controller_output overrides them + self._register_parallel_combine_with_graph_tracer( + job_metadata=job_metadata, + combined_stuff=combined_output_stuff, + branch_stuffs=output_stuffs, + ) + + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + return PipeOutput( working_memory=working_memory, pipeline_run_id=job_metadata.pipeline_run_id, @@ -258,12 +267,6 @@ async def _dry_run_controller_pipe( output_stuffs[sub_pipe_output_name] = output_stuff output_stuff_contents[sub_pipe_output_name] = output_stuff.content - # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers - self._register_branch_outputs_with_graph_tracer( - job_metadata=job_metadata, - output_stuffs=output_stuffs, - ) - # 4. Handle combined output if specified if self.combined_output: combined_output_stuff = StuffFactory.combine_stuffs( @@ -275,6 +278,22 @@ async def _dry_run_controller_pipe( stuff=combined_output_stuff, name=output_name, ) + + # Register parallel combine edges BEFORE register_branch_outputs, because + # register_parallel_combine snapshots the original branch producers from + # _stuff_producer_map before register_controller_output overrides them + self._register_parallel_combine_with_graph_tracer( + job_metadata=job_metadata, + combined_stuff=combined_output_stuff, + branch_stuffs=output_stuffs, + ) + + # Register branch outputs with graph tracer so DATA edges flow from PipeParallel to downstream consumers + self._register_branch_outputs_with_graph_tracer( + job_metadata=job_metadata, + output_stuffs=output_stuffs, + ) + return PipeOutput( working_memory=working_memory, pipeline_run_id=job_metadata.pipeline_run_id, @@ -317,6 +336,36 @@ def _register_branch_outputs_with_graph_tracer( output_spec=output_spec, ) + def _register_parallel_combine_with_graph_tracer( + self, + job_metadata: JobMetadata, + combined_stuff: "Stuff", + branch_stuffs: dict[str, "Stuff"], + ) -> None: + """Register parallel combine edges (branch outputs → combined output). + + Creates PARALLEL_COMBINE edges showing how individual branch results + are merged into the combined output. + + Args: + job_metadata: The job metadata containing graph context. + combined_stuff: The combined output Stuff. + branch_stuffs: Mapping of output_name to the branch output Stuff. + """ + graph_context = job_metadata.graph_context + if graph_context is None: + return + tracer_manager = GraphTracerManager.get_instance() + if tracer_manager is None or graph_context.parent_node_id is None: + return + branch_stuff_codes = [stuff.stuff_code for stuff in branch_stuffs.values()] + tracer_manager.register_parallel_combine( + graph_id=graph_context.graph_id, + combined_stuff_code=combined_stuff.stuff_code, + branch_stuff_codes=branch_stuff_codes, + parallel_controller_node_id=graph_context.parent_node_id, + ) + @override async def _validate_before_run( self, job_metadata: JobMetadata, working_memory: WorkingMemory, pipe_run_params: PipeRunParams, output_name: str | None = None diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py index 0db8eeab4..70609d4ee 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py @@ -51,4 +51,5 @@ class ParallelCombinedGraphExpectations: # Expected number of edges by kind EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { "contains": 2, # parallel->tone, parallel->length + "parallel_combine": 2, # tone_result->combined, length_result->combined } diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py index 141860dd5..19cce9801 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py @@ -243,6 +243,16 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): assert "tone_result" in output_names, "PipeParallel should have 'tone_result' output" assert "length_result" in output_names, "PipeParallel should have 'length_result' output" + # 5. Verify PARALLEL_COMBINE edges connect branch producers to the PipeParallel node + parallel_combine_edges = [edge for edge in graph_spec.edges if edge.kind.is_parallel_combine] + assert len(parallel_combine_edges) == 2, f"Expected 2 PARALLEL_COMBINE edges (one per branch), got {len(parallel_combine_edges)}" + for edge in parallel_combine_edges: + assert edge.target == parallel_node.node_id, ( + f"PARALLEL_COMBINE edge target should be PipeParallel '{parallel_node.node_id}', got '{edge.target}'" + ) + assert edge.source_stuff_digest is not None, "PARALLEL_COMBINE edge should have source_stuff_digest" + assert edge.target_stuff_digest is not None, "PARALLEL_COMBINE edge should have target_stuff_digest" + # Generate and save graph outputs graph_outputs = await generate_graph_outputs( graph_spec=graph_spec, From ad63b296ab4ef431e5933d16e8e24ea365ad50b9 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 21:11:02 +0100 Subject: [PATCH 5/8] Improve PipeParallel graph E2E tests with PipeSequence wrapper and 3-branch variant Wrap the combined_output test in a PipeSequence with a follow-up PipeLLM that consumes the combined result, making the graph more realistic. Add a new 3-branch PipeParallel test with selective downstream consumption where 2 branches are consumed and 1 is unused. Parametrize the test to cover both variants. Co-Authored-By: Claude Opus 4.6 --- .../pipe_parallel/parallel_graph_3branch.plx | 82 +++++++++++++++++++ .../pipe_parallel/parallel_graph_combined.plx | 22 ++++- .../pipe_parallel/parallel_graph_models.py | 8 ++ .../pipe_parallel/test_data.py | 57 ++++++++++++- .../pipe_parallel/test_pipe_parallel_graph.py | 63 ++++++++------ 5 files changed, 201 insertions(+), 31 deletions(-) create mode 100644 tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx new file mode 100644 index 000000000..d1fe6c478 --- /dev/null +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_3branch.plx @@ -0,0 +1,82 @@ +domain = "test_parallel_graph_3branch" +description = "Test 3-branch PipeParallel with selective downstream consumption" +main_pipe = "pg3_sequence" + +[concept.Pg3ToneResult] +description = "Result of tone analysis" +refines = "Text" + +[concept.Pg3LengthResult] +description = "Result of length analysis" +refines = "Text" + +[concept.Pg3StyleResult] +description = "Result of style analysis" +refines = "Text" + +[concept.Pg3CombinedResult] +description = "Combined results from 3-branch parallel analysis" + +[pipe.pg3_sequence] +type = "PipeSequence" +description = "Run 3-branch parallel analysis then selectively consume 2 of 3 branch outputs" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "pg3_parallel", result = "full_combo" }, + { pipe = "pg3_refine_tone", result = "refined_tone" }, + { pipe = "pg3_refine_length", result = "refined_length" }, +] + +[pipe.pg3_parallel] +type = "PipeParallel" +description = "Analyze tone, length, and style in parallel with combined output" +inputs = { input_text = "Text" } +output = "Pg3CombinedResult" +add_each_output = true +combined_output = "Pg3CombinedResult" +branches = [ + { pipe = "pg3_analyze_tone", result = "tone_result" }, + { pipe = "pg3_analyze_length", result = "length_result" }, + { pipe = "pg3_analyze_style", result = "style_result" }, +] + +[pipe.pg3_analyze_tone] +type = "PipeLLM" +description = "Analyze the tone of the text" +inputs = { input_text = "Text" } +output = "Pg3ToneResult" +model = "$testing-text" +prompt = "Describe the tone of: @input_text.text" + +[pipe.pg3_analyze_length] +type = "PipeLLM" +description = "Analyze the length of the text" +inputs = { input_text = "Text" } +output = "Pg3LengthResult" +model = "$testing-text" +prompt = "Describe the length characteristics of: @input_text.text" + +[pipe.pg3_analyze_style] +type = "PipeLLM" +description = "Analyze the writing style of the text" +inputs = { input_text = "Text" } +output = "Pg3StyleResult" +model = "$testing-text" +prompt = "Describe the writing style of: @input_text.text" + +[pipe.pg3_refine_tone] +type = "PipeLLM" +description = "Refine the tone analysis" +inputs = { tone_result = "Pg3ToneResult" } +output = "Text" +model = "$testing-text" +prompt = "Refine and elaborate on this tone analysis: @tone_result.text" + +[pipe.pg3_refine_length] +type = "PipeLLM" +description = "Refine the length analysis" +inputs = { length_result = "Pg3LengthResult" } +output = "Text" +model = "$testing-text" +prompt = "Refine and elaborate on this length analysis: @length_result.text" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx index 407092d52..6212ae0be 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_combined.plx @@ -1,6 +1,6 @@ domain = "test_parallel_graph_combined" -description = "Test PipeParallel with combined_output for graph edge verification" -main_pipe = "pgc_parallel_analysis" +description = "Test PipeParallel with combined_output wrapped in PipeSequence with follow-up consumer" +main_pipe = "pgc_analysis_then_summarize" [concept.PgcToneResult] description = "Result of tone analysis" @@ -13,6 +13,16 @@ refines = "Text" [concept.PgcCombinedResult] description = "Combined results from parallel analysis" +[pipe.pgc_analysis_then_summarize] +type = "PipeSequence" +description = "Run parallel analysis then summarize the combined result" +inputs = { input_text = "Text" } +output = "Text" +steps = [ + { pipe = "pgc_parallel_analysis", result = "pgc_combined_result" }, + { pipe = "pgc_summarize_combined" }, +] + [pipe.pgc_parallel_analysis] type = "PipeParallel" description = "Analyze tone and length in parallel with combined output" @@ -40,3 +50,11 @@ inputs = { input_text = "Text" } output = "PgcLengthResult" model = "$testing-text" prompt = "Describe the length characteristics of: @input_text.text" + +[pipe.pgc_summarize_combined] +type = "PipeLLM" +description = "Summarize the combined parallel analysis result" +inputs = { pgc_combined_result = "PgcCombinedResult" } +output = "Text" +model = "$testing-text" +prompt = "Summarize the following analysis: @pgc_combined_result" diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py index 341225ff3..b073ebaa4 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/parallel_graph_models.py @@ -9,3 +9,11 @@ class PgcCombinedResult(StructuredContent): tone_result: TextContent = Field(..., description="Result of tone analysis") length_result: TextContent = Field(..., description="Result of length analysis") + + +class Pg3CombinedResult(StructuredContent): + """Combined results from 3-branch parallel analysis.""" + + tone_result: TextContent = Field(..., description="Result of tone analysis") + length_result: TextContent = Field(..., description="Result of length analysis") + style_result: TextContent = Field(..., description="Result of style analysis") diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py index 70609d4ee..cfd77ed2b 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_data.py @@ -3,6 +3,15 @@ from typing import ClassVar +class ParallelCombinedGraphExpectationsBase: + """Base class for PipeParallel graph expectations with combined_output.""" + + PARALLEL_PIPE_CODE: ClassVar[str] + EXPECTED_PIPE_CODES: ClassVar[set[str]] + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] + + class ParallelAddEachGraphExpectations: """Expected structure for the parallel_graph_add_each graph.""" @@ -31,25 +40,67 @@ class ParallelAddEachGraphExpectations: } -class ParallelCombinedGraphExpectations: - """Expected structure for the parallel_graph_combined graph.""" +class ParallelCombinedGraphExpectations(ParallelCombinedGraphExpectationsBase): + """Expected structure for the parallel_graph_combined graph (PipeSequence wrapping PipeParallel with combined_output).""" + + PARALLEL_PIPE_CODE: ClassVar[str] = "pgc_parallel_analysis" # Expected node pipe_codes EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "pgc_analysis_then_summarize", # PipeSequence (outer controller) "pgc_parallel_analysis", # PipeParallel (parallel controller with combined_output) "pgc_analyze_tone", # PipeLLM (branch 1) "pgc_analyze_length", # PipeLLM (branch 2) + "pgc_summarize_combined", # PipeLLM (downstream consumer of combined result) } # Expected number of nodes per pipe_code EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "pgc_analysis_then_summarize": 1, "pgc_parallel_analysis": 1, "pgc_analyze_tone": 1, "pgc_analyze_length": 1, + "pgc_summarize_combined": 1, } # Expected number of edges by kind EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { - "contains": 2, # parallel->tone, parallel->length + "contains": 4, # sequence->parallel, sequence->summarize_combined, parallel->tone, parallel->length "parallel_combine": 2, # tone_result->combined, length_result->combined + "data": 1, # parallel->summarize_combined (combined result) + } + + +class Parallel3BranchGraphExpectations(ParallelCombinedGraphExpectationsBase): + """Expected structure for the parallel_graph_3branch graph (3-branch PipeParallel with selective consumption).""" + + PARALLEL_PIPE_CODE: ClassVar[str] = "pg3_parallel" + + # Expected node pipe_codes + EXPECTED_PIPE_CODES: ClassVar[set[str]] = { + "pg3_sequence", # PipeSequence (outer controller) + "pg3_parallel", # PipeParallel (3-branch parallel with combined_output) + "pg3_analyze_tone", # PipeLLM (branch 1) + "pg3_analyze_length", # PipeLLM (branch 2) + "pg3_analyze_style", # PipeLLM (branch 3 - unused downstream) + "pg3_refine_tone", # PipeLLM (consumes tone_result) + "pg3_refine_length", # PipeLLM (consumes length_result) + } + + # Expected number of nodes per pipe_code + EXPECTED_NODE_COUNTS: ClassVar[dict[str, int]] = { + "pg3_sequence": 1, + "pg3_parallel": 1, + "pg3_analyze_tone": 1, + "pg3_analyze_length": 1, + "pg3_analyze_style": 1, + "pg3_refine_tone": 1, + "pg3_refine_length": 1, + } + + # Expected number of edges by kind + EXPECTED_EDGE_COUNTS: ClassVar[dict[str, int]] = { + "contains": 6, # sequence->parallel, sequence->refine_tone, sequence->refine_length, parallel->tone, parallel->length, parallel->style + "parallel_combine": 3, # tone->combined, length->combined, style->combined + "data": 2, # parallel->refine_tone (tone_result), parallel->refine_length (length_result) } diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py index 19cce9801..d46f99c46 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py @@ -15,8 +15,10 @@ from pipelex.tools.misc.file_utils import get_incremental_directory_path, save_text_to_path from tests.conftest import TEST_OUTPUTS_DIR from tests.e2e.pipelex.pipes.pipe_controller.pipe_parallel.test_data import ( + Parallel3BranchGraphExpectations, ParallelAddEachGraphExpectations, ParallelCombinedGraphExpectations, + ParallelCombinedGraphExpectationsBase, ) @@ -168,11 +170,24 @@ async def test_parallel_add_each_output_graph(self, pipe_run_mode: PipeRunMode): log.info("Structural validation passed: DATA edges correctly source from PipeParallel") - async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): + @pytest.mark.parametrize( + ("pipe_code", "expectations_class"), + [ + ("pgc_analysis_then_summarize", ParallelCombinedGraphExpectations), + ("pg3_sequence", Parallel3BranchGraphExpectations), + ], + ) + async def test_parallel_combined_output_graph( + self, + pipe_run_mode: PipeRunMode, + pipe_code: str, + expectations_class: type[ParallelCombinedGraphExpectationsBase], + ): """Verify PipeParallel with combined_output generates correct graph structure. - This test runs a PipeParallel with both add_each_output and combined_output. - Expected: PipeParallel node has branch outputs + combined output in its output specs. + Parametrized with: + - pgc_analysis_then_summarize: 2-branch PipeParallel wrapped in PipeSequence with follow-up consumer + - pg3_sequence: 3-branch PipeParallel with selective downstream consumption (1 branch unused) """ # Build config with graph tracing base_config = get_config().pipelex.pipeline_execution_config @@ -194,7 +209,7 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): # Run pipeline pipe_output = await execute_pipeline( - pipe_code="pgc_parallel_analysis", + pipe_code=pipe_code, library_dirs=["tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel"], inputs={"input_text": TextContent(text="Hello world, this is a test document for parallel analysis.")}, pipe_run_mode=pipe_run_mode, @@ -209,7 +224,7 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): assert graph_spec is not None assert isinstance(graph_spec, GraphSpec) - log.info(f"Parallel combined graph: {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") + log.info(f"Parallel combined graph ({pipe_code}): {len(graph_spec.nodes)} nodes, {len(graph_spec.edges)} edges") # Build node lookup nodes_by_pipe_code: dict[str, list[NodeSpec]] = {} @@ -219,33 +234,29 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): # 1. Verify all expected pipe_codes exist actual_pipe_codes = set(nodes_by_pipe_code.keys()) - assert actual_pipe_codes == ParallelCombinedGraphExpectations.EXPECTED_PIPE_CODES, ( - f"Unexpected pipe codes. Expected: {ParallelCombinedGraphExpectations.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" + assert actual_pipe_codes == expectations_class.EXPECTED_PIPE_CODES, ( + f"Unexpected pipe codes. Expected: {expectations_class.EXPECTED_PIPE_CODES}, Got: {actual_pipe_codes}" ) # 2. Verify node counts per pipe_code - for pipe_code, expected_count in ParallelCombinedGraphExpectations.EXPECTED_NODE_COUNTS.items(): - actual_count = len(nodes_by_pipe_code.get(pipe_code, [])) - assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{pipe_code}', got {actual_count}" + for node_pipe_code, expected_count in expectations_class.EXPECTED_NODE_COUNTS.items(): + actual_count = len(nodes_by_pipe_code.get(node_pipe_code, [])) + assert actual_count == expected_count, f"Expected {expected_count} nodes for pipe_code '{node_pipe_code}', got {actual_count}" # 3. Verify edge counts by kind actual_edge_counts = Counter(str(edge.kind) for edge in graph_spec.edges) - for kind, expected_count in ParallelCombinedGraphExpectations.EXPECTED_EDGE_COUNTS.items(): + for kind, expected_count in expectations_class.EXPECTED_EDGE_COUNTS.items(): actual_count = actual_edge_counts.get(kind, 0) assert actual_count == expected_count, f"Expected {expected_count} edges of kind '{kind}', got {actual_count}" - # 4. Verify PipeParallel node has outputs (branch outputs + combined output) - parallel_node = nodes_by_pipe_code["pgc_parallel_analysis"][0] - assert len(parallel_node.node_io.outputs) >= 2, ( - f"PipeParallel with combined_output should have at least 2 output specs (branch outputs), got {len(parallel_node.node_io.outputs)}" - ) - output_names = {output.name for output in parallel_node.node_io.outputs} - assert "tone_result" in output_names, "PipeParallel should have 'tone_result' output" - assert "length_result" in output_names, "PipeParallel should have 'length_result' output" - - # 5. Verify PARALLEL_COMBINE edges connect branch producers to the PipeParallel node + # 4. Verify PARALLEL_COMBINE edges connect branch producers to the PipeParallel node + parallel_pipe_code = expectations_class.PARALLEL_PIPE_CODE + parallel_node = nodes_by_pipe_code[parallel_pipe_code][0] parallel_combine_edges = [edge for edge in graph_spec.edges if edge.kind.is_parallel_combine] - assert len(parallel_combine_edges) == 2, f"Expected 2 PARALLEL_COMBINE edges (one per branch), got {len(parallel_combine_edges)}" + expected_combine_count = expectations_class.EXPECTED_EDGE_COUNTS.get("parallel_combine", 0) + assert len(parallel_combine_edges) == expected_combine_count, ( + f"Expected {expected_combine_count} PARALLEL_COMBINE edges, got {len(parallel_combine_edges)}" + ) for edge in parallel_combine_edges: assert edge.target == parallel_node.node_id, ( f"PARALLEL_COMBINE edge target should be PipeParallel '{parallel_node.node_id}', got '{edge.target}'" @@ -257,10 +268,10 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): graph_outputs = await generate_graph_outputs( graph_spec=graph_spec, graph_config=graph_config, - pipe_code="pgc_parallel_analysis", + pipe_code=pipe_code, ) - output_dir = _get_next_output_folder("combined") + output_dir = _get_next_output_folder(pipe_code) if graph_outputs.graphspec_json: save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) if graph_outputs.reactflow_html: @@ -275,7 +286,7 @@ async def test_parallel_combined_output_graph(self, pipe_run_mode: PipeRunMode): "parallel_outputs": [output.name for output in parallel_node.node_io.outputs], "output_dir": str(output_dir), }, - title="Parallel Combined Graph Outputs", + title=f"Parallel Combined Graph Outputs ({pipe_code})", ) - log.info("Structural validation passed: PipeParallel combined_output graph is correct") + log.info(f"Structural validation passed: {pipe_code} combined_output graph is correct") From 27f642fbae2d0de040880a1fa4c5468e8d1ef852 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Wed, 11 Feb 2026 23:44:55 +0100 Subject: [PATCH 6/8] Fix Mermaid graph rendering for PipeBatch and PipeParallel edges Batch/parallel edges were referencing controller node IDs, but controllers are rendered as Mermaid subgraphs (not nodes), creating phantom auto-generated nodes. Fix by using source_stuff_digest/target_stuff_digest to connect stuff-to-stuff instead, rendering missing stuff nodes on the fly. Also place parallel_combine target stuffs inside their controller's subgraph and use plain dashed arrows (-.->) when edge labels are empty to avoid Mermaid syntax errors. Co-Authored-By: Claude Opus 4.6 --- .../graph/mermaidflow/mermaidflow_factory.py | 165 ++++++++++++++++-- .../pipe_parallel/test_pipe_parallel_graph.py | 4 + 2 files changed, 157 insertions(+), 12 deletions(-) diff --git a/pipelex/graph/mermaidflow/mermaidflow_factory.py b/pipelex/graph/mermaidflow/mermaidflow_factory.py index 311257f55..6823b4425 100644 --- a/pipelex/graph/mermaidflow/mermaidflow_factory.py +++ b/pipelex/graph/mermaidflow/mermaidflow_factory.py @@ -125,6 +125,25 @@ def make_from_graphspec( # This allows batch item stuffs to be placed inside their consumer's subgraph rendered_orphan_stuffs: set[str] = set() + # Build mapping of controller node_id → {digest: (name, concept)} for parallel_combine + # target stuffs. These are outputs of parallel controllers and should be rendered + # inside the controller's subgraph rather than as orphans at top level. + # We collect the stuff info from controller node outputs directly, because these + # stuffs may not be in stuff_registry (which skips controller nodes). + controller_output_stuffs: dict[str, dict[str, tuple[str, str | None]]] = {} + controller_combine_digests: set[str] = set() + for edge in graph.edges: + if edge.kind.is_parallel_combine and edge.target_stuff_digest: + controller_combine_digests.add(edge.target_stuff_digest) + controller_output_stuffs.setdefault(edge.target, {})[edge.target_stuff_digest] = ("", None) + # Resolve names and concepts from the controller nodes' outputs + for controller_id, digest_map in controller_output_stuffs.items(): + controller_node = analysis.nodes_by_id.get(controller_id) + if controller_node: + for output_spec in controller_node.node_io.outputs: + if output_spec.digest and output_spec.digest in digest_map: + digest_map[output_spec.digest] = (output_spec.name, output_spec.concept) + # Render pipe nodes and their produced stuff within controller subgraphs lines.append("") lines.append(" %% Pipe and stuff nodes within controller subgraphs") @@ -141,6 +160,7 @@ def make_from_graphspec( subgraph_depths=subgraph_depths, show_stuff_codes=show_stuff_codes, rendered_orphan_stuffs=rendered_orphan_stuffs, + controller_output_stuffs=controller_output_stuffs, ) lines.extend(node_lines) @@ -199,6 +219,15 @@ def make_from_graphspec( ) lines.append(stuff_line) + # Build supplementary stuff info from all nodes (including controllers) + # This is needed for batch_aggregate target_stuff_digest which may not be in stuff_registry + # (GraphAnalysis.stuff_registry skips controller outputs) + all_stuff_info: dict[str, tuple[str, str | None]] = {} + for node in graph.nodes: + for output_spec in node.node_io.outputs: + if output_spec.digest and output_spec.digest not in all_stuff_info: + all_stuff_info[output_spec.digest] = (output_spec.name, output_spec.concept) + # Render edges: producer -> stuff lines.append("") lines.append(" %% Data flow edges: producer -> stuff -> consumer") @@ -220,6 +249,8 @@ def make_from_graphspec( lines.append(f" {cons_stuff_mermaid_id} --> {consumer_mermaid_id}") # Render batch edges (BATCH_ITEM and BATCH_AGGREGATE) with dashed styling + # These edges connect stuff-to-stuff (not node-to-node) because their source/target + # are controllers rendered as Mermaid subgraphs, not nodes. batch_item_edges = [edge for edge in graph.edges if edge.kind.is_batch_item] batch_aggregate_edges = [edge for edge in graph.edges if edge.kind.is_batch_aggregate] @@ -228,30 +259,121 @@ def make_from_graphspec( lines.append(" %% Batch edges: list-item relationships") for edge in batch_item_edges: - source_mermaid_id = id_mapping.get(edge.source) - target_mermaid_id = id_mapping.get(edge.target) - if source_mermaid_id and target_mermaid_id: + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None + # Render missing stuff nodes on the fly + if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.source_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.source_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) + if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.target_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.target_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) + if source_sid and target_sid: label = edge.label or "" - lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + if label: + lines.append(f' {source_sid} -."{label}".-> {target_sid}') + else: + lines.append(f" {source_sid} -.-> {target_sid}") for edge in batch_aggregate_edges: - source_mermaid_id = id_mapping.get(edge.source) - target_mermaid_id = id_mapping.get(edge.target) - if source_mermaid_id and target_mermaid_id: + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None + # Render missing stuff nodes on the fly + if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.source_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.source_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) + if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.target_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.target_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) + if source_sid and target_sid: label = edge.label or "" - lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + if label: + lines.append(f' {source_sid} -."{label}".-> {target_sid}') + else: + lines.append(f" {source_sid} -.-> {target_sid}") # Render parallel combine edges (branch outputs → combined output) with dashed styling + # Same approach: use stuff digests to connect stuff-to-stuff. parallel_combine_edges = [edge for edge in graph.edges if edge.kind.is_parallel_combine] if parallel_combine_edges: lines.append("") lines.append(" %% Parallel combine edges: branch outputs → combined output") for edge in parallel_combine_edges: - source_mermaid_id = id_mapping.get(edge.source) - target_mermaid_id = id_mapping.get(edge.target) - if source_mermaid_id and target_mermaid_id: + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None + # Render missing stuff nodes on the fly + if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.source_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.source_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) + if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.target_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.target_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) + if source_sid and target_sid: label = edge.label or "" - lines.append(f' {source_mermaid_id} -."{label}".-> {target_mermaid_id}') + if label: + lines.append(f' {source_sid} -."{label}".-> {target_sid}') + else: + lines.append(f" {source_sid} -.-> {target_sid}") # Style definitions lines.append("") @@ -407,6 +529,7 @@ def _render_subgraph_recursive( subgraph_depths: dict[str, int], show_stuff_codes: bool, rendered_orphan_stuffs: set[str], + controller_output_stuffs: dict[str, dict[str, tuple[str, str | None]]], indent_level: int = 1, depth: int = 0, ) -> list[str]: @@ -415,6 +538,8 @@ def _render_subgraph_recursive( This renders both pipe nodes and their produced stuff nodes inside subgraphs. Orphan stuffs (no producer) consumed by leaf nodes are also rendered inside the same subgraph as their consumer, enabling proper placement of batch item stuffs. + Controller output stuffs (e.g., parallel_combine targets) are rendered inside + their controller's subgraph. Args: node_id: The node to render. @@ -428,6 +553,7 @@ def _render_subgraph_recursive( subgraph_depths: Map to track subgraph IDs and their depths (mutated). show_stuff_codes: Whether to show digest in stuff labels. rendered_orphan_stuffs: Set of orphan stuff digests already rendered (mutated). + controller_output_stuffs: Map of controller node_id to {digest: (name, concept)} for stuffs to render inside. indent_level: Current indentation level. depth: Current depth in the subgraph hierarchy (for coloring). @@ -476,11 +602,26 @@ def _render_subgraph_recursive( subgraph_depths=subgraph_depths, show_stuff_codes=show_stuff_codes, rendered_orphan_stuffs=rendered_orphan_stuffs, + controller_output_stuffs=controller_output_stuffs, indent_level=indent_level + 1, depth=depth + 1, ) lines.extend(child_lines) + # Render controller output stuffs (e.g., parallel_combine targets) inside the subgraph + for digest, (name, concept) in sorted(controller_output_stuffs.get(node_id, {}).items(), key=lambda item: item[1][0]): + if digest not in stuff_id_mapping: + stuff_line = cls._render_stuff_node( + digest=digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=indent + " ", + ) + lines.append(stuff_line) + rendered_orphan_stuffs.add(digest) + lines.append(f"{indent}end") else: # Leaf node - render as simple node diff --git a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py index d46f99c46..de8fc63e8 100644 --- a/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py +++ b/tests/e2e/pipelex/pipes/pipe_controller/pipe_parallel/test_pipe_parallel_graph.py @@ -274,6 +274,10 @@ async def test_parallel_combined_output_graph( output_dir = _get_next_output_folder(pipe_code) if graph_outputs.graphspec_json: save_text_to_path(graph_outputs.graphspec_json, str(output_dir / "graph.json")) + if graph_outputs.mermaidflow_html: + save_text_to_path(graph_outputs.mermaidflow_html, str(output_dir / "mermaidflow.html")) + if graph_outputs.mermaidflow_mmd: + save_text_to_path(graph_outputs.mermaidflow_mmd, str(output_dir / "mermaidflow.mmd")) if graph_outputs.reactflow_html: save_text_to_path(graph_outputs.reactflow_html, str(output_dir / "reactflow.html")) From 66456291885664683763354322222805f43985de Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Thu, 12 Feb 2026 16:08:21 +0100 Subject: [PATCH 7/8] Remove dead controller_combine_digests variable and add PARALLEL_COMBINE unit test The controller_combine_digests set was populated but never read anywhere in the codebase. The controller_output_stuffs dict already tracks the same digests and is the structure actually used for rendering. Added a unit test covering the PARALLEL_COMBINE subgraph rendering path to guard against future regressions. Co-Authored-By: Claude Opus 4.6 --- .../graph/mermaidflow/mermaidflow_factory.py | 2 - tests/unit/pipelex/graph/test_mermaidflow.py | 86 +++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/pipelex/graph/mermaidflow/mermaidflow_factory.py b/pipelex/graph/mermaidflow/mermaidflow_factory.py index 6823b4425..6b7f4128a 100644 --- a/pipelex/graph/mermaidflow/mermaidflow_factory.py +++ b/pipelex/graph/mermaidflow/mermaidflow_factory.py @@ -131,10 +131,8 @@ def make_from_graphspec( # We collect the stuff info from controller node outputs directly, because these # stuffs may not be in stuff_registry (which skips controller nodes). controller_output_stuffs: dict[str, dict[str, tuple[str, str | None]]] = {} - controller_combine_digests: set[str] = set() for edge in graph.edges: if edge.kind.is_parallel_combine and edge.target_stuff_digest: - controller_combine_digests.add(edge.target_stuff_digest) controller_output_stuffs.setdefault(edge.target, {})[edge.target_stuff_digest] = ("", None) # Resolve names and concepts from the controller nodes' outputs for controller_id, digest_map in controller_output_stuffs.items(): diff --git a/tests/unit/pipelex/graph/test_mermaidflow.py b/tests/unit/pipelex/graph/test_mermaidflow.py index 599b4d693..c875dcf39 100644 --- a/tests/unit/pipelex/graph/test_mermaidflow.py +++ b/tests/unit/pipelex/graph/test_mermaidflow.py @@ -386,3 +386,89 @@ def test_subgraph_depth_coloring(self) -> None: # Should have multiple subgraphs with different colors assert "subgraph" in result.mermaid_code assert "style sg_" in result.mermaid_code # Subgraph styling + + def test_parallel_combine_stuff_rendered_inside_controller_subgraph(self) -> None: + """Test that PARALLEL_COMBINE target stuffs are rendered inside the controller's subgraph.""" + parallel_ctrl = { + "node_id": "parallel_ctrl", + "kind": NodeKind.CONTROLLER, + "pipe_code": "parallel_controller", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="combined_output", concept="MergedText", digest="combined_digest_001")], + ), + } + branch_a = { + "node_id": "branch_a", + "kind": NodeKind.OPERATOR, + "pipe_code": "branch_a_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="branch_a_out", concept="Text", digest="branch_a_digest")], + ), + } + branch_b = { + "node_id": "branch_b", + "kind": NodeKind.OPERATOR, + "pipe_code": "branch_b_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="branch_b_out", concept="Text", digest="branch_b_digest")], + ), + } + contains_a = { + "edge_id": "edge_contains_a", + "source": "parallel_ctrl", + "target": "branch_a", + "kind": EdgeKind.CONTAINS, + } + contains_b = { + "edge_id": "edge_contains_b", + "source": "parallel_ctrl", + "target": "branch_b", + "kind": EdgeKind.CONTAINS, + } + combine_a = { + "edge_id": "edge_combine_a", + "source": "branch_a", + "target": "parallel_ctrl", + "kind": EdgeKind.PARALLEL_COMBINE, + "source_stuff_digest": "branch_a_digest", + "target_stuff_digest": "combined_digest_001", + } + combine_b = { + "edge_id": "edge_combine_b", + "source": "branch_b", + "target": "parallel_ctrl", + "kind": EdgeKind.PARALLEL_COMBINE, + "source_stuff_digest": "branch_b_digest", + "target_stuff_digest": "combined_digest_001", + } + graph = self._make_graph( + nodes=[parallel_ctrl, branch_a, branch_b], + edges=[contains_a, contains_b, combine_a, combine_b], + ) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + # The combined output stuff should appear inside the controller's subgraph + # (between subgraph ... and end) + lines = result.mermaid_code.split("\n") + subgraph_start_idx = None + subgraph_end_idx = None + for index_line, line in enumerate(lines): + if "subgraph" in line and "parallel_controller" in line: + subgraph_start_idx = index_line + if subgraph_start_idx is not None and subgraph_end_idx is None and line.strip() == "end": + subgraph_end_idx = index_line + break + + assert subgraph_start_idx is not None, "Controller subgraph not found" + assert subgraph_end_idx is not None, "Controller subgraph end not found" + + subgraph_content = "\n".join(lines[subgraph_start_idx : subgraph_end_idx + 1]) + assert "combined_output" in subgraph_content, "Combined output stuff should be inside the controller subgraph" + assert ":::stuff" in subgraph_content, "Combined output stuff should have :::stuff class styling" From 45bfa2ee3d743450ca4a4e7ad4ca9e1dadc81598 Mon Sep 17 00:00:00 2001 From: Louis Choquel Date: Thu, 12 Feb 2026 16:26:54 +0100 Subject: [PATCH 8/8] Extract _render_dashed_edges helper to deduplicate batch/parallel edge rendering The "resolve missing stuff nodes on the fly and render dashed edge" logic was copy-pasted three times for batch_item, batch_aggregate, and parallel_combine edges (~105 lines of duplication). Extract into a single _render_dashed_edges classmethod and add unit tests covering all three edge kinds, label/no-label variants, and structural consistency. Co-Authored-By: Claude Opus 4.6 --- .../graph/mermaidflow/mermaidflow_factory.py | 173 ++++------- .../graph/test_dashed_edge_rendering.py | 279 ++++++++++++++++++ 2 files changed, 342 insertions(+), 110 deletions(-) create mode 100644 tests/unit/pipelex/graph/test_dashed_edge_rendering.py diff --git a/pipelex/graph/mermaidflow/mermaidflow_factory.py b/pipelex/graph/mermaidflow/mermaidflow_factory.py index 6b7f4128a..7a3159c05 100644 --- a/pipelex/graph/mermaidflow/mermaidflow_factory.py +++ b/pipelex/graph/mermaidflow/mermaidflow_factory.py @@ -13,6 +13,7 @@ from pipelex.graph.graph_analysis import GraphAnalysis from pipelex.graph.graph_config import GraphConfig from pipelex.graph.graphspec import ( + EdgeSpec, GraphSpec, NodeKind, NodeSpec, @@ -255,80 +256,8 @@ def make_from_graphspec( if batch_item_edges or batch_aggregate_edges: lines.append("") lines.append(" %% Batch edges: list-item relationships") - - for edge in batch_item_edges: - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None - # Render missing stuff nodes on the fly - if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.source_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.source_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) - if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.target_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.target_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) - if source_sid and target_sid: - label = edge.label or "" - if label: - lines.append(f' {source_sid} -."{label}".-> {target_sid}') - else: - lines.append(f" {source_sid} -.-> {target_sid}") - - for edge in batch_aggregate_edges: - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None - # Render missing stuff nodes on the fly - if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.source_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.source_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) - if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.target_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.target_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) - if source_sid and target_sid: - label = edge.label or "" - if label: - lines.append(f' {source_sid} -."{label}".-> {target_sid}') - else: - lines.append(f" {source_sid} -.-> {target_sid}") + cls._render_dashed_edges(batch_item_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) + cls._render_dashed_edges(batch_aggregate_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) # Render parallel combine edges (branch outputs → combined output) with dashed styling # Same approach: use stuff digests to connect stuff-to-stuff. @@ -336,42 +265,7 @@ def make_from_graphspec( if parallel_combine_edges: lines.append("") lines.append(" %% Parallel combine edges: branch outputs → combined output") - for edge in parallel_combine_edges: - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None - # Render missing stuff nodes on the fly - if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.source_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.source_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - source_sid = stuff_id_mapping.get(edge.source_stuff_digest) - if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: - name, concept = all_stuff_info[edge.target_stuff_digest] - lines.append( - cls._render_stuff_node( - digest=edge.target_stuff_digest, - name=name, - concept=concept, - stuff_id_mapping=stuff_id_mapping, - show_stuff_codes=show_stuff_codes, - indent=" ", - ) - ) - target_sid = stuff_id_mapping.get(edge.target_stuff_digest) - if source_sid and target_sid: - label = edge.label or "" - if label: - lines.append(f' {source_sid} -."{label}".-> {target_sid}') - else: - lines.append(f" {source_sid} -.-> {target_sid}") + cls._render_dashed_edges(parallel_combine_edges, lines, stuff_id_mapping, all_stuff_info, show_stuff_codes) # Style definitions lines.append("") @@ -513,6 +407,65 @@ def _render_stuff_node( return f'{indent}{stuff_mermaid_id}(["{label}"]):::stuff' + @classmethod + def _render_dashed_edges( + cls, + edges: list[EdgeSpec], + lines: list[str], + stuff_id_mapping: dict[str, str], + all_stuff_info: dict[str, tuple[str, str | None]], + show_stuff_codes: bool, + ) -> None: + """Render dashed edges between stuff nodes, resolving missing stuff nodes on the fly. + + This handles BATCH_ITEM, BATCH_AGGREGATE, and PARALLEL_COMBINE edges which all share + the same rendering logic: look up source/target stuff IDs, render any missing stuff + nodes from all_stuff_info, and emit a dashed arrow with an optional label. + + Args: + edges: The edges to render as dashed arrows. + lines: The mermaid output lines list (mutated). + stuff_id_mapping: Map to store/retrieve stuff mermaid IDs (mutated). + all_stuff_info: Supplementary stuff info from all nodes including controllers. + show_stuff_codes: Whether to show digest in stuff labels. + """ + for edge in edges: + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) if edge.source_stuff_digest else None + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) if edge.target_stuff_digest else None + # Render missing stuff nodes on the fly + if not source_sid and edge.source_stuff_digest and edge.source_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.source_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.source_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + source_sid = stuff_id_mapping.get(edge.source_stuff_digest) + if not target_sid and edge.target_stuff_digest and edge.target_stuff_digest in all_stuff_info: + name, concept = all_stuff_info[edge.target_stuff_digest] + lines.append( + cls._render_stuff_node( + digest=edge.target_stuff_digest, + name=name, + concept=concept, + stuff_id_mapping=stuff_id_mapping, + show_stuff_codes=show_stuff_codes, + indent=" ", + ) + ) + target_sid = stuff_id_mapping.get(edge.target_stuff_digest) + if source_sid and target_sid: + label = edge.label or "" + if label: + lines.append(f' {source_sid} -."{label}".-> {target_sid}') + else: + lines.append(f" {source_sid} -.-> {target_sid}") + @classmethod def _render_subgraph_recursive( cls, diff --git a/tests/unit/pipelex/graph/test_dashed_edge_rendering.py b/tests/unit/pipelex/graph/test_dashed_edge_rendering.py new file mode 100644 index 000000000..eccf4f7a8 --- /dev/null +++ b/tests/unit/pipelex/graph/test_dashed_edge_rendering.py @@ -0,0 +1,279 @@ +import re +from datetime import datetime, timezone +from typing import Any, ClassVar + +import pytest + +from pipelex.graph.graphspec import ( + EdgeKind, + EdgeSpec, + GraphSpec, + IOSpec, + NodeIOSpec, + NodeKind, + NodeSpec, + NodeStatus, + PipelineRef, +) +from pipelex.graph.mermaidflow.mermaidflow_factory import MermaidflowFactory + +from .conftest import make_graph_config + + +class TestDashedEdgeRendering: + """Tests for dashed-edge rendering logic across BATCH_ITEM, BATCH_AGGREGATE, and PARALLEL_COMBINE edge kinds.""" + + GRAPH_ID: ClassVar[str] = "dashed_edge_test:001" + CREATED_AT: ClassVar[datetime] = datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc) + + def _make_graph( + self, + nodes: list[dict[str, Any]], + edges: list[dict[str, Any]] | None = None, + ) -> GraphSpec: + """Helper to create a GraphSpec with nodes and edges.""" + node_specs: list[NodeSpec] = [] + for node_dict in nodes: + node_specs.append(NodeSpec(**node_dict)) + + edge_specs: list[EdgeSpec] = [] + if edges: + for edge_dict in edges: + edge_specs.append(EdgeSpec(**edge_dict)) + + return GraphSpec( + graph_id=self.GRAPH_ID, + created_at=self.CREATED_AT, + pipeline_ref=PipelineRef(), + nodes=node_specs, + edges=edge_specs, + ) + + def _extract_dashed_edges(self, mermaid_code: str) -> list[str]: + """Extract all dashed-edge lines from mermaid code. + + Returns: + Lines containing dashed arrows (-.-> or -."label".->). + """ + return [line.strip() for line in mermaid_code.split("\n") if ".->" in line] + + def _build_controller_graph_with_dashed_edge( + self, + edge_kind: EdgeKind, + edge_label: str | None = None, + ) -> GraphSpec: + """Build a graph with a controller, two children, and a dashed edge between their stuffs. + + The controller contains two child pipes. The dashed edge connects + source_stuff from child_a to target_stuff owned by the controller (for aggregate/combine) + or child_b (for batch_item). + + Args: + edge_kind: The kind of dashed edge to create. + edge_label: Optional label for the dashed edge. + + Returns: + A GraphSpec with the dashed-edge scenario. + """ + controller = { + "node_id": "ctrl_1", + "kind": NodeKind.CONTROLLER, + "pipe_code": "batch_ctrl", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="ctrl_output", concept="OutputList", digest="ctrl_out_digest")], + ), + } + child_a = { + "node_id": "child_a", + "kind": NodeKind.OPERATOR, + "pipe_code": "pipe_a", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="source_stuff", concept="Text", digest="source_digest")], + ), + } + child_b = { + "node_id": "child_b", + "kind": NodeKind.OPERATOR, + "pipe_code": "pipe_b", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[IOSpec(name="target_stuff", concept="Text", digest="target_digest")], + outputs=[], + ), + } + contains_a = { + "edge_id": "edge_contains_a", + "source": "ctrl_1", + "target": "child_a", + "kind": EdgeKind.CONTAINS, + } + contains_b = { + "edge_id": "edge_contains_b", + "source": "ctrl_1", + "target": "child_b", + "kind": EdgeKind.CONTAINS, + } + + # For BATCH_AGGREGATE and PARALLEL_COMBINE, target is the controller's output stuff + # For BATCH_ITEM, target is child_b's input stuff + target_stuff_digest: str + match edge_kind: + case EdgeKind.BATCH_ITEM: + target_stuff_digest = "target_digest" + case EdgeKind.BATCH_AGGREGATE | EdgeKind.PARALLEL_COMBINE: + target_stuff_digest = "ctrl_out_digest" + case EdgeKind.CONTROL | EdgeKind.DATA | EdgeKind.CONTAINS | EdgeKind.SELECTED_OUTCOME: + msg = f"Unexpected edge kind for dashed edge test: {edge_kind}" + raise ValueError(msg) + + dashed_edge: dict[str, Any] = { + "edge_id": "edge_dashed", + "source": "child_a", + "target": "ctrl_1", + "kind": edge_kind, + "source_stuff_digest": "source_digest", + "target_stuff_digest": target_stuff_digest, + } + if edge_label: + dashed_edge["label"] = edge_label + + return self._make_graph( + nodes=[controller, child_a, child_b], + edges=[contains_a, contains_b, dashed_edge], + ) + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_rendered_for_each_kind(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that each dashed-edge kind produces at least one dashed arrow.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + assert len(dashed_lines) >= 1, f"Expected at least one dashed edge for {topic}, got none" + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_with_label(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that labeled dashed edges include the label in the mermaid syntax.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind, edge_label="my_label") + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + labeled = [line for line in dashed_lines if "my_label" in line] + assert len(labeled) >= 1, f"Expected a labeled dashed edge for {topic}, got: {dashed_lines}" + + @pytest.mark.parametrize( + ("topic", "edge_kind"), + [ + ("BATCH_ITEM", EdgeKind.BATCH_ITEM), + ("BATCH_AGGREGATE", EdgeKind.BATCH_AGGREGATE), + ("PARALLEL_COMBINE", EdgeKind.PARALLEL_COMBINE), + ], + ) + def test_dashed_edge_without_label(self, topic: str, edge_kind: EdgeKind) -> None: + """Verify that unlabeled dashed edges use plain dashed arrow syntax.""" + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + # Unlabeled edges use `-.->` without a label string + plain_dashed = [line for line in dashed_lines if ".->" in line and '-."' not in line] + assert len(plain_dashed) >= 1, f"Expected a plain dashed edge for {topic}, got: {dashed_lines}" + + def test_all_edge_kinds_use_same_dashed_syntax(self) -> None: + """Verify that all three dashed-edge kinds produce structurally identical dashed arrow syntax. + + This test catches divergence if one copy of the logic is modified but not the others. + """ + results_by_kind: dict[str, list[str]] = {} + for edge_kind in (EdgeKind.BATCH_ITEM, EdgeKind.BATCH_AGGREGATE, EdgeKind.PARALLEL_COMBINE): + graph = self._build_controller_graph_with_dashed_edge(edge_kind=edge_kind, edge_label="test_label") + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + # Extract just the arrow operator from each line (e.g., `-."test_label".->` or `-.->`) + # by replacing stuff IDs (s_XXX) with a placeholder + normalized = [re.sub(r"s_[a-f0-9]+", "ID", line) for line in dashed_lines] + results_by_kind[edge_kind] = normalized + + # All three should produce the same normalized patterns + kinds = list(results_by_kind.keys()) + for index_kind in range(1, len(kinds)): + assert results_by_kind[kinds[0]] == results_by_kind[kinds[index_kind]], ( + f"Dashed edge syntax differs between {kinds[0]} and {kinds[index_kind]}: " + f"{results_by_kind[kinds[0]]} vs {results_by_kind[kinds[index_kind]]}" + ) + + def test_missing_stuff_resolved_on_the_fly(self) -> None: + """Verify that stuff nodes not in the normal stuff_registry get rendered on-the-fly for dashed edges. + + Creates a scenario where the target stuff only exists on the controller's output + (not registered through normal pipe IOSpec), so it must be resolved from all_stuff_info. + """ + controller = { + "node_id": "ctrl_1", + "kind": NodeKind.CONTROLLER, + "pipe_code": "batch_ctrl", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="aggregated_output", concept="OutputList", digest="agg_digest")], + ), + } + child = { + "node_id": "child_1", + "kind": NodeKind.OPERATOR, + "pipe_code": "child_pipe", + "status": NodeStatus.SUCCEEDED, + "node_io": NodeIOSpec( + inputs=[], + outputs=[IOSpec(name="item_output", concept="Text", digest="item_digest")], + ), + } + contains = { + "edge_id": "edge_contains", + "source": "ctrl_1", + "target": "child_1", + "kind": EdgeKind.CONTAINS, + } + aggregate_edge = { + "edge_id": "edge_agg", + "source": "child_1", + "target": "ctrl_1", + "kind": EdgeKind.BATCH_AGGREGATE, + "source_stuff_digest": "item_digest", + "target_stuff_digest": "agg_digest", + } + graph = self._make_graph( + nodes=[controller, child], + edges=[contains, aggregate_edge], + ) + graph_config = make_graph_config() + result = MermaidflowFactory.make_from_graphspec(graph, graph_config) + + # The aggregated_output stuff should be rendered (resolved on the fly) + assert "aggregated_output" in result.mermaid_code + # And there should be a dashed edge connecting them + dashed_lines = self._extract_dashed_edges(result.mermaid_code) + assert len(dashed_lines) >= 1, "Expected a dashed edge for aggregate, got none"