From 356d1f60849df2ea05bef0802df799828ef97d1a Mon Sep 17 00:00:00 2001 From: Robert Date: Thu, 5 Mar 2026 10:04:15 -0500 Subject: [PATCH] feat(pool): squash merge + persist review to SQLite Squash merge: release(name, true, taskId) now squash-merges agent commits into a single result commit on main. Full agent history preserved at refs/tasks/{taskId}. Without taskId, falls back to legacy ff/merge for backward compat. Uses temp branch + update-ref pattern since worktrees can't checkout main directly. Review persistence: task.review (ReviewResult) round-trips through SQLite via new review TEXT column with JSON serialization. Co-Authored-By: Claude Opus 4.6 --- src/__tests__/store.test.ts | 44 +++++++++++++ src/__tests__/worktree-pool.test.ts | 95 ++++++++++++++++++++++++++++ src/scheduler.ts | 2 +- src/store.ts | 28 ++++++--- src/types.ts | 9 +++ src/worktree-pool.ts | 98 ++++++++++++++++++++++------- 6 files changed, 244 insertions(+), 32 deletions(-) diff --git a/src/__tests__/store.test.ts b/src/__tests__/store.test.ts index 0bef9a3..fc94c7c 100644 --- a/src/__tests__/store.test.ts +++ b/src/__tests__/store.test.ts @@ -579,6 +579,50 @@ describe("Store", () => { }); }); + // ── review persistence ───────────────────────────────────────────────────── + + describe("review persistence", () => { + it("save() and get() round-trip review field", () => { + const { store, cleanup } = makeTempStore(); + try { + const review = { approve: true, score: 85, issues: [], suggestions: ["clean"] }; + const task = makeTask({ id: "rev-1", review }); + store.save(task); + const got = store.get("rev-1"); + assert.ok(got !== null); + assert.deepStrictEqual(got.review, review); + } finally { + cleanup(); + } + }); + + it("task without review returns undefined after load", () => { + const { store, cleanup } = makeTempStore(); + try { + const task = makeTask({ id: "rev-2" }); + store.save(task); + const got = store.get("rev-2"); + assert.ok(got !== null); + assert.strictEqual(got.review, undefined); + } finally { + cleanup(); + } + }); + + it("update() can set review field", () => { + const { store, cleanup } = makeTempStore(); + try { + store.save(makeTask({ id: "rev-3" })); + const review = { approve: false, score: 40, issues: ["bug"], suggestions: [] }; + const updated = store.update("rev-3", { review }); + assert.ok(updated !== null); + assert.deepStrictEqual(updated.review, review); + } finally { + cleanup(); + } + }); + }); + // ── close ─────────────────────────────────────────────────────────────────── describe("close", () => { diff --git a/src/__tests__/worktree-pool.test.ts b/src/__tests__/worktree-pool.test.ts index b5982ff..9b13ca3 100644 --- a/src/__tests__/worktree-pool.test.ts +++ b/src/__tests__/worktree-pool.test.ts @@ -348,6 +348,101 @@ describe("WorktreePool lookup", () => { }); }); +// --------------------------------------------------------------------------- +// Squash merge +// --------------------------------------------------------------------------- + +describe("WorktreePool squash merge", () => { + it("release(name, true, taskId) squash-merges to single commit", async () => { + const { repoPath, cleanup } = await makeTempRepo(); + try { + const pool = new WorktreePool(repoPath, 1); + await pool.init(); + + const worker = await pool.acquire(); + assert.ok(worker !== null, "should acquire a worker"); + + const git = (...args: string[]) => execFileAsync("git", args, { cwd: worker.path }); + + // Count commits on main before + const { stdout: mainLogBefore } = await execFileAsync("git", ["log", "--oneline", "main"], { cwd: repoPath }); + const mainCommitsBefore = mainLogBefore.trim().split("\n").length; + + // Make 3 commits in the worktree (simulating agent work) + fs.writeFileSync(path.join(worker.path, "file1.txt"), "hello\n"); + await git("add", "file1.txt"); + await git("commit", "-m", "agent commit 1"); + + fs.writeFileSync(path.join(worker.path, "file2.txt"), "world\n"); + await git("add", "file2.txt"); + await git("commit", "-m", "agent commit 2"); + + fs.writeFileSync(path.join(worker.path, "file3.txt"), "!\n"); + await git("add", "file3.txt"); + await git("commit", "-m", "agent commit 3"); + + // Release with squash merge + const result = await pool.release(worker.name, true, "test123"); + assert.strictEqual(result.merged, true, "merge should succeed"); + + // Assert: main has exactly 1 new commit (squashed) + const { stdout: mainLogAfter } = await execFileAsync("git", ["log", "--oneline", "main"], { cwd: repoPath }); + const mainCommitsAfter = mainLogAfter.trim().split("\n").length; + assert.strictEqual(mainCommitsAfter, mainCommitsBefore + 1, "main should have exactly 1 new commit"); + + // Assert: the squash commit message contains the taskId + const { stdout: lastCommit } = await execFileAsync("git", ["log", "-1", "--format=%s", "main"], { cwd: repoPath }); + assert.ok(lastCommit.includes("task(test123)"), `commit msg should contain task(test123), got: ${lastCommit.trim()}`); + + // Assert: refs/tasks/test123 exists + const { stdout: refOut } = await execFileAsync("git", ["show-ref", "refs/tasks/test123"], { cwd: repoPath }); + assert.ok(refOut.trim().length > 0, "refs/tasks/test123 should exist"); + + // Assert: the ref preserves all 3 original commits + const { stdout: refLog } = await execFileAsync("git", ["log", "--oneline", "refs/tasks/test123"], { cwd: repoPath }); + const refCommits = refLog.trim().split("\n"); + // 3 agent commits + 1 initial commit = 4 total + assert.strictEqual(refCommits.length, 4, "ref should show all 3 agent commits + initial"); + } finally { + cleanup(); + } + }); + + it("release(name, true) without taskId falls back to legacy merge", async () => { + const { repoPath, cleanup } = await makeTempRepo(); + try { + const pool = new WorktreePool(repoPath, 1); + await pool.init(); + + const worker = await pool.acquire(); + assert.ok(worker !== null); + + const git = (...args: string[]) => execFileAsync("git", args, { cwd: worker.path }); + + // Make 2 commits + fs.writeFileSync(path.join(worker.path, "a.txt"), "a\n"); + await git("add", "a.txt"); + await git("commit", "-m", "commit a"); + + fs.writeFileSync(path.join(worker.path, "b.txt"), "b\n"); + await git("add", "b.txt"); + await git("commit", "-m", "commit b"); + + // Release WITHOUT taskId — should use legacy ff/merge + const result = await pool.release(worker.name, true); + assert.strictEqual(result.merged, true, "merge should succeed"); + + // Main should have all individual commits (not squashed) + const { stdout: mainLog } = await execFileAsync("git", ["log", "--oneline", "main"], { cwd: repoPath }); + const commits = mainLog.trim().split("\n"); + // initial + 2 agent commits = 3 (ff merge preserves all) + assert.ok(commits.length >= 3, `expected >= 3 commits on main, got ${commits.length}`); + } finally { + cleanup(); + } + }); +}); + // --------------------------------------------------------------------------- // Worker stats // --------------------------------------------------------------------------- diff --git a/src/scheduler.ts b/src/scheduler.ts index 81f1327..62d854f 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -472,7 +472,7 @@ export class Scheduler { await this.runner.run(task, workerPath, this.onEvent); const shouldMerge = task.status === "success"; - const mergeResult = await this.pool.release(workerName, shouldMerge); + const mergeResult = await this.pool.release(workerName, shouldMerge, task.id); if (shouldMerge && !mergeResult.merged) { const fileList = mergeResult.conflictFiles?.length diff --git a/src/store.ts b/src/store.ts index d8b27b5..55117f6 100644 --- a/src/store.ts +++ b/src/store.ts @@ -82,6 +82,12 @@ export class Store { } catch { // Column already exists — safe to ignore } + // Add review column to persist cross-agent review results + try { + this.db.exec("ALTER TABLE tasks ADD COLUMN review TEXT"); + } catch { + // Column already exists — safe to ignore + } // Indexes for common query patterns this.db.exec( "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)" @@ -115,6 +121,7 @@ export class Store { JSON.stringify(task.tags ?? []), task.dependsOn ?? null, task.webhookUrl ?? null, task.summary ?? null, task.agent ?? "claude", + JSON.stringify(task.review ?? null), ]; } @@ -126,8 +133,8 @@ export class Store { (id, prompt, status, worktree, output, error, events, created_at, started_at, completed_at, timeout, max_budget, cost_usd, token_input, token_output, duration_ms, retry_count, max_retries, priority, tags, - depends_on, webhook_url, summary, agent) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + depends_on, webhook_url, summary, agent, review) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `).run(...params); if (insertResult.changes === 0) { // Row already exists — update it (params[0] is id, rest are fields; append id at end for WHERE) @@ -136,7 +143,7 @@ export class Store { prompt=?, status=?, worktree=?, output=?, error=?, events=?, created_at=?, started_at=?, completed_at=?, timeout=?, max_budget=?, cost_usd=?, token_input=?, token_output=?, duration_ms=?, retry_count=?, max_retries=?, - priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=? + priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=?, review=? WHERE id=? `).run(...params.slice(1), task.id); } @@ -153,15 +160,15 @@ export class Store { (id, prompt, status, worktree, output, error, events, created_at, started_at, completed_at, timeout, max_budget, cost_usd, token_input, token_output, duration_ms, retry_count, max_retries, priority, tags, - depends_on, webhook_url, summary, agent) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + depends_on, webhook_url, summary, agent, review) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const updateStmt = this.db.prepare(` UPDATE tasks SET prompt=?, status=?, worktree=?, output=?, error=?, events=?, created_at=?, started_at=?, completed_at=?, timeout=?, max_budget=?, cost_usd=?, token_input=?, token_output=?, duration_ms=?, retry_count=?, max_retries=?, - priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=? + priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=?, review=? WHERE id=? `); const runAll = this.db.transaction((batch: Task[]) => { @@ -191,15 +198,15 @@ export class Store { (id, prompt, status, worktree, output, error, events, created_at, started_at, completed_at, timeout, max_budget, cost_usd, token_input, token_output, duration_ms, retry_count, max_retries, priority, tags, - depends_on, webhook_url, summary, agent) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + depends_on, webhook_url, summary, agent, review) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const updateStmt = this.db.prepare(` UPDATE tasks SET prompt=?, status=?, worktree=?, output=?, error=?, events=?, created_at=?, started_at=?, completed_at=?, timeout=?, max_budget=?, cost_usd=?, token_input=?, token_output=?, duration_ms=?, retry_count=?, max_retries=?, - priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=? + priority=?, tags=?, depends_on=?, webhook_url=?, summary=?, agent=?, review=? WHERE id=? `); this.transaction(() => { @@ -243,6 +250,7 @@ export class Store { webhookUrl: { col: "webhook_url" }, summary: { col: "summary" }, agent: { col: "agent" }, + review: { col: "review", serialize: (v) => JSON.stringify(v) }, }; const setClauses: string[] = []; @@ -400,6 +408,8 @@ export class Store { webhookUrl: row.webhook_url ?? undefined, summary: row.summary ?? undefined, agent: row.agent ?? "claude", + // ?? undefined converts null (from JSON.parse("null")) back to undefined + review: this.safeJsonParse(row.review, undefined) ?? undefined, }; } diff --git a/src/types.ts b/src/types.ts index c12de74..e9e867f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,14 @@ export type TaskPriority = "urgent" | "high" | "normal" | "low"; /** Built-in agent types with known output parsing. Any string is accepted for generic CLI agents. */ export type AgentType = "claude" | "claude-sdk" | "codex"; +export interface ReviewResult { + approve: boolean; + score: number; + issues: string[]; + suggestions: string[]; + reviewAgent?: string; +} + export interface Task { id: string; prompt: string; @@ -28,6 +36,7 @@ export interface Task { webhookUrl?: string; summary?: string; agent?: string; + review?: ReviewResult; } export interface TaskEvent { diff --git a/src/worktree-pool.ts b/src/worktree-pool.ts index 15b55ee..8f07c93 100644 --- a/src/worktree-pool.ts +++ b/src/worktree-pool.ts @@ -98,7 +98,7 @@ export class WorktreePool { } } - async release(name: string, merge: boolean): Promise<{ merged: boolean; conflictFiles?: string[] }> { + async release(name: string, merge: boolean, taskId?: string): Promise<{ merged: boolean; conflictFiles?: string[] }> { await this.waitLock(); this.lock = true; try { @@ -108,7 +108,7 @@ export class WorktreePool { let result: { merged: boolean; conflictFiles?: string[] } = { merged: true }; if (merge) { try { - result = await this.mergeToMain(w); + result = await this.mergeToMain(w, taskId); } catch (err) { log("error", "[pool] release: mergeToMain failed", { worker: w.name, err: String(err) }); throw new Error(`Failed to merge worktree '${w.name}' to main: ${String(err)}`); @@ -183,44 +183,98 @@ export class WorktreePool { } } - private async mergeToMain(w: WorkerInfo): Promise<{ merged: boolean; conflictFiles?: string[] }> { + private async mergeToMain(w: WorkerInfo, taskId?: string): Promise<{ merged: boolean; conflictFiles?: string[] }> { // Check if branch has new commits vs main const { stdout: diff } = await this.git("log", `main..${w.branch}`, "--oneline"); if (!diff.trim()) return { merged: true }; - log("info", "[pool] merging branch", { branch: w.branch, target: "main" }); + log("info", "[pool] merging branch", { branch: w.branch, target: "main", taskId }); - // Safe merge: update the main ref without touching HEAD or working tree. - // First try fast-forward via `git fetch . :main`. - // If that fails (non-ff), do a real merge in the worktree instead. + // When taskId is provided, use squash merge to keep main clean. + // Otherwise fall back to the legacy ff/merge behavior for backward compat. + if (taskId) { + return this.squashMergeToMain(w, taskId, diff.trim()); + } + + // Legacy path: fast-forward or merge commit try { await this.git("fetch", ".", `${w.branch}:main`); } catch { - // Non-fast-forward — merge in the worktree where it's safe + // Non-fast-forward — merge in worktree via temp branch (can't checkout main directly) + const tmpBranch = `_merge_legacy_${Date.now()}`; try { - await this.gitIn(w.path, "checkout", "main"); + const mainSha = (await this.git("rev-parse", "main")).stdout.trim(); + await this.gitIn(w.path, "checkout", "-b", tmpBranch, mainSha); await this.gitIn(w.path, "merge", w.branch, "--no-edit"); - // Push the merged main back to the main repo ref - await this.git("fetch", w.path, "main:main"); + const mergedSha = (await this.gitIn(w.path, "rev-parse", "HEAD")).stdout.trim(); + await this.git("update-ref", "refs/heads/main", mergedSha); } catch { - let conflictFiles: string[] = []; - try { - const { stdout } = await this.gitIn(w.path, "diff", "--name-only", "--diff-filter=U"); - conflictFiles = stdout.trim().split("\n").filter(Boolean); - } catch {} - - log("warn", "[pool] merge conflict, aborting", { branch: w.branch }); - await this.gitIn(w.path, "merge", "--abort").catch(() => {}); - await this.gitIn(w.path, "checkout", w.branch).catch(() => {}); - return { merged: false, conflictFiles }; + await this.cleanupTmpBranch(w, tmpBranch); + return this.handleMergeConflict(w); } + await this.cleanupTmpBranch(w, tmpBranch); + } + + await this.resetWorktree(w); + return { merged: true }; + } + + private async squashMergeToMain( + w: WorkerInfo, + taskId: string, + logOutput: string, + ): Promise<{ merged: boolean; conflictFiles?: string[] }> { + // Use the most recent commit message as summary (git log outputs newest first) + const firstLine = logOutput.split("\n")[0] ?? ""; + const rawSummary = firstLine.replace(/^[0-9a-f]+\s+/, ""); + const summary = rawSummary.length <= 72 + ? rawSummary + : rawSummary.slice(0, 72).replace(/\s+\S*$/, ""); + + // Cache branch tip before merge — avoids redundant rev-parse after + const branchTip = (await this.gitIn(w.path, "rev-parse", w.branch)).stdout.trim(); + + // Can't `checkout main` in a worktree (git refuses if main is checked out + // by the main repo). Instead, create a temporary branch from main, squash + // there, then update the main ref via update-ref. + const tmpBranch = `_squash_${taskId}_${Date.now()}`; + try { + const mainSha = (await this.git("rev-parse", "main")).stdout.trim(); + await this.gitIn(w.path, "checkout", "-b", tmpBranch, mainSha); + await this.gitIn(w.path, "merge", "--squash", w.branch); + await this.gitIn(w.path, "commit", "-m", `task(${taskId}): ${summary}`); + const squashedSha = (await this.gitIn(w.path, "rev-parse", "HEAD")).stdout.trim(); + await this.git("update-ref", "refs/heads/main", squashedSha); + // Preserve the full agent commit history only after successful merge + await this.git("update-ref", `refs/tasks/${taskId}`, branchTip); + } catch { + await this.cleanupTmpBranch(w, tmpBranch); + return this.handleMergeConflict(w); } - // Reset worktree to latest main + await this.cleanupTmpBranch(w, tmpBranch); await this.resetWorktree(w); return { merged: true }; } + private async cleanupTmpBranch(w: WorkerInfo, tmpBranch: string): Promise { + await this.gitIn(w.path, "checkout", w.branch).catch(() => {}); + await this.git("branch", "-D", tmpBranch).catch(() => {}); + } + + private async handleMergeConflict(w: WorkerInfo): Promise<{ merged: boolean; conflictFiles?: string[] }> { + let conflictFiles: string[] = []; + try { + const { stdout } = await this.gitIn(w.path, "diff", "--name-only", "--diff-filter=U"); + conflictFiles = stdout.trim().split("\n").filter(Boolean); + } catch {} + + log("warn", "[pool] merge conflict, aborting", { branch: w.branch }); + await this.gitIn(w.path, "merge", "--abort").catch(() => {}); + await this.gitIn(w.path, "checkout", w.branch).catch(() => {}); + return { merged: false, conflictFiles }; + } + get available(): number { let n = 0; for (const w of this.workers.values()) if (!w.busy) n++;