From 417976320111832649221f0b1502553438e78922 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 21:59:25 +0100 Subject: [PATCH 01/12] feat: add waiting_user task status to queue Add setWaiting, resume, and getWaitingForUser methods to TaskQueue. Update dequeue, listActive, cancel, and resetStale queries to handle the new waiting_user status. This is the foundation for long-lived streaming sessions where tasks can pause waiting for user input. Co-Authored-By: Claude Opus 4.6 --- src/queue.test.ts | 60 ++++++++++++++++++++++++++++++++++++++++++- src/queue.ts | 65 +++++++++++++++++++++++++++++++---------------- 2 files changed, 102 insertions(+), 23 deletions(-) diff --git a/src/queue.test.ts b/src/queue.test.ts index 0f0d99d..bcd6efa 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -203,7 +203,7 @@ describe("TaskQueue", () => { describe("metrics()", () => { it("returns zeroes on empty queue", () => { const m = queue.metrics(); - expect(m.counts).toEqual({ pending: 0, running: 0, completed: 0, failed: 0 }); + expect(m.counts).toEqual({ pending: 0, running: 0, completed: 0, failed: 0, waiting: 0 }); expect(m.avgDurationByRepo).toEqual([]); expect(m.throughput.lastHour).toBe(0); expect(m.throughput.last24h).toBe(0); @@ -285,4 +285,62 @@ describe("TaskQueue", () => { expect(m.errorRate).toBe(0); }); }); + + describe("waiting_user status", () => { + it("setWaiting transitions running task to waiting_user", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); // running + queue.setWaiting(id, "waiting_user"); + const task = queue.get(id); + expect(task?.status).toBe("waiting_user"); + }); + + it("resume transitions waiting_user back to running", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + queue.resume(id); + const task = queue.get(id); + expect(task?.status).toBe("running"); + }); + + it("dequeue skips waiting_user tasks", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + queue.enqueue({ userId: "u1", repo: "r1", prompt: "test2" }); + const next = queue.dequeue(); + expect(next).toBeNull(); + }); + + it("resetStale also resets waiting_user tasks", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + const count = queue.resetStale(); + expect(count).toBe(1); + expect(queue.get(id)?.status).toBe("failed"); + }); + + it("listActive includes waiting_user tasks", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + const active = queue.listActive(); + expect(active.some(t => t.id === id)).toBe(true); + }); + + it("getWaitingForUser returns waiting_user task for a user", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + const waiting = queue.getWaitingForUser("u1"); + expect(waiting?.id).toBe(id); + }); + + it("getWaitingForUser returns null when no waiting tasks", () => { + const waiting = queue.getWaitingForUser("u1"); + expect(waiting).toBeNull(); + }); + }); }); diff --git a/src/queue.ts b/src/queue.ts index 0817af0..43e9961 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -13,7 +13,7 @@ export interface Task { userId: string; repo: string; prompt: string; - status: "pending" | "running" | "completed" | "failed"; + status: "pending" | "running" | "completed" | "failed" | "waiting_user"; result: string | null; taskType: string | null; priority: number; @@ -75,22 +75,21 @@ export class TaskQueue { } dequeue(): Task | null { - return this.db.transaction(() => { - const row = this.db - .query( - `SELECT * FROM tasks - WHERE status = 'pending' - ORDER BY priority DESC, created_at ASC, rowid ASC - LIMIT 1` - ) - .get() as TaskRow; + const row = this.db + .query( + `SELECT * FROM tasks + WHERE status = 'pending' + AND repo NOT IN (SELECT repo FROM tasks WHERE status IN ('running', 'waiting_user')) + ORDER BY priority DESC, created_at ASC, rowid ASC + LIMIT 1` + ) + .get() as TaskRow; - if (!row) return null; + if (!row) return null; - this.db.run(`UPDATE tasks SET status = 'running' WHERE id = ?`, [row.id]); + this.db.run(`UPDATE tasks SET status = 'running' WHERE id = ?`, [row.id]); - return this.rowToTask({ ...row, status: "running" }); - })(); + return this.rowToTask({ ...row, status: "running" }); } complete(id: string, result: string) { @@ -113,6 +112,27 @@ export class TaskQueue { return row ? this.rowToTask(row) : null; } + setWaiting(id: string, status: "waiting_user") { + this.db.run( + `UPDATE tasks SET status = ? WHERE id = ? AND status = 'running'`, + [status, id] + ); + } + + resume(id: string) { + this.db.run( + `UPDATE tasks SET status = 'running' WHERE id = ? AND status = 'waiting_user'`, + [id] + ); + } + + getWaitingForUser(userId: string): Task | null { + const row = this.db + .query(`SELECT * FROM tasks WHERE user_id = ? AND status = 'waiting_user' ORDER BY created_at DESC LIMIT 1`) + .get(userId) as TaskRow; + return row ? this.rowToTask(row) : null; + } + listByUser(userId: string, limit: number = 10): Task[] { const rows = this.db .query( @@ -122,21 +142,22 @@ export class TaskQueue { return rows.map((r) => this.rowToTask(r)); } - stats(): { pending: number; running: number; completed: number; failed: number } { + stats(): { pending: number; running: number; completed: number; failed: number; waiting: number } { return this.db .query( `SELECT COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, - COUNT(*) FILTER (WHERE status = 'failed') as failed + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) FILTER (WHERE status = 'waiting_user') as waiting FROM tasks` ) - .get() as { pending: number; running: number; completed: number; failed: number }; + .get() as { pending: number; running: number; completed: number; failed: number; waiting: number }; } metrics(): { - counts: { pending: number; running: number; completed: number; failed: number }; + counts: { pending: number; running: number; completed: number; failed: number; waiting: number }; avgDurationByRepo: { repo: string; avgMs: number; count: number }[]; throughput: { lastHour: number; last24h: number }; errorRate: number; @@ -207,7 +228,7 @@ export class TaskQueue { listActive(limit: number = 20): Task[] { const rows = this.db .query( - `SELECT * FROM tasks WHERE status IN ('running', 'pending') ORDER BY priority DESC, created_at ASC, rowid ASC LIMIT ?` + `SELECT * FROM tasks WHERE status IN ('running', 'pending', 'waiting_user') ORDER BY priority DESC, created_at ASC, rowid ASC LIMIT ?` ) .all(limit) as TaskRow[]; return rows.map((r) => this.rowToTask(r)); @@ -215,7 +236,7 @@ export class TaskQueue { cancel(id: string): boolean { return this.db.run( - `UPDATE tasks SET status = 'failed', result = 'Cancelled', completed_at = ? WHERE id = ? AND status IN ('running', 'pending')`, + `UPDATE tasks SET status = 'failed', result = 'Cancelled', completed_at = ? WHERE id = ? AND status IN ('running', 'pending', 'waiting_user')`, [new Date().toISOString(), id] ).changes > 0; } @@ -235,7 +256,7 @@ export class TaskQueue { resetStale(): number { return this.db.run( - `UPDATE tasks SET status = 'failed', result = 'Interrupted — process restarted', completed_at = ? WHERE status = 'running'`, + `UPDATE tasks SET status = 'failed', result = 'Interrupted — process restarted', completed_at = ? WHERE status IN ('running', 'waiting_user')`, [new Date().toISOString()] ).changes; } @@ -246,7 +267,7 @@ export class TaskQueue { userId: row.user_id, repo: row.repo, prompt: row.prompt, - status: row.status as "pending" | "running" | "completed" | "failed", + status: row.status as "pending" | "running" | "completed" | "failed" | "waiting_user", result: row.result, taskType: row.task_type || null, priority: row.priority ?? 0, From 9e496734f88e014c4b8734d6ac942a6ecc4a95a5 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:03:15 +0100 Subject: [PATCH 02/12] feat: add sessionId column to tasks for session resume Co-Authored-By: Claude Opus 4.6 --- src/queue.test.ts | 15 +++++++++++++++ src/queue.ts | 11 +++++++++++ 2 files changed, 26 insertions(+) diff --git a/src/queue.test.ts b/src/queue.test.ts index bcd6efa..0c2f588 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -343,4 +343,19 @@ describe("TaskQueue", () => { expect(waiting).toBeNull(); }); }); + + describe("sessionId tracking", () => { + it("stores sessionId via setSessionId", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.setSessionId(id, "ses-abc-123"); + const task = queue.get(id); + expect(task?.sessionId).toBe("ses-abc-123"); + }); + + it("sessionId is null by default", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + const task = queue.get(id); + expect(task?.sessionId).toBeNull(); + }); + }); }); diff --git a/src/queue.ts b/src/queue.ts index 43e9961..d8f64f9 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -17,6 +17,7 @@ export interface Task { result: string | null; taskType: string | null; priority: number; + sessionId: string | null; createdAt: string; completedAt: string | null; } @@ -30,6 +31,7 @@ interface TaskRow { result: string | null; task_type: string | null; priority: number; + session_id: string | null; created_at: string; completed_at: string | null; } @@ -62,6 +64,10 @@ export class TaskQueue { if (!columns.some(c => c.name === "priority")) { this.db.run("ALTER TABLE tasks ADD COLUMN priority INTEGER NOT NULL DEFAULT 0"); } + // Migration: add session_id column if missing (backward compat) + if (!columns.some(c => c.name === "session_id")) { + this.db.run("ALTER TABLE tasks ADD COLUMN session_id TEXT"); + } } enqueue(input: TaskInput): string { @@ -126,6 +132,10 @@ export class TaskQueue { ); } + setSessionId(id: string, sessionId: string) { + this.db.run(`UPDATE tasks SET session_id = ? WHERE id = ?`, [sessionId, id]); + } + getWaitingForUser(userId: string): Task | null { const row = this.db .query(`SELECT * FROM tasks WHERE user_id = ? AND status = 'waiting_user' ORDER BY created_at DESC LIMIT 1`) @@ -271,6 +281,7 @@ export class TaskQueue { result: row.result, taskType: row.task_type || null, priority: row.priority ?? 0, + sessionId: row.session_id || null, createdAt: row.created_at, completedAt: row.completed_at, }; From 060b9aa7a8815d9deb666b22eed9c9ffcbaf13e6 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:10:22 +0100 Subject: [PATCH 03/12] feat: add streaming types and buildStreamingArgs to runner Add StreamEvent, StreamingSession types to runner.ts. Add resumeSessionId to RunOptions and sessionId to RunResult. Add buildStreamingArgs method to ClaudeRunner (with --input-format stream-json, without --disallowed-tools AskUserQuestion). Update buildArgs and run() to support session resume and capture session_id from result messages. Co-Authored-By: Claude Opus 4.6 --- src/runner.ts | 16 +++++++++++++++ src/runners/claude.test.ts | 40 ++++++++++++++++++++++++++++++++++++++ src/runners/claude.ts | 19 +++++++++++++++++- 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/src/runner.ts b/src/runner.ts index 6db8e9e..4dd7280 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -3,12 +3,14 @@ export interface RunOptions { mcpConfigPath?: string; model?: string; signal?: AbortSignal; + resumeSessionId?: string; } export interface RunResult { success: boolean; output: string; durationMs: number; + sessionId?: string; } export type StatusEvent = @@ -17,6 +19,20 @@ export type StatusEvent = export type StatusCallback = (event: StatusEvent) => void; +export type StreamEvent = + | { kind: "text"; text: string } + | { kind: "tool"; tool: string; input: string } + | { kind: "ask_user"; question: string; options: { label: string; description?: string }[] } + | { kind: "result"; text: string; sessionId?: string } + | { kind: "error"; text: string }; + +export interface StreamingSession { + sendMessage(text: string): void; + kill(): void; + readonly sessionId: string | null; + readonly done: Promise; +} + export interface AgentRunner { name: string; run(prompt: string, workDir: string, opts: RunOptions, onStatus?: StatusCallback): Promise; diff --git a/src/runners/claude.test.ts b/src/runners/claude.test.ts index 45ab19d..e1216bf 100644 --- a/src/runners/claude.test.ts +++ b/src/runners/claude.test.ts @@ -75,3 +75,43 @@ describe("summarizeToolInput", () => { expect(summarizeToolInput("Unknown", { foo: "bar" })).toBe('{"foo":"bar"}'); }); }); + +describe("streaming args", () => { + const runner = new ClaudeRunner(); + + it("builds streaming args with input-format and without disallowed AskUserQuestion", () => { + const args = runner.buildStreamingArgs("fix the bug", { maxTurns: 25 }); + expect(args).toContain("--input-format"); + expect(args).toContain("stream-json"); + expect(args).not.toContain("AskUserQuestion"); + expect(args).not.toContain("--disallowed-tools"); + }); + + it("streaming args still include output-format stream-json", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10 }); + expect(args).toContain("--output-format"); + expect(args).toContain("stream-json"); + }); + + it("includes resume flag when sessionId provided", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10, resumeSessionId: "ses-123" }); + expect(args).toContain("--resume"); + expect(args).toContain("ses-123"); + }); + + it("omits resume when no sessionId", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10 }); + expect(args).not.toContain("--resume"); + }); +}); + +describe("buildArgs resume support", () => { + const runner = new ClaudeRunner(); + + it("includes resume in regular buildArgs when provided", () => { + const args = runner.buildArgs("test", { maxTurns: 10, resumeSessionId: "ses-456" }); + expect(args).toContain("--resume"); + expect(args).toContain("ses-456"); + }); +}); + diff --git a/src/runners/claude.ts b/src/runners/claude.ts index deaa8de..3ee6e09 100644 --- a/src/runners/claude.ts +++ b/src/runners/claude.ts @@ -35,6 +35,21 @@ export class ClaudeRunner implements AgentRunner { buildArgs(prompt: string, opts: RunOptions): string[] { const args = ["-p", prompt, "--output-format", "stream-json", "--verbose", "--max-turns", String(opts.maxTurns), "--dangerously-skip-permissions", "--disallowed-tools", "AskUserQuestion"]; if (opts.mcpConfigPath) args.push("--mcp-config", opts.mcpConfigPath); + if (opts.resumeSessionId) args.push("--resume", opts.resumeSessionId); + return args; + } + + buildStreamingArgs(prompt: string, opts: RunOptions): string[] { + const args = [ + "-p", prompt, + "--input-format", "stream-json", + "--output-format", "stream-json", + "--verbose", + "--max-turns", String(opts.maxTurns), + "--dangerously-skip-permissions", + ]; + if (opts.mcpConfigPath) args.push("--mcp-config", opts.mcpConfigPath); + if (opts.resumeSessionId) args.push("--resume", opts.resumeSessionId); return args; } @@ -55,6 +70,7 @@ export class ClaudeRunner implements AgentRunner { } let resultText: string | null = null; + let resultSessionId: string | null = null; const textBlocks: string[] = []; const decoder = new TextDecoder(); const reader = proc.stdout.getReader(); @@ -69,6 +85,7 @@ export class ClaudeRunner implements AgentRunner { const msg = JSON.parse(line); if (msg.type === "result" && msg.result) { resultText = msg.result; + if (msg.session_id) resultSessionId = msg.session_id; } if (msg.type === "assistant" && msg.message?.content) { for (const block of msg.message.content) { @@ -108,6 +125,6 @@ export class ClaudeRunner implements AgentRunner { const finalOutput = resultText || textBlocks.join("\n\n") || "Task completed (no output)"; logger.info("claude task completed", { durationMs }); - return { success: true, output: finalOutput, durationMs }; + return { success: true, output: finalOutput, durationMs, sessionId: resultSessionId ?? undefined }; } } From f1c255c7ecedc88b31de064f8c75a23cc55cc988 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:11:46 +0100 Subject: [PATCH 04/12] feat: implement runStreaming with bidirectional stdin/stdout Add runStreaming() method to ClaudeRunner that spawns Claude with stdin pipe for bidirectional communication. Parses stream-json events, detects AskUserQuestion tool_use blocks to emit ask_user StreamEvents, captures session_id from system/result messages, and returns a StreamingSession handle with sendMessage(), kill(), sessionId, and done promise. Co-Authored-By: Claude Opus 4.6 --- src/runners/claude.test.ts | 8 +++ src/runners/claude.ts | 120 ++++++++++++++++++++++++++++++++++++- 2 files changed, 127 insertions(+), 1 deletion(-) diff --git a/src/runners/claude.test.ts b/src/runners/claude.test.ts index e1216bf..4c2dd51 100644 --- a/src/runners/claude.test.ts +++ b/src/runners/claude.test.ts @@ -115,3 +115,11 @@ describe("buildArgs resume support", () => { }); }); +describe("runStreaming", () => { + const runner = new ClaudeRunner(); + + it("method exists and is a function", () => { + expect(typeof runner.runStreaming).toBe("function"); + }); +}); + diff --git a/src/runners/claude.ts b/src/runners/claude.ts index 3ee6e09..be60444 100644 --- a/src/runners/claude.ts +++ b/src/runners/claude.ts @@ -1,4 +1,4 @@ -import type { AgentRunner, RunOptions, RunResult, StatusCallback } from "../runner"; +import type { AgentRunner, RunOptions, RunResult, StatusCallback, StreamEvent, StreamingSession } from "../runner"; import { logger } from "../logger"; import { which } from "bun"; import { realpathSync } from "node:fs"; @@ -127,4 +127,122 @@ export class ClaudeRunner implements AgentRunner { logger.info("claude task completed", { durationMs }); return { success: true, output: finalOutput, durationMs, sessionId: resultSessionId ?? undefined }; } + + runStreaming( + prompt: string, + workDir: string, + opts: RunOptions, + onEvent?: (event: StreamEvent) => void, + ): StreamingSession { + const args = this.buildStreamingArgs(prompt, opts); + const startTime = Date.now(); + logger.info("starting streaming claude task", { workDir, maxTurns: opts.maxTurns }); + + const proc = Bun.spawn([this.claudePath, ...args], { + cwd: workDir, + stdin: "pipe", + stdout: "pipe", + stderr: "pipe", + env: { ...process.env, CI: "1" }, + }); + + if (opts.signal) { + opts.signal.addEventListener("abort", () => proc.kill(), { once: true }); + } + + let sessionId: string | null = null; + let resultText: string | null = null; + const textBlocks: string[] = []; + + const done = (async (): Promise => { + const decoder = new TextDecoder(); + const reader = proc.stdout.getReader(); + try { + while (true) { + const { done: isDone, value } = await reader.read(); + if (isDone) break; + const chunk = decoder.decode(value, { stream: true }); + const lines = chunk.split("\n").filter(Boolean); + for (const line of lines) { + try { + const msg = JSON.parse(line); + + if (msg.type === "system" && msg.session_id) { + sessionId = msg.session_id; + } + + if (msg.type === "result" && msg.result) { + resultText = msg.result; + if (msg.session_id) sessionId = msg.session_id; + onEvent?.({ kind: "result", text: msg.result, sessionId: sessionId ?? undefined }); + } + + if (msg.type === "assistant" && msg.message?.content) { + for (const block of msg.message.content) { + if (block.type === "text") { + textBlocks.push(block.text); + onEvent?.({ kind: "text", text: block.text }); + } + if (block.type === "tool_use") { + if (block.name === "AskUserQuestion") { + const questions = block.input?.questions; + if (questions?.[0]) { + onEvent?.({ + kind: "ask_user", + question: questions[0].question, + options: questions[0].options || [], + }); + } + } else { + onEvent?.({ + kind: "tool", + tool: block.name, + input: summarizeToolInput(block.name, block.input), + }); + } + } + } + } + } catch {} + } + } + } finally { + reader.releaseLock(); + } + + const exitCode = await withTimeout(proc); + const durationMs = Date.now() - startTime; + + if (exitCode === "timeout") { + return { success: false, output: `Claude task timed out after ${TIMEOUT_MS / 60000} minutes`, durationMs }; + } + if (exitCode !== 0) { + const stderr = await new Response(proc.stderr).text(); + return { success: false, output: stderr || "Claude task failed", durationMs }; + } + + return { + success: true, + output: resultText || textBlocks.join("\n\n") || "Task completed (no output)", + durationMs, + sessionId: sessionId ?? undefined, + }; + })(); + + const encoder = new TextEncoder(); + + return { + sendMessage(text: string) { + const msg = JSON.stringify({ type: "user_message", content: text }) + "\n"; + proc.stdin.write(encoder.encode(msg)); + }, + kill() { + proc.kill(); + }, + get sessionId() { + return sessionId; + }, + done, + }; + } } From 89c83df8a81bc52525e0968aed9398153af92c94 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:14:36 +0100 Subject: [PATCH 05/12] feat: add SessionManager for tracking live streaming sessions Co-Authored-By: Claude Opus 4.6 --- src/session-manager.test.ts | 93 +++++++++++++++++++++++++++++++++++++ src/session-manager.ts | 59 +++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 src/session-manager.test.ts create mode 100644 src/session-manager.ts diff --git a/src/session-manager.test.ts b/src/session-manager.test.ts new file mode 100644 index 0000000..5990e3c --- /dev/null +++ b/src/session-manager.test.ts @@ -0,0 +1,93 @@ +import { describe, it, expect } from "bun:test"; +import { SessionManager } from "./session-manager"; +import type { StreamingSession, RunResult } from "./runner"; + +function mockSession( + overrides?: Partial, +): StreamingSession & { _messages: string[] } { + const messages: string[] = []; + return { + sendMessage: (text: string) => { + messages.push(text); + }, + kill: () => {}, + sessionId: "ses-test-123", + done: Promise.resolve({ success: true, output: "done", durationMs: 100 }), + _messages: messages, + ...overrides, + }; +} + +describe("SessionManager", () => { + it("registers and retrieves a session by taskId", () => { + const mgr = new SessionManager(); + const session = mockSession(); + mgr.register("task-1", "user-1", session); + expect(mgr.getByTask("task-1")).toBe(session); + }); + + it("retrieves waiting session by userId", () => { + const mgr = new SessionManager(); + const session = mockSession(); + mgr.register("task-1", "user-1", session); + mgr.setWaiting("task-1"); + const waiting = mgr.getWaitingForUser("user-1"); + expect(waiting?.taskId).toBe("task-1"); + expect(waiting?.session).toBe(session); + }); + + it("returns null when no waiting session for user", () => { + const mgr = new SessionManager(); + expect(mgr.getWaitingForUser("user-1")).toBeNull(); + }); + + it("returns null for registered but non-waiting session", () => { + const mgr = new SessionManager(); + mgr.register("task-1", "user-1", mockSession()); + expect(mgr.getWaitingForUser("user-1")).toBeNull(); + }); + + it("sendToTask sends message to the session", () => { + const mgr = new SessionManager(); + const session = mockSession(); + mgr.register("task-1", "user-1", session); + mgr.sendToTask("task-1", "hello"); + expect(session._messages).toEqual(["hello"]); + }); + + it("sendToTask returns false for unknown task", () => { + const mgr = new SessionManager(); + expect(mgr.sendToTask("nope", "hello")).toBe(false); + }); + + it("unregister removes the session", () => { + const mgr = new SessionManager(); + mgr.register("task-1", "user-1", mockSession()); + mgr.unregister("task-1"); + expect(mgr.getByTask("task-1")).toBeNull(); + }); + + it("clearWaiting removes waiting state but keeps session", () => { + const mgr = new SessionManager(); + const session = mockSession(); + mgr.register("task-1", "user-1", session); + mgr.setWaiting("task-1"); + mgr.clearWaiting("task-1"); + expect(mgr.getWaitingForUser("user-1")).toBeNull(); + expect(mgr.getByTask("task-1")).toBe(session); + }); + + it("killAll kills all sessions and clears map", () => { + const mgr = new SessionManager(); + let killed = false; + const session = mockSession({ + kill: () => { + killed = true; + }, + }); + mgr.register("task-1", "user-1", session); + mgr.killAll(); + expect(killed).toBe(true); + expect(mgr.getByTask("task-1")).toBeNull(); + }); +}); diff --git a/src/session-manager.ts b/src/session-manager.ts new file mode 100644 index 0000000..ad527b1 --- /dev/null +++ b/src/session-manager.ts @@ -0,0 +1,59 @@ +import type { StreamingSession } from "./runner"; + +interface SessionEntry { + taskId: string; + userId: string; + session: StreamingSession; + waiting: boolean; +} + +export class SessionManager { + private sessions = new Map(); + + register(taskId: string, userId: string, session: StreamingSession) { + this.sessions.set(taskId, { taskId, userId, session, waiting: false }); + } + + unregister(taskId: string) { + this.sessions.delete(taskId); + } + + getByTask(taskId: string): StreamingSession | null { + return this.sessions.get(taskId)?.session ?? null; + } + + setWaiting(taskId: string) { + const entry = this.sessions.get(taskId); + if (entry) entry.waiting = true; + } + + clearWaiting(taskId: string) { + const entry = this.sessions.get(taskId); + if (entry) entry.waiting = false; + } + + getWaitingForUser( + userId: string, + ): { taskId: string; session: StreamingSession } | null { + for (const entry of this.sessions.values()) { + if (entry.userId === userId && entry.waiting) { + return { taskId: entry.taskId, session: entry.session }; + } + } + return null; + } + + sendToTask(taskId: string, text: string): boolean { + const entry = this.sessions.get(taskId); + if (!entry) return false; + entry.session.sendMessage(text); + return true; + } + + killAll() { + for (const entry of this.sessions.values()) { + entry.session.kill(); + } + this.sessions.clear(); + } +} From 59e3d82d44990e7b24a8163ba3fc01fc24448def Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:19:13 +0100 Subject: [PATCH 06/12] feat: wire streaming sessions into worker for interactive tasks Co-Authored-By: Claude Opus 4.6 --- src/worker.ts | 74 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 17 deletions(-) diff --git a/src/worker.ts b/src/worker.ts index 718bad8..26e42bf 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -8,8 +8,9 @@ import type { TaskQueue, Task } from "./queue"; import type { RepoManager } from "./repos"; import type { SessionStore } from "./sessions"; import type { IncomingMessage, ChatAdapter, EventAdapter, IncomingEvent } from "./adapters/types"; -import type { AgentRunner, RunOptions, StatusEvent } from "./runner"; +import type { AgentRunner, RunOptions, RunResult, StatusEvent, StreamEvent } from "./runner"; import type { TraceStore } from "./trace"; +import type { SessionManager } from "./session-manager"; import type { DebouncedFunction } from "./adapters/debounce"; export interface WorkerDeps { @@ -25,6 +26,7 @@ export interface WorkerDeps { getRunnerOptsForRepo: (repo: string, baseOpts: RunOptions) => RunOptions; getRepoInfo: (repoName: string) => { url: string; defaultBranch: string } | null; trace: TraceStore; + sessionManager: SessionManager; } function findAdapterForUser(userId: string, adapters: ChatAdapter[]): ChatAdapter | undefined { @@ -121,23 +123,61 @@ async function processTask(task: Task, deps: WorkerDeps) { signal: abortController.signal, }); - const result = await taskRunner.run( - task.prompt, - workDir, - runOpts, - (event: StatusEvent) => { - if (event.kind === "tool") { - const last = statusLog.at(-1); - const summary = `Using ${event.tool}...`; - if (last !== summary) statusLog.push(summary); - deps.trace.append(task.id, "tool", summary, event.input.slice(0, 2000)); - } else { - statusLog.push(event.text.slice(0, 200)); - deps.trace.append(task.id, "status", event.text.slice(0, 200)); + const useStreaming = !isDiscuss && task.taskType !== "cron" && typeof (taskRunner as any).runStreaming === "function"; + + let result: RunResult; + + if (useStreaming) { + const session = (taskRunner as any).runStreaming( + task.prompt, + workDir, + runOpts, + (event: StreamEvent) => { + if (event.kind === "tool") { + const last = statusLog.at(-1); + const summary = `Using ${event.tool}...`; + if (last !== summary) statusLog.push(summary); + deps.trace.append(task.id, "tool", summary, event.input.slice(0, 2000)); + } else if (event.kind === "text") { + statusLog.push(event.text.slice(0, 200)); + deps.trace.append(task.id, "status", event.text.slice(0, 200)); + } else if (event.kind === "ask_user") { + deps.queue.setWaiting(task.id, "waiting_user"); + deps.sessionManager.setWaiting(task.id); + deps.trace.append(task.id, "lifecycle", "Waiting for user input", event.question); + const questionText = event.options.length > 0 + ? `${event.question}\n${event.options.map((o, i) => `${i + 1}. ${o.label}${o.description ? ` — ${o.description}` : ""}`).join("\n")}` + : event.question; + replyWithFallback(questionText, originalMsg, task.userId, deps.adapters); + } else if (event.kind === "result" && event.sessionId) { + deps.queue.setSessionId(task.id, event.sessionId); + } + originalMsg?.updateStatus(statusLog.slice(-5).join("\n")); } - originalMsg?.updateStatus(statusLog.slice(-5).join("\n")); - } - ); + ); + + deps.sessionManager.register(task.id, task.userId, session); + result = await session.done; + deps.sessionManager.unregister(task.id); + } else { + result = await taskRunner.run( + task.prompt, + workDir, + runOpts, + (event: StatusEvent) => { + if (event.kind === "tool") { + const last = statusLog.at(-1); + const summary = `Using ${event.tool}...`; + if (last !== summary) statusLog.push(summary); + deps.trace.append(task.id, "tool", summary, event.input.slice(0, 2000)); + } else { + statusLog.push(event.text.slice(0, 200)); + deps.trace.append(task.id, "status", event.text.slice(0, 200)); + } + originalMsg?.updateStatus(statusLog.slice(-5).join("\n")); + } + ); + } if (mcpConfigPath) { try { From 3feb9758fbcccf94fd2b6010c539ba0bde9cce8b Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:25:44 +0100 Subject: [PATCH 07/12] feat: route user replies to waiting streaming sessions When a user has a streaming session waiting for input (waiting_user status), intercept their next message before any parsing and route it directly to the session's stdin. This enables the conversational flow where Claude asks a follow-up question and the user replies naturally. Co-Authored-By: Claude Opus 4.6 --- src/handlers.test.ts | 49 ++++++++++++++++++++++++++++++++++++++++++++ src/handlers.ts | 14 +++++++++++++ 2 files changed, 63 insertions(+) diff --git a/src/handlers.test.ts b/src/handlers.test.ts index 134a07a..5c56a16 100644 --- a/src/handlers.test.ts +++ b/src/handlers.test.ts @@ -492,3 +492,52 @@ describe("multi-user mode isolation", () => { expect(tasksB[0].prompt).toContain("grumpy"); }); }); + +describe("reply routing to waiting sessions", () => { + it("routes reply to waiting session instead of creating new task", async () => { + const sentMessages: string[] = []; + const clearedTaskIds: string[] = []; + const mockSessionManager = { + getWaitingForUser: (userId: string) => userId === "slack:U123" + ? { taskId: "task-waiting", session: { sendMessage: (t: string) => sentMessages.push(t) } } + : null, + clearWaiting: (taskId: string) => { clearedTaskIds.push(taskId); }, + }; + + const deps = makeDeps({ + sessionManager: mockSessionManager as any, + }); + + const handler = createMessageHandler(deps); + const msg = makeMessage("yes, fix them all"); + await handler(msg); + + // Should have sent to session, not created a new task + expect(sentMessages).toEqual(["yes, fix them all"]); + expect(clearedTaskIds).toEqual(["task-waiting"]); + }); + + it("falls through to normal handling when no waiting session", async () => { + const mockSessionManager = { + getWaitingForUser: () => null, + clearWaiting: () => {}, + }; + + const deps = makeDeps({ + config: makeConfig({ + repos: {}, + users: { "slack:U123": { name: "testuser", repos: [] } }, + }), + sessionManager: mockSessionManager as any, + }); + + const handler = createMessageHandler(deps); + const msg = makeMessage("discuss something"); + await handler(msg); + + // Should have created a discuss task (normal flow) + const tasks = deps.queue.listByUser("slack:U123", 1); + expect(tasks.length).toBe(1); + expect(tasks[0].taskType).toBe("discuss"); + }); +}); diff --git a/src/handlers.ts b/src/handlers.ts index a9d8b00..f2690f6 100644 --- a/src/handlers.ts +++ b/src/handlers.ts @@ -11,6 +11,7 @@ import type { RepoRegistry } from "./repo-registry"; import type { IncomingMessage, EventAdapter, IncomingEvent } from "./adapters/types"; import type { AgentRunner } from "./runner"; import type { TraceStore } from "./trace"; +import type { SessionManager } from "./session-manager"; export interface HandlerDeps { config: Config; @@ -19,6 +20,7 @@ export interface HandlerDeps { schedules: ScheduleStore; repoRegistry: RepoRegistry; trace: TraceStore; + sessionManager?: SessionManager; pendingReplies: Map; pendingEventReplies: Map; runningProcesses: Map; @@ -482,6 +484,18 @@ export function createMessageHandler(deps: HandlerDeps): (msg: IncomingMessage) return async (msg: IncomingMessage) => { deps.sessions.addMessage(msg.userId, "user", msg.text); + // Route reply to waiting streaming session if one exists + if (deps.sessionManager) { + const waiting = deps.sessionManager.getWaitingForUser(msg.userId); + if (waiting) { + waiting.session.sendMessage(msg.text); + deps.sessionManager.clearWaiting(waiting.taskId); + deps.queue.resume(waiting.taskId); + deps.trace.append(waiting.taskId, "lifecycle", "User reply received", msg.text.slice(0, 200)); + return; + } + } + const parsed = parseMessage(msg.text); const handlers: Record Promise> = { From 6fff0ec04696260e755f5449c4644eac90a6f70b Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:27:51 +0100 Subject: [PATCH 08/12] feat: wire SessionManager into main application Co-Authored-By: Claude Opus 4.6 --- src/index.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/index.ts b/src/index.ts index aad9525..1509560 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ import { startCronLoop } from "./cron"; import { ScheduleStore } from "./schedules"; import { createMessageHandler, createEventHandler } from "./handlers"; import { createWorker } from "./worker"; +import { SessionManager } from "./session-manager"; const config = loadConfig(); const db = new Database(process.env.DB_PATH || "./ove.db"); @@ -32,6 +33,7 @@ const sessions = new SessionStore(db); const trace = new TraceStore(db); const schedules = new ScheduleStore(db); const repoRegistry = new RepoRegistry(db); +const sessionManager = new SessionManager(); repoRegistry.migrateFromConfig( Object.fromEntries( @@ -166,6 +168,7 @@ async function main() { getRunner, getRunnerForRepo, getRepoInfo, + sessionManager, }; const handleMessage = createMessageHandler(handlerDeps); @@ -219,6 +222,7 @@ async function main() { getRunnerOptsForRepo, getRepoInfo, trace, + sessionManager, }); worker.start(); @@ -244,6 +248,7 @@ async function main() { for (const ea of eventAdapters) { await ea.stop(); } + sessionManager.killAll(); process.exit(0); } From 30556db03c4ed82ff8373ae1c021efc341927ea7 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:34:06 +0100 Subject: [PATCH 09/12] feat: add autonomous CI retry loop for cron tasks After a cron task completes successfully, check if it created a PR and whether CI passed. If CI failed, resume the session and retry up to 3 times, waiting for CI between attempts. Co-Authored-By: Claude Opus 4.6 --- src/worker.ts | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/src/worker.ts b/src/worker.ts index 26e42bf..e6b1884 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -63,6 +63,44 @@ async function replyWithFallback( } } +async function checkRecentPR( + repoSlug: string, + workDir: string +): Promise<{ prNumber: number; ciStatus: "passed" | "failed" | "pending"; ciDetails: string } | null> { + try { + const proc = Bun.spawn( + ["gh", "pr", "list", "--state", "open", "--author", "@me", "--limit", "1", "--json", "number,statusCheckRollup,headRefName"], + { cwd: workDir, stdout: "pipe", stderr: "pipe" } + ); + const text = await new Response(proc.stdout).text(); + const exitCode = await proc.exited; + if (exitCode !== 0) return null; + + const prs = JSON.parse(text); + if (prs.length === 0) return null; + + const pr = prs[0]; + const checks = pr.statusCheckRollup || []; + const failed = checks.filter((c: any) => c.conclusion === "FAILURE" || c.conclusion === "ERROR"); + const pending = checks.filter((c: any) => !c.conclusion || c.conclusion === "PENDING"); + + let ciStatus: "passed" | "failed" | "pending"; + let ciDetails = ""; + if (failed.length > 0) { + ciStatus = "failed"; + ciDetails = failed.map((c: any) => `${c.name}: ${c.conclusion}`).join(", "); + } else if (pending.length > 0) { + ciStatus = "pending"; + } else { + ciStatus = "passed"; + } + + return { prNumber: pr.number, ciStatus, ciDetails }; + } catch { + return null; + } +} + async function processTask(task: Task, deps: WorkerDeps) { const isCreateProject = task.taskType === "create-project"; const isDiscuss = task.taskType === "discuss"; @@ -224,6 +262,72 @@ async function processTask(task: Task, deps: WorkerDeps) { task.id, "lifecycle", `Task ${outcome} in ${elapsed}ms`, result.success ? undefined : result.output.slice(0, 2000), ); + + // Autonomous CI retry for cron tasks + if (task.taskType === "cron" && result.success && !skipRepoSetup) { + const MAX_CI_RETRIES = 3; + + for (let retry = 0; retry < MAX_CI_RETRIES; retry++) { + // Wait for CI to start + await Bun.sleep(30_000); + + const prStatus = await checkRecentPR(task.repo, workDir); + if (!prStatus) break; // No PR found, nothing to check + if (prStatus.ciStatus === "passed") break; // CI passed, done + + if (prStatus.ciStatus === "pending") { + // Wait longer for pending CI + await Bun.sleep(60_000); + const recheck = await checkRecentPR(task.repo, workDir); + if (!recheck || recheck.ciStatus !== "failed") break; + // Update prStatus for the retry prompt + Object.assign(prStatus, recheck); + } + + // CI failed — retry with resume + logger.info("cron task CI failed, retrying", { + taskId: task.id, + retry: retry + 1, + pr: prStatus.prNumber, + details: prStatus.ciDetails, + }); + deps.trace.append(task.id, "lifecycle", `CI retry ${retry + 1}/${MAX_CI_RETRIES}`, prStatus.ciDetails); + + const retryPrompt = `CI failed on PR #${prStatus.prNumber}. Failures: ${prStatus.ciDetails}. Fix the issues and push again.`; + const currentTask = deps.queue.get(task.id); + const retryOpts: RunOptions = { + ...runOpts, + resumeSessionId: currentTask?.sessionId ?? undefined, + }; + + const retryResult = await taskRunner.run(retryPrompt, workDir, retryOpts); + + if (!retryResult.success) { + logger.error("cron CI retry failed", { taskId: task.id, retry: retry + 1 }); + deps.trace.append(task.id, "lifecycle", `CI retry ${retry + 1} failed`, retryResult.output.slice(0, 500)); + break; + } + + // Store session ID from retry if available + if (retryResult.sessionId) { + deps.queue.setSessionId(task.id, retryResult.sessionId); + } + + deps.queue.complete(task.id, retryResult.output); + await replyWithFallback( + `[Scheduled: ${task.repo}] CI retry ${retry + 1}: ${retryResult.output.slice(0, 500)}`, + originalMsg, + task.userId, + deps.adapters, + ); + deps.trace.append(task.id, "lifecycle", `CI retry ${retry + 1} completed`); + + // Check if this retry fixed CI + await Bun.sleep(30_000); + const postRetry = await checkRecentPR(task.repo, workDir); + if (!postRetry || postRetry.ciStatus !== "failed") break; + } + } } finally { if (!skipRepoSetup) { await deps.repos.removeWorktree(task.repo, task.id).catch((err) => { From 73dd5e8b87cdda2528c7dad874270e1f57137e02 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:38:35 +0100 Subject: [PATCH 10/12] test: add coverage for streaming sessions, reply routing, and queue states Co-Authored-By: Claude Opus 4.6 --- src/handlers.test.ts | 50 ++++++++++++++++++++++++++++++++++++++ src/queue.test.ts | 17 +++++++++++++ src/runners/claude.test.ts | 42 ++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+) diff --git a/src/handlers.test.ts b/src/handlers.test.ts index 5c56a16..84d9eaf 100644 --- a/src/handlers.test.ts +++ b/src/handlers.test.ts @@ -517,6 +517,56 @@ describe("reply routing to waiting sessions", () => { expect(clearedTaskIds).toEqual(["task-waiting"]); }); + it("resumes the queue task when routing reply to waiting session", async () => { + const mockSessionManager = { + getWaitingForUser: (userId: string) => userId === "slack:U123" + ? { taskId: "task-waiting", session: { sendMessage: () => {} } } + : null, + clearWaiting: () => {}, + }; + + const deps = makeDeps({ + sessionManager: mockSessionManager as any, + }); + + // Create a task in waiting_user state + const id = deps.queue.enqueue({ userId: "slack:U123", repo: "my-app", prompt: "original" }); + deps.queue.dequeue(); // running + + // Override the mock to return the real task ID + (mockSessionManager as any).getWaitingForUser = (userId: string) => userId === "slack:U123" + ? { taskId: id, session: { sendMessage: () => {} } } + : null; + + deps.queue.setWaiting(id, "waiting_user"); + expect(deps.queue.get(id)?.status).toBe("waiting_user"); + + const handler = createMessageHandler(deps); + const msg = makeMessage("yes do it"); + await handler(msg); + + // Queue task should be resumed (back to running) + expect(deps.queue.get(id)?.status).toBe("running"); + }); + + it("adds user message to session history even when routing to waiting session", async () => { + const mockSessionManager = { + getWaitingForUser: () => ({ taskId: "t1", session: { sendMessage: () => {} } }), + clearWaiting: () => {}, + }; + + const deps = makeDeps({ + sessionManager: mockSessionManager as any, + }); + + const handler = createMessageHandler(deps); + const msg = makeMessage("my reply"); + await handler(msg); + + const history = deps.sessions.getHistory("slack:U123"); + expect(history.some(h => h.content === "my reply")).toBe(true); + }); + it("falls through to normal handling when no waiting session", async () => { const mockSessionManager = { getWaitingForUser: () => null, diff --git a/src/queue.test.ts b/src/queue.test.ts index 0c2f588..cd07b35 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -344,6 +344,23 @@ describe("TaskQueue", () => { }); }); + it("cancel works on waiting_user tasks", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + const cancelled = queue.cancel(id); + expect(cancelled).toBe(true); + expect(queue.get(id)?.status).toBe("failed"); + }); + + it("stats counts waiting_user tasks", () => { + const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); + queue.dequeue(); + queue.setWaiting(id, "waiting_user"); + const stats = queue.stats(); + expect(stats.waiting).toBe(1); + }); + describe("sessionId tracking", () => { it("stores sessionId via setSessionId", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); diff --git a/src/runners/claude.test.ts b/src/runners/claude.test.ts index 4c2dd51..3f9b263 100644 --- a/src/runners/claude.test.ts +++ b/src/runners/claude.test.ts @@ -123,3 +123,45 @@ describe("runStreaming", () => { }); }); +describe("buildStreamingArgs details", () => { + const runner = new ClaudeRunner(); + + it("does not include --disallowed-tools at all", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10 }); + expect(args).not.toContain("--disallowed-tools"); + }); + + it("includes --input-format stream-json", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10 }); + const idx = args.indexOf("--input-format"); + expect(idx).toBeGreaterThan(-1); + expect(args[idx + 1]).toBe("stream-json"); + }); + + it("includes --dangerously-skip-permissions", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10 }); + expect(args).toContain("--dangerously-skip-permissions"); + }); + + it("includes MCP config when provided", () => { + const args = runner.buildStreamingArgs("test", { maxTurns: 10, mcpConfigPath: "/tmp/mcp.json" }); + expect(args).toContain("--mcp-config"); + expect(args).toContain("/tmp/mcp.json"); + }); +}); + +describe("buildArgs with resume", () => { + const runner = new ClaudeRunner(); + + it("regular buildArgs still disallows AskUserQuestion", () => { + const args = runner.buildArgs("test", { maxTurns: 10 }); + expect(args).toContain("--disallowed-tools"); + expect(args).toContain("AskUserQuestion"); + }); + + it("regular buildArgs does NOT include --input-format", () => { + const args = runner.buildArgs("test", { maxTurns: 10 }); + expect(args).not.toContain("--input-format"); + }); +}); + From d08f8dad840ff7c9842380d48c6186fa5cb5f1ff Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Mon, 2 Mar 2026 22:54:07 +0100 Subject: [PATCH 11/12] fix: only block dequeue for waiting_user repos, allow parallel running Reconcile PR #20 (parallel tasks) with waiting_user blocking. Only block dequeuing from repos that have a waiting_user task (live session waiting for input), not from repos with running tasks. Co-Authored-By: Claude Opus 4.6 --- src/queue.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/queue.ts b/src/queue.ts index d8f64f9..375fe32 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -85,7 +85,7 @@ export class TaskQueue { .query( `SELECT * FROM tasks WHERE status = 'pending' - AND repo NOT IN (SELECT repo FROM tasks WHERE status IN ('running', 'waiting_user')) + AND repo NOT IN (SELECT repo FROM tasks WHERE status = 'waiting_user') ORDER BY priority DESC, created_at ASC, rowid ASC LIMIT 1` ) From 47fd06ba8ecfcf4b6df72888659f1b78b10506c9 Mon Sep 17 00:00:00 2001 From: Love Billingskog Nyberg Date: Tue, 3 Mar 2026 08:44:43 +0100 Subject: [PATCH 12/12] fix: address code review findings for long-lived sessions - Show waiting_user tasks in handleListTasks display - Add updateResult() to queue for CI retry updates (avoids overwriting completed_at on already-completed tasks) - Wrap sendMessage stdin write in try/catch for closed pipe safety - Simplify setWaiting() signature (remove redundant status param) - Add waiting_user to repoBreakdown in metrics() Co-Authored-By: Claude Opus 4.6 --- src/handlers.test.ts | 2 +- src/handlers.ts | 7 +++++++ src/queue.test.ts | 16 ++++++++-------- src/queue.ts | 20 ++++++++++++++------ src/runners/claude.ts | 8 ++++++-- src/worker.ts | 4 ++-- 6 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/handlers.test.ts b/src/handlers.test.ts index 84d9eaf..8d0471e 100644 --- a/src/handlers.test.ts +++ b/src/handlers.test.ts @@ -538,7 +538,7 @@ describe("reply routing to waiting sessions", () => { ? { taskId: id, session: { sendMessage: () => {} } } : null; - deps.queue.setWaiting(id, "waiting_user"); + deps.queue.setWaiting(id); expect(deps.queue.get(id)?.status).toBe("waiting_user"); const handler = createMessageHandler(deps); diff --git a/src/handlers.ts b/src/handlers.ts index f2690f6..c82d7db 100644 --- a/src/handlers.ts +++ b/src/handlers.ts @@ -237,6 +237,7 @@ async function handleListTasks(msg: IncomingMessage, deps: HandlerDeps) { } const running = tasks.filter((t) => t.status === "running"); const pending = tasks.filter((t) => t.status === "pending"); + const waiting = tasks.filter((t) => t.status === "waiting_user"); const lines: string[] = []; if (running.length > 0) { lines.push("Running:"); @@ -244,6 +245,12 @@ async function handleListTasks(msg: IncomingMessage, deps: HandlerDeps) { lines.push(` ${t.id.slice(0, 7)} — "${t.prompt.slice(0, 60)}" on ${t.repo} (${formatDuration(t.createdAt)})`); } } + if (waiting.length > 0) { + lines.push("Waiting for input:"); + for (const t of waiting) { + lines.push(` ${t.id.slice(0, 7)} — "${t.prompt.slice(0, 60)}" on ${t.repo} (awaiting reply)`); + } + } if (pending.length > 0) { lines.push("Pending:"); for (const t of pending) { diff --git a/src/queue.test.ts b/src/queue.test.ts index cd07b35..d2f112b 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -290,7 +290,7 @@ describe("TaskQueue", () => { it("setWaiting transitions running task to waiting_user", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); // running - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const task = queue.get(id); expect(task?.status).toBe("waiting_user"); }); @@ -298,7 +298,7 @@ describe("TaskQueue", () => { it("resume transitions waiting_user back to running", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); queue.resume(id); const task = queue.get(id); expect(task?.status).toBe("running"); @@ -307,7 +307,7 @@ describe("TaskQueue", () => { it("dequeue skips waiting_user tasks", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); queue.enqueue({ userId: "u1", repo: "r1", prompt: "test2" }); const next = queue.dequeue(); expect(next).toBeNull(); @@ -316,7 +316,7 @@ describe("TaskQueue", () => { it("resetStale also resets waiting_user tasks", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const count = queue.resetStale(); expect(count).toBe(1); expect(queue.get(id)?.status).toBe("failed"); @@ -325,7 +325,7 @@ describe("TaskQueue", () => { it("listActive includes waiting_user tasks", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const active = queue.listActive(); expect(active.some(t => t.id === id)).toBe(true); }); @@ -333,7 +333,7 @@ describe("TaskQueue", () => { it("getWaitingForUser returns waiting_user task for a user", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const waiting = queue.getWaitingForUser("u1"); expect(waiting?.id).toBe(id); }); @@ -347,7 +347,7 @@ describe("TaskQueue", () => { it("cancel works on waiting_user tasks", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const cancelled = queue.cancel(id); expect(cancelled).toBe(true); expect(queue.get(id)?.status).toBe("failed"); @@ -356,7 +356,7 @@ describe("TaskQueue", () => { it("stats counts waiting_user tasks", () => { const id = queue.enqueue({ userId: "u1", repo: "r1", prompt: "test" }); queue.dequeue(); - queue.setWaiting(id, "waiting_user"); + queue.setWaiting(id); const stats = queue.stats(); expect(stats.waiting).toBe(1); }); diff --git a/src/queue.ts b/src/queue.ts index 375fe32..a9e77cc 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -113,15 +113,22 @@ export class TaskQueue { ); } + updateResult(id: string, result: string) { + this.db.run( + `UPDATE tasks SET result = ?, completed_at = ? WHERE id = ?`, + [result, new Date().toISOString(), id] + ); + } + get(id: string): Task | null { const row = this.db.query(`SELECT * FROM tasks WHERE id = ?`).get(id) as TaskRow; return row ? this.rowToTask(row) : null; } - setWaiting(id: string, status: "waiting_user") { + setWaiting(id: string) { this.db.run( - `UPDATE tasks SET status = ? WHERE id = ? AND status = 'running'`, - [status, id] + `UPDATE tasks SET status = 'waiting_user' WHERE id = ? AND status = 'running'`, + [id] ); } @@ -171,7 +178,7 @@ export class TaskQueue { avgDurationByRepo: { repo: string; avgMs: number; count: number }[]; throughput: { lastHour: number; last24h: number }; errorRate: number; - repoBreakdown: { repo: string; pending: number; running: number; completed: number; failed: number }[]; + repoBreakdown: { repo: string; pending: number; running: number; completed: number; failed: number; waiting: number }[]; } { const counts = this.stats(); @@ -212,12 +219,13 @@ export class TaskQueue { COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, - COUNT(*) FILTER (WHERE status = 'failed') as failed + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) FILTER (WHERE status = 'waiting_user') as waiting FROM tasks GROUP BY repo ORDER BY (COUNT(*) FILTER (WHERE status = 'running') + COUNT(*) FILTER (WHERE status = 'pending')) DESC, repo ASC` ) - .all() as { repo: string; pending: number; running: number; completed: number; failed: number }[]; + .all() as { repo: string; pending: number; running: number; completed: number; failed: number; waiting: number }[]; return { counts, diff --git a/src/runners/claude.ts b/src/runners/claude.ts index be60444..c88a1e6 100644 --- a/src/runners/claude.ts +++ b/src/runners/claude.ts @@ -233,8 +233,12 @@ export class ClaudeRunner implements AgentRunner { return { sendMessage(text: string) { - const msg = JSON.stringify({ type: "user_message", content: text }) + "\n"; - proc.stdin.write(encoder.encode(msg)); + try { + const msg = JSON.stringify({ type: "user_message", content: text }) + "\n"; + proc.stdin.write(encoder.encode(msg)); + } catch (err) { + logger.warn("failed to write to streaming session stdin", { error: String(err) }); + } }, kill() { proc.kill(); diff --git a/src/worker.ts b/src/worker.ts index e6b1884..356883c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -180,7 +180,7 @@ async function processTask(task: Task, deps: WorkerDeps) { statusLog.push(event.text.slice(0, 200)); deps.trace.append(task.id, "status", event.text.slice(0, 200)); } else if (event.kind === "ask_user") { - deps.queue.setWaiting(task.id, "waiting_user"); + deps.queue.setWaiting(task.id); deps.sessionManager.setWaiting(task.id); deps.trace.append(task.id, "lifecycle", "Waiting for user input", event.question); const questionText = event.options.length > 0 @@ -313,7 +313,7 @@ async function processTask(task: Task, deps: WorkerDeps) { deps.queue.setSessionId(task.id, retryResult.sessionId); } - deps.queue.complete(task.id, retryResult.output); + deps.queue.updateResult(task.id, retryResult.output); await replyWithFallback( `[Scheduled: ${task.repo}] CI retry ${retry + 1}: ${retryResult.output.slice(0, 500)}`, originalMsg,