Skip to content

feat(tool-streams): add tool streams#1713

Open
paul-nechifor wants to merge 2 commits intodevfrom
paul/feat/mcp-streams
Open

feat(tool-streams): add tool streams#1713
paul-nechifor wants to merge 2 commits intodevfrom
paul/feat/mcp-streams

Conversation

@paul-nechifor
Copy link
Copy Markdown
Contributor

@paul-nechifor paul-nechifor commented Mar 30, 2026

Problem

  • Agents don't have a coherent idea of what response is coming from what tool.

Closes DIM-633

Solution

  • Added MCP streaming. Now agents receive updates from one tool in the same tool context.

Breaking Changes

None

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@paul-nechifor paul-nechifor marked this pull request as draft March 30, 2026 17:47
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 30, 2026

Greptile Summary

This PR introduces a tool streams mechanism that allows long-running skills (follow_person, look_out_for) to push intermediate updates back to the LLM agent in real time, replacing the previous approach of directly calling _agent_spec.add_message(). Updates flow via LCM (/tool_streams) → McpServer subscriber → HTTP SSE endpoint (/mcp/streams) → McpClient SSE thread → agent message queue.

  • ToolStream / ToolStreamEvent — new primitives for publishing update and close events over a shared LCM topic
  • McpServer — new /mcp/streams GET endpoint; subscribes to tool_streams input and fans out to all connected SSE queues (with the previously-noted list-snapshot fix already applied)
  • McpClient — new background SSE thread that reconnects on failure and injects HumanMessage updates into the agent's queue; stop() now closes the client to unblock iter_lines()
  • person_follow.py / perceive_loop_skill.py — migrated from _agent_spec.add_message(HumanMessage(...)) to ToolStream.send() + ToolStream.stop()
  • Integration and unit tests added, covering the full SSE flow and edge cases (double-stop, send-after-stop)

Issues found:

  • _sse_connect has a TOCTOU race: if stop() fires between client creation and the assignment to self._sse_client, the new client is never closed and the SSE thread hangs indefinitely with timeout=None
  • ToolStreamEvent(**data) in _sse_consume is not wrapped in a try/except, so any malformed SSE payload raises TypeError and drops the connection
  • ToolStream.start() is not guarded by the lock or _closed flag, making double-start possible and leaking the first transport
  • McpServer.tool_streams is now a required input with no default, which may break existing blueprints that do not wire this stream

Confidence Score: 3/5

  • Not safe to merge — three P1 defects affect shutdown correctness and transport safety.
  • The SSE TOCTOU race in _sse_connect, the unguarded ToolStreamEvent(**data) constructor, and the unprotected ToolStream.start() are all present-tense defects (wrong behaviour on the changed code paths) that need to be fixed. The tool_streams required input is also a potential breaking change for existing users. Once these are addressed the design is solid.
  • dimos/agents/mcp/mcp_client.py (TOCTOU race + unhandled TypeError) and dimos/agents/mcp/tool_stream.py (unguarded start())

Important Files Changed

Filename Overview
dimos/agents/mcp/tool_stream.py New ToolStream class for publishing skill updates over LCM; start() lacks lock/guard, allowing double-start transport leaks.
dimos/agents/mcp/mcp_client.py Adds SSE listener thread for tool stream updates; TOCTOU race in _sse_connect allows stop() to miss a newly created client, and ToolStreamEvent(**data) is not guarded against malformed payloads.
dimos/agents/mcp/mcp_server.py Adds /mcp/streams SSE endpoint and subscribes tool_streams input to broadcast to all connected SSE clients; list snapshot fix (list(app.state.sse_queues)) already applied for safe iteration.
dimos/agents/mcp/test_tool_stream.py New integration and unit tests for ToolStream; covers SSE flow, agent injection, and double-stop safety.
dimos/agents/skills/person_follow.py Migrates stop-reason notification from direct HumanMessage to ToolStream; adds proper thread join and tool stream cleanup in stop().
dimos/perception/perceive_loop_skill.py Migrates detection notification to ToolStream; creates ToolStream unconditionally even when then is set (transport stays open until _stop_lookout, slightly wasteful but not broken).
dimos/agents/mcp/fixtures/test_tool_stream_agent.json Mock LLM fixture for test_tool_stream_agent; straightforward sequence of canned responses.

Sequence Diagram

sequenceDiagram
    participant Skill as Skill (e.g. follow_person)
    participant TS as ToolStream
    participant LCM as LCM (/tool_streams)
    participant Server as McpServer
    participant SSE as /mcp/streams (SSE)
    participant Client as McpClient (SSE thread)
    participant MQ as McpClient message queue
    participant Agent as LLM Agent

    Skill->>TS: start()
    TS->>LCM: pLCMTransport.start()
    Skill->>TS: send("Update 1")
    TS->>LCM: publish(ToolStreamEvent{type=update})
    LCM->>Server: _on_tool_stream_message(msg)
    Server->>SSE: asyncio.run_coroutine_threadsafe(queue.put(msg))
    SSE-->>Client: SSE line: data: {...}
    Client->>MQ: queue.put(HumanMessage("[Tool stream update...]"))
    MQ->>Agent: _process_message()

    Skill->>TS: stop()
    TS->>LCM: publish(ToolStreamEvent{type=close})
    LCM->>Server: _on_tool_stream_message(msg)
    Server->>SSE: asyncio.run_coroutine_threadsafe(queue.put(msg))
    SSE-->>Client: SSE line: data: {type=close}
    Note over Client: close event ignored by McpClient
    TS->>LCM: pLCMTransport.stop()
Loading

Reviews (2): Last reviewed commit: "feat(tool-streams): add tool streams" | Re-trigger Greptile

@paul-nechifor paul-nechifor force-pushed the paul/feat/mcp-streams branch from 21bb763 to 888cf3f Compare March 30, 2026 23:52
@paul-nechifor paul-nechifor marked this pull request as ready for review March 30, 2026 23:52
@paul-nechifor paul-nechifor force-pushed the paul/feat/mcp-streams branch from 2fc370a to 2b07827 Compare March 31, 2026 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant