diff --git a/src/strands/multiagent/graph.py b/src/strands/multiagent/graph.py index e87b9592d..49c2c2cbb 100644 --- a/src/strands/multiagent/graph.py +++ b/src/strands/multiagent/graph.py @@ -1001,7 +1001,12 @@ def _build_node_input(self, node: GraphNode) -> list[ContentBlock]: agent_results = node_result.get_agent_results() for result in agent_results: agent_name = getattr(result, "agent_name", "Agent") - result_text = str(result) + # Prefer structured_output if available, otherwise fall back to str(result) + structured_output = getattr(result, "structured_output", None) + if structured_output is not None: + result_text = structured_output.model_dump_json() + else: + result_text = str(result) node_input.append(ContentBlock(text=f" - {agent_name}: {result_text}")) return node_input diff --git a/tests/strands/multiagent/test_graph.py b/tests/strands/multiagent/test_graph.py index 4875d1bec..f73f1a34d 100644 --- a/tests/strands/multiagent/test_graph.py +++ b/tests/strands/multiagent/test_graph.py @@ -1,8 +1,10 @@ import asyncio import time +from typing import Annotated from unittest.mock import AsyncMock, MagicMock, Mock, call, patch import pytest +from pydantic import BaseModel, Field from strands.agent import Agent, AgentResult from strands.agent.state import AgentState @@ -2068,3 +2070,117 @@ def cancel_callback(event): tru_status = graph.state.status exp_status = Status.FAILED assert tru_status == exp_status + + +@pytest.mark.asyncio +async def test_graph_structured_output_propagation(mock_strands_tracer, mock_use_span): + """Test that structured_output is properly propagated from dependency nodes. + + This test verifies issue #1118: when using structured_output_model, the output + from previous nodes should be propagated correctly to dependent nodes using the + structured output's JSON representation. + """ + + class Foo(BaseModel): + """Test Pydantic model for structured output.""" + + foo: Annotated[str, Field(min_length=1, max_length=10)] + bar: Annotated[str, Field(min_length=1, max_length=10)] + + # Create first agent with structured output + first_agent = create_mock_agent("first_agent", "") # Empty text content + structured_data = Foo(foo="hello", bar="world") + first_agent.return_value.structured_output = structured_data + # Update the mock to return the agent result with structured output + mock_result_with_structured = first_agent.return_value + + async def mock_stream_first(*args, **kwargs): + yield {"agent_start": True} + yield {"result": mock_result_with_structured} + + first_agent.stream_async = Mock(side_effect=mock_stream_first) + + # Create second agent that receives the output + second_agent = create_mock_agent("second_agent", "Processed result") + + # Track the input received by second agent + received_inputs = [] + + async def mock_stream_second(input_data, *args, **kwargs): + received_inputs.append(input_data) + yield {"agent_start": True} + yield {"result": second_agent.return_value} + + second_agent.stream_async = Mock(side_effect=mock_stream_second) + + # Build graph: first_agent -> second_agent + builder = GraphBuilder() + builder.add_node(first_agent, "first_agent") + builder.add_node(second_agent, "second_agent") + builder.add_edge("first_agent", "second_agent") + builder.set_entry_point("first_agent") + graph = builder.build() + + # Execute the graph + result = await graph.invoke_async("Hello, world!") + + # Verify execution completed + assert result.status == Status.COMPLETED + + # Verify second agent received input + assert len(received_inputs) == 1 + + # Verify the structured output was propagated as JSON + input_text = str(received_inputs[0]) + assert '{"foo":"hello","bar":"world"}' in input_text + + mock_strands_tracer.start_multiagent_span.assert_called() + mock_use_span.assert_called_once() + + +@pytest.mark.asyncio +async def test_graph_structured_output_fallback_to_text(mock_strands_tracer, mock_use_span): + """Test that when structured_output is None, text output is used. + + This ensures backward compatibility with agents that don't use structured output. + """ + # Create first agent WITHOUT structured output (None) + first_agent = create_mock_agent("first_agent", "Plain text response") + assert first_agent.return_value.structured_output is None # Verify it's None + + # Create second agent that receives the output + second_agent = create_mock_agent("second_agent", "Processed result") + + # Track the input received by second agent + received_inputs = [] + + async def mock_stream_second(input_data, *args, **kwargs): + received_inputs.append(input_data) + yield {"agent_start": True} + yield {"result": second_agent.return_value} + + second_agent.stream_async = Mock(side_effect=mock_stream_second) + + # Build graph: first_agent -> second_agent + builder = GraphBuilder() + builder.add_node(first_agent, "first_agent") + builder.add_node(second_agent, "second_agent") + builder.add_edge("first_agent", "second_agent") + builder.set_entry_point("first_agent") + graph = builder.build() + + # Execute the graph + result = await graph.invoke_async("Hello, world!") + + # Verify execution completed + assert result.status == Status.COMPLETED + + # Verify second agent received input + assert len(received_inputs) == 1 + + # Verify the text output was propagated (not JSON structured output) + input_text = str(received_inputs[0]) + assert "Plain text response" in input_text + + mock_strands_tracer.start_multiagent_span.assert_called() + mock_use_span.assert_called_once()