diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 68db0e6ff..fa29cc838 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -10,8 +10,12 @@ import UsageLimitModal from '../components/upsell/UsageLimitModal'; import { useUsageState } from '../hooks/useUsageState'; import { chatCancel, + type ChatInferenceStartEvent, + type ChatIterationStartEvent, type ChatSegmentEvent, chatSend, + type ChatSubagentDoneEvent, + type ChatSubagentSpawnedEvent, type ChatToolCallEvent, type ChatToolResultEvent, segmentText, @@ -53,6 +57,19 @@ const AGENTIC_MODEL_ID = 'agentic-v1'; type ToolTimelineEntryStatus = 'running' | 'success' | 'error'; type InputMode = 'text' | 'voice'; type ReplyMode = 'text' | 'voice'; + +/** Tracks the live inference state for a thread's in-flight request. */ +interface InferenceStatus { + /** Current phase: thinking (LLM call), tool_use, subagent. */ + phase: 'thinking' | 'tool_use' | 'subagent'; + /** 1-based iteration index. */ + iteration: number; + maxIterations: number; + /** Active tool name (when phase is tool_use). */ + activeTool?: string; + /** Active sub-agent id (when phase is subagent). */ + activeSubagent?: string; +} const AUTOCOMPLETE_POLL_DEBOUNCE_MS = 320; const AUTOCOMPLETE_MIN_CONTEXT_CHARS = 3; @@ -187,6 +204,9 @@ const Conversations = () => { const [toolTimelineByThread, setToolTimelineByThread] = useState< Record >({}); + const [inferenceStatusByThread, setInferenceStatusByThread] = useState< + Record + >({}); const rustChat = useRustChat(); const defaultChannelType = useAppSelector( state => state.channelConnections?.defaultMessagingChannel ?? 'web' @@ -431,7 +451,32 @@ const Conversations = () => { if (!rustChat || socketStatus !== 'connected') return; const cleanup = subscribeChatEvents({ + onInferenceStart: (event: ChatInferenceStartEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { phase: 'thinking', iteration: 0, maxIterations: 0 }, + })); + }, + onIterationStart: (event: ChatIterationStartEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + phase: 'thinking', + iteration: event.round, + maxIterations: prev[event.thread_id]?.maxIterations ?? 0, + }, + })); + }, onToolCall: (event: ChatToolCallEvent) => { + // Update inference status to show active tool + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + ...(prev[event.thread_id] ?? { iteration: event.round, maxIterations: 0 }), + phase: 'tool_use' as const, + activeTool: event.tool_name, + }, + })); const eventKey = `tool_call:${event.thread_id}:${event.request_id ?? 'none'}:${event.round}:${event.tool_name}`; if (!markChatEventSeen(eventKey)) return; @@ -477,6 +522,65 @@ const Conversations = () => { if (!changed) return prev; return { ...prev, [event.thread_id]: nextEntries }; }); + // Tool completed — go back to thinking for the next iteration + setInferenceStatusByThread(prev => { + const current = prev[event.thread_id]; + if (!current) return prev; + return { + ...prev, + [event.thread_id]: { ...current, phase: 'thinking', activeTool: undefined }, + }; + }); + }, + onSubagentSpawned: (event: ChatSubagentSpawnedEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + ...(prev[event.thread_id] ?? { iteration: event.round, maxIterations: 0 }), + phase: 'subagent' as const, + activeSubagent: event.tool_name, + }, + })); + // Add sub-agent to tool timeline for visual tracking + setToolTimelineByThread(prev => { + const existing = prev[event.thread_id] ?? []; + return { + ...prev, + [event.thread_id]: [ + ...existing, + { + id: `${event.thread_id}:subagent:${event.skill_id}:${event.tool_name}`, + name: `🤖 ${event.tool_name}`, + round: event.round, + status: 'running' as const, + }, + ], + }; + }); + }, + onSubagentDone: (event: ChatSubagentDoneEvent) => { + setToolTimelineByThread(prev => { + const existing = prev[event.thread_id] ?? []; + return { + ...prev, + [event.thread_id]: existing.map(entry => + entry.name === `🤖 ${event.tool_name}` && entry.status === 'running' + ? { + ...entry, + status: (event.success ? 'success' : 'error') as ToolTimelineEntryStatus, + } + : entry + ), + }; + }); + setInferenceStatusByThread(prev => { + const current = prev[event.thread_id]; + if (!current) return prev; + return { + ...prev, + [event.thread_id]: { ...current, phase: 'thinking', activeSubagent: undefined }, + }; + }); }, onSegment: (event: ChatSegmentEvent) => { const eventKey = `segment:${event.thread_id}:${event.request_id}:${event.segment_index}`; @@ -502,6 +606,14 @@ const Conversations = () => { const eventKey = `done:${event.thread_id}:${event.request_id ?? 'none'}`; if (!markChatEventSeen(eventKey)) return; + // Clear inference status — the turn is finished + setInferenceStatusByThread(prev => { + if (!prev[event.thread_id]) return prev; + const next = { ...prev }; + delete next[event.thread_id]; + return next; + }); + // Update tool timeline setToolTimelineByThread(prev => { const existing = prev[event.thread_id] ?? []; @@ -561,6 +673,13 @@ const Conversations = () => { sendingTimeoutRef.current = null; } setIsSending(false); + // Clear inference status on error + setInferenceStatusByThread(prev => { + if (!prev[event.thread_id]) return prev; + const next = { ...prev }; + delete next[event.thread_id]; + return next; + }); setToolTimelineByThread(prev => { const existing = prev[event.thread_id] ?? []; if (existing.length === 0) return prev; @@ -973,6 +1092,9 @@ const Conversations = () => { const selectedThreadToolTimeline = selectedThreadId ? (toolTimelineByThread[selectedThreadId] ?? []) : []; + const selectedInferenceStatus = selectedThreadId + ? (inferenceStatusByThread[selectedThreadId] ?? null) + : null; const inlineCompletionSuffix = getInlineCompletionSuffix(inputValue, inlineSuggestionValue); return ( @@ -1153,6 +1275,23 @@ const Conversations = () => { )} + {/* Inference status indicator */} + {selectedInferenceStatus && ( +
+ + + {selectedInferenceStatus.phase === 'thinking' && + (selectedInferenceStatus.iteration > 0 + ? `Thinking (iteration ${selectedInferenceStatus.iteration})...` + : 'Thinking...')} + {selectedInferenceStatus.phase === 'tool_use' && + `Running ${selectedInferenceStatus.activeTool ?? 'tool'}...`} + {selectedInferenceStatus.phase === 'subagent' && + `Sub-agent ${selectedInferenceStatus.activeSubagent ?? ''} working...`} + +
+ )} + {/* Tool call timeline */} {selectedThreadToolTimeline.length > 0 && (
{selectedThreadToolTimeline.map(entry => ( diff --git a/app/src/services/chatService.ts b/app/src/services/chatService.ts index 4c6d9941f..264ff5738 100644 --- a/app/src/services/chatService.ts +++ b/app/src/services/chatService.ts @@ -6,9 +6,13 @@ * (tool_call, tool_result, chat_done, chat_error) via the web-channel * event bridge in the Rust core. */ +import debug from 'debug'; + import { callCoreRpc } from './coreRpcClient'; import { socketService } from './socketService'; +const chatLog = debug('realtime:chat'); + export interface ChatToolCallEvent { thread_id: string; request_id?: string; @@ -69,9 +73,51 @@ export interface ChatErrorEvent { round: number | null; } +/** Emitted when the agent turn begins (before the first LLM call). */ +export interface ChatInferenceStartEvent { + thread_id: string; + request_id: string; +} + +/** Emitted at the start of each LLM iteration in the tool loop. */ +export interface ChatIterationStartEvent { + thread_id: string; + request_id: string; + /** 1-based iteration index. */ + round: number; + message: string; +} + +/** Emitted when a sub-agent is spawned during tool execution. */ +export interface ChatSubagentSpawnedEvent { + thread_id: string; + request_id: string; + /** Agent definition id (e.g. "researcher"). */ + tool_name: string; + /** Per-spawn task id. */ + skill_id: string; + message: string; + round: number; +} + +/** Emitted when a sub-agent completes or fails. */ +export interface ChatSubagentDoneEvent { + thread_id: string; + request_id: string; + tool_name: string; + skill_id: string; + message: string; + success: boolean; + round: number; +} + export interface ChatEventListeners { + onInferenceStart?: (event: ChatInferenceStartEvent) => void; + onIterationStart?: (event: ChatIterationStartEvent) => void; onToolCall?: (event: ChatToolCallEvent) => void; onToolResult?: (event: ChatToolResultEvent) => void; + onSubagentSpawned?: (event: ChatSubagentSpawnedEvent) => void; + onSubagentDone?: (event: ChatSubagentDoneEvent) => void; onSegment?: (event: ChatSegmentEvent) => void; onDone?: (event: ChatDoneEvent) => void; onError?: (event: ChatErrorEvent) => void; @@ -86,39 +132,169 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void { // The core emits aliases for compatibility, but subscribing once avoids // processing the same logical event twice. const EVENTS = { + inferenceStart: 'inference_start', + iterationStart: 'iteration_start', toolCall: 'tool_call', toolResult: 'tool_result', + subagentSpawned: 'subagent_spawned', + subagentCompleted: 'subagent_completed', + subagentFailed: 'subagent_failed', segment: 'chat_segment', done: 'chat_done', error: 'chat_error', } as const; + if (listeners.onInferenceStart) { + const cb = (payload: unknown) => { + const e = payload as ChatInferenceStartEvent; + chatLog('%s thread_id=%s request_id=%s', EVENTS.inferenceStart, e.thread_id, e.request_id); + listeners.onInferenceStart?.(e); + }; + socket.on(EVENTS.inferenceStart, cb); + handlers.push([EVENTS.inferenceStart, cb]); + } + + if (listeners.onIterationStart) { + const cb = (payload: unknown) => { + const e = payload as ChatIterationStartEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d', + EVENTS.iterationStart, + e.thread_id, + e.request_id, + e.round + ); + listeners.onIterationStart?.(e); + }; + socket.on(EVENTS.iterationStart, cb); + handlers.push([EVENTS.iterationStart, cb]); + } + if (listeners.onToolCall) { - const cb = (payload: unknown) => listeners.onToolCall?.(payload as ChatToolCallEvent); + const cb = (payload: unknown) => { + const e = payload as ChatToolCallEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d tool=%s', + EVENTS.toolCall, + e.thread_id, + e.request_id, + e.round, + e.tool_name + ); + listeners.onToolCall?.(e); + }; socket.on(EVENTS.toolCall, cb); handlers.push([EVENTS.toolCall, cb]); } if (listeners.onToolResult) { - const cb = (payload: unknown) => listeners.onToolResult?.(payload as ChatToolResultEvent); + const cb = (payload: unknown) => { + const e = payload as ChatToolResultEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d tool=%s success=%s', + EVENTS.toolResult, + e.thread_id, + e.request_id, + e.round, + e.tool_name, + e.success + ); + listeners.onToolResult?.(e); + }; socket.on(EVENTS.toolResult, cb); handlers.push([EVENTS.toolResult, cb]); } + if (listeners.onSubagentSpawned) { + const cb = (payload: unknown) => { + const e = payload as ChatSubagentSpawnedEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d agent=%s', + EVENTS.subagentSpawned, + e.thread_id, + e.request_id, + e.round, + e.tool_name + ); + listeners.onSubagentSpawned?.(e); + }; + socket.on(EVENTS.subagentSpawned, cb); + handlers.push([EVENTS.subagentSpawned, cb]); + } + + if (listeners.onSubagentDone) { + const onCompleted = (payload: unknown) => { + const e = payload as ChatSubagentDoneEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d agent=%s success=%s', + EVENTS.subagentCompleted, + e.thread_id, + e.request_id, + e.round, + e.tool_name, + e.success + ); + listeners.onSubagentDone?.(e); + }; + socket.on(EVENTS.subagentCompleted, onCompleted); + handlers.push([EVENTS.subagentCompleted, onCompleted]); + + const onFailed = (payload: unknown) => { + const e = payload as ChatSubagentDoneEvent; + chatLog( + '%s thread_id=%s request_id=%s round=%d agent=%s success=%s', + EVENTS.subagentFailed, + e.thread_id, + e.request_id, + e.round, + e.tool_name, + e.success + ); + listeners.onSubagentDone?.(e); + }; + socket.on(EVENTS.subagentFailed, onFailed); + handlers.push([EVENTS.subagentFailed, onFailed]); + } + if (listeners.onSegment) { - const cb = (payload: unknown) => listeners.onSegment?.(payload as ChatSegmentEvent); + const cb = (payload: unknown) => { + const e = payload as ChatSegmentEvent; + chatLog( + '%s thread_id=%s request_id=%s segment=%d/%d', + EVENTS.segment, + e.thread_id, + e.request_id, + e.segment_index, + e.segment_total + ); + listeners.onSegment?.(e); + }; socket.on(EVENTS.segment, cb); handlers.push([EVENTS.segment, cb]); } if (listeners.onDone) { - const cb = (payload: unknown) => listeners.onDone?.(payload as ChatDoneEvent); + const cb = (payload: unknown) => { + const e = payload as ChatDoneEvent; + chatLog('%s thread_id=%s request_id=%s', EVENTS.done, e.thread_id, e.request_id); + listeners.onDone?.(e); + }; socket.on(EVENTS.done, cb); handlers.push([EVENTS.done, cb]); } if (listeners.onError) { - const cb = (payload: unknown) => listeners.onError?.(payload as ChatErrorEvent); + const cb = (payload: unknown) => { + const e = payload as ChatErrorEvent; + chatLog( + '%s thread_id=%s request_id=%s error_type=%s', + EVENTS.error, + e.thread_id, + e.request_id, + e.error_type + ); + listeners.onError?.(e); + }; socket.on(EVENTS.error, cb); handlers.push([EVENTS.error, cb]); } diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 9553a9678..00f83f41e 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -290,6 +290,7 @@ impl AgentBuilder { session_transcript_path: None, cached_transcript_messages: None, context, + on_progress: None, }) } } diff --git a/src/openhuman/agent/harness/session/runtime.rs b/src/openhuman/agent/harness/session/runtime.rs index 34c29c5c7..f5dda163d 100644 --- a/src/openhuman/agent/harness/session/runtime.rs +++ b/src/openhuman/agent/harness/session/runtime.rs @@ -110,6 +110,18 @@ impl Agent { self.event_channel = channel.into(); } + /// Attach a progress event sender for real-time turn updates. + /// + /// When set, the turn loop emits [`AgentProgress`] events so + /// callers (e.g. the web channel) can surface live tool-call and + /// iteration updates to the UI. Pass `None` to disable. + pub fn set_on_progress( + &mut self, + tx: Option>, + ) { + self.on_progress = tx; + } + /// Clears the agent's conversation history. pub fn clear_history(&mut self) { self.history.clear(); diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index 0f2fe4bc3..a743ec469 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -25,6 +25,7 @@ use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::agent::dispatcher::{ParsedToolCall, ToolExecutionResult}; use crate::openhuman::agent::harness; use crate::openhuman::agent::hooks::{self, ToolCallRecord, TurnContext}; +use crate::openhuman::agent::progress::AgentProgress; use crate::openhuman::context::prompt::{ LearnedContextData, PromptContext, PromptTool, RenderedPrompt, }; @@ -44,6 +45,7 @@ impl Agent { /// and returns the final assistant response. pub async fn turn(&mut self, user_message: &str) -> Result { let turn_started = std::time::Instant::now(); + self.emit_progress(AgentProgress::TurnStarted); log::info!("[agent] turn started — awaiting user message processing"); log::info!( "[agent_loop] turn start message_chars={} history_len={} max_tool_iterations={}", @@ -166,6 +168,10 @@ impl Agent { let turn_body = async { for iteration in 0..self.config.max_tool_iterations { + self.emit_progress(AgentProgress::IterationStarted { + iteration: (iteration + 1) as u32, + max_iterations: self.config.max_tool_iterations as u32, + }); log::info!( "[agent_loop] iteration start i={} history_len={}", iteration + 1, @@ -342,6 +348,10 @@ impl Agent { final_text.chars().count() ); + self.emit_progress(AgentProgress::TurnCompleted { + iterations: (iteration + 1) as u32, + }); + self.history .push(ConversationMessage::Chat(ChatMessage::assistant( final_text.clone(), @@ -426,7 +436,7 @@ impl Agent { tool_calls: persisted_tool_calls, }); - let (results, records) = self.execute_tools(&calls).await; + let (results, records) = self.execute_tools(&calls, iteration).await; all_tool_records.extend(records); log::info!( "[agent_loop] tool results complete i={} result_count={}", @@ -524,12 +534,18 @@ impl Agent { pub(super) async fn execute_tool_call( &self, call: &ParsedToolCall, + iteration: usize, ) -> (ToolExecutionResult, ToolCallRecord) { let started = std::time::Instant::now(); publish_global(DomainEvent::ToolExecutionStarted { tool_name: call.name.clone(), session_id: self.event_session_id().to_string(), }); + self.emit_progress(AgentProgress::ToolCallStarted { + tool_name: call.name.clone(), + arguments: call.arguments.clone(), + iteration: (iteration + 1) as u32, + }); log::info!("[agent] executing tool: {}", call.name); log::info!("[agent_loop] tool start name={}", call.name); @@ -612,6 +628,13 @@ impl Agent { success, elapsed_ms, }); + self.emit_progress(AgentProgress::ToolCallCompleted { + tool_name: call.name.clone(), + success, + output_chars: result.chars().count(), + elapsed_ms, + iteration: (iteration + 1) as u32, + }); log::info!( "[agent] tool completed: {} success={} elapsed_ms={}", call.name, @@ -655,11 +678,12 @@ impl Agent { pub(super) async fn execute_tools( &self, calls: &[ParsedToolCall], + iteration: usize, ) -> (Vec, Vec) { let mut results = Vec::with_capacity(calls.len()); let mut records = Vec::with_capacity(calls.len()); for call in calls { - let (exec_result, record) = self.execute_tool_call(call).await; + let (exec_result, record) = self.execute_tool_call(call, iteration).await; results.push(exec_result); records.push(record); } @@ -727,6 +751,13 @@ impl Agent { // History & prompt helpers // ───────────────────────────────────────────────────────────────── + /// Emit a progress event (fire-and-forget) if the sender is set. + fn emit_progress(&self, event: AgentProgress) { + if let Some(ref tx) = self.on_progress { + let _ = tx.try_send(event); + } + } + /// Truncates the conversation history to the configured maximum message count. /// /// System messages are always preserved. Older non-system messages are diff --git a/src/openhuman/agent/harness/session/types.rs b/src/openhuman/agent/harness/session/types.rs index e3f7529c2..05f37f94a 100644 --- a/src/openhuman/agent/harness/session/types.rs +++ b/src/openhuman/agent/harness/session/types.rs @@ -9,6 +9,7 @@ use crate::openhuman::agent::dispatcher::ToolDispatcher; use crate::openhuman::agent::hooks::PostTurnHook; use crate::openhuman::agent::memory_loader::MemoryLoader; +use crate::openhuman::agent::progress::AgentProgress; use crate::openhuman::context::prompt::SystemPromptBuilder; use crate::openhuman::context::ContextManager; use crate::openhuman::memory::Memory; @@ -80,6 +81,11 @@ pub struct Agent { /// session-memory deltas persist across turns. See /// [`crate::openhuman::context`] for the full surface. pub(super) context: ContextManager, + /// Optional progress event sender for real-time turn progress. + /// When set, the turn loop emits [`AgentProgress`] events through + /// this channel so callers (e.g. web channel) can surface live + /// tool-call and iteration updates to the UI. + pub(super) on_progress: Option>, } /// A builder for creating `Agent` instances with custom configuration. diff --git a/src/openhuman/agent/mod.rs b/src/openhuman/agent/mod.rs index 082c47080..f3372f416 100644 --- a/src/openhuman/agent/mod.rs +++ b/src/openhuman/agent/mod.rs @@ -8,6 +8,7 @@ pub mod host_runtime; pub mod memory_loader; pub mod multimodal; pub mod pformat; +pub mod progress; mod schemas; pub mod triage; pub use schemas::{ diff --git a/src/openhuman/agent/progress.rs b/src/openhuman/agent/progress.rs new file mode 100644 index 000000000..3eef692e8 --- /dev/null +++ b/src/openhuman/agent/progress.rs @@ -0,0 +1,70 @@ +//! Real-time progress events emitted during an agent turn. +//! +//! Consumers (e.g. the web channel provider) create an +//! `mpsc::Sender` and attach it to the [`Agent`] via +//! [`Agent::set_on_progress`] before calling [`Agent::run_single`]. +//! The agent's turn loop sends events through this channel as it +//! progresses — tool calls starting/completing, iteration boundaries, +//! sub-agent lifecycle, etc. +//! +//! This is intentionally separate from [`DomainEvent`] (the global +//! broadcast bus) because progress events are **per-request scoped**: +//! they carry no routing info (client_id, thread_id) — the consumer +//! that created the channel already knows those and tags the outgoing +//! socket events accordingly. + +/// A real-time progress event emitted during an agent turn. +#[derive(Debug, Clone)] +pub enum AgentProgress { + /// The turn has started (about to enter the iteration loop). + TurnStarted, + + /// A new LLM iteration is starting. + IterationStarted { + /// 1-based iteration index. + iteration: u32, + /// Maximum iterations configured for this turn. + max_iterations: u32, + }, + + /// The LLM responded and the agent is about to execute a tool. + ToolCallStarted { + tool_name: String, + arguments: serde_json::Value, + /// 1-based iteration index. + iteration: u32, + }, + + /// A tool execution completed (success or failure). + ToolCallCompleted { + tool_name: String, + success: bool, + output_chars: usize, + elapsed_ms: u64, + /// 1-based iteration index. + iteration: u32, + }, + + /// A sub-agent was spawned during tool execution. + SubagentSpawned { agent_id: String, task_id: String }, + + /// A sub-agent completed successfully. + SubagentCompleted { + agent_id: String, + task_id: String, + elapsed_ms: u64, + }, + + /// A sub-agent failed. + SubagentFailed { + agent_id: String, + task_id: String, + error: String, + }, + + /// The turn completed with a final text response. + TurnCompleted { + /// Total iterations used. + iterations: u32, + }, +} diff --git a/src/openhuman/channels/providers/web.rs b/src/openhuman/channels/providers/web.rs index 482687d93..c626b3ca6 100644 --- a/src/openhuman/channels/providers/web.rs +++ b/src/openhuman/channels/providers/web.rs @@ -11,7 +11,6 @@ use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; use crate::openhuman::agent::Agent; use crate::openhuman::config::rpc as config_rpc; use crate::openhuman::config::Config; -use crate::openhuman::providers::ConversationMessage; use crate::rpc::RpcOutcome; use super::presentation; @@ -262,16 +261,22 @@ async fn run_chat_task( )?, }; - let history_before = agent.history().len(); + // Wire up a real-time progress channel so tool calls, iterations, + // and sub-agent events are emitted to the web channel as they happen + // (instead of retroactively after the loop finishes). + let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(64); + agent.set_on_progress(Some(progress_tx)); + spawn_progress_bridge( + progress_rx, + client_id.to_string(), + thread_id.to_string(), + request_id.to_string(), + ); + let result = agent.run_single(message).await.map_err(|e| e.to_string()); - if result.is_ok() { - publish_tool_events_from_history( - client_id, - thread_id, - request_id, - &agent.history()[history_before..], - ); - } + + // Clear the sender so it doesn't hold the channel open across sessions. + agent.set_on_progress(None); { let mut sessions = THREAD_SESSIONS.lock().await; @@ -288,101 +293,198 @@ async fn run_chat_task( result } -fn publish_tool_events_from_history( - client_id: &str, - thread_id: &str, - request_id: &str, - messages: &[ConversationMessage], +/// Spawn a background task that reads [`AgentProgress`] events from the +/// agent turn loop and translates them into [`WebChannelEvent`]s tagged +/// with the correct client/thread/request IDs. The task runs until the +/// sender is dropped (i.e. when the agent turn finishes). +fn spawn_progress_bridge( + mut rx: tokio::sync::mpsc::Receiver, + client_id: String, + thread_id: String, + request_id: String, ) { - let mut round: u32 = 0; - let mut current_round_calls: Vec<(String, String)> = Vec::new(); - - for message in messages { - match message { - ConversationMessage::AssistantToolCalls { tool_calls, .. } => { - round += 1; - current_round_calls.clear(); - for (idx, call) in tool_calls.iter().enumerate() { - let synthetic_id = if call.id.trim().is_empty() { - format!("idx:{idx}") - } else { - call.id.clone() - }; - current_round_calls.push((synthetic_id, call.name.clone())); + use crate::openhuman::agent::progress::AgentProgress; + + tokio::spawn(async move { + let mut round: u32 = 0; + while let Some(event) = rx.recv().await { + match event { + AgentProgress::TurnStarted => { + publish_web_channel_event(WebChannelEvent { + event: "inference_start".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: None, + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: None, + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::IterationStarted { + iteration, + max_iterations, + } => { + round = iteration; + publish_web_channel_event(WebChannelEvent { + event: "iteration_start".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!("Iteration {iteration}/{max_iterations}")), + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: Some(iteration), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::ToolCallStarted { + tool_name, + arguments, + iteration, + } => { publish_web_channel_event(WebChannelEvent { event: "tool_call".to_string(), - client_id: client_id.to_string(), - thread_id: thread_id.to_string(), - request_id: request_id.to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), full_response: None, message: None, error_type: None, - tool_name: Some(call.name.clone()), + tool_name: Some(tool_name), skill_id: Some("web_channel".to_string()), - args: Some(parse_tool_args(&call.arguments)), + args: Some(arguments), output: None, success: None, - round: Some(round), + round: Some(iteration), reaction_emoji: None, segment_index: None, segment_total: None, }); } - } - ConversationMessage::ToolResults(results) => { - for (idx, result) in results.iter().enumerate() { - let fallback = format!("idx:{idx}"); - let tool_name = current_round_calls - .iter() - .find(|(tool_call_id, _)| tool_call_id == &result.tool_call_id) - .or_else(|| current_round_calls.get(idx)) - .map(|(_, name)| name.clone()) - .unwrap_or_else(|| "unknown".to_string()); - - let success = !result.content.trim_start().starts_with("Error:"); + AgentProgress::ToolCallCompleted { + tool_name, + success, + output_chars, + elapsed_ms, + iteration, + } => { publish_web_channel_event(WebChannelEvent { event: "tool_result".to_string(), - client_id: client_id.to_string(), - thread_id: thread_id.to_string(), - request_id: request_id.to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), full_response: None, message: None, error_type: None, tool_name: Some(tool_name), skill_id: Some("web_channel".to_string()), - args: Some(Value::Object(Map::from_iter([( - "tool_call_id".to_string(), - Value::String(if result.tool_call_id.is_empty() { - fallback - } else { - result.tool_call_id.clone() - }), - )]))), - output: Some(result.content.clone()), + args: None, + output: Some( + json!({"output_chars": output_chars, "elapsed_ms": elapsed_ms}) + .to_string(), + ), success: Some(success), - round: Some(round.max(1)), + round: Some(iteration), reaction_emoji: None, segment_index: None, segment_total: None, }); } + AgentProgress::SubagentSpawned { agent_id, task_id } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_spawned".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!("Sub-agent '{agent_id}' spawned")), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: None, + round: Some(round), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::SubagentCompleted { + agent_id, + task_id, + elapsed_ms, + } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_completed".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!( + "Sub-agent '{agent_id}' completed in {elapsed_ms}ms" + )), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: Some(true), + round: Some(round), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::SubagentFailed { + agent_id, + task_id, + error, + } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_failed".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(error), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: Some(false), + round: Some(round), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::TurnCompleted { iterations } => { + log::debug!( + "[web_channel] turn completed after {iterations} iteration(s) \ + client_id={client_id} thread_id={thread_id} request_id={request_id}" + ); + } } - ConversationMessage::Chat(_) => {} } - } -} - -fn parse_tool_args(arguments: &str) -> Value { - if arguments.trim().is_empty() { - return Value::Object(Map::new()); - } - match serde_json::from_str::(arguments) { - Ok(value) => value, - Err(_) => Value::Object(Map::from_iter([( - "raw".to_string(), - Value::String(arguments.to_string()), - )])), - } + }); } fn normalize_model_override(model_override: Option) -> Option { @@ -594,8 +696,7 @@ fn to_json(outcome: RpcOutcome) -> Result #[cfg(test)] mod tests { - use super::{cancel_chat, parse_tool_args, start_chat}; - use serde_json::json; + use super::{cancel_chat, start_chat}; #[tokio::test] async fn start_chat_validates_required_fields() { @@ -627,14 +728,4 @@ mod tests { .expect_err("thread id should be required"); assert!(err.contains("thread_id is required")); } - - #[test] - fn parse_tool_args_handles_json_and_raw_fallback() { - assert_eq!( - parse_tool_args(r#"{"command":"date"}"#), - json!({"command":"date"}) - ); - assert_eq!(parse_tool_args(""), json!({})); - assert_eq!(parse_tool_args("not-json"), json!({"raw":"not-json"})); - } } diff --git a/tests/json_rpc_e2e.rs b/tests/json_rpc_e2e.rs index 1f663a119..7bc807fe0 100644 --- a/tests/json_rpc_e2e.rs +++ b/tests/json_rpc_e2e.rs @@ -441,6 +441,7 @@ async fn post_json_rpc(rpc_base: &str, id: i64, method: &str, params: Value) -> .unwrap_or_else(|e| panic!("json for {method}: {e}")) } +#[allow(dead_code)] async fn read_first_sse_event(events_url: &str) -> Value { let client = reqwest::Client::builder() .timeout(Duration::from_secs(120)) @@ -484,6 +485,54 @@ async fn read_first_sse_event(events_url: &str) -> Value { panic!("SSE stream ended before any event payload"); } +/// Read SSE events until one matches the given `event` field value, skipping +/// progress events (inference_start, iteration_start, etc.) that precede the +/// terminal event. +async fn read_sse_event_by_type(events_url: &str, target_event: &str) -> Value { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(120)) + .build() + .expect("client"); + let resp = client + .get(events_url) + .send() + .await + .unwrap_or_else(|e| panic!("GET {events_url}: {e}")); + assert!( + resp.status().is_success(), + "SSE HTTP error {} for {}", + resp.status(), + events_url + ); + + let mut stream = resp.bytes_stream(); + let mut buffer = String::new(); + while let Some(item) = stream.next().await { + let chunk = item.unwrap_or_else(|e| panic!("sse stream read failed: {e}")); + let text = std::str::from_utf8(&chunk).unwrap_or(""); + buffer.push_str(text); + while let Some(idx) = buffer.find("\n\n") { + let block = buffer[..idx].to_string(); + buffer = buffer[idx + 2..].to_string(); + let mut data_lines = Vec::new(); + for line in block.lines() { + if let Some(data) = line.strip_prefix("data:") { + data_lines.push(data.trim_start()); + } + } + if !data_lines.is_empty() { + let payload = data_lines.join("\n"); + let value: Value = serde_json::from_str(&payload) + .unwrap_or_else(|e| panic!("invalid sse data json: {e}")); + if value.get("event").and_then(Value::as_str) == Some(target_event) { + return value; + } + } + } + } + panic!("SSE stream ended before receiving '{target_event}' event"); +} + fn assert_no_jsonrpc_error<'a>(v: &'a Value, context: &str) -> &'a Value { if let Some(err) = v.get("error") { panic!("{context}: JSON-RPC error: {err}"); @@ -653,7 +702,8 @@ async fn json_rpc_protocol_auth_and_agent_hello() { let client_id = "e2e-client-1"; let thread_id = "thread-1"; let events_url = format!("{}/events?client_id={}", rpc_base, client_id); - let sse_task = tokio::spawn(async move { read_first_sse_event(&events_url).await }); + let sse_task = + tokio::spawn(async move { read_sse_event_by_type(&events_url, "chat_done").await }); let web_chat = post_json_rpc( &rpc_base,