From 30b424310c33654a7b75db06f9279a698b0bede2 Mon Sep 17 00:00:00 2001 From: 1872-svg <1872-svg@users.noreply.github.com> Date: Wed, 11 Mar 2026 12:41:07 +0000 Subject: [PATCH] feat: streaming output, /verbose mode, fork agent, and bugfixes - Stream-json output with real-time Telegram message editing - /verbose command to show tool calls in streaming message - Sliding window display (last 8 tool lines, 15 text lines) to stay within 4096 limit - Newline separation between successive assistant message turns - Fork agent (haiku) for parallel queries when main agent is busy - /kill command to stop active agent - Fix duplicate messages from edit/sendMessage race condition - Fix verbose final message lost when edit fails silently - Fix verbose fallback duplicate on short text-only responses Co-Authored-By: Claude Opus 4.6 --- src/commands/telegram.ts | 216 +++++++++++++++++++- src/runner.ts | 411 ++++++++++++++++++++++++++++----------- 2 files changed, 512 insertions(+), 115 deletions(-) diff --git a/src/commands/telegram.ts b/src/commands/telegram.ts index 94b5793e..f47440bd 100644 --- a/src/commands/telegram.ts +++ b/src/commands/telegram.ts @@ -1,4 +1,4 @@ -import { ensureProjectClaudeMd, run, runUserMessage } from "../runner"; +import { ensureProjectClaudeMd, run, runUserMessage, runFork, killActive, isMainBusy } from "../runner"; import { getSettings, loadSettings } from "../config"; import { resetSession } from "../sessions"; import { transcribeAudioToText } from "../whisper"; @@ -300,6 +300,128 @@ async function sendTyping(token: string, chatId: number, threadId?: number): Pro }).catch(() => {}); } +// Chat IDs with verbose tool display enabled +const verboseChats = new Set(); + +/** + * Build a streaming callback using editMessageText. + * On first chunk: send a placeholder message to get message_id. + * On subsequent chunks (throttled): edit that message with accumulated plain text. + * In verbose mode, tool call/result lines appear above the text response. + */ +function makeStreamCallback( + token: string, + chatId: number, + threadId: number | undefined, + options: { intervalMs?: number; verbose?: boolean } = {} +): { onChunk: (text: string) => void; onToolEvent: (line: string) => void; waitForStreamMsg: () => Promise } { + const { intervalMs = 500, verbose = false } = options; + let textAcc = ""; + const toolLines: string[] = []; + let lastSentAt = 0; + let timer: ReturnType | null = null; + let streamMsgId: number | null = null; + let initPromise: Promise | null = null; + let finalized = false; + + const getDisplay = () => { + const MAX_TOOL_LINES = 8; + const MAX_TEXT_LINES = 15; + let toolPart: string; + if (toolLines.length > MAX_TOOL_LINES) { + const shown = toolLines.slice(-MAX_TOOL_LINES); + toolPart = `[...${toolLines.length - MAX_TOOL_LINES} earlier]\n` + shown.join("\n"); + } else { + toolPart = toolLines.join("\n"); + } + let textPart = textAcc; + const textLines = textPart.split("\n"); + if (textLines.length > MAX_TEXT_LINES) { + textPart = `[...]\n` + textLines.slice(-MAX_TEXT_LINES).join("\n"); + } + return toolPart + (textPart ? (toolPart ? "\n\n" : "") + textPart : ""); + }; + + const editStream = () => { + if (!streamMsgId || finalized) return; + let display: string; + if (verbose) { + display = getDisplay(); + } else { + // Keep last N lines of text for streaming preview + const lines = textAcc.split("\n"); + display = lines.length > 30 ? `[...]\n${lines.slice(-30).join("\n")}` : textAcc; + } + if (!display) return; + callApi(token, "editMessageText", { + chat_id: chatId, + message_id: streamMsgId, + text: display.slice(0, 4096), + }).catch(() => {}); + }; + + const flush = async () => { + const display = verbose ? getDisplay() : textAcc; + if (!display) return; + lastSentAt = Date.now(); + + if (!streamMsgId && !initPromise) { + initPromise = (async () => { + try { + const res = await callApi<{ ok: boolean; result: { message_id: number } }>( + token, "sendMessage", { + chat_id: chatId, + text: "⏳", + ...(threadId ? { message_thread_id: threadId } : {}), + } + ); + if (res.ok) { + streamMsgId = res.result.message_id; + editStream(); + } + } catch {} + })(); + await initPromise; + } else { + if (initPromise) await initPromise; + editStream(); + } + }; + + const onChunk = (text: string) => { + textAcc += text; + const now = Date.now(); + if (now - lastSentAt >= intervalMs) { + if (timer) { clearTimeout(timer); timer = null; } + flush(); + } else if (!timer) { + timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt)); + } + }; + + const onToolEvent = (line: string) => { + if (!verbose) return; + toolLines.push(line); + // Use same throttle logic as onChunk to avoid spamming the API + const now = Date.now(); + if (now - lastSentAt >= intervalMs) { + if (timer) { clearTimeout(timer); timer = null; } + flush(); + } else if (!timer) { + timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt)); + } + }; + + const waitForStreamMsg = async (): Promise<{ msgId: number | null; hadToolLines: boolean }> => { + if (timer) { clearTimeout(timer); timer = null; } + if (initPromise) await initPromise; + finalized = true; + return { msgId: streamMsgId, hadToolLines: toolLines.length > 0 }; + }; + + return { onChunk, onToolEvent, waitForStreamMsg }; +} + function extractReactionDirective(text: string): { cleanedText: string; reactionEmoji: string | null } { let reactionEmoji: string | null = null; const cleanedText = text @@ -516,6 +638,47 @@ async function handleMessage(message: TelegramMessage): Promise { return; } + if (command === "/kill") { + const killed = killActive(); + await sendMessage(config.token, chatId, killed ? "Killed active agent." : "No active agent running.", threadId); + return; + } + + if (command === "/verbose") { + if (verboseChats.has(chatId)) { + verboseChats.delete(chatId); + await sendMessage(config.token, chatId, "Verbose mode off.", threadId); + } else { + verboseChats.add(chatId); + await sendMessage(config.token, chatId, "Verbose mode on — tool calls will be shown.", threadId); + } + return; + } + + if (command === "/fork") { + const forkPrompt = text.replace(/^\/fork\s*/i, "").trim(); + if (!forkPrompt) { + await sendMessage(config.token, chatId, "Usage: /fork ", threadId); + return; + } + const typingInterval = setInterval(() => sendTyping(config.token, chatId, threadId), 4000); + try { + await sendTyping(config.token, chatId, threadId); + const senderLabel = message.from?.username ?? String(userId ?? "unknown"); + const result = await runFork(`[Telegram from ${senderLabel}]\nMessage: ${forkPrompt}`); + if (result.exitCode !== 0) { + await sendMessage(config.token, chatId, `Fork error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId); + } else { + await sendMessage(config.token, chatId, result.stdout || "(empty response)", threadId); + } + } catch (err) { + await sendMessage(config.token, chatId, `Fork error: ${err instanceof Error ? err.message : String(err)}`, threadId); + } finally { + clearInterval(typingInterval); + } + return; + } + // Secretary: detect reply to a bot alert message → treat as custom reply const replyToMsgId = message.reply_to_message?.message_id; if (replyToMsgId && text && botId && message.reply_to_message?.from?.id === botId) { @@ -598,10 +761,30 @@ async function handleMessage(message: TelegramMessage): Promise { ); } const prefixedPrompt = promptParts.join("\n"); - const result = await runUserMessage("telegram", prefixedPrompt); + const busy = isMainBusy(); + const verbose = verboseChats.has(chatId); + let result; + let streamMsgId: number | null = null; + let hadToolLines = false; + if (busy) { + result = await runFork(prefixedPrompt); + } else { + const stream = makeStreamCallback(config.token, chatId, threadId, { verbose }); + result = await runUserMessage("telegram", prefixedPrompt, stream.onChunk, stream.onToolEvent); + const streamResult = await stream.waitForStreamMsg(); + streamMsgId = streamResult.msgId; + hadToolLines = streamResult.hadToolLines; + } if (result.exitCode !== 0) { - await sendMessage(config.token, chatId, `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId); + const errText = `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`; + if (streamMsgId) { + await callApi(config.token, "editMessageText", { + chat_id: chatId, message_id: streamMsgId, text: errText, + }).catch(() => sendMessage(config.token, chatId, errText, threadId)); + } else { + await sendMessage(config.token, chatId, errText, threadId); + } } else { const { cleanedText, reactionEmoji } = extractReactionDirective(result.stdout || ""); if (reactionEmoji) { @@ -609,7 +792,32 @@ async function handleMessage(message: TelegramMessage): Promise { console.error(`[Telegram] Failed to send reaction for ${label}: ${err instanceof Error ? err.message : err}`); }); } - await sendMessage(config.token, chatId, cleanedText || "(empty response)", threadId); + const finalText = cleanedText || "(empty response)"; + if (streamMsgId) { + // Edit the streaming message with final formatted HTML. + // editStream() already set the message to the correct plain text content, + // so if all edits fail (e.g. "message is not modified"), do NOT send a new + // message — the user already sees the correct content and a sendMessage + // would create a duplicate. + const html = markdownToTelegramHtml(normalizeTelegramText(finalText)); + await callApi(config.token, "editMessageText", { + chat_id: chatId, message_id: streamMsgId, + text: html.slice(0, 4096), parse_mode: "HTML", + }).catch(() => callApi(config.token, "editMessageText", { + chat_id: chatId, message_id: streamMsgId, + text: finalText.slice(0, 4096), + }).catch(() => { + // If all edits fail and the stream message has tool output (verbose), + // send the final response as a new message. But if there were no tool + // lines, the stream message already shows the correct text — "not + // modified" just means it's already right, so don't send a duplicate. + if (verbose && hadToolLines) { + return sendMessage(config.token, chatId, finalText, threadId); + } + })); + } else { + await sendMessage(config.token, chatId, finalText, threadId); + } } } catch (err) { const errMsg = err instanceof Error ? err.message : String(err); diff --git a/src/runner.ts b/src/runner.ts index 93355dc1..3a488fae 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -33,6 +33,25 @@ function enqueue(fn: () => Promise): Promise { return task; } +// Active process tracking — allows kill from outside +let activeProc: ReturnType | null = null; + +/** Kill the currently running claude subprocess. Returns true if something was killed. */ +export function killActive(): boolean { + if (!activeProc) return false; + try { activeProc.kill(); } catch {} + activeProc = null; + return true; +} + +// Tracks whether the main serial queue is currently executing +let mainRunning = false; + +/** True while the main agent is processing a task (excludes fork). */ +export function isMainBusy(): boolean { + return mainRunning; +} + function extractRateLimitMessage(stdout: string, stderr: string): string | null { const candidates = [stdout, stderr]; for (const text of candidates) { @@ -88,11 +107,13 @@ async function runClaudeOnce( env: buildChildEnv(baseEnv, model, api), }); + activeProc = proc; const [rawStdout, stderr] = await Promise.all([ new Response(proc.stdout).text(), new Response(proc.stderr).text(), ]); await proc.exited; + if (activeProc === proc) activeProc = null; return { rawStdout, @@ -101,6 +122,139 @@ async function runClaudeOnce( }; } +function formatToolCallSummary(name: string, input: Record): string { + const s = (v: unknown, max = 50) => String(v ?? "").slice(0, max); + switch (name) { + case "Write": + case "Edit": + case "Read": return `${name}(${s(input.file_path)})`; + case "Bash": return `Bash(${s(input.command, 60)})`; + case "Grep": return `Grep(${s(input.pattern)} in ${s(input.path ?? ".")})`; + case "Glob": return `Glob(${s(input.pattern)})`; + case "WebSearch": return `WebSearch(${s(input.query)})`; + case "WebFetch": return `WebFetch(${s(input.url, 60)})`; + default: return `${name}(...)`; + } +} + +function extractToolResultText(content: unknown): string { + if (typeof content === "string") return content; + if (Array.isArray(content)) { + return (content as Array<{ type?: string; text?: string }>) + .filter(b => b.type === "text") + .map(b => b.text ?? "") + .join(""); + } + return String(content ?? ""); +} + +/** + * Run claude with --output-format stream-json, emitting text chunks via onChunk + * and tool call/result lines via onToolEvent as they arrive. + * Session ID and final result come from the result event. + */ +async function runClaudeStreaming( + baseArgs: string[], + model: string, + api: string, + baseEnv: Record, + onChunk?: (text: string) => void, + onToolEvent?: (line: string) => void +): Promise<{ result: string; stderr: string; exitCode: number; sessionId?: string; isRateLimit: boolean }> { + const args = [...baseArgs]; + const normalizedModel = model.trim().toLowerCase(); + if (model.trim() && normalizedModel !== "glm") args.push("--model", model.trim()); + + const proc = Bun.spawn(args, { + stdout: "pipe", + stderr: "pipe", + env: buildChildEnv(baseEnv, model, api), + }); + + activeProc = proc; + const stderrPromise = new Response(proc.stderr).text(); + + let finalResult = ""; + let sessionId: string | undefined; + let isRateLimit = false; + let delivered = ""; // text already sent to onChunk for the current message + let lastMsgId = ""; // reset delivered tracking when a new assistant message starts + const pendingToolCalls = new Map(); // tool_use_id → tool name + + const reader = proc.stdout.getReader(); + const decoder = new TextDecoder(); + let buf = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buf += decoder.decode(value, { stream: true }); + + let nl: number; + while ((nl = buf.indexOf("\n")) !== -1) { + const line = buf.slice(0, nl).trim(); + buf = buf.slice(nl + 1); + if (!line) continue; + + try { + const event = JSON.parse(line); + + if (event.type === "assistant" && event.message?.content) { + const msgId: string = event.message.id ?? ""; + if (msgId !== lastMsgId) { + // Insert newline separator between assistant messages so text + // from successive turns doesn't merge onto one line. + if (onChunk && delivered) onChunk("\n"); + delivered = ""; + lastMsgId = msgId; + } + let full = ""; + for (const block of event.message.content) { + if (block.type === "text" && typeof block.text === "string") { + full += block.text; + } else if (block.type === "tool_use" && onToolEvent) { + pendingToolCalls.set(block.id, block.name); + onToolEvent(`● ${formatToolCallSummary(block.name, block.input ?? {})}`); + } + } + if (onChunk && full.length > delivered.length) { + onChunk(full.slice(delivered.length)); + delivered = full; + } + } + + if (event.type === "user" && onToolEvent) { + for (const block of event.message?.content ?? []) { + if (block.type === "tool_result") { + const name = pendingToolCalls.get(block.tool_use_id) ?? "?"; + pendingToolCalls.delete(block.tool_use_id); + const text = extractToolResultText(block.content); + const firstLine = text.split("\n")[0].slice(0, 80); + const summary = block.is_error ? `Error: ${firstLine}` : (firstLine || "done"); + onToolEvent(` ⎿ [${name}] ${summary}`); + } + } + } + + if (event.type === "result") { + sessionId = event.session_id; + finalResult = typeof event.result === "string" ? event.result : finalResult; + isRateLimit = RATE_LIMIT_PATTERN.test(finalResult); + } + } catch {} + } + } + + await proc.exited; + if (activeProc === proc) activeProc = null; + + const stderr = await stderrPromise; + // Also check stderr for rate limit signals + if (!isRateLimit) isRateLimit = RATE_LIMIT_PATTERN.test(stderr); + + return { result: finalResult, stderr, exitCode: proc.exitCode ?? 1, sessionId, isRateLimit }; +} + const PROJECT_DIR = process.cwd(); const DIR_SCOPE_PROMPT = [ @@ -223,127 +377,99 @@ export async function loadHeartbeatPromptTemplate(): Promise { return ""; } -async function execClaude(name: string, prompt: string): Promise { - await mkdir(LOGS_DIR, { recursive: true }); - - const existing = await getSession(); - const isNew = !existing; - const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); - const logFile = join(LOGS_DIR, `${name}-${timestamp}.log`); - - const { security, model, api, fallback } = getSettings(); - const primaryConfig: ModelConfig = { model, api }; - const fallbackConfig: ModelConfig = { - model: fallback?.model ?? "", - api: fallback?.api ?? "", - }; - const securityArgs = buildSecurityArgs(security); - - console.log( - `[${new Date().toLocaleTimeString()}] Running: ${name} (${isNew ? "new session" : `resume ${existing.sessionId.slice(0, 8)}`}, security: ${security.level})` - ); - - // New session: use json output to capture Claude's session_id - // Resumed session: use text output with --resume - const outputFormat = isNew ? "json" : "text"; - const args = ["claude", "-p", prompt, "--output-format", outputFormat, ...securityArgs]; - - if (!isNew) { - args.push("--resume", existing.sessionId); - } - - // Build the appended system prompt: prompt files + directory scoping - // This is passed on EVERY invocation (not just new sessions) because - // --append-system-prompt does not persist across --resume. - const promptContent = await loadPrompts(); - const appendParts: string[] = [ - "You are running inside ClaudeClaw.", - ]; - if (promptContent) appendParts.push(promptContent); +async function execClaude(name: string, prompt: string, onChunk?: (text: string) => void, onToolEvent?: (line: string) => void): Promise { + mainRunning = true; + try { + await mkdir(LOGS_DIR, { recursive: true }); + + const existing = await getSession(); + const isNew = !existing; + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const logFile = join(LOGS_DIR, `${name}-${timestamp}.log`); + + const { security, model, api, fallback } = getSettings(); + const primaryConfig: ModelConfig = { model, api }; + const fallbackConfig: ModelConfig = { + model: fallback?.model ?? "", + api: fallback?.api ?? "", + }; + const securityArgs = buildSecurityArgs(security); + + console.log( + `[${new Date().toLocaleTimeString()}] Running: ${name} (${isNew ? "new session" : `resume ${existing.sessionId.slice(0, 8)}`}, security: ${security.level})` + ); - // Load the project's CLAUDE.md if it exists - if (existsSync(PROJECT_CLAUDE_MD)) { - try { - const claudeMd = await Bun.file(PROJECT_CLAUDE_MD).text(); - if (claudeMd.trim()) appendParts.push(claudeMd.trim()); - } catch (e) { - console.error(`[${new Date().toLocaleTimeString()}] Failed to read project CLAUDE.md:`, e); + // Always use stream-json — session_id comes from the result event for both new and resumed + // --verbose is required by claude when using stream-json with --print + const args = ["claude", "-p", prompt, "--output-format", "stream-json", "--verbose", ...securityArgs]; + if (!isNew) args.push("--resume", existing.sessionId); + + // Build the appended system prompt (re-sent every turn since --append-system-prompt doesn't persist) + const promptContent = await loadPrompts(); + const appendParts: string[] = ["You are running inside ClaudeClaw."]; + if (promptContent) appendParts.push(promptContent); + + if (existsSync(PROJECT_CLAUDE_MD)) { + try { + const claudeMd = await Bun.file(PROJECT_CLAUDE_MD).text(); + if (claudeMd.trim()) appendParts.push(claudeMd.trim()); + } catch (e) { + console.error(`[${new Date().toLocaleTimeString()}] Failed to read project CLAUDE.md:`, e); + } } - } - if (security.level !== "unrestricted") appendParts.push(DIR_SCOPE_PROMPT); - if (appendParts.length > 0) { - args.push("--append-system-prompt", appendParts.join("\n\n")); - } + if (security.level !== "unrestricted") appendParts.push(DIR_SCOPE_PROMPT); + if (appendParts.length > 0) args.push("--append-system-prompt", appendParts.join("\n\n")); - // Strip CLAUDECODE env var so child claude processes don't think they're nested - const { CLAUDECODE: _, ...cleanEnv } = process.env; - const baseEnv = { ...cleanEnv } as Record; + const { CLAUDECODE: _, ...cleanEnv } = process.env; + const baseEnv = { ...cleanEnv } as Record; - let exec = await runClaudeOnce(args, primaryConfig.model, primaryConfig.api, baseEnv); - const primaryRateLimit = extractRateLimitMessage(exec.rawStdout, exec.stderr); - let usedFallback = false; + let exec = await runClaudeStreaming(args, primaryConfig.model, primaryConfig.api, baseEnv, onChunk, onToolEvent); + let usedFallback = false; - if (primaryRateLimit && hasModelConfig(fallbackConfig) && !sameModelConfig(primaryConfig, fallbackConfig)) { - console.warn( - `[${new Date().toLocaleTimeString()}] Claude limit reached; retrying with fallback${fallbackConfig.model ? ` (${fallbackConfig.model})` : ""}...` - ); - exec = await runClaudeOnce(args, fallbackConfig.model, fallbackConfig.api, baseEnv); - usedFallback = true; - } - - const rawStdout = exec.rawStdout; - const stderr = exec.stderr; - const exitCode = exec.exitCode; - let stdout = rawStdout; - let sessionId = existing?.sessionId ?? "unknown"; - const rateLimitMessage = extractRateLimitMessage(rawStdout, stderr); - - if (rateLimitMessage) { - stdout = rateLimitMessage; - } - - // For new sessions, parse the JSON to extract session_id and result text - if (!rateLimitMessage && isNew && exitCode === 0) { - try { - const json = JSON.parse(rawStdout); - sessionId = json.session_id; - stdout = json.result ?? ""; - // Save the real session ID from Claude Code - await createSession(sessionId); - console.log(`[${new Date().toLocaleTimeString()}] Session created: ${sessionId}`); - } catch (e) { - console.error(`[${new Date().toLocaleTimeString()}] Failed to parse session from Claude output:`, e); + if (exec.isRateLimit && hasModelConfig(fallbackConfig) && !sameModelConfig(primaryConfig, fallbackConfig)) { + console.warn( + `[${new Date().toLocaleTimeString()}] Claude limit reached; retrying with fallback${fallbackConfig.model ? ` (${fallbackConfig.model})` : ""}...` + ); + exec = await runClaudeStreaming(args, fallbackConfig.model, fallbackConfig.api, baseEnv, onChunk, onToolEvent); + usedFallback = true; } - } - const result: RunResult = { - stdout, - stderr, - exitCode, - }; - - const output = [ - `# ${name}`, - `Date: ${new Date().toISOString()}`, - `Session: ${sessionId} (${isNew ? "new" : "resumed"})`, - `Model config: ${usedFallback ? "fallback" : "primary"}`, - `Prompt: ${prompt}`, - `Exit code: ${result.exitCode}`, - "", - "## Output", - stdout, - ...(stderr ? ["## Stderr", stderr] : []), - ].join("\n"); + const { result: stdout, stderr, exitCode, sessionId: streamedSessionId } = exec; + let sessionId = streamedSessionId ?? existing?.sessionId ?? "unknown"; - await Bun.write(logFile, output); - console.log(`[${new Date().toLocaleTimeString()}] Done: ${name} → ${logFile}`); + // Persist session ID — works for both new and resumed (stream-json always emits it) + if (streamedSessionId && (isNew || streamedSessionId !== existing?.sessionId)) { + await createSession(streamedSessionId); + if (isNew) console.log(`[${new Date().toLocaleTimeString()}] Session created: ${streamedSessionId}`); + } - return result; + const result: RunResult = { stdout, stderr, exitCode }; + + const output = [ + `# ${name}`, + `Date: ${new Date().toISOString()}`, + `Session: ${sessionId} (${isNew ? "new" : "resumed"})`, + `Model config: ${usedFallback ? "fallback" : "primary"}`, + `Prompt: ${prompt}`, + `Exit code: ${exitCode}`, + "", + "## Output", + stdout, + ...(stderr ? ["## Stderr", stderr] : []), + ].join("\n"); + + await Bun.write(logFile, output); + console.log(`[${new Date().toLocaleTimeString()}] Done: ${name} → ${logFile}`); + + return result; + } finally { + mainRunning = false; + } } -export async function run(name: string, prompt: string): Promise { - return enqueue(() => execClaude(name, prompt)); +export async function run(name: string, prompt: string, onChunk?: (text: string) => void, onToolEvent?: (line: string) => void): Promise { + return enqueue(() => execClaude(name, prompt, onChunk, onToolEvent)); } function prefixUserMessageWithClock(prompt: string): string { @@ -357,8 +483,71 @@ function prefixUserMessageWithClock(prompt: string): string { } } -export async function runUserMessage(name: string, prompt: string): Promise { - return run(name, prefixUserMessageWithClock(prompt)); +export async function runUserMessage(name: string, prompt: string, onChunk?: (text: string) => void, onToolEvent?: (line: string) => void): Promise { + return run(name, prefixUserMessageWithClock(prompt), onChunk, onToolEvent); +} + +// Path where Claude Code stores session JSONL transcripts for this project +const CLAUDE_SESSIONS_DIR = join( + process.env.HOME ?? "/root", + ".claude", + "projects", + PROJECT_DIR.replace(/\//g, "-") +); + +const FORK_SYSTEM_PROMPT = [ + "You are a FORK AGENT — a fast, lightweight watcher running in parallel with the main agent.", + "", + "SPEED IS YOUR PRIORITY. Be brief. Answer in 1-3 sentences. No preamble, no padding.", + "Do NOT over-analyze. Do NOT think through edge cases. Just answer and stop.", + "", + "Your job: answer quick questions and peek at the main agent's progress via its session transcript.", + "", + "DENY immediately (one sentence explanation) any request that would take more than ~30 seconds:", + "• Compiling / building anything (kernels, projects, binaries)", + "• Downloads or network fetches", + "• Fuzzing, long analysis, heavy computations", + "• Anything that would block you and prevent monitoring/killing the main agent", + "", + "ALLOW:", + "• Reading files (especially JSONL transcripts to report main agent progress)", + "• Short factual answers", + "• Reporting on what the main agent is currently doing", + "", + `Main session info lives at: /project/.claude/claudeclaw/session.json`, + `Session JSONL transcripts dir: ${CLAUDE_SESSIONS_DIR}`, + "To peek at main agent progress: read session.json for the session ID, then read the .jsonl file in the transcripts dir.", + "Each JSONL line is a turn. The last few lines show what the main agent is currently doing.", +].join("\n"); + +const FORK_MODEL = "claude-haiku-4-5-20251001"; + +/** Run a fork agent — parallel, does NOT touch the main serial queue or main session. */ +export async function runFork(prompt: string): Promise { + const { api } = getSettings(); + + const args = [ + "claude", "-p", prompt, + "--output-format", "json", + "--dangerously-skip-permissions", + "--model", FORK_MODEL, + "--append-system-prompt", FORK_SYSTEM_PROMPT, + ]; + + const { CLAUDECODE: _, ...cleanEnv } = process.env; + const baseEnv = { ...cleanEnv } as Record; + + const exec = await runClaudeOnce(args, FORK_MODEL, api, baseEnv); + + let stdout = exec.rawStdout; + if (exec.exitCode === 0) { + try { + const json = JSON.parse(exec.rawStdout); + stdout = json.result ?? exec.rawStdout; + } catch {} + } + + return { stdout, stderr: exec.stderr, exitCode: exec.exitCode }; } /**