From f242f6b10f8c5bf2a02d799d81a127af13323d85 Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 01:54:48 -0700 Subject: [PATCH 1/3] feat(conversations): implement real-time inference status tracking - Added new event listeners for inference start, iteration start, subagent spawning, and completion to track the live state of chat interactions. - Introduced an `InferenceStatus` interface to manage the current phase and active tools/subagents for each thread. - Updated the UI to display inference status indicators, enhancing user experience during chat interactions. - Created a new `progress` module in the Rust backend to emit real-time progress events, allowing for better integration with the web channel. - Refactored the `subscribeChatEvents` function to include new event handlers for managing inference and subagent events, improving clarity and maintainability of the event handling logic. --- app/src/pages/Conversations.tsx | 140 ++++++++++ app/src/services/chatService.ts | 80 ++++++ .../agent/harness/session/builder.rs | 1 + .../agent/harness/session/runtime.rs | 9 + src/openhuman/agent/harness/session/turn.rs | 35 ++- src/openhuman/agent/harness/session/types.rs | 6 + src/openhuman/agent/mod.rs | 1 + src/openhuman/agent/progress.rs | 73 ++++++ src/openhuman/channels/providers/web.rs | 243 +++++++++++++----- 9 files changed, 523 insertions(+), 65 deletions(-) create mode 100644 src/openhuman/agent/progress.rs diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 68db0e6ff..0a7aaa0b4 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -10,7 +10,11 @@ import UsageLimitModal from '../components/upsell/UsageLimitModal'; import { useUsageState } from '../hooks/useUsageState'; import { chatCancel, + type ChatInferenceStartEvent, + type ChatIterationStartEvent, type ChatSegmentEvent, + type ChatSubagentDoneEvent, + type ChatSubagentSpawnedEvent, chatSend, type ChatToolCallEvent, type ChatToolResultEvent, @@ -53,6 +57,19 @@ const AGENTIC_MODEL_ID = 'agentic-v1'; type ToolTimelineEntryStatus = 'running' | 'success' | 'error'; type InputMode = 'text' | 'voice'; type ReplyMode = 'text' | 'voice'; + +/** Tracks the live inference state for a thread's in-flight request. */ +interface InferenceStatus { + /** Current phase: thinking (LLM call), tool_use, subagent. */ + phase: 'thinking' | 'tool_use' | 'subagent'; + /** 1-based iteration index. */ + iteration: number; + maxIterations: number; + /** Active tool name (when phase is tool_use). */ + activeTool?: string; + /** Active sub-agent id (when phase is subagent). */ + activeSubagent?: string; +} const AUTOCOMPLETE_POLL_DEBOUNCE_MS = 320; const AUTOCOMPLETE_MIN_CONTEXT_CHARS = 3; @@ -187,6 +204,9 @@ const Conversations = () => { const [toolTimelineByThread, setToolTimelineByThread] = useState< Record >({}); + const [inferenceStatusByThread, setInferenceStatusByThread] = useState< + Record + >({}); const rustChat = useRustChat(); const defaultChannelType = useAppSelector( state => state.channelConnections?.defaultMessagingChannel ?? 'web' @@ -431,7 +451,36 @@ const Conversations = () => { if (!rustChat || socketStatus !== 'connected') return; const cleanup = subscribeChatEvents({ + onInferenceStart: (event: ChatInferenceStartEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + phase: 'thinking', + iteration: 0, + maxIterations: 0, + }, + })); + }, + onIterationStart: (event: ChatIterationStartEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + phase: 'thinking', + iteration: event.round, + maxIterations: prev[event.thread_id]?.maxIterations ?? 0, + }, + })); + }, onToolCall: (event: ChatToolCallEvent) => { + // Update inference status to show active tool + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + ...(prev[event.thread_id] ?? { iteration: event.round, maxIterations: 0 }), + phase: 'tool_use' as const, + activeTool: event.tool_name, + }, + })); const eventKey = `tool_call:${event.thread_id}:${event.request_id ?? 'none'}:${event.round}:${event.tool_name}`; if (!markChatEventSeen(eventKey)) return; @@ -477,6 +526,62 @@ const Conversations = () => { if (!changed) return prev; return { ...prev, [event.thread_id]: nextEntries }; }); + // Tool completed — go back to thinking for the next iteration + setInferenceStatusByThread(prev => { + const current = prev[event.thread_id]; + if (!current) return prev; + return { + ...prev, + [event.thread_id]: { ...current, phase: 'thinking', activeTool: undefined }, + }; + }); + }, + onSubagentSpawned: (event: ChatSubagentSpawnedEvent) => { + setInferenceStatusByThread(prev => ({ + ...prev, + [event.thread_id]: { + ...(prev[event.thread_id] ?? { iteration: event.round, maxIterations: 0 }), + phase: 'subagent' as const, + activeSubagent: event.tool_name, + }, + })); + // Add sub-agent to tool timeline for visual tracking + setToolTimelineByThread(prev => { + const existing = prev[event.thread_id] ?? []; + return { + ...prev, + [event.thread_id]: [ + ...existing, + { + id: `${event.thread_id}:subagent:${event.skill_id}:${event.tool_name}`, + name: `🤖 ${event.tool_name}`, + round: event.round, + status: 'running' as const, + }, + ], + }; + }); + }, + onSubagentDone: (event: ChatSubagentDoneEvent) => { + setToolTimelineByThread(prev => { + const existing = prev[event.thread_id] ?? []; + return { + ...prev, + [event.thread_id]: existing.map(entry => + entry.name === `🤖 ${event.tool_name}` && entry.status === 'running' + ? { ...entry, status: (event.success ? 'success' : 'error') as ToolTimelineEntryStatus } + : entry + ), + }; + }); + setInferenceStatusByThread(prev => { + const current = prev[event.thread_id]; + if (!current) return prev; + return { + ...prev, + [event.thread_id]: { ...current, phase: 'thinking', activeSubagent: undefined }, + }; + }); }, onSegment: (event: ChatSegmentEvent) => { const eventKey = `segment:${event.thread_id}:${event.request_id}:${event.segment_index}`; @@ -502,6 +607,14 @@ const Conversations = () => { const eventKey = `done:${event.thread_id}:${event.request_id ?? 'none'}`; if (!markChatEventSeen(eventKey)) return; + // Clear inference status — the turn is finished + setInferenceStatusByThread(prev => { + if (!prev[event.thread_id]) return prev; + const next = { ...prev }; + delete next[event.thread_id]; + return next; + }); + // Update tool timeline setToolTimelineByThread(prev => { const existing = prev[event.thread_id] ?? []; @@ -561,6 +674,13 @@ const Conversations = () => { sendingTimeoutRef.current = null; } setIsSending(false); + // Clear inference status on error + setInferenceStatusByThread(prev => { + if (!prev[event.thread_id]) return prev; + const next = { ...prev }; + delete next[event.thread_id]; + return next; + }); setToolTimelineByThread(prev => { const existing = prev[event.thread_id] ?? []; if (existing.length === 0) return prev; @@ -973,6 +1093,9 @@ const Conversations = () => { const selectedThreadToolTimeline = selectedThreadId ? (toolTimelineByThread[selectedThreadId] ?? []) : []; + const selectedInferenceStatus = selectedThreadId + ? inferenceStatusByThread[selectedThreadId] ?? null + : null; const inlineCompletionSuffix = getInlineCompletionSuffix(inputValue, inlineSuggestionValue); return ( @@ -1153,6 +1276,23 @@ const Conversations = () => { )} + {/* Inference status indicator */} + {selectedInferenceStatus && ( +
+ + + {selectedInferenceStatus.phase === 'thinking' && + (selectedInferenceStatus.iteration > 0 + ? `Thinking (iteration ${selectedInferenceStatus.iteration})...` + : 'Thinking...')} + {selectedInferenceStatus.phase === 'tool_use' && + `Running ${selectedInferenceStatus.activeTool ?? 'tool'}...`} + {selectedInferenceStatus.phase === 'subagent' && + `Sub-agent ${selectedInferenceStatus.activeSubagent ?? ''} working...`} + +
+ )} + {/* Tool call timeline */} {selectedThreadToolTimeline.length > 0 && (
{selectedThreadToolTimeline.map(entry => ( diff --git a/app/src/services/chatService.ts b/app/src/services/chatService.ts index 4c6d9941f..2734e86a6 100644 --- a/app/src/services/chatService.ts +++ b/app/src/services/chatService.ts @@ -69,9 +69,51 @@ export interface ChatErrorEvent { round: number | null; } +/** Emitted when the agent turn begins (before the first LLM call). */ +export interface ChatInferenceStartEvent { + thread_id: string; + request_id: string; +} + +/** Emitted at the start of each LLM iteration in the tool loop. */ +export interface ChatIterationStartEvent { + thread_id: string; + request_id: string; + /** 1-based iteration index. */ + round: number; + message: string; +} + +/** Emitted when a sub-agent is spawned during tool execution. */ +export interface ChatSubagentSpawnedEvent { + thread_id: string; + request_id: string; + /** Agent definition id (e.g. "researcher"). */ + tool_name: string; + /** Per-spawn task id. */ + skill_id: string; + message: string; + round: number; +} + +/** Emitted when a sub-agent completes or fails. */ +export interface ChatSubagentDoneEvent { + thread_id: string; + request_id: string; + tool_name: string; + skill_id: string; + message: string; + success: boolean; + round: number; +} + export interface ChatEventListeners { + onInferenceStart?: (event: ChatInferenceStartEvent) => void; + onIterationStart?: (event: ChatIterationStartEvent) => void; onToolCall?: (event: ChatToolCallEvent) => void; onToolResult?: (event: ChatToolResultEvent) => void; + onSubagentSpawned?: (event: ChatSubagentSpawnedEvent) => void; + onSubagentDone?: (event: ChatSubagentDoneEvent) => void; onSegment?: (event: ChatSegmentEvent) => void; onDone?: (event: ChatDoneEvent) => void; onError?: (event: ChatErrorEvent) => void; @@ -86,13 +128,32 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void { // The core emits aliases for compatibility, but subscribing once avoids // processing the same logical event twice. const EVENTS = { + inferenceStart: 'inference_start', + iterationStart: 'iteration_start', toolCall: 'tool_call', toolResult: 'tool_result', + subagentSpawned: 'subagent_spawned', + subagentCompleted: 'subagent_completed', + subagentFailed: 'subagent_failed', segment: 'chat_segment', done: 'chat_done', error: 'chat_error', } as const; + if (listeners.onInferenceStart) { + const cb = (payload: unknown) => + listeners.onInferenceStart?.(payload as ChatInferenceStartEvent); + socket.on(EVENTS.inferenceStart, cb); + handlers.push([EVENTS.inferenceStart, cb]); + } + + if (listeners.onIterationStart) { + const cb = (payload: unknown) => + listeners.onIterationStart?.(payload as ChatIterationStartEvent); + socket.on(EVENTS.iterationStart, cb); + handlers.push([EVENTS.iterationStart, cb]); + } + if (listeners.onToolCall) { const cb = (payload: unknown) => listeners.onToolCall?.(payload as ChatToolCallEvent); socket.on(EVENTS.toolCall, cb); @@ -105,6 +166,25 @@ export function subscribeChatEvents(listeners: ChatEventListeners): () => void { handlers.push([EVENTS.toolResult, cb]); } + if (listeners.onSubagentSpawned) { + const cb = (payload: unknown) => + listeners.onSubagentSpawned?.(payload as ChatSubagentSpawnedEvent); + socket.on(EVENTS.subagentSpawned, cb); + handlers.push([EVENTS.subagentSpawned, cb]); + } + + if (listeners.onSubagentDone) { + const onCompleted = (payload: unknown) => + listeners.onSubagentDone?.(payload as ChatSubagentDoneEvent); + socket.on(EVENTS.subagentCompleted, onCompleted); + handlers.push([EVENTS.subagentCompleted, onCompleted]); + + const onFailed = (payload: unknown) => + listeners.onSubagentDone?.(payload as ChatSubagentDoneEvent); + socket.on(EVENTS.subagentFailed, onFailed); + handlers.push([EVENTS.subagentFailed, onFailed]); + } + if (listeners.onSegment) { const cb = (payload: unknown) => listeners.onSegment?.(payload as ChatSegmentEvent); socket.on(EVENTS.segment, cb); diff --git a/src/openhuman/agent/harness/session/builder.rs b/src/openhuman/agent/harness/session/builder.rs index 0c16abba9..b887a060c 100644 --- a/src/openhuman/agent/harness/session/builder.rs +++ b/src/openhuman/agent/harness/session/builder.rs @@ -276,6 +276,7 @@ impl AgentBuilder { .unwrap_or_else(|| "standalone".to_string()), event_channel: self.event_channel.unwrap_or_else(|| "internal".to_string()), context, + on_progress: None, }) } } diff --git a/src/openhuman/agent/harness/session/runtime.rs b/src/openhuman/agent/harness/session/runtime.rs index 34c29c5c7..dcceddc5a 100644 --- a/src/openhuman/agent/harness/session/runtime.rs +++ b/src/openhuman/agent/harness/session/runtime.rs @@ -110,6 +110,15 @@ impl Agent { self.event_channel = channel.into(); } + /// Attach a progress event sender for real-time turn updates. + /// + /// When set, the turn loop emits [`AgentProgress`] events so + /// callers (e.g. the web channel) can surface live tool-call and + /// iteration updates to the UI. Pass `None` to disable. + pub fn set_on_progress(&mut self, tx: Option>) { + self.on_progress = tx; + } + /// Clears the agent's conversation history. pub fn clear_history(&mut self) { self.history.clear(); diff --git a/src/openhuman/agent/harness/session/turn.rs b/src/openhuman/agent/harness/session/turn.rs index a6dba1070..3cb2a0e13 100644 --- a/src/openhuman/agent/harness/session/turn.rs +++ b/src/openhuman/agent/harness/session/turn.rs @@ -24,6 +24,7 @@ use crate::core::event_bus::{publish_global, DomainEvent}; use crate::openhuman::agent::dispatcher::{ParsedToolCall, ToolExecutionResult}; use crate::openhuman::agent::harness; use crate::openhuman::agent::hooks::{self, ToolCallRecord, TurnContext}; +use crate::openhuman::agent::progress::AgentProgress; use crate::openhuman::context::prompt::{ LearnedContextData, PromptContext, PromptTool, RenderedPrompt, }; @@ -43,6 +44,7 @@ impl Agent { /// and returns the final assistant response. pub async fn turn(&mut self, user_message: &str) -> Result { let turn_started = std::time::Instant::now(); + self.emit_progress(AgentProgress::TurnStarted); log::info!("[agent] turn started — awaiting user message processing"); log::info!( "[agent_loop] turn start message_chars={} history_len={} max_tool_iterations={}", @@ -147,6 +149,10 @@ impl Agent { let turn_body = async { for iteration in 0..self.config.max_tool_iterations { + self.emit_progress(AgentProgress::IterationStarted { + iteration: (iteration + 1) as u32, + max_iterations: self.config.max_tool_iterations as u32, + }); log::info!( "[agent_loop] iteration start i={} history_len={}", iteration + 1, @@ -298,6 +304,10 @@ impl Agent { final_text.chars().count() ); + self.emit_progress(AgentProgress::TurnCompleted { + iterations: (iteration + 1) as u32, + }); + self.history .push(ConversationMessage::Chat(ChatMessage::assistant( final_text.clone(), @@ -382,7 +392,7 @@ impl Agent { tool_calls: persisted_tool_calls, }); - let (results, records) = self.execute_tools(&calls).await; + let (results, records) = self.execute_tools(&calls, iteration).await; all_tool_records.extend(records); log::info!( "[agent_loop] tool results complete i={} result_count={}", @@ -465,12 +475,18 @@ impl Agent { pub(super) async fn execute_tool_call( &self, call: &ParsedToolCall, + iteration: usize, ) -> (ToolExecutionResult, ToolCallRecord) { let started = std::time::Instant::now(); publish_global(DomainEvent::ToolExecutionStarted { tool_name: call.name.clone(), session_id: self.event_session_id().to_string(), }); + self.emit_progress(AgentProgress::ToolCallStarted { + tool_name: call.name.clone(), + arguments: call.arguments.clone(), + iteration: (iteration + 1) as u32, + }); log::info!("[agent] executing tool: {}", call.name); log::info!("[agent_loop] tool start name={}", call.name); @@ -553,6 +569,13 @@ impl Agent { success, elapsed_ms, }); + self.emit_progress(AgentProgress::ToolCallCompleted { + tool_name: call.name.clone(), + success, + output_chars: result.chars().count(), + elapsed_ms, + iteration: (iteration + 1) as u32, + }); log::info!( "[agent] tool completed: {} success={} elapsed_ms={}", call.name, @@ -596,11 +619,12 @@ impl Agent { pub(super) async fn execute_tools( &self, calls: &[ParsedToolCall], + iteration: usize, ) -> (Vec, Vec) { let mut results = Vec::with_capacity(calls.len()); let mut records = Vec::with_capacity(calls.len()); for call in calls { - let (exec_result, record) = self.execute_tool_call(call).await; + let (exec_result, record) = self.execute_tool_call(call, iteration).await; results.push(exec_result); records.push(record); } @@ -668,6 +692,13 @@ impl Agent { // History & prompt helpers // ───────────────────────────────────────────────────────────────── + /// Emit a progress event (fire-and-forget) if the sender is set. + fn emit_progress(&self, event: AgentProgress) { + if let Some(ref tx) = self.on_progress { + let _ = tx.try_send(event); + } + } + /// Truncates the conversation history to the configured maximum message count. /// /// System messages are always preserved. Older non-system messages are diff --git a/src/openhuman/agent/harness/session/types.rs b/src/openhuman/agent/harness/session/types.rs index c4581f766..c089402dd 100644 --- a/src/openhuman/agent/harness/session/types.rs +++ b/src/openhuman/agent/harness/session/types.rs @@ -9,6 +9,7 @@ use crate::openhuman::agent::dispatcher::ToolDispatcher; use crate::openhuman::agent::hooks::PostTurnHook; use crate::openhuman::agent::memory_loader::MemoryLoader; +use crate::openhuman::agent::progress::AgentProgress; use crate::openhuman::context::prompt::SystemPromptBuilder; use crate::openhuman::context::ContextManager; use crate::openhuman::memory::Memory; @@ -67,6 +68,11 @@ pub struct Agent { /// session-memory deltas persist across turns. See /// [`crate::openhuman::context`] for the full surface. pub(super) context: ContextManager, + /// Optional progress event sender for real-time turn progress. + /// When set, the turn loop emits [`AgentProgress`] events through + /// this channel so callers (e.g. web channel) can surface live + /// tool-call and iteration updates to the UI. + pub(super) on_progress: Option>, } /// A builder for creating `Agent` instances with custom configuration. diff --git a/src/openhuman/agent/mod.rs b/src/openhuman/agent/mod.rs index bd6c23d2f..f3616fe8f 100644 --- a/src/openhuman/agent/mod.rs +++ b/src/openhuman/agent/mod.rs @@ -7,6 +7,7 @@ pub mod hooks; pub mod host_runtime; pub mod memory_loader; pub mod multimodal; +pub mod progress; mod schemas; pub use schemas::{ all_controller_schemas as all_agent_controller_schemas, diff --git a/src/openhuman/agent/progress.rs b/src/openhuman/agent/progress.rs new file mode 100644 index 000000000..910e2e2ff --- /dev/null +++ b/src/openhuman/agent/progress.rs @@ -0,0 +1,73 @@ +//! Real-time progress events emitted during an agent turn. +//! +//! Consumers (e.g. the web channel provider) create an +//! `mpsc::Sender` and attach it to the [`Agent`] via +//! [`Agent::set_on_progress`] before calling [`Agent::run_single`]. +//! The agent's turn loop sends events through this channel as it +//! progresses — tool calls starting/completing, iteration boundaries, +//! sub-agent lifecycle, etc. +//! +//! This is intentionally separate from [`DomainEvent`] (the global +//! broadcast bus) because progress events are **per-request scoped**: +//! they carry no routing info (client_id, thread_id) — the consumer +//! that created the channel already knows those and tags the outgoing +//! socket events accordingly. + +/// A real-time progress event emitted during an agent turn. +#[derive(Debug, Clone)] +pub enum AgentProgress { + /// The turn has started (about to enter the iteration loop). + TurnStarted, + + /// A new LLM iteration is starting. + IterationStarted { + /// 1-based iteration index. + iteration: u32, + /// Maximum iterations configured for this turn. + max_iterations: u32, + }, + + /// The LLM responded and the agent is about to execute a tool. + ToolCallStarted { + tool_name: String, + arguments: serde_json::Value, + /// 1-based iteration index. + iteration: u32, + }, + + /// A tool execution completed (success or failure). + ToolCallCompleted { + tool_name: String, + success: bool, + output_chars: usize, + elapsed_ms: u64, + /// 1-based iteration index. + iteration: u32, + }, + + /// A sub-agent was spawned during tool execution. + SubagentSpawned { + agent_id: String, + task_id: String, + }, + + /// A sub-agent completed successfully. + SubagentCompleted { + agent_id: String, + task_id: String, + elapsed_ms: u64, + }, + + /// A sub-agent failed. + SubagentFailed { + agent_id: String, + task_id: String, + error: String, + }, + + /// The turn completed with a final text response. + TurnCompleted { + /// Total iterations used. + iterations: u32, + }, +} diff --git a/src/openhuman/channels/providers/web.rs b/src/openhuman/channels/providers/web.rs index 482687d93..180bf912e 100644 --- a/src/openhuman/channels/providers/web.rs +++ b/src/openhuman/channels/providers/web.rs @@ -11,7 +11,6 @@ use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; use crate::openhuman::agent::Agent; use crate::openhuman::config::rpc as config_rpc; use crate::openhuman::config::Config; -use crate::openhuman::providers::ConversationMessage; use crate::rpc::RpcOutcome; use super::presentation; @@ -262,16 +261,22 @@ async fn run_chat_task( )?, }; - let history_before = agent.history().len(); + // Wire up a real-time progress channel so tool calls, iterations, + // and sub-agent events are emitted to the web channel as they happen + // (instead of retroactively after the loop finishes). + let (progress_tx, progress_rx) = tokio::sync::mpsc::channel(64); + agent.set_on_progress(Some(progress_tx)); + spawn_progress_bridge( + progress_rx, + client_id.to_string(), + thread_id.to_string(), + request_id.to_string(), + ); + let result = agent.run_single(message).await.map_err(|e| e.to_string()); - if result.is_ok() { - publish_tool_events_from_history( - client_id, - thread_id, - request_id, - &agent.history()[history_before..], - ); - } + + // Clear the sender so it doesn't hold the channel open across sessions. + agent.set_on_progress(None); { let mut sessions = THREAD_SESSIONS.lock().await; @@ -288,90 +293,202 @@ async fn run_chat_task( result } -fn publish_tool_events_from_history( - client_id: &str, - thread_id: &str, - request_id: &str, - messages: &[ConversationMessage], +/// Spawn a background task that reads [`AgentProgress`] events from the +/// agent turn loop and translates them into [`WebChannelEvent`]s tagged +/// with the correct client/thread/request IDs. The task runs until the +/// sender is dropped (i.e. when the agent turn finishes). +fn spawn_progress_bridge( + mut rx: tokio::sync::mpsc::Receiver, + client_id: String, + thread_id: String, + request_id: String, ) { - let mut round: u32 = 0; - let mut current_round_calls: Vec<(String, String)> = Vec::new(); - - for message in messages { - match message { - ConversationMessage::AssistantToolCalls { tool_calls, .. } => { - round += 1; - current_round_calls.clear(); - for (idx, call) in tool_calls.iter().enumerate() { - let synthetic_id = if call.id.trim().is_empty() { - format!("idx:{idx}") - } else { - call.id.clone() - }; - current_round_calls.push((synthetic_id, call.name.clone())); + use crate::openhuman::agent::progress::AgentProgress; + + tokio::spawn(async move { + let mut round: u32 = 0; + while let Some(event) = rx.recv().await { + match event { + AgentProgress::TurnStarted => { + publish_web_channel_event(WebChannelEvent { + event: "inference_start".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: None, + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: None, + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::IterationStarted { + iteration, + max_iterations, + } => { + round = iteration; + publish_web_channel_event(WebChannelEvent { + event: "iteration_start".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!( + "Iteration {iteration}/{max_iterations}" + )), + error_type: None, + tool_name: None, + skill_id: None, + args: None, + output: None, + success: None, + round: Some(iteration), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::ToolCallStarted { + tool_name, + arguments, + iteration, + } => { publish_web_channel_event(WebChannelEvent { event: "tool_call".to_string(), - client_id: client_id.to_string(), - thread_id: thread_id.to_string(), - request_id: request_id.to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), full_response: None, message: None, error_type: None, - tool_name: Some(call.name.clone()), + tool_name: Some(tool_name), skill_id: Some("web_channel".to_string()), - args: Some(parse_tool_args(&call.arguments)), + args: Some(arguments), output: None, success: None, - round: Some(round), + round: Some(iteration), reaction_emoji: None, segment_index: None, segment_total: None, }); } - } - ConversationMessage::ToolResults(results) => { - for (idx, result) in results.iter().enumerate() { - let fallback = format!("idx:{idx}"); - let tool_name = current_round_calls - .iter() - .find(|(tool_call_id, _)| tool_call_id == &result.tool_call_id) - .or_else(|| current_round_calls.get(idx)) - .map(|(_, name)| name.clone()) - .unwrap_or_else(|| "unknown".to_string()); - - let success = !result.content.trim_start().starts_with("Error:"); + AgentProgress::ToolCallCompleted { + tool_name, + success, + output_chars, + elapsed_ms, + iteration, + } => { publish_web_channel_event(WebChannelEvent { event: "tool_result".to_string(), - client_id: client_id.to_string(), - thread_id: thread_id.to_string(), - request_id: request_id.to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), full_response: None, message: None, error_type: None, tool_name: Some(tool_name), skill_id: Some("web_channel".to_string()), - args: Some(Value::Object(Map::from_iter([( - "tool_call_id".to_string(), - Value::String(if result.tool_call_id.is_empty() { - fallback - } else { - result.tool_call_id.clone() - }), - )]))), - output: Some(result.content.clone()), + args: None, + output: Some(format!( + "{{\"output_chars\":{output_chars},\"elapsed_ms\":{elapsed_ms}}}" + )), success: Some(success), - round: Some(round.max(1)), + round: Some(iteration), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::SubagentSpawned { agent_id, task_id } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_spawned".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!("Sub-agent '{agent_id}' spawned")), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: None, + round: Some(round), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::SubagentCompleted { + agent_id, + task_id, + elapsed_ms, + } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_completed".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(format!( + "Sub-agent '{agent_id}' completed in {elapsed_ms}ms" + )), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: Some(true), + round: Some(round), + reaction_emoji: None, + segment_index: None, + segment_total: None, + }); + } + AgentProgress::SubagentFailed { + agent_id, + task_id, + error, + } => { + publish_web_channel_event(WebChannelEvent { + event: "subagent_failed".to_string(), + client_id: client_id.clone(), + thread_id: thread_id.clone(), + request_id: request_id.clone(), + full_response: None, + message: Some(error), + error_type: None, + tool_name: Some(agent_id), + skill_id: Some(task_id), + args: None, + output: None, + success: Some(false), + round: Some(round), reaction_emoji: None, segment_index: None, segment_total: None, }); } + AgentProgress::TurnCompleted { iterations } => { + log::debug!( + "[web_channel] turn completed after {iterations} iteration(s) \ + client_id={client_id} thread_id={thread_id} request_id={request_id}" + ); + } } - ConversationMessage::Chat(_) => {} } - } + }); } + fn parse_tool_args(arguments: &str) -> Value { if arguments.trim().is_empty() { return Value::Object(Map::new()); From 4061bcada9bb9d5eca32dd206bf891d0296882ed Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 01:56:17 -0700 Subject: [PATCH 2/3] style: fix formatting from pre-push hook --- app/src/pages/Conversations.tsx | 15 +++++++-------- src/openhuman/agent/harness/session/runtime.rs | 5 ++++- src/openhuman/agent/progress.rs | 5 +---- src/openhuman/channels/providers/web.rs | 5 +---- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/app/src/pages/Conversations.tsx b/app/src/pages/Conversations.tsx index 0a7aaa0b4..fa29cc838 100644 --- a/app/src/pages/Conversations.tsx +++ b/app/src/pages/Conversations.tsx @@ -13,9 +13,9 @@ import { type ChatInferenceStartEvent, type ChatIterationStartEvent, type ChatSegmentEvent, + chatSend, type ChatSubagentDoneEvent, type ChatSubagentSpawnedEvent, - chatSend, type ChatToolCallEvent, type ChatToolResultEvent, segmentText, @@ -454,11 +454,7 @@ const Conversations = () => { onInferenceStart: (event: ChatInferenceStartEvent) => { setInferenceStatusByThread(prev => ({ ...prev, - [event.thread_id]: { - phase: 'thinking', - iteration: 0, - maxIterations: 0, - }, + [event.thread_id]: { phase: 'thinking', iteration: 0, maxIterations: 0 }, })); }, onIterationStart: (event: ChatIterationStartEvent) => { @@ -569,7 +565,10 @@ const Conversations = () => { ...prev, [event.thread_id]: existing.map(entry => entry.name === `🤖 ${event.tool_name}` && entry.status === 'running' - ? { ...entry, status: (event.success ? 'success' : 'error') as ToolTimelineEntryStatus } + ? { + ...entry, + status: (event.success ? 'success' : 'error') as ToolTimelineEntryStatus, + } : entry ), }; @@ -1094,7 +1093,7 @@ const Conversations = () => { ? (toolTimelineByThread[selectedThreadId] ?? []) : []; const selectedInferenceStatus = selectedThreadId - ? inferenceStatusByThread[selectedThreadId] ?? null + ? (inferenceStatusByThread[selectedThreadId] ?? null) : null; const inlineCompletionSuffix = getInlineCompletionSuffix(inputValue, inlineSuggestionValue); diff --git a/src/openhuman/agent/harness/session/runtime.rs b/src/openhuman/agent/harness/session/runtime.rs index dcceddc5a..f5dda163d 100644 --- a/src/openhuman/agent/harness/session/runtime.rs +++ b/src/openhuman/agent/harness/session/runtime.rs @@ -115,7 +115,10 @@ impl Agent { /// When set, the turn loop emits [`AgentProgress`] events so /// callers (e.g. the web channel) can surface live tool-call and /// iteration updates to the UI. Pass `None` to disable. - pub fn set_on_progress(&mut self, tx: Option>) { + pub fn set_on_progress( + &mut self, + tx: Option>, + ) { self.on_progress = tx; } diff --git a/src/openhuman/agent/progress.rs b/src/openhuman/agent/progress.rs index 910e2e2ff..3eef692e8 100644 --- a/src/openhuman/agent/progress.rs +++ b/src/openhuman/agent/progress.rs @@ -46,10 +46,7 @@ pub enum AgentProgress { }, /// A sub-agent was spawned during tool execution. - SubagentSpawned { - agent_id: String, - task_id: String, - }, + SubagentSpawned { agent_id: String, task_id: String }, /// A sub-agent completed successfully. SubagentCompleted { diff --git a/src/openhuman/channels/providers/web.rs b/src/openhuman/channels/providers/web.rs index 180bf912e..c63d4006e 100644 --- a/src/openhuman/channels/providers/web.rs +++ b/src/openhuman/channels/providers/web.rs @@ -340,9 +340,7 @@ fn spawn_progress_bridge( thread_id: thread_id.clone(), request_id: request_id.clone(), full_response: None, - message: Some(format!( - "Iteration {iteration}/{max_iterations}" - )), + message: Some(format!("Iteration {iteration}/{max_iterations}")), error_type: None, tool_name: None, skill_id: None, @@ -488,7 +486,6 @@ fn spawn_progress_bridge( }); } - fn parse_tool_args(arguments: &str) -> Value { if arguments.trim().is_empty() { return Value::Object(Map::new()); From 241f07011a1ff3f3ccf72c41f529f26481b0223b Mon Sep 17 00:00:00 2001 From: Steven Enamakel Date: Sun, 12 Apr 2026 11:36:43 -0700 Subject: [PATCH 3/3] fix(test): read SSE events until chat_done instead of first event The e2e test expected `chat_done` as the first SSE event, but now real-time progress events (inference_start, iteration_start) are emitted before it. Use `read_sse_event_by_type` to skip progress events and wait for the terminal `chat_done` event. --- tests/json_rpc_e2e.rs | 52 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/tests/json_rpc_e2e.rs b/tests/json_rpc_e2e.rs index 1f663a119..7bc807fe0 100644 --- a/tests/json_rpc_e2e.rs +++ b/tests/json_rpc_e2e.rs @@ -441,6 +441,7 @@ async fn post_json_rpc(rpc_base: &str, id: i64, method: &str, params: Value) -> .unwrap_or_else(|e| panic!("json for {method}: {e}")) } +#[allow(dead_code)] async fn read_first_sse_event(events_url: &str) -> Value { let client = reqwest::Client::builder() .timeout(Duration::from_secs(120)) @@ -484,6 +485,54 @@ async fn read_first_sse_event(events_url: &str) -> Value { panic!("SSE stream ended before any event payload"); } +/// Read SSE events until one matches the given `event` field value, skipping +/// progress events (inference_start, iteration_start, etc.) that precede the +/// terminal event. +async fn read_sse_event_by_type(events_url: &str, target_event: &str) -> Value { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(120)) + .build() + .expect("client"); + let resp = client + .get(events_url) + .send() + .await + .unwrap_or_else(|e| panic!("GET {events_url}: {e}")); + assert!( + resp.status().is_success(), + "SSE HTTP error {} for {}", + resp.status(), + events_url + ); + + let mut stream = resp.bytes_stream(); + let mut buffer = String::new(); + while let Some(item) = stream.next().await { + let chunk = item.unwrap_or_else(|e| panic!("sse stream read failed: {e}")); + let text = std::str::from_utf8(&chunk).unwrap_or(""); + buffer.push_str(text); + while let Some(idx) = buffer.find("\n\n") { + let block = buffer[..idx].to_string(); + buffer = buffer[idx + 2..].to_string(); + let mut data_lines = Vec::new(); + for line in block.lines() { + if let Some(data) = line.strip_prefix("data:") { + data_lines.push(data.trim_start()); + } + } + if !data_lines.is_empty() { + let payload = data_lines.join("\n"); + let value: Value = serde_json::from_str(&payload) + .unwrap_or_else(|e| panic!("invalid sse data json: {e}")); + if value.get("event").and_then(Value::as_str) == Some(target_event) { + return value; + } + } + } + } + panic!("SSE stream ended before receiving '{target_event}' event"); +} + fn assert_no_jsonrpc_error<'a>(v: &'a Value, context: &str) -> &'a Value { if let Some(err) = v.get("error") { panic!("{context}: JSON-RPC error: {err}"); @@ -653,7 +702,8 @@ async fn json_rpc_protocol_auth_and_agent_hello() { let client_id = "e2e-client-1"; let thread_id = "thread-1"; let events_url = format!("{}/events?client_id={}", rpc_base, client_id); - let sse_task = tokio::spawn(async move { read_first_sse_event(&events_url).await }); + let sse_task = + tokio::spawn(async move { read_sse_event_by_type(&events_url, "chat_done").await }); let web_chat = post_json_rpc( &rpc_base,