diff --git a/Cargo.lock b/Cargo.lock index 8b5bf8604..945796092 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8259,6 +8259,7 @@ dependencies = [ "dirs", "emojis", "fastembed", + "flate2", "futures", "hex", "ignore", diff --git a/Cargo.toml b/Cargo.toml index d2f24f0c3..fd73aa72b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,9 @@ fastembed = "4" base64 = "0.22" hex = "0.4" +# Compression +flate2 = "1" + # Logging and tracing tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/interface/src/api/client.ts b/interface/src/api/client.ts index e78808eea..3a031c0fd 100644 --- a/interface/src/api/client.ts +++ b/interface/src/api/client.ts @@ -53,6 +53,7 @@ export interface WorkerStartedEvent { channel_id: string | null; worker_id: string; task: string; + worker_type?: string; } export interface WorkerStatusEvent { @@ -69,6 +70,7 @@ export interface WorkerCompletedEvent { channel_id: string | null; worker_id: string; result: string; + success?: boolean; } export interface BranchStartedEvent { @@ -94,6 +96,7 @@ export interface ToolStartedEvent { process_type: ProcessType; process_id: string; tool_name: string; + args: string; } export interface ToolCompletedEvent { @@ -103,6 +106,7 @@ export interface ToolCompletedEvent { process_type: ProcessType; process_id: string; tool_name: string; + result: string; } export type ApiEvent = @@ -193,6 +197,49 @@ export interface StatusBlockSnapshot { /** channel_id -> StatusBlockSnapshot */ export type ChannelStatusResponse = Record; +// --- Workers API types --- + +export type ActionContent = + | { type: "text"; text: string } + | { type: "tool_call"; id: string; name: string; args: string }; + +export type TranscriptStep = + | { type: "action"; content: ActionContent[] } + | { type: "tool_result"; call_id: string; name: string; text: string }; + +export interface WorkerRunInfo { + id: string; + task: string; + status: string; + worker_type: string; + channel_id: string | null; + channel_name: string | null; + started_at: string; + completed_at: string | null; + has_transcript: boolean; + live_status: string | null; + tool_calls: number; +} + +export interface WorkerDetailResponse { + id: string; + task: string; + result: string | null; + status: string; + worker_type: string; + channel_id: string | null; + channel_name: string | null; + started_at: string; + completed_at: string | null; + transcript: TranscriptStep[] | null; + tool_calls: number; +} + +export interface WorkerListResponse { + workers: WorkerRunInfo[]; + total: number; +} + export interface AgentInfo { id: string; display_name?: string; @@ -1076,6 +1123,15 @@ export const api = { return fetchJson(`/channels/messages?${params}`); }, channelStatus: () => fetchJson("/channels/status"), + workersList: (agentId: string, params: { limit?: number; offset?: number; status?: string } = {}) => { + const search = new URLSearchParams({ agent_id: agentId }); + if (params.limit) search.set("limit", String(params.limit)); + if (params.offset) search.set("offset", String(params.offset)); + if (params.status) search.set("status", params.status); + return fetchJson(`/agents/workers?${search}`); + }, + workerDetail: (agentId: string, workerId: string) => + fetchJson(`/agents/workers/detail?agent_id=${encodeURIComponent(agentId)}&worker_id=${encodeURIComponent(workerId)}`), agentMemories: (agentId: string, params: MemoriesListParams = {}) => { const search = new URLSearchParams({ agent_id: agentId }); if (params.limit) search.set("limit", String(params.limit)); diff --git a/interface/src/components/WebChatPanel.tsx b/interface/src/components/WebChatPanel.tsx index 747e0de5a..1e322c9e9 100644 --- a/interface/src/components/WebChatPanel.tsx +++ b/interface/src/components/WebChatPanel.tsx @@ -1,4 +1,4 @@ -import {useEffect, useRef, useState} from "react"; +import {useEffect, useMemo, useRef, useState} from "react"; import { useWebChat, getPortalChatSessionId, @@ -174,14 +174,55 @@ export function WebChatPanel({agentId}: WebChatPanelProps) { useWebChat(agentId); const {liveStates} = useLiveContext(); const [input, setInput] = useState(""); + const [sseMessages, setSseMessages] = useState<{id: string; role: "assistant"; content: string}[]>([]); const messagesEndRef = useRef(null); const sessionId = getPortalChatSessionId(agentId); const activeWorkers = Object.values(liveStates[sessionId]?.workers ?? {}); const hasActiveWorkers = activeWorkers.length > 0; + // Pick up assistant messages from the global SSE stream that arrived + // after the webchat request SSE closed (e.g. worker completion retriggers). + const timeline = liveStates[sessionId]?.timeline; + const seenIdsRef = useRef(new Set()); + useEffect(() => { + if (!timeline) return; + // Seed seen IDs from webchat messages so we don't duplicate + for (const m of messages) seenIdsRef.current.add(m.id); + + const newMessages: {id: string; role: "assistant"; content: string}[] = []; + for (const item of timeline) { + if ( + item.type === "message" && + item.role === "assistant" && + !seenIdsRef.current.has(item.id) + ) { + seenIdsRef.current.add(item.id); + newMessages.push({ + id: item.id, + role: "assistant", + content: item.content, + }); + } + } + if (newMessages.length > 0) { + setSseMessages((prev) => [...prev, ...newMessages]); + } + }, [timeline, messages]); + + // Clear SSE messages when a new webchat send starts (they'll be in history on next load) + useEffect(() => { + if (isStreaming) setSseMessages([]); + }, [isStreaming]); + + const allMessages = useMemo(() => { + const messageIds = new Set(messages.map((message) => message.id)); + const dedupedSse = sseMessages.filter((message) => !messageIds.has(message.id)); + return [...messages, ...dedupedSse]; + }, [messages, sseMessages]); + useEffect(() => { messagesEndRef.current?.scrollIntoView({behavior: "smooth"}); - }, [messages.length, isStreaming, toolActivity.length, activeWorkers.length]); + }, [allMessages.length, isStreaming, toolActivity.length, activeWorkers.length]); const handleSubmit = () => { const trimmed = input.trim(); @@ -201,7 +242,7 @@ export function WebChatPanel({agentId}: WebChatPanelProps) { )} - {messages.length === 0 && !isStreaming && ( + {allMessages.length === 0 && !isStreaming && (

Start a conversation with {agentId} @@ -209,7 +250,7 @@ export function WebChatPanel({agentId}: WebChatPanelProps) {

)} - {messages.map((message) => ( + {allMessages.map((message) => (
{message.role === "user" ? (
@@ -225,18 +266,18 @@ export function WebChatPanel({agentId}: WebChatPanelProps) {
))} - {/* Streaming state */} - {isStreaming && - messages[messages.length - 1]?.role !== "assistant" && ( + {/* Streaming state */} + {isStreaming && + allMessages[allMessages.length - 1]?.role !== "assistant" && (
{toolActivity.length === 0 && }
)} - {/* Inline tool activity during streaming assistant message */} - {isStreaming && - messages[messages.length - 1]?.role === "assistant" && + {/* Inline tool activity during streaming assistant message */} + {isStreaming && + allMessages[allMessages.length - 1]?.role === "assistant" && toolActivity.length > 0 && ( )} diff --git a/interface/src/hooks/useLiveContext.tsx b/interface/src/hooks/useLiveContext.tsx index dd987dc90..e64c7a765 100644 --- a/interface/src/hooks/useLiveContext.tsx +++ b/interface/src/hooks/useLiveContext.tsx @@ -1,8 +1,8 @@ import { createContext, useContext, useCallback, useRef, useState, useMemo, type ReactNode } from "react"; import { useQuery, useQueryClient } from "@tanstack/react-query"; -import { api, type AgentMessageEvent, type ChannelInfo } from "@/api/client"; +import { api, type AgentMessageEvent, type ChannelInfo, type ToolStartedEvent, type ToolCompletedEvent, type WorkerStatusEvent, type TranscriptStep } from "@/api/client"; import { useEventSource, type ConnectionState } from "@/hooks/useEventSource"; -import { useChannelLiveState, type ChannelLiveState } from "@/hooks/useChannelLiveState"; +import { useChannelLiveState, type ChannelLiveState, type ActiveWorker } from "@/hooks/useChannelLiveState"; interface LiveContextValue { liveStates: Record; @@ -12,6 +12,12 @@ interface LiveContextValue { loadOlderMessages: (channelId: string) => void; /** Set of edge IDs ("from->to") with recent message activity */ activeLinks: Set; + /** Flat map of all active workers across all channels, keyed by worker_id. */ + activeWorkers: Record; + /** Monotonically increasing counter, bumped on every worker lifecycle SSE event. */ + workerEventVersion: number; + /** Live transcript steps for running workers, keyed by worker_id. Built from SSE tool events. */ + liveTranscripts: Record; } const LiveContext = createContext({ @@ -21,6 +27,9 @@ const LiveContext = createContext({ hasData: false, loadOlderMessages: () => {}, activeLinks: new Set(), + activeWorkers: {}, + workerEventVersion: 0, + liveTranscripts: {}, }); export function useLiveContext() { @@ -42,6 +51,32 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const channels = channelsData?.channels ?? []; const { liveStates, handlers: channelHandlers, syncStatusSnapshot, loadOlderMessages } = useChannelLiveState(channels); + // Flat active workers map + event version counter for the workers tab. + // This is a separate piece of state from channel liveStates so the workers + // tab can react to SSE events without scanning all channels. + const [workerEventVersion, setWorkerEventVersion] = useState(0); + const bumpWorkerVersion = useCallback(() => setWorkerEventVersion((v) => v + 1), []); + + // Live transcript accumulator: builds TranscriptStep[] from SSE tool events + // for running workers. Cleared when worker completes. + const [liveTranscripts, setLiveTranscripts] = useState>({}); + + // Derive flat active workers from channel live states + const pendingToolCallIdsRef = useRef>>({}); + + const activeWorkers = useMemo(() => { + const channelAgentIds = new Map(channels.map((channel) => [channel.id, channel.agent_id])); + const map: Record = {}; + for (const [channelId, state] of Object.entries(liveStates)) { + const channelAgentId = channelAgentIds.get(channelId); + if (!channelAgentId) continue; + for (const [workerId, worker] of Object.entries(state.workers)) { + map[workerId] = { ...worker, channelId, agentId: channelAgentId }; + } + } + return map; + }, [liveStates, channels]); + // Track recently active link edges const [activeLinks, setActiveLinks] = useState>(new Set()); const timersRef = useRef>>(new Map()); @@ -85,14 +120,110 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { [markEdgeActive], ); + // Wrap channel worker handlers to also bump the worker event version + // and accumulate live transcript steps from SSE events. + const wrappedWorkerStarted = useCallback((data: unknown) => { + channelHandlers.worker_started(data); + const event = data as { worker_id: string }; + setLiveTranscripts((prev) => ({ ...prev, [event.worker_id]: [] })); + delete pendingToolCallIdsRef.current[event.worker_id]; + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedWorkerStatus = useCallback((data: unknown) => { + channelHandlers.worker_status(data); + const event = data as WorkerStatusEvent; + // Push status text as an action step in the live transcript + if (event.status && event.status !== "starting" && event.status !== "running") { + setLiveTranscripts((prev) => { + const steps = prev[event.worker_id] ?? []; + const step: TranscriptStep = { + type: "action", + content: [{ type: "text", text: event.status }], + }; + return { ...prev, [event.worker_id]: [...steps, step] }; + }); + } + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedWorkerCompleted = useCallback((data: unknown) => { + channelHandlers.worker_completed(data); + const event = data as { worker_id: string }; + delete pendingToolCallIdsRef.current[event.worker_id]; + bumpWorkerVersion(); + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedToolStarted = useCallback((data: unknown) => { + channelHandlers.tool_started(data); + const event = data as ToolStartedEvent; + if (event.process_type === "worker") { + const callId = crypto.randomUUID(); + const pendingByTool = pendingToolCallIdsRef.current[event.process_id] ?? {}; + const queue = pendingByTool[event.tool_name] ?? []; + pendingByTool[event.tool_name] = [...queue, callId]; + pendingToolCallIdsRef.current[event.process_id] = pendingByTool; + setLiveTranscripts((prev) => { + const steps = prev[event.process_id] ?? []; + const step: TranscriptStep = { + type: "action", + content: [{ + type: "tool_call", + id: callId, + name: event.tool_name, + args: event.args || "", + }], + }; + return { ...prev, [event.process_id]: [...steps, step] }; + }); + bumpWorkerVersion(); + } + }, [channelHandlers, bumpWorkerVersion]); + + const wrappedToolCompleted = useCallback((data: unknown) => { + channelHandlers.tool_completed(data); + const event = data as ToolCompletedEvent; + if (event.process_type === "worker") { + const pendingByTool = pendingToolCallIdsRef.current[event.process_id]; + const queue = pendingByTool?.[event.tool_name] ?? []; + const [callId, ...rest] = queue; + if (pendingByTool) { + if (rest.length > 0) { + pendingByTool[event.tool_name] = rest; + } else { + delete pendingByTool[event.tool_name]; + } + if (Object.keys(pendingByTool).length === 0) { + delete pendingToolCallIdsRef.current[event.process_id]; + } + } + setLiveTranscripts((prev) => { + const steps = prev[event.process_id] ?? []; + const step: TranscriptStep = { + type: "tool_result", + call_id: callId ?? `${event.process_id}:${event.tool_name}:${steps.length}`, + name: event.tool_name, + text: event.result || "", + }; + return { ...prev, [event.process_id]: [...steps, step] }; + }); + bumpWorkerVersion(); + } + }, [channelHandlers, bumpWorkerVersion]); + // Merge channel handlers with agent message handlers const handlers = useMemo( () => ({ ...channelHandlers, + worker_started: wrappedWorkerStarted, + worker_status: wrappedWorkerStatus, + worker_completed: wrappedWorkerCompleted, + tool_started: wrappedToolStarted, + tool_completed: wrappedToolCompleted, agent_message_sent: handleAgentMessage, agent_message_received: handleAgentMessage, }), - [channelHandlers, handleAgentMessage], + [channelHandlers, wrappedWorkerStarted, wrappedWorkerStatus, wrappedWorkerCompleted, wrappedToolStarted, wrappedToolCompleted, handleAgentMessage], ); const onReconnect = useCallback(() => { @@ -111,7 +242,7 @@ export function LiveContextProvider({ children }: { children: ReactNode }) { const hasData = channels.length > 0 || channelsData !== undefined; return ( - + {children} ); diff --git a/interface/src/router.tsx b/interface/src/router.tsx index 1c6950f05..c48ab7593 100644 --- a/interface/src/router.tsx +++ b/interface/src/router.tsx @@ -20,6 +20,7 @@ import {AgentConfig} from "@/routes/AgentConfig"; import {AgentCron} from "@/routes/AgentCron"; import {AgentIngest} from "@/routes/AgentIngest"; import {AgentSkills} from "@/routes/AgentSkills"; +import {AgentWorkers} from "@/routes/AgentWorkers"; import {AgentChat} from "@/routes/AgentChat"; import {Settings} from "@/routes/Settings"; import {useLiveContext} from "@/hooks/useLiveContext"; @@ -187,15 +188,16 @@ const agentIngestRoute = createRoute({ const agentWorkersRoute = createRoute({ getParentRoute: () => rootRoute, path: "/agents/$agentId/workers", + validateSearch: (search: Record): {worker?: string} => ({ + worker: typeof search.worker === "string" ? search.worker : undefined, + }), component: function AgentWorkersPage() { const {agentId} = agentWorkersRoute.useParams(); return (
-
-

- Workers control interface coming soon -

+
+
); diff --git a/interface/src/routes/AgentWorkers.tsx b/interface/src/routes/AgentWorkers.tsx new file mode 100644 index 000000000..f52718f90 --- /dev/null +++ b/interface/src/routes/AgentWorkers.tsx @@ -0,0 +1,629 @@ +import {useState, useMemo, useEffect, useCallback, useRef} from "react"; +import {useQuery, useQueryClient} from "@tanstack/react-query"; +import {useNavigate, useSearch} from "@tanstack/react-router"; +import {motion} from "framer-motion"; +import {Markdown} from "@/components/Markdown"; +import { + api, + type WorkerRunInfo, + type WorkerDetailResponse, + type TranscriptStep, + type ActionContent, +} from "@/api/client"; +import {Badge} from "@/ui/Badge"; +import {formatTimeAgo, formatDuration} from "@/lib/format"; +import {LiveDuration} from "@/components/LiveDuration"; +import {useLiveContext} from "@/hooks/useLiveContext"; +import {cx} from "@/ui/utils"; + +const STATUS_FILTERS = ["all", "running", "done", "failed"] as const; +type StatusFilter = (typeof STATUS_FILTERS)[number]; + +const KNOWN_STATUSES = new Set(["running", "done", "failed"]); + +function normalizeStatus(status: string): string { + if (KNOWN_STATUSES.has(status)) return status; + // Legacy rows where set_status text overwrote the state enum. + // If it has a completed_at it finished, otherwise it was interrupted. + return "failed"; +} + +function statusBadgeVariant(status: string) { + switch (status) { + case "running": + return "amber" as const; + case "failed": + return "red" as const; + default: + return "outline" as const; + } +} + +function workerTypeBadgeVariant(workerType: string) { + return workerType === "opencode" ? ("accent" as const) : ("outline" as const); +} + +function durationBetween(start: string, end: string | null): string { + if (!end) return ""; + const seconds = Math.floor( + (new Date(end).getTime() - new Date(start).getTime()) / 1000, + ); + return formatDuration(seconds); +} + +export function AgentWorkers({agentId}: {agentId: string}) { + const [statusFilter, setStatusFilter] = useState("all"); + const [search, setSearch] = useState(""); + const queryClient = useQueryClient(); + const navigate = useNavigate(); + const routeSearch = useSearch({strict: false}) as {worker?: string}; + const selectedWorkerId = routeSearch.worker ?? null; + const {activeWorkers, workerEventVersion, liveTranscripts} = useLiveContext(); + + // Invalidate worker queries when SSE events fire + const prevVersion = useRef(workerEventVersion); + useEffect(() => { + if (workerEventVersion !== prevVersion.current) { + prevVersion.current = workerEventVersion; + queryClient.invalidateQueries({queryKey: ["workers", agentId]}); + if (selectedWorkerId) { + queryClient.invalidateQueries({ + queryKey: ["worker-detail", agentId, selectedWorkerId], + }); + } + } + }, [workerEventVersion, agentId, selectedWorkerId, queryClient]); + + // List query + const {data: listData} = useQuery({ + queryKey: ["workers", agentId, statusFilter], + queryFn: () => + api.workersList(agentId, { + limit: 200, + status: statusFilter === "all" ? undefined : statusFilter, + }), + refetchInterval: 10_000, + }); + + // Detail query (only when a worker is selected). + // Returns null instead of throwing on 404 — the worker may not be in the DB + // yet while it's still visible via SSE state. + const {data: detailData} = useQuery({ + queryKey: ["worker-detail", agentId, selectedWorkerId], + queryFn: () => + selectedWorkerId + ? api.workerDetail(agentId, selectedWorkerId).catch(() => null) + : Promise.resolve(null), + enabled: !!selectedWorkerId, + }); + + const workers = listData?.workers ?? []; + const total = listData?.total ?? 0; + const scopedActiveWorkers = useMemo(() => { + const entries = Object.entries(activeWorkers).filter( + ([, worker]) => worker.agentId === agentId, + ); + return Object.fromEntries(entries); + }, [activeWorkers, agentId]); + + // Merge live SSE state onto the API-returned list. + // Workers that exist in SSE state but haven't hit the DB yet + // are synthesized and prepended so they appear instantly. + const mergedWorkers: WorkerRunInfo[] = useMemo(() => { + const dbIds = new Set(workers.map((w) => w.id)); + + // Overlay live state onto existing DB rows + const merged = workers.map((worker) => { + const live = scopedActiveWorkers[worker.id]; + if (!live) return worker; + return { + ...worker, + status: "running", + live_status: live.status, + tool_calls: live.toolCalls, + }; + }); + + // Synthesize entries for workers only known via SSE (not in DB yet) + const synthetic: WorkerRunInfo[] = Object.values(scopedActiveWorkers) + .filter((w) => !dbIds.has(w.id)) + .map((live) => ({ + id: live.id, + task: live.task, + status: "running", + worker_type: "builtin", + channel_id: live.channelId ?? null, + channel_name: null, + started_at: new Date(live.startedAt).toISOString(), + completed_at: null, + has_transcript: false, + live_status: live.status, + tool_calls: live.toolCalls, + })); + + return [...synthetic, ...merged]; + }, [workers, scopedActiveWorkers]); + + // Client-side task text search filter + const filteredWorkers = useMemo(() => { + if (!search.trim()) return mergedWorkers; + const term = search.toLowerCase(); + return mergedWorkers.filter((w) => w.task.toLowerCase().includes(term)); + }, [mergedWorkers, search]); + + // Build detail view: prefer DB data, fall back to synthesized live state. + // Running workers that haven't hit the DB yet still get a full detail view + // from SSE state + live transcript. + const mergedDetail: WorkerDetailResponse | null = useMemo(() => { + const live = selectedWorkerId ? scopedActiveWorkers[selectedWorkerId] : null; + + if (detailData) { + // DB data exists — overlay live status if worker is still running + if (!live) return detailData; + return { ...detailData, status: "running" }; + } + + // No DB data yet — synthesize from SSE state + if (!live) return null; + return { + id: live.id, + task: live.task, + result: null, + status: "running", + worker_type: "builtin", + channel_id: live.channelId ?? null, + channel_name: null, + started_at: new Date(live.startedAt).toISOString(), + completed_at: null, + transcript: null, + tool_calls: live.toolCalls, + }; + }, [detailData, scopedActiveWorkers, selectedWorkerId]); + + const selectWorker = useCallback( + (workerId: string | null) => { + navigate({ + to: `/agents/${agentId}/workers`, + search: workerId ? {worker: workerId} : {}, + replace: true, + } as any); + }, + [navigate, agentId], + ); + + return ( +
+ {/* Left column: worker list */} +
+ {/* Toolbar */} +
+ setSearch(e.target.value)} + className="h-7 flex-1 rounded-md border border-app-line/50 bg-app-input px-2.5 text-xs text-ink placeholder:text-ink-faint focus:border-accent/50 focus:outline-none" + /> + {total} +
+ + {/* Status filter pills */} +
+ {STATUS_FILTERS.map((filter) => ( + + ))} +
+ + {/* Worker list */} +
+ {filteredWorkers.length === 0 ? ( +
+

No workers found

+
+ ) : ( + filteredWorkers.map((worker) => ( + selectWorker(worker.id)} + /> + )) + )} +
+
+ + {/* Right column: detail view */} +
+ {selectedWorkerId && mergedDetail ? ( + + ) : ( +
+

+ Select a worker to view details +

+
+ )} +
+
+ ); +} + +interface LiveWorker { + id: string; + task: string; + status: string; + startedAt: number; + toolCalls: number; + currentTool: string | null; +} + +function WorkerCard({ + worker, + liveWorker, + selected, + onClick, +}: { + worker: WorkerRunInfo; + liveWorker?: LiveWorker; + selected: boolean; + onClick: () => void; +}) { + const isRunning = worker.status === "running" || !!liveWorker; + const toolCalls = liveWorker?.toolCalls ?? worker.tool_calls; + + return ( + + ); +} + +function WorkerDetail({ + detail, + liveWorker, + liveTranscript, +}: { + detail: WorkerDetailResponse; + liveWorker?: LiveWorker; + liveTranscript?: TranscriptStep[]; +}) { + const isRunning = detail.status === "running" || !!liveWorker; + const duration = durationBetween(detail.started_at, detail.completed_at); + const displayStatus = liveWorker?.status; + const currentTool = liveWorker?.currentTool; + const toolCalls = liveWorker?.toolCalls ?? detail.tool_calls ?? 0; + // Use persisted transcript if available, otherwise fall back to live SSE transcript. + // Strip the final action step if it duplicates the result text shown above. + const rawTranscript = detail.transcript ?? (isRunning ? liveTranscript : null); + const transcript = useMemo(() => { + if (!rawTranscript || !detail.result) return rawTranscript; + const last = rawTranscript[rawTranscript.length - 1]; + if ( + last?.type === "action" && + last.content.length === 1 && + last.content[0].type === "text" && + last.content[0].text.trim() === detail.result.trim() + ) { + return rawTranscript.slice(0, -1); + } + return rawTranscript; + }, [rawTranscript, detail.result]); + const transcriptRef = useRef(null); + + // Auto-scroll to latest transcript step for running workers + useEffect(() => { + if (isRunning && transcriptRef.current) { + transcriptRef.current.scrollTop = transcriptRef.current.scrollHeight; + } + }, [isRunning, transcript?.length]); + + return ( +
+ {/* Header */} +
+
+ +
+ {isRunning && detail.channel_id && ( + + )} + + {detail.worker_type} + + + {isRunning && ( + + )} + {isRunning ? "running" : normalizeStatus(detail.status)} + +
+
+
+ {detail.channel_name && {detail.channel_name}} + {isRunning ? ( + + Running for{" "} + + + ) : ( + duration && {duration} + )} + {!isRunning && {formatTimeAgo(detail.started_at)}} + {toolCalls > 0 && ( + {toolCalls} tool calls + )} +
+ {/* Live status bar for running workers */} + {isRunning && (currentTool || displayStatus) && ( +
+ {currentTool ? ( + + Running {currentTool}... + + ) : displayStatus ? ( + {displayStatus} + ) : null} +
+ )} +
+ + {/* Content */} +
+ {/* Result section */} + {detail.result && ( +
+

+ Result +

+
+ {detail.result} +
+
+ )} + + {/* Transcript section */} + {transcript && transcript.length > 0 ? ( +
+

+ {isRunning ? "Live Transcript" : "Transcript"} +

+
+ {transcript.map((step, index) => ( + + + + ))} + {isRunning && currentTool && ( +
+ + Running {currentTool}... +
+ )} +
+
+ ) : isRunning ? ( +
+
+

Waiting for first tool call...

+
+ ) : ( +
+ Full transcript not available for this worker +
+ )} +
+
+ ); +} + +function TaskText({text}: {text: string}) { + const [expanded, setExpanded] = useState(false); + + return ( + + ); +} + +function TranscriptStepView({step}: {step: TranscriptStep}) { + if (step.type === "action") { + return ( +
+ {step.content.map((content, index) => ( + + ))} +
+ ); + } + + return ; +} + +function CancelWorkerButton({ + channelId, + workerId, +}: { + channelId: string; + workerId: string; +}) { + const [cancelling, setCancelling] = useState(false); + + return ( + + ); +} + +function ActionContentView({content}: {content: ActionContent}) { + if (content.type === "text") { + return ( +
+ {content.text} +
+ ); + } + + return ; +} + +function ToolCallView({ + content, +}: { + content: Extract; +}) { + const [expanded, setExpanded] = useState(false); + + return ( +
+ + {expanded && ( +
+					{content.args}
+				
+ )} +
+ ); +} + +function ToolResultView({ + step, +}: { + step: Extract; +}) { + const [expanded, setExpanded] = useState(false); + const isLong = step.text.length > 300; + const displayText = + isLong && !expanded ? step.text.slice(0, 300) + "..." : step.text; + + return ( +
+
+ + {step.name && ( + + {step.name} + + )} +
+
+				{displayText}
+			
+ {isLong && ( + + )} +
+ ); +} diff --git a/interface/src/routes/ChannelDetail.tsx b/interface/src/routes/ChannelDetail.tsx index e5ba17ed3..b953227b5 100644 --- a/interface/src/routes/ChannelDetail.tsx +++ b/interface/src/routes/ChannelDetail.tsx @@ -70,18 +70,25 @@ function LiveBranchRunItem({ item, live, channelId }: { item: TimelineBranchRun; ); } -function LiveWorkerRunItem({ item, live, channelId }: { item: TimelineWorkerRun; live: ActiveWorker; channelId: string }) { +function LiveWorkerRunItem({ item, live, channelId, agentId }: { item: TimelineWorkerRun; live: ActiveWorker; channelId: string; agentId: string }) { return (
{formatTimestamp(new Date(item.started_at).getTime())}
-
+
Worker - {item.task} + + {item.task} + { api.cancelProcess(channelId, "worker", item.id).catch(console.warn); }} />
@@ -137,48 +144,37 @@ function BranchRunItem({ item }: { item: TimelineBranchRun }) { ); } -function WorkerRunItem({ item }: { item: TimelineWorkerRun }) { - const [expanded, setExpanded] = useState(false); - +function WorkerRunItem({ item, agentId }: { item: TimelineWorkerRun; agentId: string }) { return (
{formatTimestamp(new Date(item.started_at).getTime())}
- - {expanded && item.result && ( -
-
- {item.result} -
-
- )} +
); } -function TimelineEntry({ item, liveWorkers, liveBranches, channelId }: { +function TimelineEntry({ item, liveWorkers, liveBranches, channelId, agentId }: { item: TimelineItem; liveWorkers: Record; liveBranches: Record; channelId: string; + agentId: string; }) { switch (item.type) { case "message": @@ -210,8 +206,8 @@ function TimelineEntry({ item, liveWorkers, liveBranches, channelId }: { } case "worker_run": { const live = liveWorkers[item.id]; - if (live) return ; - return ; + if (live) return ; + return ; } } } @@ -339,6 +335,7 @@ export function ChannelDetail({ agentId, channelId, channel, liveState, onLoadMo liveWorkers={workers} liveBranches={branches} channelId={channelId} + agentId={agentId} /> )) )} diff --git a/migrations/20260223000001_worker_transcript.sql b/migrations/20260223000001_worker_transcript.sql new file mode 100644 index 000000000..774b733d6 --- /dev/null +++ b/migrations/20260223000001_worker_transcript.sql @@ -0,0 +1,6 @@ +-- Add transcript storage and worker metadata to worker_runs. +ALTER TABLE worker_runs ADD COLUMN worker_type TEXT NOT NULL DEFAULT 'builtin'; +ALTER TABLE worker_runs ADD COLUMN agent_id TEXT; +ALTER TABLE worker_runs ADD COLUMN transcript BLOB; + +CREATE INDEX idx_worker_runs_agent ON worker_runs(agent_id, started_at); diff --git a/migrations/20260224000001_worker_tool_calls.sql b/migrations/20260224000001_worker_tool_calls.sql new file mode 100644 index 000000000..311ab28aa --- /dev/null +++ b/migrations/20260224000001_worker_tool_calls.sql @@ -0,0 +1 @@ +ALTER TABLE worker_runs ADD COLUMN tool_calls INTEGER NOT NULL DEFAULT 0; diff --git a/prompts/en/channel.md.j2 b/prompts/en/channel.md.j2 index 1528a8859..e4aa0a0c3 100644 --- a/prompts/en/channel.md.j2 +++ b/prompts/en/channel.md.j2 @@ -44,7 +44,9 @@ You are able to write code or do work extremely fast inside a worker, never say You have three paths for getting things done. Choosing the right one matters. -**Branch** — for thinking and memory. Branch when you need to recall, save, or forget something from long-term memory, reason through a complex decision, figure out what instructions to give a worker, or retrieve transcript context from another channel. Branches have your full conversation context and access to the memory system (recall, save, and delete) plus cross-channel transcript recall (`channel_recall`). They return a conclusion. You never see the working. Branch often — it's cheap and keeps you responsive. +**Branch** — for thinking and memory. Branch when you need to recall, save, or forget something from long-term memory, reason through a complex decision, figure out what instructions to give a worker, or retrieve transcript context from another channel. Branches have your full conversation context and access to the memory system (recall, save, and delete), cross-channel transcript recall (`channel_recall`), and worker transcript inspection (`worker_inspect`). They return a conclusion. You never see the working. Branch often — it's cheap and keeps you responsive. + +Use `worker_inspect` in a branch when you need to verify what a worker actually did — what tools it called, what results it got, what sources it checked. Useful when a worker returns a thin or unexpected result, or when the user asks "what did you actually do?" **Worker** — for doing. Workers have task tools (see Worker Capabilities section below). They do NOT have your conversation context or access to memories — they only know what you tell them in the task description, so be specific. Two flavors: @@ -59,6 +61,8 @@ The key distinction: branches think, workers do, you talk. Never use a worker fo When an interactive worker is active and the user's message is directed at that work, route the message to the worker instead of spawning a new one. +**Cancel** — for stopping work. Use `cancel` when a worker is stuck, taking too long, working on the wrong thing, or the user asks you to stop it. You can cancel workers and branches by their ID (visible in the status block and in spawn confirmations). Don't let a runaway worker burn tokens — if something looks wrong, cancel it and start fresh. + ## When To Stay Silent You have a `skip` tool. Use it. Not every message needs a response from you. diff --git a/prompts/en/tools/worker_inspect_description.md.j2 b/prompts/en/tools/worker_inspect_description.md.j2 new file mode 100644 index 000000000..8ab06504e --- /dev/null +++ b/prompts/en/tools/worker_inspect_description.md.j2 @@ -0,0 +1 @@ +Inspect a worker's full execution transcript to see exactly what it did — every tool call, result, and reasoning step. Use without a worker_id to list recent workers. Use with a worker_id to retrieve the full transcript. Useful for verifying worker output, debugging incomplete results, or understanding what sources/steps a worker used. \ No newline at end of file diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 7bb716a59..c30430585 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -74,9 +74,13 @@ impl ChannelState { if let Some(handle) = handle { handle.abort(); + // Mark the DB row as cancelled since the abort prevents WorkerComplete from firing + self.process_run_logger + .log_worker_completed(worker_id, "Worker cancelled", false); Ok(()) } else if removed { - // Worker was in active_workers but had no handle (shouldn't happen, but handle gracefully) + self.process_run_logger + .log_worker_completed(worker_id, "Worker cancelled", false); Ok(()) } else { Err(format!("Worker {worker_id} not found")) @@ -1442,15 +1446,44 @@ impl Channel { } else if replied { tracing::debug!(channel_id = %self.id, "channel turn replied via tool (fallback suppressed)"); } else if is_retrigger { - // On retrigger turns, suppress fallback text. The LLM should - // use the reply tool explicitly if it has something to say, or - // the skip tool if not. Raw text output from retriggers is - // almost always internal acknowledgment, not a real response. - tracing::debug!( - channel_id = %self.id, - response_len = response.len(), - "retrigger turn fallback suppressed (LLM did not use reply/skip tool)" - ); + // On retrigger turns the LLM should use the reply tool, but + // some models return the result as raw text instead. Send it + // as a fallback so the user still gets the worker/branch output. + let text = response.trim(); + if !text.is_empty() { + tracing::info!( + channel_id = %self.id, + response_len = text.len(), + "retrigger produced text without reply tool, sending as fallback" + ); + let extracted = extract_reply_from_tool_syntax(text); + let source = self + .conversation_id + .as_deref() + .and_then(|conversation_id| conversation_id.split(':').next()) + .unwrap_or("unknown"); + let final_text = crate::tools::reply::normalize_discord_mention_tokens( + extracted.as_deref().unwrap_or(text), + source, + ); + if !final_text.is_empty() { + self.state + .conversation_logger + .log_bot_message(&self.state.channel_id, &final_text); + if let Err(error) = self + .response_tx + .send(OutboundResponse::Text(final_text)) + .await + { + tracing::error!(%error, channel_id = %self.id, "failed to send retrigger fallback reply"); + } + } + } else { + tracing::debug!( + channel_id = %self.id, + "retrigger turn produced no text and no reply tool call" + ); + } } else { // If the LLM returned text without using the reply tool, send it // directly. Some models respond with text instead of tool calls. @@ -1584,9 +1617,16 @@ impl Channel { worker_id, channel_id, task, + worker_type, .. } => { - run_logger.log_worker_started(channel_id.as_ref(), *worker_id, task); + run_logger.log_worker_started( + channel_id.as_ref(), + *worker_id, + task, + worker_type, + &self.deps.agent_id, + ); } ProcessEvent::WorkerStatus { worker_id, status, .. @@ -1597,9 +1637,10 @@ impl Channel { worker_id, result, notify, + success, .. } => { - run_logger.log_worker_completed(*worker_id, result); + run_logger.log_worker_completed(*worker_id, result, *success); let mut workers = self.state.active_workers.write().await; workers.remove(worker_id); @@ -1610,7 +1651,7 @@ impl Channel { if *notify { let mut history = self.state.history.write().await; - let worker_message = format!("[Worker completed]: {result}"); + let worker_message = format!("[Worker {worker_id} completed]: {result}"); history.push(rig::message::Message::from(worker_message)); should_retrigger = true; } @@ -1865,6 +1906,8 @@ async fn spawn_branch( state.deps.memory_search.clone(), state.conversation_logger.clone(), state.channel_store.clone(), + crate::conversation::ProcessRunLogger::new(state.deps.sqlite_pool.clone()), + &state.deps.agent_id, ); let branch_max_turns = **state.deps.runtime_config.branch_max_turns.load(); @@ -2041,6 +2084,7 @@ pub async fn spawn_worker_from_state( worker_id, channel_id: Some(state.channel_id.clone()), task: task.clone(), + worker_type: "builtin".into(), }) .ok(); @@ -2140,6 +2184,7 @@ pub async fn spawn_opencode_worker_from_state( worker_id, channel_id: Some(state.channel_id.clone()), task: opencode_task, + worker_type: "opencode".into(), }) .ok(); @@ -2174,11 +2219,11 @@ where .with_label_values(&[&*agent_id]) .inc(); - let (result_text, notify) = match future.await { - Ok(text) => (text, true), + let (result_text, notify, success) = match future.await { + Ok(text) => (text, true, true), Err(error) => { tracing::error!(worker_id = %worker_id, %error, "worker failed"); - (format!("Worker failed: {error}"), true) + (format!("Worker failed: {error}"), true, false) } }; #[cfg(feature = "metrics")] @@ -2200,6 +2245,7 @@ where channel_id, result: result_text, notify, + success, }); }) } diff --git a/src/agent/ingestion.rs b/src/agent/ingestion.rs index 506baa45c..cdfa77844 100644 --- a/src/agent/ingestion.rs +++ b/src/agent/ingestion.rs @@ -475,6 +475,8 @@ async fn process_chunk( deps.memory_search.clone(), conversation_logger, channel_store, + crate::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()), + &deps.agent_id, ); let agent = AgentBuilder::new(model) diff --git a/src/agent/worker.rs b/src/agent/worker.rs index 6f941d7f9..097753b3e 100644 --- a/src/agent/worker.rs +++ b/src/agent/worker.rs @@ -221,6 +221,7 @@ impl Worker { // Fresh history for the worker (no channel context) let mut history = Vec::new(); + let mut compacted_history = Vec::new(); // Run the initial task in segments with compaction checkpoints let mut prompt = self.task.clone(); @@ -271,7 +272,8 @@ impl Worker { }); } - self.maybe_compact_history(&mut history).await; + self.maybe_compact_history(&mut compacted_history, &mut history) + .await; prompt = "Continue where you left off. Do not repeat completed work.".into(); self.hook .send_status(format!("working (segment {segments_run})")); @@ -287,6 +289,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("cancelled"); self.write_failure_log(&history, &format!("cancelled: {reason}")); + self.persist_transcript(&compacted_history, &history); tracing::info!(worker_id = %self.id, %reason, "worker cancelled"); return Ok(format!("Worker cancelled: {reason}")); } @@ -296,6 +299,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("failed"); self.write_failure_log(&history, &format!("context overflow after {MAX_OVERFLOW_RETRIES} compaction attempts: {error}")); + self.persist_transcript(&compacted_history, &history); tracing::error!(worker_id = %self.id, %error, "worker context overflow unrecoverable"); return Err(crate::error::AgentError::Other(error.into()).into()); } @@ -307,7 +311,8 @@ impl Worker { "context overflow, compacting and retrying" ); self.hook.send_status("compacting (overflow recovery)"); - self.force_compact_history(&mut history).await; + self.force_compact_history(&mut compacted_history, &mut history) + .await; prompt = "Continue where you left off. Do not repeat completed work. \ Your previous attempt exceeded the context limit, so older history \ has been compacted." @@ -317,6 +322,7 @@ impl Worker { self.state = WorkerState::Failed; self.hook.send_status("failed"); self.write_failure_log(&history, &error.to_string()); + self.persist_transcript(&compacted_history, &history); tracing::error!(worker_id = %self.id, %error, "worker LLM call failed"); return Err(crate::error::AgentError::Other(error.into()).into()); } @@ -333,7 +339,8 @@ impl Worker { self.hook.send_status("processing follow-up"); // Compact before follow-up if needed - self.maybe_compact_history(&mut history).await; + self.maybe_compact_history(&mut compacted_history, &mut history) + .await; let mut follow_up_prompt = follow_up.clone(); let mut follow_up_overflow_retries = 0; @@ -360,7 +367,8 @@ impl Worker { "follow-up context overflow, compacting and retrying" ); self.hook.send_status("compacting (overflow recovery)"); - self.force_compact_history(&mut history).await; + self.force_compact_history(&mut compacted_history, &mut history) + .await; let prompt_engine = self.deps.runtime_config.prompts.load(); let overflow_msg = prompt_engine.render_system_worker_overflow()?; follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}"); @@ -393,6 +401,9 @@ impl Worker { self.write_success_log(&history); } + // Persist transcript blob (fire-and-forget) + self.persist_transcript(&compacted_history, &history); + tracing::info!(worker_id = %self.id, "worker completed"); Ok(result) } @@ -402,7 +413,11 @@ impl Worker { /// Workers don't have a full Compactor instance — they do inline compaction /// by summarizing older tool calls and results into a condensed recap. /// No LLM call, just programmatic truncation with a summary marker. - async fn maybe_compact_history(&self, history: &mut Vec) { + async fn maybe_compact_history( + &self, + compacted_history: &mut Vec, + history: &mut Vec, + ) { let context_window = **self.deps.runtime_config.context_window.load(); let estimated = estimate_history_tokens(history); let usage = estimated as f32 / context_window as f32; @@ -411,7 +426,7 @@ impl Worker { return; } - self.compact_history(history, 0.50, "worker history compacted") + self.compact_history(compacted_history, history, 0.50, "worker history compacted") .await; } @@ -420,8 +435,13 @@ impl Worker { /// Unlike `maybe_compact_history`, this always fires regardless of current /// usage and removes 75% of messages. Used when the provider has already /// rejected the request for exceeding context limits. - async fn force_compact_history(&self, history: &mut Vec) { + async fn force_compact_history( + &self, + compacted_history: &mut Vec, + history: &mut Vec, + ) { self.compact_history( + compacted_history, history, 0.75, "worker history force-compacted (overflow recovery)", @@ -432,6 +452,7 @@ impl Worker { /// Compact worker history by removing a fraction of the oldest messages. async fn compact_history( &self, + compacted_history: &mut Vec, history: &mut Vec, fraction: f32, log_message: &str, @@ -449,6 +470,7 @@ impl Worker { .max(1) .min(total.saturating_sub(2)); let removed: Vec = history.drain(..remove_count).collect(); + compacted_history.extend(removed.iter().cloned()); let recap = build_worker_recap(&removed); let prompt_engine = self.deps.runtime_config.prompts.load(); @@ -470,6 +492,47 @@ impl Worker { ); } + /// Persist the compressed transcript blob to worker_runs. Fire-and-forget. + fn persist_transcript( + &self, + compacted_history: &[rig::message::Message], + history: &[rig::message::Message], + ) { + let mut full_history = compacted_history.to_vec(); + full_history.extend(history.iter().cloned()); + let transcript_blob = + crate::conversation::worker_transcript::serialize_transcript(&full_history); + let pool = self.deps.sqlite_pool.clone(); + let worker_id = self.id.to_string(); + + // Count tool calls from the Rig history (each ToolCall in an Assistant message) + let tool_calls: i64 = full_history + .iter() + .filter_map(|message| match message { + rig::message::Message::Assistant { content, .. } => Some( + content + .iter() + .filter(|c| matches!(c, rig::message::AssistantContent::ToolCall(_))) + .count() as i64, + ), + _ => None, + }) + .sum(); + + tokio::spawn(async move { + if let Err(error) = + sqlx::query("UPDATE worker_runs SET transcript = ?, tool_calls = ? WHERE id = ?") + .bind(&transcript_blob) + .bind(tool_calls) + .bind(&worker_id) + .execute(&pool) + .await + { + tracing::warn!(%error, worker_id, "failed to persist worker transcript"); + } + }); + } + /// Check if worker is in a terminal state. pub fn is_done(&self) -> bool { matches!(self.state, WorkerState::Done | WorkerState::Failed) @@ -672,7 +735,8 @@ fn build_worker_recap(messages: &[rig::message::Message]) -> String { rig::message::Message::Assistant { content, .. } => { for item in content.iter() { if let rig::message::AssistantContent::ToolCall(tc) = item { - let args = tc.function.arguments.to_string(); + let args = + crate::tools::truncate_output(&tc.function.arguments.to_string(), 200); recap.push_str(&format!("- Called `{}` ({args})\n", tc.function.name)); } if let rig::message::AssistantContent::Text(t) = item @@ -687,7 +751,8 @@ fn build_worker_recap(messages: &[rig::message::Message]) -> String { if let rig::message::UserContent::ToolResult(tr) = item { for c in tr.content.iter() { if let rig::message::ToolResultContent::Text(t) = c { - recap.push_str(&format!(" Result: {}\n", t.text)); + let truncated = crate::tools::truncate_output(&t.text, 200); + recap.push_str(&format!(" Result: {truncated}\n")); } } } diff --git a/src/api.rs b/src/api.rs index e8a13c91a..3c4d38170 100644 --- a/src/api.rs +++ b/src/api.rs @@ -23,6 +23,7 @@ mod skills; mod state; mod system; mod webchat; +mod workers; pub use server::start_http_server; pub use state::{AgentInfo, ApiEvent, ApiState}; diff --git a/src/api/agents.rs b/src/api/agents.rs index c139c4ffe..154fe6d5d 100644 --- a/src/api/agents.rs +++ b/src/api/agents.rs @@ -730,11 +730,13 @@ pub(super) async fn create_agent( let conversation_logger = crate::conversation::history::ConversationLogger::new(db.sqlite.clone()); let channel_store = crate::conversation::ChannelStore::new(db.sqlite.clone()); - + let run_logger = crate::conversation::ProcessRunLogger::new(db.sqlite.clone()); let cortex_tool_server = crate::tools::create_cortex_chat_tool_server( memory_search.clone(), conversation_logger, channel_store, + run_logger, + &deps.agent_id, browser_config, agent_config.screenshot_dir(), brave_search_key, diff --git a/src/api/server.rs b/src/api/server.rs index 9cf29495f..6e0c248c4 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -3,7 +3,7 @@ use super::state::ApiState; use super::{ agents, bindings, channels, config, cortex, cron, ingest, links, mcp, memories, messaging, - models, providers, settings, skills, system, webchat, + models, providers, settings, skills, system, webchat, workers, }; use axum::Json; @@ -88,6 +88,8 @@ pub async fn start_http_server( ) .route("/channels/messages", get(channels::channel_messages)) .route("/channels/status", get(channels::channel_status)) + .route("/agents/workers", get(workers::list_workers)) + .route("/agents/workers/detail", get(workers::worker_detail)) .route("/agents/memories", get(memories::list_memories)) .route("/agents/memories/search", get(memories::search_memories)) .route("/agents/memories/graph", get(memories::memory_graph)) diff --git a/src/api/state.rs b/src/api/state.rs index 03914f079..0b4408480 100644 --- a/src/api/state.rs +++ b/src/api/state.rs @@ -137,6 +137,7 @@ pub enum ApiEvent { channel_id: Option, worker_id: String, task: String, + worker_type: String, }, /// A worker's status changed. WorkerStatusUpdate { @@ -151,6 +152,7 @@ pub enum ApiEvent { channel_id: Option, worker_id: String, result: String, + success: bool, }, /// A branch was started. BranchStarted { @@ -173,6 +175,7 @@ pub enum ApiEvent { process_type: String, process_id: String, tool_name: String, + args: String, }, /// A tool call completed on a process. ToolCompleted { @@ -181,6 +184,7 @@ pub enum ApiEvent { process_type: String, process_id: String, tool_name: String, + result: String, }, /// Configuration was reloaded (skills, identity, etc.). ConfigReloaded, @@ -289,6 +293,7 @@ impl ApiState { worker_id, channel_id, task, + worker_type, .. } => { api_tx @@ -297,6 +302,7 @@ impl ApiState { channel_id: channel_id.as_deref().map(|s| s.to_string()), worker_id: worker_id.to_string(), task: task.clone(), + worker_type: worker_type.clone(), }) .ok(); } @@ -334,6 +340,7 @@ impl ApiState { worker_id, channel_id, result, + success, .. } => { api_tx @@ -342,6 +349,7 @@ impl ApiState { channel_id: channel_id.as_deref().map(|s| s.to_string()), worker_id: worker_id.to_string(), result: result.clone(), + success: *success, }) .ok(); } @@ -364,6 +372,7 @@ impl ApiState { process_id, channel_id, tool_name, + args, .. } => { let (process_type, id_str) = process_id_info(process_id); @@ -374,6 +383,7 @@ impl ApiState { process_type, process_id: id_str, tool_name: tool_name.clone(), + args: args.clone(), }) .ok(); } @@ -381,6 +391,7 @@ impl ApiState { process_id, channel_id, tool_name, + result, .. } => { let (process_type, id_str) = process_id_info(process_id); @@ -391,6 +402,7 @@ impl ApiState { process_type, process_id: id_str, tool_name: tool_name.clone(), + result: result.clone(), }) .ok(); } diff --git a/src/api/workers.rs b/src/api/workers.rs new file mode 100644 index 000000000..39fcc8d73 --- /dev/null +++ b/src/api/workers.rs @@ -0,0 +1,180 @@ +//! Workers API endpoints: list and detail views for worker runs. + +use super::state::ApiState; + +use crate::conversation::history::ProcessRunLogger; +use crate::conversation::worker_transcript; + +use axum::Json; +use axum::extract::{Query, State}; +use axum::http::StatusCode; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Deserialize)] +pub(super) struct WorkerListQuery { + agent_id: String, + #[serde(default = "default_limit")] + limit: i64, + #[serde(default)] + offset: i64, + status: Option, +} + +fn default_limit() -> i64 { + 50 +} + +#[derive(Serialize)] +pub(super) struct WorkerListResponse { + workers: Vec, + total: i64, +} + +#[derive(Serialize)] +pub(super) struct WorkerListItem { + id: String, + task: String, + status: String, + worker_type: String, + channel_id: Option, + channel_name: Option, + started_at: String, + completed_at: Option, + has_transcript: bool, + /// Live status text from StatusBlock (running workers only). + live_status: Option, + /// Total tool calls. From DB for completed workers, from StatusBlock for running. + tool_calls: i64, +} + +#[derive(Deserialize)] +pub(super) struct WorkerDetailQuery { + agent_id: String, + worker_id: String, +} + +#[derive(Serialize)] +pub(super) struct WorkerDetailResponse { + id: String, + task: String, + result: Option, + status: String, + worker_type: String, + channel_id: Option, + channel_name: Option, + started_at: String, + completed_at: Option, + transcript: Option>, + tool_calls: i64, +} + +/// List worker runs for an agent, with live status merged from StatusBlocks. +pub(super) async fn list_workers( + State(state): State>, + Query(query): Query, +) -> Result, StatusCode> { + let pools = state.agent_pools.load(); + let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let logger = ProcessRunLogger::new(pool.clone()); + + let limit = query.limit.clamp(1, 200); + let offset = query.offset.max(0); + let (rows, total) = logger + .list_worker_runs(&query.agent_id, limit, offset, query.status.as_deref()) + .await + .map_err(|error| { + tracing::warn!(%error, "failed to list worker runs"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + // Build a live status lookup from all channel StatusBlocks + let live_statuses = { + let blocks = state.channel_status_blocks.read().await; + let mut map = std::collections::HashMap::new(); + for (_channel_id, status_block) in blocks.iter() { + let block = status_block.read().await; + for worker in &block.active_workers { + map.insert( + worker.id.to_string(), + (worker.status.clone(), worker.tool_calls), + ); + } + } + map + }; + + let workers = rows + .into_iter() + .map(|row| { + let (live_status, live_tool_calls) = live_statuses + .get(&row.id) + .map(|(status, calls)| (Some(status.clone()), *calls as i64)) + .unwrap_or((None, 0)); + + // Use live tool call count for running workers, DB count for completed + let tool_calls = if row.status == "running" && live_tool_calls > 0 { + live_tool_calls + } else { + row.tool_calls + }; + + WorkerListItem { + id: row.id, + task: row.task, + status: row.status, + worker_type: row.worker_type, + channel_id: row.channel_id, + channel_name: row.channel_name, + started_at: row.started_at, + completed_at: row.completed_at, + has_transcript: row.has_transcript, + live_status, + tool_calls, + } + }) + .collect(); + + Ok(Json(WorkerListResponse { workers, total })) +} + +/// Get full detail for a single worker run, including decompressed transcript. +pub(super) async fn worker_detail( + State(state): State>, + Query(query): Query, +) -> Result, StatusCode> { + let pools = state.agent_pools.load(); + let pool = pools.get(&query.agent_id).ok_or(StatusCode::NOT_FOUND)?; + let logger = ProcessRunLogger::new(pool.clone()); + + let detail = logger + .get_worker_detail(&query.agent_id, &query.worker_id) + .await + .map_err(|error| { + tracing::warn!(%error, worker_id = %query.worker_id, "failed to load worker detail"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let transcript = detail.transcript_blob.as_deref().and_then(|blob| { + worker_transcript::deserialize_transcript(blob) + .map_err(|error| { + tracing::warn!(%error, worker_id = %query.worker_id, "failed to decompress transcript"); + }) + .ok() + }); + + Ok(Json(WorkerDetailResponse { + id: detail.id, + task: detail.task, + result: detail.result, + status: detail.status, + worker_type: detail.worker_type, + channel_id: detail.channel_id, + channel_name: detail.channel_name, + started_at: detail.started_at, + completed_at: detail.completed_at, + transcript, + tool_calls: detail.tool_calls, + })) +} diff --git a/src/conversation.rs b/src/conversation.rs index 0097a7d42..bbfc5fac9 100644 --- a/src/conversation.rs +++ b/src/conversation.rs @@ -3,6 +3,10 @@ pub mod channels; pub mod context; pub mod history; +pub mod worker_transcript; pub use channels::ChannelStore; -pub use history::{ConversationLogger, ProcessRunLogger, TimelineItem}; +pub use history::{ + ConversationLogger, ProcessRunLogger, TimelineItem, WorkerDetailRow, WorkerRunRow, +}; +pub use worker_transcript::{ActionContent, TranscriptStep}; diff --git a/src/conversation/history.rs b/src/conversation/history.rs index 4c62659d0..9f0db80da 100644 --- a/src/conversation/history.rs +++ b/src/conversation/history.rs @@ -280,19 +280,26 @@ impl ProcessRunLogger { channel_id: Option<&ChannelId>, worker_id: WorkerId, task: &str, + worker_type: &str, + agent_id: &crate::AgentId, ) { let pool = self.pool.clone(); let id = worker_id.to_string(); let channel_id = channel_id.map(|c| c.to_string()); let task = task.to_string(); + let worker_type = worker_type.to_string(); + let agent_id = agent_id.to_string(); tokio::spawn(async move { if let Err(error) = sqlx::query( - "INSERT OR IGNORE INTO worker_runs (id, channel_id, task) VALUES (?, ?, ?)", + "INSERT OR IGNORE INTO worker_runs (id, channel_id, task, worker_type, agent_id) \ + VALUES (?, ?, ?, ?, ?)", ) .bind(&id) .bind(&channel_id) .bind(&task) + .bind(&worker_type) + .bind(&agent_id) .execute(&pool) .await { @@ -302,34 +309,29 @@ impl ProcessRunLogger { } /// Update a worker's status. Fire-and-forget. - pub fn log_worker_status(&self, worker_id: WorkerId, status: &str) { - let pool = self.pool.clone(); - let id = worker_id.to_string(); - let status = status.to_string(); - - tokio::spawn(async move { - if let Err(error) = sqlx::query("UPDATE worker_runs SET status = ? WHERE id = ?") - .bind(&status) - .bind(&id) - .execute(&pool) - .await - { - tracing::warn!(%error, worker_id = %id, "failed to persist worker status"); - } - }); + /// Worker status text updates are transient — they're available via the + /// in-memory StatusBlock for live workers and don't need to be persisted. + /// The `status` column is reserved for the state enum (running/done/failed). + pub fn log_worker_status(&self, _worker_id: WorkerId, _status: &str) { + // Intentionally a no-op. Status text was previously written to the + // `status` column, overwriting the state enum with free-text like + // "Searching for weather in Germany" which broke badge rendering + // and status filtering. } /// Record a worker completing with its result. Fire-and-forget. - pub fn log_worker_completed(&self, worker_id: WorkerId, result: &str) { + pub fn log_worker_completed(&self, worker_id: WorkerId, result: &str, success: bool) { let pool = self.pool.clone(); let id = worker_id.to_string(); let result = result.to_string(); + let status = if success { "done" } else { "failed" }; tokio::spawn(async move { if let Err(error) = sqlx::query( - "UPDATE worker_runs SET result = ?, status = 'done', completed_at = CURRENT_TIMESTAMP WHERE id = ?" + "UPDATE worker_runs SET result = ?, status = ?, completed_at = CURRENT_TIMESTAMP WHERE id = ?" ) .bind(&result) + .bind(status) .bind(&id) .execute(&pool) .await @@ -438,4 +440,161 @@ impl ProcessRunLogger { items.reverse(); Ok(items) } + + /// List worker runs for an agent, ordered by most recent first. + /// Does NOT include the transcript blob — that's fetched separately via `get_worker_detail`. + pub async fn list_worker_runs( + &self, + agent_id: &str, + limit: i64, + offset: i64, + status_filter: Option<&str>, + ) -> crate::error::Result<(Vec, i64)> { + let (count_where_clause, list_where_clause, has_status_filter) = if status_filter.is_some() + { + ( + "WHERE w.agent_id = ?1 AND w.status = ?2", + "WHERE w.agent_id = ?1 AND w.status = ?4", + true, + ) + } else { + ("WHERE w.agent_id = ?1", "WHERE w.agent_id = ?1", false) + }; + + let count_query = + format!("SELECT COUNT(*) as total FROM worker_runs w {count_where_clause}"); + let list_query = format!( + "SELECT w.id, w.task, w.status, w.worker_type, w.channel_id, w.started_at, \ + w.completed_at, w.transcript IS NOT NULL as has_transcript, \ + w.tool_calls, c.display_name as channel_name \ + FROM worker_runs w \ + LEFT JOIN channels c ON w.channel_id = c.id \ + {list_where_clause} \ + ORDER BY w.started_at DESC \ + LIMIT ?2 OFFSET ?3" + ); + + let mut count_q = sqlx::query(&count_query).bind(agent_id); + let mut list_q = sqlx::query(&list_query) + .bind(agent_id) + .bind(limit) + .bind(offset); + + if has_status_filter { + let filter = status_filter.unwrap_or(""); + count_q = count_q.bind(filter); + list_q = list_q.bind(filter); + } + + let total: i64 = count_q + .fetch_one(&self.pool) + .await + .map(|row| row.try_get("total").unwrap_or(0)) + .map_err(|e| anyhow::anyhow!(e))?; + + let rows = list_q + .fetch_all(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + let items = rows + .into_iter() + .map(|row| WorkerRunRow { + id: row.try_get("id").unwrap_or_default(), + task: row.try_get("task").unwrap_or_default(), + status: row.try_get("status").unwrap_or_default(), + worker_type: row + .try_get("worker_type") + .unwrap_or_else(|_| "builtin".into()), + channel_id: row.try_get("channel_id").ok(), + channel_name: row.try_get("channel_name").ok(), + started_at: row + .try_get::, _>("started_at") + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + completed_at: row + .try_get::, _>("completed_at") + .ok() + .map(|t| t.to_rfc3339()), + has_transcript: row.try_get::("has_transcript").unwrap_or(false), + tool_calls: row.try_get::("tool_calls").unwrap_or(0), + }) + .collect(); + + Ok((items, total)) + } + + /// Get full detail for a single worker run, including the compressed transcript blob. + pub async fn get_worker_detail( + &self, + agent_id: &str, + worker_id: &str, + ) -> crate::error::Result> { + let row = sqlx::query( + "SELECT w.id, w.task, w.result, w.status, w.worker_type, w.channel_id, \ + w.started_at, w.completed_at, w.transcript, w.tool_calls, \ + c.display_name as channel_name \ + FROM worker_runs w \ + LEFT JOIN channels c ON w.channel_id = c.id \ + WHERE w.agent_id = ? AND w.id = ?", + ) + .bind(agent_id) + .bind(worker_id) + .fetch_optional(&self.pool) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(row.map(|row| WorkerDetailRow { + id: row.try_get("id").unwrap_or_default(), + task: row.try_get("task").unwrap_or_default(), + result: row.try_get("result").ok(), + status: row.try_get("status").unwrap_or_default(), + worker_type: row + .try_get("worker_type") + .unwrap_or_else(|_| "builtin".into()), + channel_id: row.try_get("channel_id").ok(), + channel_name: row.try_get("channel_name").ok(), + started_at: row + .try_get::, _>("started_at") + .map(|t| t.to_rfc3339()) + .unwrap_or_default(), + completed_at: row + .try_get::, _>("completed_at") + .ok() + .map(|t| t.to_rfc3339()), + transcript_blob: row.try_get("transcript").ok(), + tool_calls: row.try_get::("tool_calls").unwrap_or(0), + })) + } +} + +/// A worker run row without the transcript blob (for list queries). +#[derive(Debug, Clone, Serialize)] +pub struct WorkerRunRow { + pub id: String, + pub task: String, + pub status: String, + pub worker_type: String, + pub channel_id: Option, + pub channel_name: Option, + pub started_at: String, + pub completed_at: Option, + pub has_transcript: bool, + pub tool_calls: i64, +} + +/// A worker run row with full detail including the transcript blob. +#[derive(Debug, Clone)] +pub struct WorkerDetailRow { + pub id: String, + pub task: String, + pub result: Option, + pub status: String, + pub worker_type: String, + pub channel_id: Option, + pub channel_name: Option, + pub started_at: String, + pub completed_at: Option, + pub transcript_blob: Option>, + pub tool_calls: i64, } diff --git a/src/conversation/worker_transcript.rs b/src/conversation/worker_transcript.rs new file mode 100644 index 000000000..e58da56c9 --- /dev/null +++ b/src/conversation/worker_transcript.rs @@ -0,0 +1,149 @@ +//! Worker transcript serialization and compression. +//! +//! Converts a Rig `Vec` history into a flat `Vec`, +//! then serializes to gzipped JSON for compact storage on the `worker_runs` row. + +use crate::tools::{MAX_TOOL_OUTPUT_BYTES, truncate_output}; + +use flate2::Compression; +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use serde::{Deserialize, Serialize}; +use std::io::{Read, Write}; + +/// Maximum byte length for tool call arguments in transcripts. +const MAX_TOOL_ARGS_BYTES: usize = 2_000; + +/// A single step in a worker transcript. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TranscriptStep { + /// Agent reasoning and/or tool calls. + Action { content: Vec }, + /// Tool execution result. + ToolResult { + call_id: String, + name: String, + text: String, + }, +} + +/// Content within an action step. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ActionContent { + Text { + text: String, + }, + ToolCall { + id: String, + name: String, + args: String, + }, +} + +/// Convert a Rig message history to transcript steps, serialize as JSON, and gzip compress. +pub fn serialize_transcript(history: &[rig::message::Message]) -> Vec { + let steps = convert_history(history); + let json = serde_json::to_vec(&steps).unwrap_or_default(); + + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&json).ok(); + encoder.finish().unwrap_or_default() +} + +/// Decompress and deserialize a gzipped transcript blob. +pub fn deserialize_transcript(blob: &[u8]) -> anyhow::Result> { + let mut decoder = GzDecoder::new(blob); + let mut json = Vec::new(); + decoder.read_to_end(&mut json)?; + let steps: Vec = serde_json::from_slice(&json)?; + Ok(steps) +} + +/// Convert Rig `Vec` to `Vec`. +fn convert_history(history: &[rig::message::Message]) -> Vec { + let mut steps = Vec::new(); + + for message in history { + match message { + rig::message::Message::Assistant { content, .. } => { + let mut parts = Vec::new(); + for item in content.iter() { + match item { + rig::message::AssistantContent::Text(text) => { + if !text.text.is_empty() { + parts.push(ActionContent::Text { + text: text.text.clone(), + }); + } + } + rig::message::AssistantContent::ToolCall(tool_call) => { + let args_str = tool_call.function.arguments.to_string(); + let args = if args_str.len() > MAX_TOOL_ARGS_BYTES { + truncate_output(&args_str, MAX_TOOL_ARGS_BYTES) + } else { + args_str + }; + parts.push(ActionContent::ToolCall { + id: tool_call.id.clone(), + name: tool_call.function.name.clone(), + args, + }); + } + _ => {} + } + } + if !parts.is_empty() { + steps.push(TranscriptStep::Action { content: parts }); + } + } + rig::message::Message::User { content } => { + for item in content.iter() { + match item { + rig::message::UserContent::ToolResult(tool_result) => { + let call_id = tool_result + .call_id + .clone() + .unwrap_or_else(|| tool_result.id.clone()); + + let text = tool_result + .content + .iter() + .filter_map(|c| { + if let rig::message::ToolResultContent::Text(t) = c { + Some(t.text.as_str()) + } else { + None + } + }) + .collect::>() + .join("\n"); + + let truncated = truncate_output(&text, MAX_TOOL_OUTPUT_BYTES); + + steps.push(TranscriptStep::ToolResult { + call_id, + name: String::new(), + text: truncated, + }); + } + rig::message::UserContent::Text(text) => { + // Skip compaction markers and system-injected messages + if !text.text.is_empty() && !text.text.starts_with("[System:") { + steps.push(TranscriptStep::Action { + content: vec![ActionContent::Text { + text: text.text.clone(), + }], + }); + } + } + _ => {} + } + } + } + } + } + + steps +} diff --git a/src/hooks/spacebot.rs b/src/hooks/spacebot.rs index 6f867f9c4..04a13f97a 100644 --- a/src/hooks/spacebot.rs +++ b/src/hooks/spacebot.rs @@ -40,7 +40,7 @@ impl SpacebotHook { process_id: self.process_id.clone(), status: status.into(), }; - let _ = self.event_tx.send(event); + self.event_tx.send(event).ok(); } /// Check a string against the leak pattern set. @@ -192,14 +192,16 @@ where }; } - // Send event without blocking + // Send event without blocking. Truncate args to keep broadcast payloads bounded. + let capped_args = crate::tools::truncate_output(args, 2_000); let event = ProcessEvent::ToolStarted { agent_id: self.agent_id.clone(), process_id: self.process_id.clone(), channel_id: self.channel_id.clone(), tool_name: tool_name.to_string(), + args: capped_args, }; - let _ = self.event_tx.send(event); + self.event_tx.send(event).ok(); tracing::debug!( process_id = %self.process_id, @@ -251,7 +253,7 @@ where tool_name: tool_name.to_string(), result: capped_result, }; - let _ = self.event_tx.send(event); + self.event_tx.send(event).ok(); tracing::debug!( process_id = %self.process_id, diff --git a/src/lib.rs b/src/lib.rs index 572b84415..0e28ff0a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,6 +116,7 @@ pub enum ProcessEvent { worker_id: WorkerId, channel_id: Option, task: String, + worker_type: String, }, WorkerStatus { agent_id: AgentId, @@ -129,12 +130,14 @@ pub enum ProcessEvent { channel_id: Option, result: String, notify: bool, + success: bool, }, ToolStarted { agent_id: AgentId, process_id: ProcessId, channel_id: Option, tool_name: String, + args: String, }, ToolCompleted { agent_id: AgentId, diff --git a/src/main.rs b/src/main.rs index 451f89b93..e1eec905c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1773,10 +1773,13 @@ async fn initialize_agents( let conversation_logger = spacebot::conversation::history::ConversationLogger::new(agent.db.sqlite.clone()); let channel_store = spacebot::conversation::ChannelStore::new(agent.db.sqlite.clone()); + let run_logger = spacebot::conversation::ProcessRunLogger::new(agent.db.sqlite.clone()); let tool_server = spacebot::tools::create_cortex_chat_tool_server( agent.deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + &agent.deps.agent_id, browser_config, agent.config.screenshot_dir(), brave_search_key, diff --git a/src/prompts/text.rs b/src/prompts/text.rs index 3e0518b45..dbed2c89e 100644 --- a/src/prompts/text.rs +++ b/src/prompts/text.rs @@ -159,6 +159,9 @@ fn lookup(lang: &str, key: &str) -> &'static str { ("en", "tools/channel_recall") => { include_str!("../../prompts/en/tools/channel_recall_description.md.j2") } + ("en", "tools/worker_inspect") => { + include_str!("../../prompts/en/tools/worker_inspect_description.md.j2") + } ("en", "tools/send_file") => { include_str!("../../prompts/en/tools/send_file_description.md.j2") } diff --git a/src/tools.rs b/src/tools.rs index 5dab9da1a..7d8d58690 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -45,6 +45,7 @@ pub mod shell; pub mod skip; pub mod spawn_worker; pub mod web_search; +pub mod worker_inspect; pub use branch_tool::{BranchArgs, BranchError, BranchOutput, BranchTool}; pub use browser::{ @@ -88,6 +89,9 @@ pub use shell::{ShellArgs, ShellError, ShellOutput, ShellResult, ShellTool}; pub use skip::{SkipArgs, SkipError, SkipFlag, SkipOutput, SkipTool, new_skip_flag}; pub use spawn_worker::{SpawnWorkerArgs, SpawnWorkerError, SpawnWorkerOutput, SpawnWorkerTool}; pub use web_search::{SearchResult, WebSearchArgs, WebSearchError, WebSearchOutput, WebSearchTool}; +pub use worker_inspect::{ + WorkerInspectArgs, WorkerInspectError, WorkerInspectOutput, WorkerInspectTool, +}; use crate::agent::channel::ChannelState; use crate::config::{BrowserConfig, RuntimeConfig}; @@ -338,12 +342,15 @@ pub fn create_branch_tool_server( memory_search: Arc, conversation_logger: crate::conversation::history::ConversationLogger, channel_store: crate::conversation::ChannelStore, + run_logger: crate::conversation::history::ProcessRunLogger, + agent_id: &str, ) -> ToolServerHandle { ToolServer::new() .tool(MemorySaveTool::new(memory_search.clone())) .tool(MemoryRecallTool::new(memory_search.clone())) .tool(MemoryDeleteTool::new(memory_search)) .tool(ChannelRecallTool::new(conversation_logger, channel_store)) + .tool(WorkerInspectTool::new(run_logger, agent_id.to_string())) .run() } @@ -413,6 +420,8 @@ pub fn create_cortex_chat_tool_server( memory_search: Arc, conversation_logger: crate::conversation::history::ConversationLogger, channel_store: crate::conversation::ChannelStore, + run_logger: crate::conversation::history::ProcessRunLogger, + agent_id: &str, browser_config: BrowserConfig, screenshot_dir: PathBuf, brave_search_key: Option, @@ -424,6 +433,7 @@ pub fn create_cortex_chat_tool_server( .tool(MemoryRecallTool::new(memory_search.clone())) .tool(MemoryDeleteTool::new(memory_search)) .tool(ChannelRecallTool::new(conversation_logger, channel_store)) + .tool(WorkerInspectTool::new(run_logger, agent_id.to_string())) .tool(ShellTool::new(workspace.clone(), sandbox.clone())) .tool(FileTool::new(workspace.clone())) .tool(ExecTool::new(workspace, sandbox)); diff --git a/src/tools/worker_inspect.rs b/src/tools/worker_inspect.rs new file mode 100644 index 000000000..960d52972 --- /dev/null +++ b/src/tools/worker_inspect.rs @@ -0,0 +1,230 @@ +//! Worker transcript inspection tool for branches. +//! +//! Allows a branch to retrieve the full transcript of a completed worker run, +//! or list recent worker runs to find the right one. + +use crate::conversation::history::ProcessRunLogger; +use crate::conversation::worker_transcript; + +use rig::completion::ToolDefinition; +use rig::tool::Tool; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Tool for inspecting worker run transcripts. +#[derive(Debug, Clone)] +pub struct WorkerInspectTool { + run_logger: ProcessRunLogger, + agent_id: String, +} + +impl WorkerInspectTool { + pub fn new(run_logger: ProcessRunLogger, agent_id: String) -> Self { + Self { + run_logger, + agent_id, + } + } +} + +fn truncate_utf8(text: &str, max_bytes: usize) -> String { + if text.len() <= max_bytes { + return text.to_string(); + } + + let boundary = text.floor_char_boundary(max_bytes); + format!("{}...", &text[..boundary]) +} + +#[derive(Debug, thiserror::Error)] +#[error("Worker inspect failed: {0}")] +pub struct WorkerInspectError(String); + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct WorkerInspectArgs { + /// The worker ID to inspect. Omit to list recent worker runs. + #[serde(default)] + pub worker_id: Option, + /// Maximum number of worker runs to list (default 10, max 50). Only used when listing. + #[serde(default = "default_list_limit")] + pub limit: i64, +} + +fn default_list_limit() -> i64 { + 10 +} + +#[derive(Debug, Serialize)] +pub struct WorkerInspectOutput { + pub action: String, + pub summary: String, +} + +impl Tool for WorkerInspectTool { + const NAME: &'static str = "worker_inspect"; + + type Error = WorkerInspectError; + type Args = WorkerInspectArgs; + type Output = WorkerInspectOutput; + + async fn definition(&self, _prompt: String) -> ToolDefinition { + ToolDefinition { + name: Self::NAME.to_string(), + description: crate::prompts::text::get("tools/worker_inspect").to_string(), + parameters: serde_json::json!({ + "type": "object", + "properties": { + "worker_id": { + "type": "string", + "description": "UUID of the worker run to inspect. Omit to list recent workers." + }, + "limit": { + "type": "integer", + "minimum": 1, + "maximum": 50, + "default": 10, + "description": "Number of recent workers to list (1-50). Only used when worker_id is omitted." + } + } + }), + } + } + + async fn call(&self, args: Self::Args) -> Result { + let Some(worker_id) = args.worker_id else { + return self.list_workers(args.limit).await; + }; + + let detail = self + .run_logger + .get_worker_detail(&self.agent_id, &worker_id) + .await + .map_err(|e| WorkerInspectError(format!("Failed to query worker: {e}")))? + .ok_or_else(|| WorkerInspectError(format!("No worker found with ID {worker_id}")))?; + + let mut summary = format!( + "## Worker {}\n\n**Task:** {}\n**Status:** {}\n**Started:** {}\n", + detail.id, detail.task, detail.status, detail.started_at, + ); + + if let Some(completed_at) = &detail.completed_at { + summary.push_str(&format!("**Completed:** {completed_at}\n")); + } + + if let Some(result) = &detail.result { + summary.push_str(&format!("\n### Result\n\n{result}\n")); + } + + if let Some(blob) = &detail.transcript_blob { + match worker_transcript::deserialize_transcript(blob) { + Ok(steps) => { + summary.push_str(&format!("\n### Transcript ({} steps)\n\n", steps.len())); + for step in &steps { + match step { + worker_transcript::TranscriptStep::Action { content } => { + for item in content { + match item { + worker_transcript::ActionContent::Text { text } => { + summary.push_str(&format!("**Agent:** {text}\n\n")); + } + worker_transcript::ActionContent::ToolCall { + name, + args, + .. + } => { + summary.push_str(&format!( + "**Tool call:** `{name}`\n```\n{args}\n```\n\n" + )); + } + } + } + } + worker_transcript::TranscriptStep::ToolResult { + name, text, .. + } => { + let label = if name.is_empty() { "tool" } else { name }; + let display = if text.len() > 500 { + format!( + "{}\n[truncated, {} bytes total]", + truncate_utf8(text, 500), + text.len() + ) + } else { + text.clone() + }; + summary.push_str(&format!( + "**Result ({label}):**\n```\n{display}\n```\n\n" + )); + } + } + } + } + Err(error) => { + summary.push_str(&format!( + "\n*Transcript could not be decompressed: {error}*\n" + )); + } + } + } else { + summary.push_str("\n*No transcript available for this worker.*\n"); + } + + Ok(WorkerInspectOutput { + action: "inspect".to_string(), + summary, + }) + } +} + +impl WorkerInspectTool { + async fn list_workers(&self, limit: i64) -> Result { + let limit = limit.clamp(1, 50); + let (rows, total) = self + .run_logger + .list_worker_runs(&self.agent_id, limit, 0, None) + .await + .map_err(|e| WorkerInspectError(format!("Failed to list workers: {e}")))?; + + if rows.is_empty() { + return Ok(WorkerInspectOutput { + action: "list".to_string(), + summary: "No worker runs found.".to_string(), + }); + } + + let mut summary = format!("## Recent Workers ({} of {total})\n\n", rows.len()); + + for row in &rows { + let status_marker = match row.status.as_str() { + "running" => "[running]", + "done" => "[done]", + "failed" => "[failed]", + _ => "[-]", + }; + summary.push_str(&format!( + "{status_marker} `{}` — {} ({})\n", + row.id, row.task, row.status, + )); + if let Some(channel) = &row.channel_name { + summary.push_str(&format!(" Channel: {channel}\n")); + } + summary.push_str(&format!( + " Started: {} | {} tool calls{}\n\n", + row.started_at, + row.tool_calls, + if row.has_transcript { + " | transcript available" + } else { + "" + }, + )); + } + + summary.push_str("Use `worker_inspect` with a `worker_id` to view the full transcript."); + + Ok(WorkerInspectOutput { + action: "list".to_string(), + summary, + }) + } +} diff --git a/tests/context_dump.rs b/tests/context_dump.rs index 30780f261..a455ae62f 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -274,10 +274,13 @@ async fn dump_branch_context() { let conversation_logger = spacebot::conversation::ConversationLogger::new(deps.sqlite_pool.clone()); let channel_store = spacebot::conversation::ChannelStore::new(deps.sqlite_pool.clone()); + let run_logger = spacebot::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()); let branch_tool_server = spacebot::tools::create_branch_tool_server( deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + "test-agent", ); let tool_defs = branch_tool_server @@ -459,10 +462,13 @@ async fn dump_all_contexts() { let branch_prompt = prompt_engine .render_branch_prompt(&instance_dir, &workspace_dir) .expect("failed to render branch prompt"); + let run_logger = spacebot::conversation::ProcessRunLogger::new(deps.sqlite_pool.clone()); let branch_tool_server = spacebot::tools::create_branch_tool_server( deps.memory_search.clone(), conversation_logger, channel_store, + run_logger, + "test-agent", ); let branch_tool_defs = branch_tool_server.get_tool_defs(None).await.unwrap(); let branch_tools_text = format_tool_defs(&branch_tool_defs);