-
Couldn't load subscription status.
- Fork 1.3k
Description
Initial Checks
- I confirm that I'm using the latest version of Pydantic AI
- I confirm that I searched for my issue in https://github.com/pydantic/pydantic-ai/issues before opening this issue
Description
Sometimes, it seems that TEXT_MESSAGE_CONTENT events are sent after a TEXT_MESSAGE_END event. This causes problems when using the agui client as receive the error error: Cannot send 'TEXT_MESSAGE_CONTENT' event: No active text message found with ID '4e3fcf08-eff9-4594-843f-45dad275a301'. Start a text message with 'TEXT_MESSAGE_START' first.
I have no idea what is causing this to happen as it exhibits some race condition behaviour. I haven't been able to reliably get this to happen.
I have managed to fix this with the following patch on our end. At the very least I haven't seen the bug since 😆 but maybe there is still some race condition where it may happen
async def patch_agent_stream(run_stream: AsyncIterator[str]) -> AsyncIterator[str]:
"""
Fix bug in pydantic_ai where TEXT_MESSAGE_CONTENT events orphaned.
This fixes the bug by converting orphaned TEXT_MESSAGE_CONTENT events to TEXT_MESSAGE_CHUNK events.
"""
last_text_message_start_event: TextMessageStartEvent | None = None
last_text_message_end_event: TextMessageEndEvent | None = None
encoder = EventEncoder()
async for chunk in run_stream:
chunk_without_data = chunk.replace("data: ", "").strip()
if '"type":"TEXT_MESSAGE_START"' in chunk_without_data:
last_text_message_start_event = TextMessageStartEvent.model_validate_json(
chunk_without_data
)
last_text_message_end_event = None
yield chunk
elif '"type":"TEXT_MESSAGE_END"' in chunk_without_data:
last_text_message_end_event = TextMessageEndEvent.model_validate_json(
chunk_without_data
)
last_text_message_start_event = None
yield chunk
elif '"type":"TEXT_MESSAGE_CONTENT"' in chunk_without_data:
# Check if this is an orphaned TEXT_MESSAGE_CONTENT
# (comes without TEXT_MESSAGE_START or after TEXT_MESSAGE_END)
is_inside_message = (
last_text_message_start_event is not None and last_text_message_end_event is None
)
if not is_inside_message:
# Convert orphaned TEXT_MESSAGE_CONTENT to TEXT_MESSAGE_CHUNK
content_event = TextMessageContentEvent.model_validate_json(chunk_without_data)
chunk_event = TextMessageChunkEvent(
message_id=content_event.message_id,
delta=content_event.delta,
timestamp=content_event.timestamp,
)
yield encoder.encode(chunk_event)
else:
# Inside a message, let it through normally
yield chunk
else:
yield chunkExample Code
agent = await get_agent()
event_stream = patch_agent_stream(
run_agui(
agent,
run_input,
accept=accept,
)
)
return StreamingResponse(
event_stream,
media_type=accept,
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)Python, Pydantic AI & LLM client version
Python 3.11.6
Pydantic AI 1.0.15