Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/strands/multiagent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,12 @@ def _build_node_input(self, node: GraphNode) -> list[ContentBlock]:
agent_results = node_result.get_agent_results()
for result in agent_results:
agent_name = getattr(result, "agent_name", "Agent")
result_text = str(result)
# Prefer structured_output if available, otherwise fall back to str(result)
structured_output = getattr(result, "structured_output", None)
if structured_output is not None:
result_text = structured_output.model_dump_json()
else:
result_text = str(result)
node_input.append(ContentBlock(text=f" - {agent_name}: {result_text}"))

return node_input
Expand Down
116 changes: 116 additions & 0 deletions tests/strands/multiagent/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import time
from typing import Annotated
from unittest.mock import AsyncMock, MagicMock, Mock, call, patch

import pytest
from pydantic import BaseModel, Field

from strands.agent import Agent, AgentResult
from strands.agent.state import AgentState
Expand Down Expand Up @@ -2068,3 +2070,117 @@ def cancel_callback(event):
tru_status = graph.state.status
exp_status = Status.FAILED
assert tru_status == exp_status


@pytest.mark.asyncio
async def test_graph_structured_output_propagation(mock_strands_tracer, mock_use_span):
"""Test that structured_output is properly propagated from dependency nodes.

This test verifies issue #1118: when using structured_output_model, the output
from previous nodes should be propagated correctly to dependent nodes using the
structured output's JSON representation.
"""

class Foo(BaseModel):
"""Test Pydantic model for structured output."""

foo: Annotated[str, Field(min_length=1, max_length=10)]
bar: Annotated[str, Field(min_length=1, max_length=10)]

# Create first agent with structured output
first_agent = create_mock_agent("first_agent", "") # Empty text content
structured_data = Foo(foo="hello", bar="world")
first_agent.return_value.structured_output = structured_data
# Update the mock to return the agent result with structured output
mock_result_with_structured = first_agent.return_value

async def mock_stream_first(*args, **kwargs):
yield {"agent_start": True}
yield {"result": mock_result_with_structured}

first_agent.stream_async = Mock(side_effect=mock_stream_first)

# Create second agent that receives the output
second_agent = create_mock_agent("second_agent", "Processed result")

# Track the input received by second agent
received_inputs = []

async def mock_stream_second(input_data, *args, **kwargs):
received_inputs.append(input_data)
yield {"agent_start": True}
yield {"result": second_agent.return_value}

second_agent.stream_async = Mock(side_effect=mock_stream_second)

# Build graph: first_agent -> second_agent
builder = GraphBuilder()
builder.add_node(first_agent, "first_agent")
builder.add_node(second_agent, "second_agent")
builder.add_edge("first_agent", "second_agent")
builder.set_entry_point("first_agent")
graph = builder.build()

# Execute the graph
result = await graph.invoke_async("Hello, world!")

# Verify execution completed
assert result.status == Status.COMPLETED

# Verify second agent received input
assert len(received_inputs) == 1

# Verify the structured output was propagated as JSON
input_text = str(received_inputs[0])
assert '{"foo":"hello","bar":"world"}' in input_text

mock_strands_tracer.start_multiagent_span.assert_called()
mock_use_span.assert_called_once()


@pytest.mark.asyncio
async def test_graph_structured_output_fallback_to_text(mock_strands_tracer, mock_use_span):
"""Test that when structured_output is None, text output is used.

This ensures backward compatibility with agents that don't use structured output.
"""
# Create first agent WITHOUT structured output (None)
first_agent = create_mock_agent("first_agent", "Plain text response")
assert first_agent.return_value.structured_output is None # Verify it's None

# Create second agent that receives the output
second_agent = create_mock_agent("second_agent", "Processed result")

# Track the input received by second agent
received_inputs = []

async def mock_stream_second(input_data, *args, **kwargs):
received_inputs.append(input_data)
yield {"agent_start": True}
yield {"result": second_agent.return_value}

second_agent.stream_async = Mock(side_effect=mock_stream_second)

# Build graph: first_agent -> second_agent
builder = GraphBuilder()
builder.add_node(first_agent, "first_agent")
builder.add_node(second_agent, "second_agent")
builder.add_edge("first_agent", "second_agent")
builder.set_entry_point("first_agent")
graph = builder.build()

# Execute the graph
result = await graph.invoke_async("Hello, world!")

# Verify execution completed
assert result.status == Status.COMPLETED

# Verify second agent received input
assert len(received_inputs) == 1

# Verify the text output was propagated (not JSON structured output)
input_text = str(received_inputs[0])
assert "Plain text response" in input_text

mock_strands_tracer.start_multiagent_span.assert_called()
mock_use_span.assert_called_once()