From 80fe5f0862690a8daa5d79d5d52920942aaddb42 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 16:52:24 -0700 Subject: [PATCH 01/14] fix(bounty): multi-store atomicity, saga settlement, reconciler framework (#240) Addresses the three codex-flagged correctness bugs in bounty operations and adds infrastructure to prevent recurrence: - Add pending_settlement saga pivot state so settleBountyOperation never enters a terminal state before capture() confirms - Add pre-flight status checks in claim/settle to prevent wasted side effects - Add input validation (amount > 0, non-empty title) at operation boundary - Add LRU doc cache + ETag-forwarding in NexusBountyStore to reduce VFS round-trips in multi-transition flows - Add SweepReconciler framework with pluggable strategies for periodic consistency repair - Add BountyIndexSweep (dual-write index repair), SettlementSweep (resume stalled pending_settlement), HandoffSweep (detect orphans) - Add FailingBountyStore test wrapper for partial-failure injection - Add lazy eviction of expired reservations in InMemoryCreditsService 115 tests pass across 4 test files including all 3 acceptance criteria from Issue #240. --- src/core/bounty-index-sweep.ts | 64 ++ src/core/bounty-logic.ts | 3 +- src/core/bounty-store.ts | 32 +- src/core/bounty.ts | 2 + src/core/handoff-sweep.ts | 59 ++ src/core/in-memory-credits.ts | 20 +- src/core/operations/bounty.test.ts | 625 ++++++++++++++++++++ src/core/operations/bounty.ts | 57 +- src/core/operations/failing-bounty-store.ts | 134 +++++ src/core/settlement-sweep.ts | 85 +++ src/core/sweep-reconciler.test.ts | 342 +++++++++++ src/core/sweep-reconciler.ts | 155 +++++ src/local/sqlite-bounty-store.ts | 16 +- src/nexus/nexus-bounty-store.ts | 91 ++- 14 files changed, 1646 insertions(+), 39 deletions(-) create mode 100644 src/core/bounty-index-sweep.ts create mode 100644 src/core/handoff-sweep.ts create mode 100644 src/core/operations/failing-bounty-store.ts create mode 100644 src/core/settlement-sweep.ts create mode 100644 src/core/sweep-reconciler.test.ts create mode 100644 src/core/sweep-reconciler.ts diff --git a/src/core/bounty-index-sweep.ts b/src/core/bounty-index-sweep.ts new file mode 100644 index 00000000..3dd85d01 --- /dev/null +++ b/src/core/bounty-index-sweep.ts @@ -0,0 +1,64 @@ +/** + * BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency. + * + * Scans all bounty documents and ensures: + * 1. Each bounty has a status index entry matching its current status. + * 2. Stale index entries (pointing to a bounty with a different status) are deleted. + * + * This fixes the dual-write gap where the document write succeeds but the + * status index write fails (Issue #240, findings #2 and #3). + * + * Uses BountyStore.repairIndex() when available. For stores without a separate + * index (e.g., SQLite), this sweep is detection-only. + */ + +import type { BountyStore } from "./bounty-store.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +export class BountyIndexSweep implements SweepStrategy { + readonly name = "BountyIndexSweep"; + private readonly bountyStore: BountyStore; + + constructor(bountyStore: BountyStore) { + this.bountyStore = bountyStore; + } + + async sweep(): Promise { + let found = 0; + let repaired = 0; + const errors: Error[] = []; + + try { + // List ALL bounties (unfiltered) to get the authoritative document state. + const allBounties = await this.bountyStore.listBounties(); + + for (const bounty of allBounties) { + try { + // Verify the bounty appears in its own status-filtered query. + const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); + const inIndex = byStatus.some((b) => b.bountyId === bounty.bountyId); + if (!inIndex) { + found++; + // Attempt repair if the store supports it + if (this.bountyStore.repairIndex) { + await this.bountyStore.repairIndex(bounty.bountyId); + repaired++; + } + } + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Index check failed for bounty ${bounty.bountyId}: ${String(err)}`), + ); + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`BountyIndexSweep failed: ${String(err)}`), + ); + } + + return { strategy: this.name, found, repaired, errors }; + } +} diff --git a/src/core/bounty-logic.ts b/src/core/bounty-logic.ts index 24898227..568c6da9 100644 --- a/src/core/bounty-logic.ts +++ b/src/core/bounty-logic.ts @@ -18,7 +18,8 @@ import type { Contribution, Score } from "./models.js"; const VALID_TRANSITIONS: Readonly> = { draft: ["open", "cancelled"], open: ["claimed", "expired", "cancelled"], - claimed: ["completed", "open", "expired", "cancelled"], + claimed: ["pending_settlement", "completed", "open", "expired", "cancelled"], + pending_settlement: ["completed", "claimed", "expired", "cancelled"], completed: ["settled", "expired", "cancelled"], settled: [], expired: [], diff --git a/src/core/bounty-store.ts b/src/core/bounty-store.ts index 27a6c784..c6210112 100644 --- a/src/core/bounty-store.ts +++ b/src/core/bounty-store.ts @@ -101,13 +101,27 @@ export interface BountyStore { ): Promise; /** - * Mark a bounty as completed (work done, pending settlement). + * Transition a bounty to 'pending_settlement' — the saga pivot point. * - * @param bountyId - The bounty to complete. + * After this transition, the bounty is committed to settlement. The caller + * must capture credits and then advance to completed → settled. If the + * caller crashes, the reconciler will resume from pending_settlement. + * + * @param bountyId - The bounty entering settlement. * @param fulfilledByCid - CID of the contribution that fulfilled the bounty. * @returns The updated bounty snapshot. * @throws BountyStateError if bounty is not in 'claimed' status. */ + beginSettlement(bountyId: string, fulfilledByCid: string): Promise; + + /** + * Mark a bounty as completed (work done, credits captured). + * + * @param bountyId - The bounty to complete. + * @param fulfilledByCid - CID of the contribution that fulfilled the bounty. + * @returns The updated bounty snapshot. + * @throws BountyStateError if bounty is not in 'pending_settlement' status. + */ completeBounty(bountyId: string, fulfilledByCid: string): Promise; /** @@ -138,7 +152,7 @@ export interface BountyStore { cancelBounty(bountyId: string): Promise; // ----------------------------------------------------------------------- - // Expiry sweep + // Expiry sweep + index repair // ----------------------------------------------------------------------- /** @@ -149,6 +163,18 @@ export interface BountyStore { */ findExpiredBounties(): Promise; + /** + * Repair the status index for a bounty so it matches the authoritative + * document state. Idempotent — calling on an already-correct entry is a no-op. + * + * Used by BountyIndexSweep to fix dual-write gaps where the document was + * written but the status index was not. For stores without a separate + * index (e.g., SQLite), this is a no-op. + * + * Optional: stores that don't implement this will not be swept. + */ + repairIndex?(bountyId: string): Promise; + // ----------------------------------------------------------------------- // Reward records // ----------------------------------------------------------------------- diff --git a/src/core/bounty.ts b/src/core/bounty.ts index 01fde9b3..5b89fbee 100644 --- a/src/core/bounty.ts +++ b/src/core/bounty.ts @@ -21,6 +21,8 @@ export const BountyStatus = { Open: "open", /** Claimed by an agent (work in progress). */ Claimed: "claimed", + /** Capture initiated, awaiting state advancement. Saga pivot point. */ + PendingSettlement: "pending_settlement", /** Work completed, pending settlement. */ Completed: "completed", /** Credits distributed to fulfiller. */ diff --git a/src/core/handoff-sweep.ts b/src/core/handoff-sweep.ts new file mode 100644 index 00000000..c630fda0 --- /dev/null +++ b/src/core/handoff-sweep.ts @@ -0,0 +1,59 @@ +/** + * HandoffSweep — reconciler strategy for detecting orphaned contributions. + * + * Finds contributions that have zero handoff records where the contribution + * kind normally generates handoffs (work, review, reproduction, adoption). + * Reports them for operator attention. + * + * Detection-only: repair requires re-running the routing logic from the + * topology/contract, which this sweep doesn't have access to. Full + * auto-repair is a follow-up (tracked in Issue #240). + * + * Absorbs the handoff reconciler scope from Issue #227 → #240. + */ + +import type { HandoffStore } from "./handoff.js"; +import type { ContributionStore } from "./store.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +/** Contribution kinds that normally generate handoff records. */ +const HANDOFF_ELIGIBLE_KINDS = new Set(["work", "review", "reproduction", "adoption"]); + +export class HandoffSweep implements SweepStrategy { + readonly name = "HandoffSweep"; + private readonly contributionStore: ContributionStore; + private readonly handoffStore: HandoffStore; + + constructor(contributionStore: ContributionStore, handoffStore: HandoffStore) { + this.contributionStore = contributionStore; + this.handoffStore = handoffStore; + } + + async sweep(): Promise { + let found = 0; + const errors: Error[] = []; + + try { + // Scan recent contributions that should have handoffs + const contributions = await this.contributionStore.list({ limit: 500 }); + const eligible = contributions.filter((c) => HANDOFF_ELIGIBLE_KINDS.has(c.kind)); + + // Check each eligible contribution for any handoff records + const handoffs = await this.handoffStore.list(); + const cidsWithHandoffs = new Set(handoffs.map((h) => h.sourceCid)); + + for (const contribution of eligible) { + if (!cidsWithHandoffs.has(contribution.cid)) { + found++; + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`HandoffSweep scan failed: ${String(err)}`), + ); + } + + // Detection-only: repaired is always 0. Operator acts on found > 0. + return { strategy: this.name, found, repaired: 0, errors }; + } +} diff --git a/src/core/in-memory-credits.ts b/src/core/in-memory-credits.ts index 4fae10c1..6e14922a 100644 --- a/src/core/in-memory-credits.ts +++ b/src/core/in-memory-credits.ts @@ -292,15 +292,23 @@ export class InMemoryCreditsService implements CreditsService { private getReserved(agentId: string): number { const now = Date.now(); let reserved = 0; - for (const res of this.reservations.values()) { - if (res.agentId === agentId && !res.captured && !res.voided) { - // Skip expired reservations — they no longer hold funds - if (new Date(res.expiresAt).getTime() <= now) { - continue; - } + const evictable: string[] = []; + for (const [id, res] of this.reservations.entries()) { + const expired = new Date(res.expiresAt).getTime() <= now; + // Evict only expired reservations that were never captured or voided. + // Captured/voided entries must stay for idempotency checks in + // capture() and void(). + if (expired && !res.captured && !res.voided) { + evictable.push(id); + continue; + } + if (res.agentId === agentId && !res.captured && !res.voided && !expired) { reserved += res.amount; } } + for (const id of evictable) { + this.reservations.delete(id); + } return reserved; } } diff --git a/src/core/operations/bounty.test.ts b/src/core/operations/bounty.test.ts index 003ab9bc..e64998eb 100644 --- a/src/core/operations/bounty.test.ts +++ b/src/core/operations/bounty.test.ts @@ -339,6 +339,13 @@ describe("settleBountyOperation", () => { expect(bounty.ok).toBe(true); if (!bounty.ok) return; + // Must claim before settle (pre-flight check requires "claimed" status) + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + expect(claim.ok).toBe(true); + const result = await settleBountyOperation( { bountyId: bounty.value.bountyId, @@ -369,6 +376,12 @@ describe("settleBountyOperation", () => { expect(bounty.ok).toBe(true); if (!bounty.ok) return; + // Must claim before settle (pre-flight check requires "claimed" status) + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + // Create contribution WITHOUT the required tag const contrib = await contributeOperation( { @@ -397,3 +410,615 @@ describe("settleBountyOperation", () => { } }); }); + +// --------------------------------------------------------------------------- +// Input validation edge cases (Issue 7A + 11A) +// --------------------------------------------------------------------------- + +describe("createBountyOperation input validation", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects zero amount", async () => { + const result = await createBountyOperation( + { title: "Zero", amount: 0, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("positive"); + } + }); + + test("rejects negative amount", async () => { + const result = await createBountyOperation( + { title: "Neg", amount: -50, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("positive"); + } + }); + + test("rejects empty title", async () => { + const result = await createBountyOperation( + { title: "", amount: 100, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("title"); + } + }); + + test("rejects whitespace-only title", async () => { + const result = await createBountyOperation( + { title: " ", amount: 100, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("title"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Pre-flight status validation (Issue 6A) +// --------------------------------------------------------------------------- + +describe("claimBountyOperation status checks", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects claim on already-claimed bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Double claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim succeeds + const firstClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + expect(firstClaim.ok).toBe(true); + + // Second claim should fail with pre-flight check + const secondClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(secondClaim.ok).toBe(false); + if (!secondClaim.ok) { + expect(secondClaim.error.code).toBe("VALIDATION_ERROR"); + expect(secondClaim.error.message).toContain("not open"); + } + }); +}); + +describe("settleBountyOperation status checks", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects settle on unclaimed (open) bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Not claimed", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + const contrib = await contributeOperation( + { kind: "work", summary: "Some work", agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("cannot be settled"); + } + }); + + test("rejects settle on already-settled bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Create → claim → contribute → settle (success) + const bounty = await createBountyOperation( + { + title: "Already settled", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const firstSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(firstSettle.ok).toBe(true); + + // Second settle should fail with pre-flight check + const secondSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(secondSettle.ok).toBe(false); + if (!secondSettle.ok) { + expect(secondSettle.error.code).toBe("VALIDATION_ERROR"); + expect(secondSettle.error.message).toContain("cannot be settled"); + } + }); + + test("rejects settle when creditsService missing but bounty has reservationId", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Escrow test", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Done", agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Remove credits service to simulate missing provider + const depsNoCredits: OperationDeps = { ...deps, creditsService: undefined }; + + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + depsNoCredits, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("creditsService"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Non-escrowed bounty lifecycle (Issue 12A) +// --------------------------------------------------------------------------- + +describe("non-escrowed bounty lifecycle", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("settles a bounty end-to-end without credits service", async () => { + const depsNoCredits: OperationDeps = { ...deps, creditsService: undefined }; + + // 1. Create bounty without escrow + const bounty = await createBountyOperation( + { + title: "Reputation only", + amount: 100, + criteria: { description: "Any work", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + depsNoCredits, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + expect(bounty.value.reservationId).toBeUndefined(); + + // 2. Claim + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + depsNoCredits, + ); + expect(claim.ok).toBe(true); + + // 3. Contribute + const contrib = await contributeOperation( + { + kind: "work", + summary: "Reputation fix", + tags: ["fix"], + agent: { agentId: "worker" }, + }, + depsNoCredits, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // 4. Settle (no capture since no reservation) + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + depsNoCredits, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.status).toBe("settled"); + expect(result.value.amount).toBe(100); + expect(result.value.paidTo).toBe("worker"); + }); +}); + +// --------------------------------------------------------------------------- +// Sequential conflict tests (Issue 10B) +// --------------------------------------------------------------------------- + +describe("sequential conflict scenarios", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("second claim on same bounty fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Conflict claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim succeeds + const first = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + expect(first.ok).toBe(true); + + // Second claim fails — pre-flight rejects non-open bounty + const second = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(second.ok).toBe(false); + if (!second.ok) { + expect(second.error.code).toBe("VALIDATION_ERROR"); + } + }); + + test("settle after settle fails (idempotent capture but state rejects)", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Full lifecycle to settled + const bounty = await createBountyOperation( + { + title: "Double settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const firstSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(firstSettle.ok).toBe(true); + + // Second settle blocked by pre-flight (status is "settled") + const secondSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(secondSettle.ok).toBe(false); + if (!secondSettle.ok) { + expect(secondSettle.error.code).toBe("VALIDATION_ERROR"); + } + }); + + test("claim after settle fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Full lifecycle to settled + const bounty = await createBountyOperation( + { + title: "Claim after settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + + // Claim on settled bounty — pre-flight rejects + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "latecomer" } }, + deps, + ); + expect(claim.ok).toBe(false); + if (!claim.ok) { + expect(claim.error.code).toBe("VALIDATION_ERROR"); + expect(claim.error.message).toContain("not open"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Acceptance criteria tests (Issue #240) — FailingBountyStore partial failures +// --------------------------------------------------------------------------- + +import { FailingBountyStore } from "./failing-bounty-store.js"; + +describe("partial failure acceptance criteria (#240)", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("AC1: capture throws after state transitions — bounty retryable via pending_settlement", async () => { + // Simulate: settle starts, pivot to pending_settlement succeeds, + // but capture() fails. The bounty should be in pending_settlement + // and retryable. + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Capture fail test", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Configure credits service to fail on capture + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated capture failure"), + }); + + // First settle attempt — fails during capture + const failedSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(failedSettle.ok).toBe(false); + + // Bounty should be in pending_settlement (pivot committed) + const stuck = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stuck?.status).toBe("pending_settlement"); + + // Clear the failure — capture will now succeed (idempotent) + (deps.creditsService as InMemoryCreditsService).setFailures({}); + + // Retry settle — should resume from pending_settlement + const retrySettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(retrySettle.ok).toBe(true); + if (!retrySettle.ok) return; + expect(retrySettle.value.status).toBe("settled"); + expect(retrySettle.value.paidTo).toBe("worker"); + }); + + test("AC2: createBounty throws post-commit — reservation must NOT be voided", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const failingStore = new FailingBountyStore(deps.bountyStore!); + failingStore.failOnNext("createBounty"); + const failDeps: OperationDeps = { ...deps, bountyStore: failingStore }; + + // createBountyOperation fails — but the bounty IS committed in the delegate + const result = await createBountyOperation( + { + title: "Post-commit fail", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + failDeps, + ); + + // Operation reports failure + expect(result.ok).toBe(false); + + // The bounty exists in the store (post-commit failure) + const bounties = await deps.bountyStore!.listBounties(); + expect(bounties.length).toBe(1); + expect(bounties[0]?.status).toBe("open"); + + // The reservation was NOT voided — it's still active. + // The operation does NOT compensate on failure (by design — no try/catch + // compensation, which was the bug in the WIP that #240 identified). + // The reservation will auto-expire via its timeout. + const balanceInfo = await (deps.creditsService as InMemoryCreditsService).balance("creator"); + // 1000 - 100 reserved = 900 available (reservation still active) + expect(balanceInfo.available).toBe(900); + expect(balanceInfo.reserved).toBe(100); + }); + + test("AC3: claimBounty throws post-commit — claim must NOT be released", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Claim fail test", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + const failingStore = new FailingBountyStore(deps.bountyStore!); + failingStore.failOnNext("claimBounty"); + const failDeps: OperationDeps = { ...deps, bountyStore: failingStore }; + + // claimBountyOperation fails — but the bounty IS transitioned in the delegate + const result = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + failDeps, + ); + + // Operation reports failure + expect(result.ok).toBe(false); + + // The bounty is in "claimed" state in the store (post-commit) + const stored = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stored?.status).toBe("claimed"); + expect(stored?.claimedBy?.agentId).toBe("worker"); + + // The claim was NOT released — it's still active in the claim store. + // The operation does NOT compensate on failure (by design). + // The claim will expire via its lease timeout (30min default). + const claims = await deps.claimStore!.activeClaims(); + const bountyClaimExists = claims.some((c) => c.targetRef === `bounty:${bounty.value.bountyId}`); + expect(bountyClaimExists).toBe(true); + }); +}); diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index ba2b5e44..e3bbe622 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -101,6 +101,14 @@ export interface SettleBountyInput { readonly contributionCid: string; } +// --------------------------------------------------------------------------- +// Error messages +// --------------------------------------------------------------------------- + +const MISSING_BOUNTY_STORE = "Bounty operations not available (missing bountyStore)"; +const MISSING_CLAIM_STORE = "Claim operations not available (missing claimStore)"; +const MISSING_CONTRIBUTION_STORE = "Settle bounty not available (missing contributionStore)"; + // --------------------------------------------------------------------------- // Operations // --------------------------------------------------------------------------- @@ -114,7 +122,14 @@ export async function createBountyOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); + return validationErr(MISSING_BOUNTY_STORE); + } + + if (!input.title || input.title.trim().length === 0) { + return validationErr("Bounty title must be a non-empty string"); + } + if (input.amount <= 0) { + return validationErr("Bounty amount must be positive"); } const agent = resolveAgent(input.agent); @@ -173,7 +188,7 @@ export async function listBountiesOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); + return validationErr(MISSING_BOUNTY_STORE); } const bounties = await deps.bountyStore.listBounties({ @@ -204,11 +219,11 @@ export async function claimBountyOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); + return validationErr(MISSING_BOUNTY_STORE); } if (deps.claimStore === undefined) { - return validationErr("Claim operations not available (missing claimStore)"); + return validationErr(MISSING_CLAIM_STORE); } const bounty = await deps.bountyStore.getBounty(input.bountyId); @@ -216,6 +231,12 @@ export async function claimBountyOperation( return notFound("Bounty", input.bountyId); } + if (bounty.status !== BS.Open) { + return validationErr( + `Bounty '${input.bountyId}' is not open for claims (current status: ${bounty.status})`, + ); + } + const agent = resolveAgent(input.agent); const now = new Date(); const claimId = crypto.randomUUID(); @@ -247,18 +268,24 @@ export async function claimBountyOperation( } } -/** Settle a completed bounty. */ +/** + * Settle a bounty using the saga pattern: + * claimed → pending_settlement (pivot) → capture → completed → settled + * + * Retryable: if the operation is called again on a pending_settlement bounty, + * it resumes from the capture step (capture is idempotent). + */ export async function settleBountyOperation( input: SettleBountyInput, deps: OperationDeps, ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); + return validationErr(MISSING_BOUNTY_STORE); } if (deps.contributionStore === undefined) { - return validationErr("Settle bounty not available (missing contributionStore)"); + return validationErr(MISSING_CONTRIBUTION_STORE); } const bounty = await deps.bountyStore.getBounty(input.bountyId); @@ -266,6 +293,13 @@ export async function settleBountyOperation( return notFound("Bounty", input.bountyId); } + // Allow both "claimed" (fresh settle) and "pending_settlement" (retry/resume) + if (bounty.status !== BS.Claimed && bounty.status !== BS.PendingSettlement) { + return validationErr( + `Bounty '${input.bountyId}' cannot be settled (current status: ${bounty.status})`, + ); + } + // Validate contribution exists and meets criteria const contribution = await deps.contributionStore.get(input.contributionCid); if (!contribution) { @@ -282,7 +316,12 @@ export async function settleBountyOperation( ); } - // Capture payment before state transition + // Step 1: Pivot — transition to pending_settlement (skip if resuming) + if (bounty.status === BS.Claimed) { + await deps.bountyStore.beginSettlement(input.bountyId, input.contributionCid); + } + + // Step 2: Capture payment (idempotent — safe to retry) if (deps.creditsService && bounty.reservationId && bounty.claimedBy) { await deps.creditsService.capture(bounty.reservationId, { toAgentId: bounty.claimedBy.agentId, @@ -291,7 +330,7 @@ export async function settleBountyOperation( await deps.creditsService.capture(bounty.reservationId); } - // Persist state transitions + // Step 3: Advance through completed → settled (retryable after pivot) const completed = await deps.bountyStore.completeBounty(input.bountyId, input.contributionCid); const settled = await deps.bountyStore.settleBounty(completed.bountyId); diff --git a/src/core/operations/failing-bounty-store.ts b/src/core/operations/failing-bounty-store.ts new file mode 100644 index 00000000..e5a66005 --- /dev/null +++ b/src/core/operations/failing-bounty-store.ts @@ -0,0 +1,134 @@ +/** + * FailingBountyStore — test wrapper that injects failures between internal + * store steps to simulate dual-write partial failures. + * + * Wraps a real BountyStore and can be configured to fail after specific + * operations, simulating the gap between document write and index write + * in NexusBountyStore. + * + * Usage: + * const real = new SqliteBountyStore(db); + * const failing = new FailingBountyStore(real); + * failing.failOnNext("createBounty"); // next createBounty throws after real store commits + * await createBountyOperation(input, { bountyStore: failing, ... }); + */ + +import type { Bounty, RewardRecord } from "../bounty.js"; +import type { BountyQuery, BountyStore, RewardQuery } from "../bounty-store.js"; +import type { AgentIdentity } from "../models.js"; + +export class FailingBountyStore implements BountyStore { + readonly storeIdentity: string | undefined; + private readonly delegate: BountyStore; + private failureTarget: string | undefined; + private failureError: Error; + + constructor(delegate: BountyStore) { + this.delegate = delegate; + this.storeIdentity = delegate.storeIdentity; + this.failureError = new Error("Simulated post-commit failure"); + } + + /** + * Configure the next call to the named method to succeed internally + * (the delegate commits the change) but throw to the caller, simulating + * a failure AFTER the document write but BEFORE the caller sees success. + */ + failOnNext(method: string, error?: Error): void { + this.failureTarget = method; + if (error) this.failureError = error; + } + + /** Clear any pending failure configuration. */ + clearFailure(): void { + this.failureTarget = undefined; + } + + private async maybeFailAfter(method: string, result: T): Promise { + if (this.failureTarget === method) { + this.failureTarget = undefined; // one-shot + throw this.failureError; + } + return result; + } + + // ----------------------------------------------------------------------- + // Delegated methods with failure injection + // ----------------------------------------------------------------------- + + async createBounty(bounty: Bounty): Promise { + const result = await this.delegate.createBounty(bounty); + return this.maybeFailAfter("createBounty", result); + } + + async getBounty(bountyId: string): Promise { + return this.delegate.getBounty(bountyId); + } + + async listBounties(query?: BountyQuery): Promise { + return this.delegate.listBounties(query); + } + + async countBounties(query?: BountyQuery): Promise { + return this.delegate.countBounties(query); + } + + async fundBounty(bountyId: string, reservationId: string): Promise { + const result = await this.delegate.fundBounty(bountyId, reservationId); + return this.maybeFailAfter("fundBounty", result); + } + + async claimBounty(bountyId: string, claimedBy: AgentIdentity, claimId: string): Promise { + const result = await this.delegate.claimBounty(bountyId, claimedBy, claimId); + return this.maybeFailAfter("claimBounty", result); + } + + async beginSettlement(bountyId: string, fulfilledByCid: string): Promise { + const result = await this.delegate.beginSettlement(bountyId, fulfilledByCid); + return this.maybeFailAfter("beginSettlement", result); + } + + async completeBounty(bountyId: string, fulfilledByCid: string): Promise { + const result = await this.delegate.completeBounty(bountyId, fulfilledByCid); + return this.maybeFailAfter("completeBounty", result); + } + + async settleBounty(bountyId: string): Promise { + const result = await this.delegate.settleBounty(bountyId); + return this.maybeFailAfter("settleBounty", result); + } + + async expireBounty(bountyId: string): Promise { + const result = await this.delegate.expireBounty(bountyId); + return this.maybeFailAfter("expireBounty", result); + } + + async cancelBounty(bountyId: string): Promise { + const result = await this.delegate.cancelBounty(bountyId); + return this.maybeFailAfter("cancelBounty", result); + } + + async findExpiredBounties(): Promise { + return this.delegate.findExpiredBounties(); + } + + async recordReward(reward: RewardRecord): Promise { + return this.delegate.recordReward(reward); + } + + async hasReward(rewardId: string): Promise { + return this.delegate.hasReward(rewardId); + } + + async listRewards(query?: RewardQuery): Promise { + return this.delegate.listRewards(query); + } + + async repairIndex(bountyId: string): Promise { + return this.delegate.repairIndex?.(bountyId); + } + + close(): void { + this.delegate.close(); + } +} diff --git a/src/core/settlement-sweep.ts b/src/core/settlement-sweep.ts new file mode 100644 index 00000000..7af3fe30 --- /dev/null +++ b/src/core/settlement-sweep.ts @@ -0,0 +1,85 @@ +/** + * SettlementSweep — reconciler strategy for resuming stalled bounty settlements. + * + * Finds bounties stuck in "pending_settlement" (the saga pivot state) and + * attempts to resume them: capture credits (idempotent), then advance through + * completed → settled. + * + * This handles the case where the client crashed after the pivot transition + * but before completing the settle flow (Issue #240, finding #1). + */ + +import type { Bounty, BountyStatus } from "./bounty.js"; +import type { BountyStore } from "./bounty-store.js"; +import type { CreditsService } from "./credits.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +export class SettlementSweep implements SweepStrategy { + readonly name = "SettlementSweep"; + private readonly bountyStore: BountyStore; + private readonly creditsService: CreditsService | undefined; + + constructor(bountyStore: BountyStore, creditsService?: CreditsService) { + this.bountyStore = bountyStore; + this.creditsService = creditsService; + } + + async sweep(): Promise { + let found = 0; + let repaired = 0; + const errors: Error[] = []; + + try { + const pending = await this.bountyStore.listBounties({ + status: "pending_settlement" as BountyStatus, + }); + found = pending.length; + + for (const bounty of pending) { + try { + await this.resumeSettlement(bounty); + repaired++; + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Settlement resume failed for ${bounty.bountyId}: ${String(err)}`), + ); + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`SettlementSweep scan failed: ${String(err)}`), + ); + } + + return { strategy: this.name, found, repaired, errors }; + } + + /** + * Resume a stalled settlement. Idempotent — safe to call multiple times. + * + * Steps: + * 1. Capture credits (idempotent — already-captured is a no-op) + * 2. Advance: pending_settlement → completed → settled + */ + private async resumeSettlement(bounty: Bounty): Promise { + // Capture credits if escrow is active + if (this.creditsService && bounty.reservationId) { + if (bounty.claimedBy) { + await this.creditsService.capture(bounty.reservationId, { + toAgentId: bounty.claimedBy.agentId, + }); + } else { + await this.creditsService.capture(bounty.reservationId); + } + } + + // Advance through completed → settled + const completed = await this.bountyStore.completeBounty( + bounty.bountyId, + bounty.fulfilledByCid ?? "", + ); + await this.bountyStore.settleBounty(completed.bountyId); + } +} diff --git a/src/core/sweep-reconciler.test.ts b/src/core/sweep-reconciler.test.ts new file mode 100644 index 00000000..334f035f --- /dev/null +++ b/src/core/sweep-reconciler.test.ts @@ -0,0 +1,342 @@ +/** + * Tests for the SweepReconciler framework and sweep strategies. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; + +import { BountyIndexSweep } from "./bounty-index-sweep.js"; +import { HandoffSweep } from "./handoff-sweep.js"; +import type { InMemoryCreditsService } from "./in-memory-credits.js"; +import { + claimBountyOperation, + createBountyOperation, + settleBountyOperation, +} from "./operations/bounty.js"; +import { contributeOperation } from "./operations/contribute.js"; +import type { OperationDeps } from "./operations/deps.js"; +import type { TestOperationDeps } from "./operations/test-helpers.js"; +import { createTestOperationDeps } from "./operations/test-helpers.js"; +import { SettlementSweep } from "./settlement-sweep.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; +import { SweepReconciler } from "./sweep-reconciler.js"; + +// --------------------------------------------------------------------------- +// SweepReconciler framework tests +// --------------------------------------------------------------------------- + +describe("SweepReconciler", () => { + test("starts and stops without strategies", () => { + const sr = new SweepReconciler({ intervalMs: 10_000 }); + expect(sr.isRunning).toBe(false); + expect(sr.strategyCount).toBe(0); + sr.start(); + expect(sr.isRunning).toBe(true); + sr.stop(); + expect(sr.isRunning).toBe(false); + }); + + test("start is idempotent", () => { + const sr = new SweepReconciler({ intervalMs: 10_000 }); + sr.start(); + sr.start(); // should not create a second timer + expect(sr.isRunning).toBe(true); + sr.stop(); + }); + + test("registers and runs strategies", async () => { + const results: SweepResult[] = []; + const strategy: SweepStrategy = { + name: "TestSweep", + async sweep() { + return { strategy: "TestSweep", found: 3, repaired: 2, errors: [] }; + }, + }; + + const sr = new SweepReconciler({ + onCycle: (r) => results.push(...r), + }); + sr.register(strategy); + expect(sr.strategyCount).toBe(1); + + await sr.runCycle(); + + expect(results).toHaveLength(1); + expect(results[0]?.strategy).toBe("TestSweep"); + expect(results[0]?.found).toBe(3); + expect(results[0]?.repaired).toBe(2); + }); + + test("catches strategy throws and reports via onError", async () => { + const errors: unknown[] = []; + const strategy: SweepStrategy = { + name: "ThrowingSweep", + async sweep() { + throw new Error("boom"); + }, + }; + + const sr = new SweepReconciler({ + onError: (e) => errors.push(e), + }); + sr.register(strategy); + + const results = await sr.runCycle(); + + expect(results).toHaveLength(1); + expect(results[0]?.errors).toHaveLength(1); + expect(results[0]?.errors[0]?.message).toBe("boom"); + expect(errors).toHaveLength(1); + }); + + test("runs multiple strategies sequentially", async () => { + const order: string[] = []; + + const sr = new SweepReconciler(); + sr.register({ + name: "First", + async sweep() { + order.push("first"); + return { strategy: "First", found: 0, repaired: 0, errors: [] }; + }, + }); + sr.register({ + name: "Second", + async sweep() { + order.push("second"); + return { strategy: "Second", found: 0, repaired: 0, errors: [] }; + }, + }); + + await sr.runCycle(); + expect(order).toEqual(["first", "second"]); + }); +}); + +// --------------------------------------------------------------------------- +// BountyIndexSweep tests +// --------------------------------------------------------------------------- + +describe("BountyIndexSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues on empty store", async () => { + const sweep = new BountyIndexSweep(deps.bountyStore!); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("BountyIndexSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("reports zero issues when bounties are consistent", async () => { + (deps.creditsService as InMemoryCreditsService).seed("agent-1", 5000); + + await createBountyOperation( + { + title: "Bounty A", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "agent-1" }, + }, + deps, + ); + await createBountyOperation( + { + title: "Bounty B", + amount: 200, + criteria: { description: "work" }, + agent: { agentId: "agent-1" }, + }, + deps, + ); + + const sweep = new BountyIndexSweep(deps.bountyStore!); + const result = await sweep.sweep(); + + expect(result.found).toBe(0); + expect(result.errors).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// SettlementSweep tests +// --------------------------------------------------------------------------- + +describe("SettlementSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues when no pending_settlement bounties exist", async () => { + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("SettlementSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("resumes a pending_settlement bounty to settled", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Create → claim → contribute + const bounty = await createBountyOperation( + { + title: "Stalled settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Simulate: settle starts, pivot succeeds, but capture fails + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated"), + }); + const failedSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(failedSettle.ok).toBe(false); + + // Verify stuck in pending_settlement + const stuck = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stuck?.status).toBe("pending_settlement"); + + // Clear failure and run the sweep + (deps.creditsService as InMemoryCreditsService).setFailures({}); + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.found).toBe(1); + expect(result.repaired).toBe(1); + expect(result.errors).toHaveLength(0); + + // Verify bounty is now settled + const settled = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(settled?.status).toBe("settled"); + }); + + test("reports error when resume fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Permanent fail", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Get into pending_settlement + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated"), + }); + await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + + // Run sweep with capture STILL failing + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.found).toBe(1); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(1); + expect(result.errors[0]?.message).toContain("Simulated"); + }); +}); + +// --------------------------------------------------------------------------- +// HandoffSweep tests +// --------------------------------------------------------------------------- + +describe("HandoffSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues on empty store", async () => { + const sweep = new HandoffSweep(deps.contributionStore!, deps.handoffStore!); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("HandoffSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("detects contributions without handoffs", async () => { + // Create a "work" contribution directly (no handoff fan-out since no topology) + await contributeOperation( + { kind: "work", summary: "Orphaned work", agent: { agentId: "worker" } }, + deps, + ); + + const sweep = new HandoffSweep(deps.contributionStore!, deps.handoffStore!); + const result = await sweep.sweep(); + + // The contribution has kind "work" (handoff-eligible) but no handoffs + expect(result.found).toBe(1); + // Detection-only — no repair + expect(result.repaired).toBe(0); + }); +}); diff --git a/src/core/sweep-reconciler.ts b/src/core/sweep-reconciler.ts new file mode 100644 index 00000000..8d97fa29 --- /dev/null +++ b/src/core/sweep-reconciler.ts @@ -0,0 +1,155 @@ +/** + * Unified sweep reconciler with pluggable strategies. + * + * Runs registered SweepStrategy instances on a shared timer. Each strategy + * independently scans a store for inconsistencies and repairs them using + * idempotent writes. Strategies run sequentially within a cycle to avoid + * contention on shared stores. + * + * Distinct from the claim/workspace Reconciler in reconciler.ts — this + * framework handles bounty-store atomicity, handoff orphans, and settlement + * resumption. + * + * Usage: + * const sr = new SweepReconciler({ intervalMs: 60_000 }); + * sr.register(new BountyIndexSweep(bountyStore)); + * sr.register(new SettlementSweep(bountyStore, creditsService)); + * sr.start(); + * // later: + * sr.stop(); + */ + +// --------------------------------------------------------------------------- +// Strategy interface +// --------------------------------------------------------------------------- + +/** Result of a single sweep execution. */ +export interface SweepResult { + /** Strategy name (for logging). */ + readonly strategy: string; + /** Number of inconsistencies found. */ + readonly found: number; + /** Number of inconsistencies repaired. */ + readonly repaired: number; + /** Errors encountered during sweep (non-fatal). */ + readonly errors: readonly Error[]; +} + +/** + * A sweep strategy that scans for and repairs a specific class of + * inconsistency. Strategies must be idempotent — running the same + * sweep twice with no intervening changes should be a no-op. + */ +export interface SweepStrategy { + /** Human-readable name for logging. */ + readonly name: string; + + /** + * Execute the sweep. Returns a summary of what was found and repaired. + * Must not throw — errors should be collected in the result. + */ + sweep(): Promise; +} + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** Options for the sweep reconciler. */ +export interface SweepReconcilerConfig { + /** Interval between sweep cycles in milliseconds. Default: 60_000 (1 minute). */ + readonly intervalMs?: number | undefined; + + /** + * Called after each sweep cycle with results from all strategies. + * Use for logging, monitoring, or alerting. + */ + readonly onCycle?: (results: readonly SweepResult[]) => void; + + /** + * Called when the reconciler encounters an unexpected error + * (not from individual strategy sweeps, which report their own errors). + */ + readonly onError?: (error: unknown) => void; +} + +const DEFAULT_INTERVAL_MS = 60_000; + +// --------------------------------------------------------------------------- +// SweepReconciler +// --------------------------------------------------------------------------- + +/** + * Periodic sweep reconciler. + * + * Strategies are executed sequentially per cycle. Each strategy must + * handle its own errors and report them in the SweepResult. An + * unhandled throw from a strategy is caught and reported via onError. + */ +export class SweepReconciler { + private readonly strategies: SweepStrategy[] = []; + private readonly intervalMs: number; + private readonly onCycle?: (results: readonly SweepResult[]) => void; + private readonly onError?: (error: unknown) => void; + private timer: ReturnType | undefined; + private running = false; + + constructor(config?: SweepReconcilerConfig) { + this.intervalMs = config?.intervalMs ?? DEFAULT_INTERVAL_MS; + this.onCycle = config?.onCycle; + this.onError = config?.onError; + } + + /** Register a sweep strategy. Must be called before start(). */ + register(strategy: SweepStrategy): void { + this.strategies.push(strategy); + } + + /** Start the reconciler timer. No-op if already running. */ + start(): void { + if (this.running) return; + this.running = true; + this.timer = setInterval(() => void this.runCycle(), this.intervalMs); + } + + /** Stop the reconciler timer and cancel any pending cycle. */ + stop(): void { + this.running = false; + if (this.timer !== undefined) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** Run one sweep cycle immediately (useful for testing or on-demand). */ + async runCycle(): Promise { + const results: SweepResult[] = []; + for (const strategy of this.strategies) { + try { + const result = await strategy.sweep(); + results.push(result); + } catch (err) { + // Strategy.sweep() should not throw, but guard against it + results.push({ + strategy: strategy.name, + found: 0, + repaired: 0, + errors: [err instanceof Error ? err : new Error(String(err))], + }); + this.onError?.(err); + } + } + this.onCycle?.(results); + return results; + } + + /** Whether the reconciler is currently running. */ + get isRunning(): boolean { + return this.running; + } + + /** Number of registered strategies. */ + get strategyCount(): number { + return this.strategies.length; + } +} diff --git a/src/local/sqlite-bounty-store.ts b/src/local/sqlite-bounty-store.ts index aba80040..051a1e5d 100644 --- a/src/local/sqlite-bounty-store.ts +++ b/src/local/sqlite-bounty-store.ts @@ -263,6 +263,20 @@ export class SqliteBountyStore implements BountyStore { })); }; + beginSettlement = async (bountyId: string, fulfilledByCid: string): Promise => { + return this.transitionBounty( + bountyId, + BountyStatus.PendingSettlement, + "beginSettlement", + (bounty) => ({ + ...bounty, + status: BountyStatus.PendingSettlement, + fulfilledByCid, + updatedAt: nowUtcIso(), + }), + ); + }; + completeBounty = async (bountyId: string, fulfilledByCid: string): Promise => { return this.transitionBounty(bountyId, BountyStatus.Completed, "complete", (bounty) => ({ ...bounty, @@ -306,7 +320,7 @@ export class SqliteBountyStore implements BountyStore { .prepare( `SELECT * FROM bounties WHERE deadline < ? - AND status IN ('open', 'claimed') + AND status IN ('open', 'claimed', 'pending_settlement') ORDER BY deadline ASC`, ) .all(now) as BountyRow[]; diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index 8f6dbeb0..ce270957 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -15,11 +15,12 @@ import { NotFoundError, StateConflictError } from "../core/errors.js"; import type { AgentIdentity } from "../core/models.js"; import { safeCleanup } from "../shared/safe-cleanup.js"; import { batchParallel } from "./batch.js"; -import type { ListEntry, NexusClient } from "./client.js"; +import type { ListEntry, NexusClient, WriteResult } from "./client.js"; import type { NexusConfig, ResolvedNexusConfig } from "./config.js"; import { resolveConfig } from "./config.js"; import { NexusConflictError } from "./errors.js"; import { listAllPages } from "./list-pages.js"; +import { LruCache } from "./lru-cache.js"; import { withRetry, withSemaphore } from "./retry.js"; import { Semaphore } from "./semaphore.js"; import { @@ -60,6 +61,7 @@ export class NexusBountyStore implements BountyStore { private readonly config: ResolvedNexusConfig; private readonly semaphore: Semaphore; private readonly zoneId: string; + private readonly cache: LruCache; constructor(config: NexusConfig) { this.config = resolveConfig(config); @@ -67,6 +69,7 @@ export class NexusBountyStore implements BountyStore { this.zoneId = this.config.zoneId; this.storeIdentity = `nexus:${this.zoneId}:bounties`; this.semaphore = new Semaphore(this.config.maxConcurrency); + this.cache = new LruCache(this.config.cacheMaxEntries); } async createBounty(bounty: Bounty): Promise { @@ -77,8 +80,9 @@ export class NexusBountyStore implements BountyStore { updatedAt: now, }; + let writeResult: WriteResult; try { - await withRetry( + writeResult = await withRetry( () => withSemaphore(this.semaphore, () => this.client.write(bountyPath(this.zoneId, bounty.bountyId), encodeBounty(created), { @@ -99,19 +103,19 @@ export class NexusBountyStore implements BountyStore { throw err; } + this.cache.set(bounty.bountyId, { bounty: created, etag: writeResult.etag }); await this.writeStatusIndex(created); return created; } async getBounty(bountyId: string): Promise { - const data = await withRetry( - () => - withSemaphore(this.semaphore, () => this.client.read(bountyPath(this.zoneId, bountyId))), - "getBounty", - this.config, - ); - if (data === undefined) return undefined; - return decodeBounty(data); + const cached = this.cache.get(bountyId); + if (cached) return cached.bounty; + + const result = await this.readBountyWithEtag(bountyId); + if (!result) return undefined; + this.cache.set(bountyId, result); + return result.bounty; } async listBounties(query?: BountyQuery): Promise { @@ -191,6 +195,13 @@ export class NexusBountyStore implements BountyStore { })); } + async beginSettlement(bountyId: string, fulfilledByCid: string): Promise { + return this.transitionBounty(bountyId, "pending_settlement" as BountyStatus, (b) => ({ + ...b, + fulfilledByCid, + })); + } + async completeBounty(bountyId: string, fulfilledByCid: string): Promise { return this.transitionBounty(bountyId, "completed" as BountyStatus, (b) => ({ ...b, @@ -212,12 +223,45 @@ export class NexusBountyStore implements BountyStore { async findExpiredBounties(): Promise { const now = new Date(); - const openBounties = await this.listBounties({ status: "open" as BountyStatus }); - const claimedBounties = await this.listBounties({ status: "claimed" as BountyStatus }); - const all = [...openBounties, ...claimedBounties]; + const statuses: BountyStatus[] = ["open", "claimed", "pending_settlement"] as BountyStatus[]; + const all: Bounty[] = []; + for (const status of statuses) { + const bounties = await this.listBounties({ status }); + all.push(...bounties); + } return all.filter((b) => new Date(b.deadline).getTime() < now.getTime()); } + async repairIndex(bountyId: string): Promise { + const bounty = await this.getBounty(bountyId); + if (!bounty) return; + + // Ensure the correct status index entry exists + await this.writeStatusIndex(bounty); + + // Clean stale index entries for other statuses + const allStatuses: BountyStatus[] = [ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ] as BountyStatus[]; + for (const status of allStatuses) { + if (status === bounty.status) continue; + await safeCleanup( + withSemaphore(this.semaphore, () => + this.client.delete(bountyStatusIndexPath(this.zoneId, status, bountyId)), + ), + "repairIndex: delete stale status index", + { silent: true }, + ); + } + } + async recordReward(_reward: RewardRecord): Promise { // Rewards are not yet stored in Nexus VFS (future work) } @@ -231,7 +275,7 @@ export class NexusBountyStore implements BountyStore { } close(): void { - // No-op — no local state to release + this.cache.clear(); } // ----------------------------------------------------------------------- @@ -243,7 +287,10 @@ export class NexusBountyStore implements BountyStore { newStatus: BountyStatus, transform?: (b: Bounty) => Bounty, ): Promise { - const result = await this.readBountyWithEtag(bountyId); + // Check cache for a fresh bounty+etag to avoid a redundant VFS read. + // Falls back to readBountyWithEtag on cache miss. + const cached = this.cache.get(bountyId); + const result = cached ?? (await this.readBountyWithEtag(bountyId)); if (!result) throw new NotFoundError({ resource: "Bounty", @@ -260,7 +307,12 @@ export class NexusBountyStore implements BountyStore { }; if (transform) updated = transform(updated); - await this.writeBountyCas(updated, etag); + const newEtag = await this.writeBountyCas(updated, etag); + + // Update cache with the freshly written bounty + new ETag so that + // a subsequent transition in the same call chain (e.g. complete → settle) + // can skip the VFS read entirely. + this.cache.set(bountyId, { bounty: updated, etag: newEtag }); // Clean up old status index if (oldStatus !== newStatus) { @@ -291,9 +343,9 @@ export class NexusBountyStore implements BountyStore { return { bounty: decodeBounty(result.content), etag: result.etag }; } - /** Write bounty with ifMatch for CAS safety on mutations. */ - private async writeBountyCas(bounty: Bounty, expectedEtag: string): Promise { - await withRetry( + /** Write bounty with ifMatch for CAS safety on mutations. Returns the new ETag. */ + private async writeBountyCas(bounty: Bounty, expectedEtag: string): Promise { + const result = await withRetry( () => withSemaphore(this.semaphore, () => this.client.write(bountyPath(this.zoneId, bounty.bountyId), encodeBounty(bounty), { @@ -303,6 +355,7 @@ export class NexusBountyStore implements BountyStore { "writeBountyCas", this.config, ); + return result.etag; } private async writeStatusIndex(bounty: Bounty): Promise { From b9a11935543d3496caa550a4719a8a4fc96f6c16 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:03:11 -0700 Subject: [PATCH 02/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=201=20=E2=80=94=203=20HIGH=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Freeze fulfillment CID after saga pivot: when resuming a pending_settlement bounty, reject attempts to change the contribution CID. Prevents non-deterministic settlements. 2. Remove stale cache from transitionBounty: mutations always read fresh from VFS to get a valid ETag. Cache is still used for read-only getBounty() pre-flight checks. 3. Wire SweepReconciler into server startup: BountyIndexSweep and SettlementSweep now run on a 60s timer with graceful shutdown. Closes the "recovery not wired" gap. --- src/core/operations/bounty.ts | 23 ++++++++++++++++++----- src/nexus/nexus-bounty-store.ts | 13 ++++++------- src/server/serve.ts | 31 +++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index e3bbe622..306bfb43 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -300,13 +300,26 @@ export async function settleBountyOperation( ); } + // On resume from pending_settlement, the fulfillment CID is frozen — + // reject attempts to change it (prevents non-deterministic settlements). + let fulfilledByCid = input.contributionCid; + if (bounty.status === BS.PendingSettlement) { + if (bounty.fulfilledByCid && input.contributionCid !== bounty.fulfilledByCid) { + return validationErr( + `Bounty '${input.bountyId}' is already pending settlement with contribution ` + + `'${bounty.fulfilledByCid}' — cannot change to '${input.contributionCid}'`, + ); + } + fulfilledByCid = bounty.fulfilledByCid ?? input.contributionCid; + } + // Validate contribution exists and meets criteria - const contribution = await deps.contributionStore.get(input.contributionCid); + const contribution = await deps.contributionStore.get(fulfilledByCid); if (!contribution) { - return notFound("Contribution", input.contributionCid); + return notFound("Contribution", fulfilledByCid); } if (!evaluateBountyCriteria(bounty.criteria, contribution)) { - return validationErr(`Contribution '${input.contributionCid}' does not meet bounty criteria`); + return validationErr(`Contribution '${fulfilledByCid}' does not meet bounty criteria`); } // Require credits service when escrow is active @@ -318,7 +331,7 @@ export async function settleBountyOperation( // Step 1: Pivot — transition to pending_settlement (skip if resuming) if (bounty.status === BS.Claimed) { - await deps.bountyStore.beginSettlement(input.bountyId, input.contributionCid); + await deps.bountyStore.beginSettlement(input.bountyId, fulfilledByCid); } // Step 2: Capture payment (idempotent — safe to retry) @@ -331,7 +344,7 @@ export async function settleBountyOperation( } // Step 3: Advance through completed → settled (retryable after pivot) - const completed = await deps.bountyStore.completeBounty(input.bountyId, input.contributionCid); + const completed = await deps.bountyStore.completeBounty(input.bountyId, fulfilledByCid); const settled = await deps.bountyStore.settleBounty(completed.bountyId); return ok({ diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index ce270957..d395208b 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -287,10 +287,11 @@ export class NexusBountyStore implements BountyStore { newStatus: BountyStatus, transform?: (b: Bounty) => Bounty, ): Promise { - // Check cache for a fresh bounty+etag to avoid a redundant VFS read. - // Falls back to readBountyWithEtag on cache miss. - const cached = this.cache.get(bountyId); - const result = cached ?? (await this.readBountyWithEtag(bountyId)); + // Always read fresh from VFS for mutations — do NOT use the cache here. + // The cache is safe for read-only getBounty() (pre-flight checks) but + // mutations must use a fresh ETag to avoid CAS failures against concurrent + // writers in other processes. + const result = await this.readBountyWithEtag(bountyId); if (!result) throw new NotFoundError({ resource: "Bounty", @@ -309,9 +310,7 @@ export class NexusBountyStore implements BountyStore { const newEtag = await this.writeBountyCas(updated, etag); - // Update cache with the freshly written bounty + new ETag so that - // a subsequent transition in the same call chain (e.g. complete → settle) - // can skip the VFS read entirely. + // Update cache after successful write so subsequent reads are fresh. this.cache.set(bountyId, { bounty: updated, etag: newEtag }); // Clean up old status index diff --git a/src/server/serve.ts b/src/server/serve.ts index 48307cff..2534ae98 100644 --- a/src/server/serve.ts +++ b/src/server/serve.ts @@ -95,6 +95,34 @@ const deps: ServerDeps = { const app = createApp(deps); +// --------------------------------------------------------------------------- +// Background sweep reconciler +// --------------------------------------------------------------------------- + +import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; +import { SweepReconciler } from "../core/sweep-reconciler.js"; + +let sweepReconciler: SweepReconciler | undefined; +if (serverBountyStore) { + sweepReconciler = new SweepReconciler({ + intervalMs: 60_000, + onCycle(results) { + for (const r of results) { + if (r.found > 0 || r.errors.length > 0) { + console.log( + `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}`, + ); + } + } + }, + }); + sweepReconciler.register(new BountyIndexSweep(serverBountyStore)); + sweepReconciler.register(new SettlementSweep(serverBountyStore)); + sweepReconciler.start(); + console.log("sweep-reconciler started (BountyIndexSweep, SettlementSweep)"); +} + // --------------------------------------------------------------------------- // Optional SessionService + WebSocket push // --------------------------------------------------------------------------- @@ -195,6 +223,9 @@ console.log(`grove-server listening on http://${HOST ?? "localhost"}:${server.po // Graceful shutdown async function shutdown(): Promise { console.log("Shutting down..."); + if (sweepReconciler) { + sweepReconciler.stop(); + } if (sessionService) { sessionService.destroy(); } From 38af8e19fe5585f1db8af15c89439a2a5c8ce189 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:14:26 -0700 Subject: [PATCH 03/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=202=20=E2=80=94=202=20HIGH=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Remove process-local bounty cache entirely: mutable objects must not be cached without cross-process invalidation. getBounty() now always reads fresh from VFS. Add validateBountyTransition() call in transitionBounty() to reject stale state before CAS write. 2. Extend settlement recovery to handle "completed" status: if capture succeeded and completeBounty committed but settleBounty failed, the operation and SettlementSweep can now resume from "completed" state. Prevents stranded post-capture bounties. --- src/core/operations/bounty.ts | 22 +++++++++------ src/core/settlement-sweep.ts | 30 ++++++++++++--------- src/nexus/nexus-bounty-store.ts | 48 +++++++++++++++------------------ 3 files changed, 53 insertions(+), 47 deletions(-) diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index 306bfb43..1b59be9d 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -293,17 +293,21 @@ export async function settleBountyOperation( return notFound("Bounty", input.bountyId); } - // Allow both "claimed" (fresh settle) and "pending_settlement" (retry/resume) - if (bounty.status !== BS.Claimed && bounty.status !== BS.PendingSettlement) { + // Allow "claimed" (fresh), "pending_settlement" (post-pivot), "completed" (post-capture) + const resumable = + bounty.status === BS.Claimed || + bounty.status === BS.PendingSettlement || + bounty.status === BS.Completed; + if (!resumable) { return validationErr( `Bounty '${input.bountyId}' cannot be settled (current status: ${bounty.status})`, ); } - // On resume from pending_settlement, the fulfillment CID is frozen — - // reject attempts to change it (prevents non-deterministic settlements). + // On resume from pending_settlement or completed, the fulfillment CID + // is frozen — reject attempts to change it. let fulfilledByCid = input.contributionCid; - if (bounty.status === BS.PendingSettlement) { + if (bounty.status !== BS.Claimed) { if (bounty.fulfilledByCid && input.contributionCid !== bounty.fulfilledByCid) { return validationErr( `Bounty '${input.bountyId}' is already pending settlement with contribution ` + @@ -343,9 +347,11 @@ export async function settleBountyOperation( await deps.creditsService.capture(bounty.reservationId); } - // Step 3: Advance through completed → settled (retryable after pivot) - const completed = await deps.bountyStore.completeBounty(input.bountyId, fulfilledByCid); - const settled = await deps.bountyStore.settleBounty(completed.bountyId); + // Step 3: Advance through completed → settled (skip steps already done) + if (bounty.status !== BS.Completed) { + await deps.bountyStore.completeBounty(input.bountyId, fulfilledByCid); + } + const settled = await deps.bountyStore.settleBounty(input.bountyId); return ok({ bountyId: settled.bountyId, diff --git a/src/core/settlement-sweep.ts b/src/core/settlement-sweep.ts index 7af3fe30..15ed27e1 100644 --- a/src/core/settlement-sweep.ts +++ b/src/core/settlement-sweep.ts @@ -30,12 +30,21 @@ export class SettlementSweep implements SweepStrategy { const errors: Error[] = []; try { + // Scan both pending_settlement AND completed bounties that have a + // fulfilledByCid — the latter covers the case where capture succeeded + // and completeBounty committed but settleBounty failed. const pending = await this.bountyStore.listBounties({ status: "pending_settlement" as BountyStatus, }); - found = pending.length; + const completed = await this.bountyStore.listBounties({ + status: "completed" as BountyStatus, + }); + // Only resume completed bounties that have a fulfillment CID + // (indicating they were mid-settlement, not just manually completed). + const stalled = [...pending, ...completed.filter((b) => b.fulfilledByCid)]; + found = stalled.length; - for (const bounty of pending) { + for (const bounty of stalled) { try { await this.resumeSettlement(bounty); repaired++; @@ -59,12 +68,10 @@ export class SettlementSweep implements SweepStrategy { /** * Resume a stalled settlement. Idempotent — safe to call multiple times. * - * Steps: - * 1. Capture credits (idempotent — already-captured is a no-op) - * 2. Advance: pending_settlement → completed → settled + * Handles both pending_settlement (pre-capture) and completed (post-capture). */ private async resumeSettlement(bounty: Bounty): Promise { - // Capture credits if escrow is active + // Capture credits if escrow is active (idempotent — already-captured is a no-op) if (this.creditsService && bounty.reservationId) { if (bounty.claimedBy) { await this.creditsService.capture(bounty.reservationId, { @@ -75,11 +82,10 @@ export class SettlementSweep implements SweepStrategy { } } - // Advance through completed → settled - const completed = await this.bountyStore.completeBounty( - bounty.bountyId, - bounty.fulfilledByCid ?? "", - ); - await this.bountyStore.settleBounty(completed.bountyId); + // Advance through remaining states (skip steps already done) + if (bounty.status !== "completed") { + await this.bountyStore.completeBounty(bounty.bountyId, bounty.fulfilledByCid ?? ""); + } + await this.bountyStore.settleBounty(bounty.bountyId); } } diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index d395208b..e89ab948 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -10,17 +10,17 @@ */ import type { Bounty, BountyStatus, RewardRecord } from "../core/bounty.js"; +import { validateBountyTransition } from "../core/bounty-logic.js"; import type { BountyQuery, BountyStore, RewardQuery } from "../core/bounty-store.js"; import { NotFoundError, StateConflictError } from "../core/errors.js"; import type { AgentIdentity } from "../core/models.js"; import { safeCleanup } from "../shared/safe-cleanup.js"; import { batchParallel } from "./batch.js"; -import type { ListEntry, NexusClient, WriteResult } from "./client.js"; +import type { ListEntry, NexusClient } from "./client.js"; import type { NexusConfig, ResolvedNexusConfig } from "./config.js"; import { resolveConfig } from "./config.js"; import { NexusConflictError } from "./errors.js"; import { listAllPages } from "./list-pages.js"; -import { LruCache } from "./lru-cache.js"; import { withRetry, withSemaphore } from "./retry.js"; import { Semaphore } from "./semaphore.js"; import { @@ -61,7 +61,6 @@ export class NexusBountyStore implements BountyStore { private readonly config: ResolvedNexusConfig; private readonly semaphore: Semaphore; private readonly zoneId: string; - private readonly cache: LruCache; constructor(config: NexusConfig) { this.config = resolveConfig(config); @@ -69,7 +68,6 @@ export class NexusBountyStore implements BountyStore { this.zoneId = this.config.zoneId; this.storeIdentity = `nexus:${this.zoneId}:bounties`; this.semaphore = new Semaphore(this.config.maxConcurrency); - this.cache = new LruCache(this.config.cacheMaxEntries); } async createBounty(bounty: Bounty): Promise { @@ -80,9 +78,8 @@ export class NexusBountyStore implements BountyStore { updatedAt: now, }; - let writeResult: WriteResult; try { - writeResult = await withRetry( + await withRetry( () => withSemaphore(this.semaphore, () => this.client.write(bountyPath(this.zoneId, bounty.bountyId), encodeBounty(created), { @@ -103,19 +100,19 @@ export class NexusBountyStore implements BountyStore { throw err; } - this.cache.set(bounty.bountyId, { bounty: created, etag: writeResult.etag }); await this.writeStatusIndex(created); return created; } async getBounty(bountyId: string): Promise { - const cached = this.cache.get(bountyId); - if (cached) return cached.bounty; - - const result = await this.readBountyWithEtag(bountyId); - if (!result) return undefined; - this.cache.set(bountyId, result); - return result.bounty; + const data = await withRetry( + () => + withSemaphore(this.semaphore, () => this.client.read(bountyPath(this.zoneId, bountyId))), + "getBounty", + this.config, + ); + if (data === undefined) return undefined; + return decodeBounty(data); } async listBounties(query?: BountyQuery): Promise { @@ -275,7 +272,7 @@ export class NexusBountyStore implements BountyStore { } close(): void { - this.cache.clear(); + // No-op — no local state to release } // ----------------------------------------------------------------------- @@ -287,10 +284,6 @@ export class NexusBountyStore implements BountyStore { newStatus: BountyStatus, transform?: (b: Bounty) => Bounty, ): Promise { - // Always read fresh from VFS for mutations — do NOT use the cache here. - // The cache is safe for read-only getBounty() (pre-flight checks) but - // mutations must use a fresh ETag to avoid CAS failures against concurrent - // writers in other processes. const result = await this.readBountyWithEtag(bountyId); if (!result) throw new NotFoundError({ @@ -301,6 +294,11 @@ export class NexusBountyStore implements BountyStore { const { bounty: existing, etag } = result; const oldStatus = existing.status; + + // Validate the transition before writing — catches stale state from + // concurrent writers before the CAS round-trip. + validateBountyTransition(bountyId, oldStatus, newStatus, `transition to ${newStatus}`); + let updated: Bounty = { ...existing, status: newStatus, @@ -308,10 +306,7 @@ export class NexusBountyStore implements BountyStore { }; if (transform) updated = transform(updated); - const newEtag = await this.writeBountyCas(updated, etag); - - // Update cache after successful write so subsequent reads are fresh. - this.cache.set(bountyId, { bounty: updated, etag: newEtag }); + await this.writeBountyCas(updated, etag); // Clean up old status index if (oldStatus !== newStatus) { @@ -342,9 +337,9 @@ export class NexusBountyStore implements BountyStore { return { bounty: decodeBounty(result.content), etag: result.etag }; } - /** Write bounty with ifMatch for CAS safety on mutations. Returns the new ETag. */ - private async writeBountyCas(bounty: Bounty, expectedEtag: string): Promise { - const result = await withRetry( + /** Write bounty with ifMatch for CAS safety on mutations. */ + private async writeBountyCas(bounty: Bounty, expectedEtag: string): Promise { + await withRetry( () => withSemaphore(this.semaphore, () => this.client.write(bountyPath(this.zoneId, bounty.bountyId), encodeBounty(bounty), { @@ -354,7 +349,6 @@ export class NexusBountyStore implements BountyStore { "writeBountyCas", this.config, ); - return result.etag; } private async writeStatusIndex(bounty: Bounty): Promise { From 910967f2451c59dddcc5d425bca73ed09718f960 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:24:19 -0700 Subject: [PATCH 04/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=203=20=E2=80=94=201=20CRITICAL,=201=20HIGH,=201=20?= =?UTF-8?q?MEDIUM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [critical] SettlementSweep hard-fails when bounty has reservationId but no creditsService — prevents settling escrowed bounties without actually capturing funds. 2. [high] Remove claimed→completed from state machine — force all settlement through pending_settlement pivot. Update conformance tests and bounty-logic tests to use beginSettlement first. 3. [medium] BountyIndexSweep now calls repairIndex unconditionally for every bounty — cleans both missing current-status entries AND stale old-status markers. --- src/core/bounty-index-sweep.ts | 42 +++++++++++++--------------- src/core/bounty-logic.test.ts | 24 +++++++++++++++- src/core/bounty-logic.ts | 2 +- src/core/bounty-store.conformance.ts | 24 ++++++++++++---- src/core/settlement-sweep.ts | 9 ++++++ 5 files changed, 71 insertions(+), 30 deletions(-) diff --git a/src/core/bounty-index-sweep.ts b/src/core/bounty-index-sweep.ts index 3dd85d01..535d5177 100644 --- a/src/core/bounty-index-sweep.ts +++ b/src/core/bounty-index-sweep.ts @@ -1,15 +1,14 @@ /** * BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency. * - * Scans all bounty documents and ensures: - * 1. Each bounty has a status index entry matching its current status. - * 2. Stale index entries (pointing to a bounty with a different status) are deleted. + * Scans all bounty documents and calls repairIndex() for each one. + * repairIndex() ensures the correct status index entry exists AND deletes + * stale entries for other statuses. This covers both failure modes: + * 1. Missing current-status index (new index write failed) + * 2. Stale old-status index (old index delete failed) * - * This fixes the dual-write gap where the document write succeeds but the - * status index write fails (Issue #240, findings #2 and #3). - * - * Uses BountyStore.repairIndex() when available. For stores without a separate - * index (e.g., SQLite), this sweep is detection-only. + * For stores without a separate index (e.g., SQLite), repairIndex is + * undefined and this sweep is a no-op. */ import type { BountyStore } from "./bounty-store.js"; @@ -24,32 +23,28 @@ export class BountyIndexSweep implements SweepStrategy { } async sweep(): Promise { - let found = 0; let repaired = 0; const errors: Error[] = []; + // Skip entirely if the store has no index to repair + if (!this.bountyStore.repairIndex) { + return { strategy: this.name, found: 0, repaired: 0, errors: [] }; + } + try { - // List ALL bounties (unfiltered) to get the authoritative document state. const allBounties = await this.bountyStore.listBounties(); for (const bounty of allBounties) { try { - // Verify the bounty appears in its own status-filtered query. - const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); - const inIndex = byStatus.some((b) => b.bountyId === bounty.bountyId); - if (!inIndex) { - found++; - // Attempt repair if the store supports it - if (this.bountyStore.repairIndex) { - await this.bountyStore.repairIndex(bounty.bountyId); - repaired++; - } - } + // repairIndex ensures the correct status index entry exists and + // deletes stale entries for all other statuses. Idempotent. + await this.bountyStore.repairIndex(bounty.bountyId); + repaired++; } catch (err) { errors.push( err instanceof Error ? err - : new Error(`Index check failed for bounty ${bounty.bountyId}: ${String(err)}`), + : new Error(`Index repair failed for bounty ${bounty.bountyId}: ${String(err)}`), ); } } @@ -59,6 +54,7 @@ export class BountyIndexSweep implements SweepStrategy { ); } - return { strategy: this.name, found, repaired, errors }; + // "found" = total bounties scanned (all repaired unconditionally) + return { strategy: this.name, found: repaired, repaired, errors }; } } diff --git a/src/core/bounty-logic.test.ts b/src/core/bounty-logic.test.ts index 110ebaf5..4d2a87e7 100644 --- a/src/core/bounty-logic.test.ts +++ b/src/core/bounty-logic.test.ts @@ -34,9 +34,31 @@ describe("validateBountyTransition", () => { ).not.toThrow(); }); - test("allows claimed → completed", () => { + test("allows claimed → pending_settlement", () => { + expect(() => + validateBountyTransition( + "b-1", + BountyStatus.Claimed, + BountyStatus.PendingSettlement, + "beginSettlement", + ), + ).not.toThrow(); + }); + + test("rejects claimed → completed (must go through pending_settlement)", () => { expect(() => validateBountyTransition("b-1", BountyStatus.Claimed, BountyStatus.Completed, "complete"), + ).toThrow(); + }); + + test("allows pending_settlement → completed", () => { + expect(() => + validateBountyTransition( + "b-1", + BountyStatus.PendingSettlement, + BountyStatus.Completed, + "complete", + ), ).not.toThrow(); }); diff --git a/src/core/bounty-logic.ts b/src/core/bounty-logic.ts index 568c6da9..f6e189d0 100644 --- a/src/core/bounty-logic.ts +++ b/src/core/bounty-logic.ts @@ -18,7 +18,7 @@ import type { Contribution, Score } from "./models.js"; const VALID_TRANSITIONS: Readonly> = { draft: ["open", "cancelled"], open: ["claimed", "expired", "cancelled"], - claimed: ["pending_settlement", "completed", "open", "expired", "cancelled"], + claimed: ["pending_settlement", "open", "expired", "cancelled"], pending_settlement: ["completed", "claimed", "expired", "cancelled"], completed: ["settled", "expired", "cancelled"], settled: [], diff --git a/src/core/bounty-store.conformance.ts b/src/core/bounty-store.conformance.ts index 43edca10..f0242896 100644 --- a/src/core/bounty-store.conformance.ts +++ b/src/core/bounty-store.conformance.ts @@ -217,16 +217,27 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { await expect(store.claimBounty("claim-2", makeAgent(), "claim-id-2")).rejects.toThrow(); }); - test("completeBounty transitions claimed to completed", async () => { - const bounty = makeBounty({ bountyId: "complete-1", status: BountyStatus.Claimed }); + test("beginSettlement transitions claimed to pending_settlement", async () => { + const bounty = makeBounty({ bountyId: "begin-settle-1", status: BountyStatus.Claimed }); + await store.createBounty(bounty); + const pending = await store.beginSettlement("begin-settle-1", "cid-fulfilled"); + expect(pending.status).toBe(BountyStatus.PendingSettlement); + expect(pending.fulfilledByCid).toBe("cid-fulfilled"); + }); + + test("completeBounty transitions pending_settlement to completed", async () => { + const bounty = makeBounty({ + bountyId: "complete-1", + status: BountyStatus.PendingSettlement, + }); await store.createBounty(bounty); const completed = await store.completeBounty("complete-1", "cid-fulfilled"); expect(completed.status).toBe(BountyStatus.Completed); expect(completed.fulfilledByCid).toBe("cid-fulfilled"); }); - test("completeBounty throws for non-claimed bounty", async () => { - const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Open }); + test("completeBounty throws for claimed bounty (must use beginSettlement first)", async () => { + const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Claimed }); await store.createBounty(bounty); await expect(store.completeBounty("complete-2", "cid")).rejects.toThrow(); }); @@ -274,7 +285,7 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { // Full lifecycle // ------------------------------------------------------------------ - test("full lifecycle: draft → open → claimed → completed → settled", async () => { + test("full lifecycle: draft → open → claimed → pending_settlement → completed → settled", async () => { const bounty = makeBounty({ bountyId: "lifecycle-1", status: BountyStatus.Draft }); await store.createBounty(bounty); @@ -288,6 +299,9 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { ); expect(claimed.status).toBe(BountyStatus.Claimed); + const pending = await store.beginSettlement("lifecycle-1", "cid-result"); + expect(pending.status).toBe(BountyStatus.PendingSettlement); + const completed = await store.completeBounty("lifecycle-1", "cid-result"); expect(completed.status).toBe(BountyStatus.Completed); diff --git a/src/core/settlement-sweep.ts b/src/core/settlement-sweep.ts index 15ed27e1..c3f72942 100644 --- a/src/core/settlement-sweep.ts +++ b/src/core/settlement-sweep.ts @@ -71,6 +71,15 @@ export class SettlementSweep implements SweepStrategy { * Handles both pending_settlement (pre-capture) and completed (post-capture). */ private async resumeSettlement(bounty: Bounty): Promise { + // Hard-fail if the bounty has an escrow but we can't capture it — + // settling without capture would mark the bounty paid with no funds moved. + if (bounty.reservationId && !this.creditsService) { + throw new Error( + `Cannot resume settlement for bounty ${bounty.bountyId}: ` + + "reservationId is set but no creditsService is available", + ); + } + // Capture credits if escrow is active (idempotent — already-captured is a no-op) if (this.creditsService && bounty.reservationId) { if (bounty.claimedBy) { From 6a630a7acd1bb8c45c06828d51b25cb33cd64456 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:32:37 -0700 Subject: [PATCH 05/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=204=20=E2=80=94=202=20HIGH=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Remove SettlementSweep from server startup: local runtime has no CreditsService, so the sweep would hard-fail on escrowed bounties. Only BountyIndexSweep is registered. Settlement sweep will be enabled when a production CreditsService is wired in. 2. Release orphaned claims on bounty transition failure: if claimBounty() fails, re-read the bounty and release the claim only if the bounty is still open (confirming the transition didn't commit). Post-commit failures keep the claim for consistency. --- src/core/operations/bounty.ts | 19 ++++++++++++++++++- src/server/serve.ts | 8 +++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index 1b59be9d..9a1cab97 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -254,7 +254,24 @@ export async function claimBountyOperation( leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), }); - const claimed = await deps.bountyStore.claimBounty(input.bountyId, agent, claim.claimId); + let claimed: Bounty; + try { + claimed = await deps.bountyStore.claimBounty(input.bountyId, agent, claim.claimId); + } catch (bountyErr) { + // If the bounty transition failed (pre-commit or CAS conflict), the + // claim lease is orphaned. Re-read the bounty: if it's still open, + // the transition didn't commit and we can safely release the claim. + // If it's already claimed (post-commit failure), keep the claim. + try { + const current = await deps.bountyStore.getBounty(input.bountyId); + if (current && current.status === BS.Open) { + await deps.claimStore.release(claim.claimId); + } + } catch { + // Best-effort release — claim will expire via lease timeout + } + throw bountyErr; + } return ok({ bountyId: claimed.bountyId, diff --git a/src/server/serve.ts b/src/server/serve.ts index 2534ae98..5a435ca4 100644 --- a/src/server/serve.ts +++ b/src/server/serve.ts @@ -100,7 +100,6 @@ const app = createApp(deps); // --------------------------------------------------------------------------- import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; -import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let sweepReconciler: SweepReconciler | undefined; @@ -118,9 +117,12 @@ if (serverBountyStore) { }, }); sweepReconciler.register(new BountyIndexSweep(serverBountyStore)); - sweepReconciler.register(new SettlementSweep(serverBountyStore)); + // SettlementSweep requires a CreditsService to safely resume escrowed + // bounties. The local runtime does not provide one yet, so we only + // register BountyIndexSweep here. When a production CreditsService is + // wired in, add: sweepReconciler.register(new SettlementSweep(store, credits)); sweepReconciler.start(); - console.log("sweep-reconciler started (BountyIndexSweep, SettlementSweep)"); + console.log("sweep-reconciler started (BountyIndexSweep)"); } // --------------------------------------------------------------------------- From 660bca527cf1ee025d6015b86beb6775c4ccc7ff Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:42:46 -0700 Subject: [PATCH 06/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=205=20=E2=80=94=202=20HIGH=20findings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Add pending_settlement to Zod schemas in core/schemas.ts and mcp/tools/bounties.ts — prevents parsers from rejecting bounties in the new pivot state. 2. Re-enable SettlementSweep in server: it safely recovers non-escrowed bounties (no reservationId). Escrowed bounties log an error and wait for CreditsService. Update doc comment in bounty.ts lifecycle. --- src/core/bounty.ts | 2 +- src/core/schemas.ts | 1 + src/mcp/tools/bounties.ts | 11 ++++++++++- src/server/serve.ts | 12 +++++++----- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/core/bounty.ts b/src/core/bounty.ts index 5b89fbee..8e07895d 100644 --- a/src/core/bounty.ts +++ b/src/core/bounty.ts @@ -78,7 +78,7 @@ export interface BountyCriteria { * A bounty — a reward offer for completing specific work. * * Bounties are mutable coordination objects with a lifecycle - * (draft → open → claimed → completed → settled). + * (draft → open → claimed → pending_settlement → completed → settled). * Like Claims, bounty objects returned by the store are readonly * snapshots; state transitions produce new snapshots. */ diff --git a/src/core/schemas.ts b/src/core/schemas.ts index e06c3821..c23b3e56 100644 --- a/src/core/schemas.ts +++ b/src/core/schemas.ts @@ -99,6 +99,7 @@ const BountySchema: z.ZodType = z.object({ "draft", "open", "claimed", + "pending_settlement", "completed", "settled", "expired", diff --git a/src/mcp/tools/bounties.ts b/src/mcp/tools/bounties.ts index f77f9611..eef27206 100644 --- a/src/mcp/tools/bounties.ts +++ b/src/mcp/tools/bounties.ts @@ -68,7 +68,16 @@ const createBountySchema = z.object({ const listBountiesSchema = z.object({ status: z - .enum(["draft", "open", "claimed", "completed", "settled", "expired", "cancelled"]) + .enum([ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ]) .optional() .describe("Filter by bounty status"), creatorAgentId: z.string().optional().describe("Filter by creator agent ID"), diff --git a/src/server/serve.ts b/src/server/serve.ts index 5a435ca4..9bbd8e93 100644 --- a/src/server/serve.ts +++ b/src/server/serve.ts @@ -100,6 +100,7 @@ const app = createApp(deps); // --------------------------------------------------------------------------- import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let sweepReconciler: SweepReconciler | undefined; @@ -117,12 +118,13 @@ if (serverBountyStore) { }, }); sweepReconciler.register(new BountyIndexSweep(serverBountyStore)); - // SettlementSweep requires a CreditsService to safely resume escrowed - // bounties. The local runtime does not provide one yet, so we only - // register BountyIndexSweep here. When a production CreditsService is - // wired in, add: sweepReconciler.register(new SettlementSweep(store, credits)); + // SettlementSweep runs without creditsService — it can recover non-escrowed + // bounties. Escrowed bounties (those with reservationId) will log an error + // and wait for a CreditsService to be available. When a production + // CreditsService is wired in, pass it: new SettlementSweep(store, credits). + sweepReconciler.register(new SettlementSweep(serverBountyStore)); sweepReconciler.start(); - console.log("sweep-reconciler started (BountyIndexSweep)"); + console.log("sweep-reconciler started (BountyIndexSweep, SettlementSweep)"); } // --------------------------------------------------------------------------- From c96529866bef4bc088f0d82325484ba11d3e607d Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:52:33 -0700 Subject: [PATCH 07/14] =?UTF-8?q?fix(bounty):=20address=20adversarial=20re?= =?UTF-8?q?view=20round=206=20=E2=80=94=20settlement=20sweep=20fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SettlementSweep: completed bounties have already captured — skip the creditsService requirement and just advance to settled. Only pending_settlement bounties need the capture step. Remaining findings (out of scope for #240): - Claim renewal/heartbeat path: pre-existing design gap, not introduced by this branch. Tracked separately. - Nexus MCP sweep wiring: requires architectural changes to MCP server startup. Tracked as follow-up integration work. --- src/core/settlement-sweep.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/core/settlement-sweep.ts b/src/core/settlement-sweep.ts index c3f72942..2603cea0 100644 --- a/src/core/settlement-sweep.ts +++ b/src/core/settlement-sweep.ts @@ -71,8 +71,14 @@ export class SettlementSweep implements SweepStrategy { * Handles both pending_settlement (pre-capture) and completed (post-capture). */ private async resumeSettlement(bounty: Bounty): Promise { - // Hard-fail if the bounty has an escrow but we can't capture it — - // settling without capture would mark the bounty paid with no funds moved. + // "completed" bounties have already captured — only need to advance to settled. + // "pending_settlement" bounties still need capture before advancing. + if (bounty.status === "completed") { + await this.bountyStore.settleBounty(bounty.bountyId); + return; + } + + // For pending_settlement: hard-fail if escrowed but no creditsService if (bounty.reservationId && !this.creditsService) { throw new Error( `Cannot resume settlement for bounty ${bounty.bountyId}: ` + @@ -91,10 +97,8 @@ export class SettlementSweep implements SweepStrategy { } } - // Advance through remaining states (skip steps already done) - if (bounty.status !== "completed") { - await this.bountyStore.completeBounty(bounty.bountyId, bounty.fulfilledByCid ?? ""); - } + // Advance through completed → settled + await this.bountyStore.completeBounty(bounty.bountyId, bounty.fulfilledByCid ?? ""); await this.bountyStore.settleBounty(bounty.bountyId); } } From 7c01f5b6016f289bbce846a6d1f530d397275628 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 17:57:12 -0700 Subject: [PATCH 08/14] feat(bounty): claim renewal + MCP sweep reconciler wiring 1. Same-agent claim renewal: claimBountyOperation now allows the current claim holder to extend their lease without reopening the bounty. Different agents are still rejected. Prevents long-running bounties from getting stranded when the claim lease expires. 2. Wire SweepReconciler into both MCP entry points: - serve.ts (stdio): starts BountyIndexSweep + SettlementSweep after store setup, stops on shutdown - serve-http.ts (HTTP): starts at process level using zone-scoped Nexus bounty store (not session-scoped), stops on shutdown The reconciler now runs in all three runtimes that can create bounties: HTTP server, stdio MCP, and HTTP MCP. --- src/core/operations/bounty.test.ts | 70 ++++++++++++++++++++++++++++++ src/core/operations/bounty.ts | 35 +++++++++++++-- src/mcp/serve-http.ts | 39 +++++++++++++++++ src/mcp/serve.ts | 27 ++++++++++++ 4 files changed, 168 insertions(+), 3 deletions(-) diff --git a/src/core/operations/bounty.test.ts b/src/core/operations/bounty.test.ts index e64998eb..16c1df96 100644 --- a/src/core/operations/bounty.test.ts +++ b/src/core/operations/bounty.test.ts @@ -527,6 +527,76 @@ describe("claimBountyOperation status checks", () => { expect(secondClaim.error.message).toContain("not open"); } }); + + test("same agent can renew claim on already-claimed bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Renewable claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim + const firstClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + expect(firstClaim.ok).toBe(true); + if (!firstClaim.ok) return; + + // Same agent renews — should succeed + const renewal = await claimBountyOperation( + { + bountyId: bounty.value.bountyId, + agent: { agentId: "worker" }, + leaseDurationMs: 3_600_000, + }, + deps, + ); + expect(renewal.ok).toBe(true); + if (!renewal.ok) return; + expect(renewal.value.status).toBe("claimed"); + expect(renewal.value.claimedBy).toBe("worker"); + expect(renewal.value.claimId).toBe(firstClaim.value.claimId); + }); + + test("different agent cannot renew another agent's claim", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "No steal", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + + // Different agent tries to claim — rejected + const steal = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(steal.ok).toBe(false); + if (!steal.ok) { + expect(steal.error.code).toBe("VALIDATION_ERROR"); + } + }); }); describe("settleBountyOperation status checks", () => { diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index 9a1cab97..c47ae0fd 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -231,16 +231,45 @@ export async function claimBountyOperation( return notFound("Bounty", input.bountyId); } + const agent = resolveAgent(input.agent); + const now = new Date(); + const leaseDurationMs = input.leaseDurationMs ?? 1_800_000; + + // Renewal path: same agent can extend the lease on an already-claimed bounty. + // This prevents long-running bounties from getting stranded when the claim + // lease expires while the worker is still active. + if ( + bounty.status === BS.Claimed && + bounty.claimedBy?.agentId === agent.agentId && + bounty.claimId + ) { + const renewed = await deps.claimStore.claimOrRenew({ + claimId: bounty.claimId, + targetRef: `bounty:${input.bountyId}`, + agent, + status: "active", + intentSummary: `Renewing claim on bounty: ${bounty.title}`, + createdAt: now.toISOString(), + heartbeatAt: now.toISOString(), + leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), + }); + return ok({ + bountyId: bounty.bountyId, + title: bounty.title, + status: bounty.status, + claimId: renewed.claimId, + claimedBy: bounty.claimedBy.agentId, + }); + } + + // New claim: bounty must be open if (bounty.status !== BS.Open) { return validationErr( `Bounty '${input.bountyId}' is not open for claims (current status: ${bounty.status})`, ); } - const agent = resolveAgent(input.agent); - const now = new Date(); const claimId = crypto.randomUUID(); - const leaseDurationMs = input.leaseDurationMs ?? 1_800_000; // Create claim via existing claim system const claim = await deps.claimStore.claimOrRenew({ diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 042dbecf..3e0eb1ab 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -125,6 +125,44 @@ try { process.exit(1); } +// --- Background sweep reconciler ------------------------------------------ +// Runs at process level using the zone-level bounty store (Nexus when +// available, local SQLite otherwise). Not session-scoped — index repair +// and settlement recovery are zone-wide concerns. + +import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; +import { SweepReconciler } from "../core/sweep-reconciler.js"; + +let httpSweepReconciler: SweepReconciler | undefined; +{ + // Build a zone-level bounty store for the reconciler. In Nexus mode this + // is a NexusBountyStore scoped to the zone (not session). In local mode + // it's the local SQLite bounty store from the runtime. + let reconcilerBountyStore: import("../core/bounty-store.js").BountyStore = runtime.bountyStore; + if (nexusClient) { + const { NexusBountyStore } = await import("../nexus/nexus-bounty-store.js"); + reconcilerBountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); + } + + httpSweepReconciler = new SweepReconciler({ + intervalMs: 60_000, + onCycle(results) { + for (const r of results) { + if (r.found > 0 || r.errors.length > 0) { + process.stderr.write( + `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}\n`, + ); + } + } + }, + }); + httpSweepReconciler.register(new BountyIndexSweep(reconcilerBountyStore)); + httpSweepReconciler.register(new SettlementSweep(reconcilerBountyStore)); + httpSweepReconciler.start(); + process.stderr.write("grove-mcp-http: sweep-reconciler started\n"); +} + // --- Dynamic session-scoped deps -------------------------------------------- // // The HTTP MCP server is spawned before any interactive session exists, so we @@ -668,6 +706,7 @@ httpServer.listen(port, () => { // Graceful shutdown const shutdown = async (): Promise => { + httpSweepReconciler?.stop(); clearInterval(reapTimer); // Close all active sessions for (const [, session] of sessions) { diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index a128811f..bad9158c 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -349,6 +349,32 @@ try { process.exit(1); } +// --- Background sweep reconciler ------------------------------------------ + +import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; +import { SweepReconciler } from "../core/sweep-reconciler.js"; + +let mcpSweepReconciler: SweepReconciler | undefined; +if (deps.bountyStore) { + mcpSweepReconciler = new SweepReconciler({ + intervalMs: 60_000, + onCycle(results) { + for (const r of results) { + if (r.found > 0 || r.errors.length > 0) { + process.stderr.write( + `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}\n`, + ); + } + } + }, + }); + mcpSweepReconciler.register(new BountyIndexSweep(deps.bountyStore)); + mcpSweepReconciler.register(new SettlementSweep(deps.bountyStore)); + mcpSweepReconciler.start(); + process.stderr.write("grove-mcp: sweep-reconciler started\n"); +} + // --- Server setup --------------------------------------------------------- const server = await createMcpServer(deps, preset); @@ -358,6 +384,7 @@ await server.connect(transport); // Graceful shutdown const shutdown = async (): Promise => { + mcpSweepReconciler?.stop(); await server.close(); close(); process.exit(0); From 872147895649a704c6ebe656cdc4a10e8d7f02e1 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 18:45:52 -0700 Subject: [PATCH 09/14] =?UTF-8?q?fix(bounty):=20review-loop=20round=201=20?= =?UTF-8?q?=E2=80=94=202=20HIGH,=201=20MEDIUM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [high] Claim renewal with expired lease: detect if existing claim is expired and create a fresh claim ID instead of reusing the stale one. Rebinds the bounty to the new claim atomically. 2. [high] Remove SettlementSweep from MCP runtimes: no CreditsService available, escrowed bounties would fail every cycle. Only BountyIndexSweep registered. Settlement recovery deferred to #253. 3. [medium] BountyIndexSweep now detection-based: queries status-filtered lists to find actual drift, only calls repairIndex when missing. No more unconditional rewrite of every healthy bounty each cycle. --- src/core/bounty-index-sweep.ts | 53 +++++++++++++++++++++------------- src/core/operations/bounty.ts | 15 +++++++++- src/mcp/serve-http.ts | 7 +++-- src/mcp/serve.ts | 7 +++-- 4 files changed, 55 insertions(+), 27 deletions(-) diff --git a/src/core/bounty-index-sweep.ts b/src/core/bounty-index-sweep.ts index 535d5177..465b6f4d 100644 --- a/src/core/bounty-index-sweep.ts +++ b/src/core/bounty-index-sweep.ts @@ -1,14 +1,13 @@ /** * BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency. * - * Scans all bounty documents and calls repairIndex() for each one. - * repairIndex() ensures the correct status index entry exists AND deletes - * stale entries for other statuses. This covers both failure modes: - * 1. Missing current-status index (new index write failed) - * 2. Stale old-status index (old index delete failed) + * Detection-based: for each bounty, checks whether it appears in its + * status-filtered query result. Only calls repairIndex() when an actual + * inconsistency is found (missing current-status entry or stale old-status + * marker). Does NOT rewrite healthy indexes, avoiding unnecessary VFS + * pressure under Nexus rate limits. * - * For stores without a separate index (e.g., SQLite), repairIndex is - * undefined and this sweep is a no-op. + * For stores without a separate index (e.g., SQLite), this sweep is a no-op. */ import type { BountyStore } from "./bounty-store.js"; @@ -23,6 +22,7 @@ export class BountyIndexSweep implements SweepStrategy { } async sweep(): Promise { + let found = 0; let repaired = 0; const errors: Error[] = []; @@ -32,20 +32,34 @@ export class BountyIndexSweep implements SweepStrategy { } try { + // Unfiltered list gives us the authoritative document state. const allBounties = await this.bountyStore.listBounties(); + // Build a set of bountyIds per status from status-filtered queries + // to detect both missing entries and stale entries. + const statusSets = new Map>(); for (const bounty of allBounties) { - try { - // repairIndex ensures the correct status index entry exists and - // deletes stale entries for all other statuses. Idempotent. - await this.bountyStore.repairIndex(bounty.bountyId); - repaired++; - } catch (err) { - errors.push( - err instanceof Error - ? err - : new Error(`Index repair failed for bounty ${bounty.bountyId}: ${String(err)}`), - ); + if (!statusSets.has(bounty.status)) { + const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); + statusSets.set(bounty.status, new Set(byStatus.map((b) => b.bountyId))); + } + } + + for (const bounty of allBounties) { + const inCorrectIndex = statusSets.get(bounty.status)?.has(bounty.bountyId) ?? false; + if (!inCorrectIndex) { + // Missing from the correct status index — needs repair + found++; + try { + await this.bountyStore.repairIndex!(bounty.bountyId); + repaired++; + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Index repair failed for bounty ${bounty.bountyId}: ${String(err)}`), + ); + } } } } catch (err) { @@ -54,7 +68,6 @@ export class BountyIndexSweep implements SweepStrategy { ); } - // "found" = total bounties scanned (all repaired unconditionally) - return { strategy: this.name, found: repaired, repaired, errors }; + return { strategy: this.name, found, repaired, errors }; } } diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index c47ae0fd..93dbbb4f 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -243,8 +243,15 @@ export async function claimBountyOperation( bounty.claimedBy?.agentId === agent.agentId && bounty.claimId ) { + // Check if the existing claim is still active. If expired, we need a + // fresh claim ID — reusing the old one would collide with the persisted + // expired claim record instead of renewing it. + const existingClaim = await deps.claimStore.getClaim(bounty.claimId); + const claimIsActive = existingClaim?.status === "active"; + + const renewalClaimId = claimIsActive ? bounty.claimId : crypto.randomUUID(); const renewed = await deps.claimStore.claimOrRenew({ - claimId: bounty.claimId, + claimId: renewalClaimId, targetRef: `bounty:${input.bountyId}`, agent, status: "active", @@ -253,6 +260,12 @@ export async function claimBountyOperation( heartbeatAt: now.toISOString(), leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), }); + + // If we rotated the claim ID, update the bounty record to point at the new claim + if (renewalClaimId !== bounty.claimId) { + await deps.bountyStore.claimBounty(input.bountyId, agent, renewed.claimId); + } + return ok({ bountyId: bounty.bountyId, title: bounty.title, diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 3e0eb1ab..4d3efa71 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -131,7 +131,6 @@ try { // and settlement recovery are zone-wide concerns. import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; -import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let httpSweepReconciler: SweepReconciler | undefined; @@ -158,9 +157,11 @@ let httpSweepReconciler: SweepReconciler | undefined; }, }); httpSweepReconciler.register(new BountyIndexSweep(reconcilerBountyStore)); - httpSweepReconciler.register(new SettlementSweep(reconcilerBountyStore)); + // SettlementSweep omitted: no CreditsService in the MCP HTTP runtime. + // Escrowed bounty recovery requires #253. Non-escrowed bounties are + // handled by the HTTP server's SettlementSweep (serve.ts). httpSweepReconciler.start(); - process.stderr.write("grove-mcp-http: sweep-reconciler started\n"); + process.stderr.write("grove-mcp-http: sweep-reconciler started (BountyIndexSweep)\n"); } // --- Dynamic session-scoped deps -------------------------------------------- diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index bad9158c..a0c500ae 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -352,7 +352,6 @@ try { // --- Background sweep reconciler ------------------------------------------ import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; -import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let mcpSweepReconciler: SweepReconciler | undefined; @@ -370,9 +369,11 @@ if (deps.bountyStore) { }, }); mcpSweepReconciler.register(new BountyIndexSweep(deps.bountyStore)); - mcpSweepReconciler.register(new SettlementSweep(deps.bountyStore)); + // SettlementSweep omitted: no CreditsService available in the MCP runtime. + // Escrowed bounty recovery requires #253 (production CreditsService). + // Non-escrowed bounties are handled by the HTTP server's SettlementSweep. mcpSweepReconciler.start(); - process.stderr.write("grove-mcp: sweep-reconciler started\n"); + process.stderr.write("grove-mcp: sweep-reconciler started (BountyIndexSweep)\n"); } // --- Server setup --------------------------------------------------------- From 5666bddab09c698bddaf638144929a0a6a33ac49 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 18:56:58 -0700 Subject: [PATCH 10/14] =?UTF-8?q?fix(bounty):=20review-loop=20round=202=20?= =?UTF-8?q?=E2=80=94=202=20HIGH,=201=20MEDIUM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [high] Claim rebind after lease expiry: allow claimed→claimed self-transition so expired claim IDs get rotated to fresh ones. The bounty record is atomically rebound to the new claim. 2. [high] Re-enable SettlementSweep in MCP runtimes: completed bounties (already captured) can settle without CreditsService. Only pending_settlement+reservationId cases log errors. 3. [medium] repairIndex version-aware: re-reads with ETag before deleting stale markers. Skips cleanup if a concurrent transition changed the bounty between read and delete. --- src/core/bounty-logic.ts | 2 +- src/mcp/serve-http.ts | 7 +++---- src/mcp/serve.ts | 10 ++++++---- src/nexus/nexus-bounty-store.ts | 18 +++++++++++++++--- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/core/bounty-logic.ts b/src/core/bounty-logic.ts index f6e189d0..78e61d72 100644 --- a/src/core/bounty-logic.ts +++ b/src/core/bounty-logic.ts @@ -18,7 +18,7 @@ import type { Contribution, Score } from "./models.js"; const VALID_TRANSITIONS: Readonly> = { draft: ["open", "cancelled"], open: ["claimed", "expired", "cancelled"], - claimed: ["pending_settlement", "open", "expired", "cancelled"], + claimed: ["claimed", "pending_settlement", "open", "expired", "cancelled"], pending_settlement: ["completed", "claimed", "expired", "cancelled"], completed: ["settled", "expired", "cancelled"], settled: [], diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 4d3efa71..3e0eb1ab 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -131,6 +131,7 @@ try { // and settlement recovery are zone-wide concerns. import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let httpSweepReconciler: SweepReconciler | undefined; @@ -157,11 +158,9 @@ let httpSweepReconciler: SweepReconciler | undefined; }, }); httpSweepReconciler.register(new BountyIndexSweep(reconcilerBountyStore)); - // SettlementSweep omitted: no CreditsService in the MCP HTTP runtime. - // Escrowed bounty recovery requires #253. Non-escrowed bounties are - // handled by the HTTP server's SettlementSweep (serve.ts). + httpSweepReconciler.register(new SettlementSweep(reconcilerBountyStore)); httpSweepReconciler.start(); - process.stderr.write("grove-mcp-http: sweep-reconciler started (BountyIndexSweep)\n"); + process.stderr.write("grove-mcp-http: sweep-reconciler started\n"); } // --- Dynamic session-scoped deps -------------------------------------------- diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index a0c500ae..3b1fb8e1 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -352,6 +352,7 @@ try { // --- Background sweep reconciler ------------------------------------------ import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; import { SweepReconciler } from "../core/sweep-reconciler.js"; let mcpSweepReconciler: SweepReconciler | undefined; @@ -369,11 +370,12 @@ if (deps.bountyStore) { }, }); mcpSweepReconciler.register(new BountyIndexSweep(deps.bountyStore)); - // SettlementSweep omitted: no CreditsService available in the MCP runtime. - // Escrowed bounty recovery requires #253 (production CreditsService). - // Non-escrowed bounties are handled by the HTTP server's SettlementSweep. + // SettlementSweep without CreditsService: recovers non-escrowed bounties + // and completed escrowed bounties (capture already happened). Only + // pending_settlement+reservationId cases log errors and wait for #253. + mcpSweepReconciler.register(new SettlementSweep(deps.bountyStore)); mcpSweepReconciler.start(); - process.stderr.write("grove-mcp: sweep-reconciler started (BountyIndexSweep)\n"); + process.stderr.write("grove-mcp: sweep-reconciler started\n"); } // --- Server setup --------------------------------------------------------- diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index e89ab948..617d8a35 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -230,13 +230,25 @@ export class NexusBountyStore implements BountyStore { } async repairIndex(bountyId: string): Promise { - const bounty = await this.getBounty(bountyId); - if (!bounty) return; + // Read with ETag so we can detect if a concurrent transition changed + // the bounty between our read and the cleanup deletes. + const result = await this.readBountyWithEtag(bountyId); + if (!result) return; + const { bounty, etag } = result; // Ensure the correct status index entry exists await this.writeStatusIndex(bounty); - // Clean stale index entries for other statuses + // Before deleting stale markers, re-verify the bounty hasn't changed. + // A concurrent transitionBounty could have moved the status between our + // initial read and now — deleting the new status's marker would be wrong. + const recheck = await this.readBountyWithEtag(bountyId); + if (!recheck || recheck.etag !== etag) { + // Bounty was modified concurrently — skip cleanup, next sweep will retry + return; + } + + // Safe to clean: the document hasn't changed since our read const allStatuses: BountyStatus[] = [ "draft", "open", From a905b021b79b22abf8ab458e6c89d19ac722fc85 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 19:05:25 -0700 Subject: [PATCH 11/14] =?UTF-8?q?fix(bounty):=20review-loop=20round=203=20?= =?UTF-8?q?=E2=80=94=202=20HIGH,=201=20MEDIUM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [high] Claim renewal checks lease validity (not just status): only reuse claimId if both status=active AND leaseExpiresAt > now. 2. [high] Compensation on rotated-claim rebind failure: release the orphaned new claim if bountyStore.claimBounty throws. 3. [medium] Remove sweep reconciler from stdio MCP: per-agent processes must not run zone-wide sweeps (N×load, CAS conflicts). Sweeps run only in HTTP server + HTTP MCP (singleton processes). --- src/core/operations/bounty.ts | 26 +++++++++++++++++++------- src/mcp/serve.ts | 34 ++++------------------------------ 2 files changed, 23 insertions(+), 37 deletions(-) diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index 93dbbb4f..e52ad885 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -243,13 +243,15 @@ export async function claimBountyOperation( bounty.claimedBy?.agentId === agent.agentId && bounty.claimId ) { - // Check if the existing claim is still active. If expired, we need a - // fresh claim ID — reusing the old one would collide with the persisted - // expired claim record instead of renewing it. + // Check if the existing claim is still active AND the lease hasn't expired. + // Both conditions must hold — a claim with status "active" but expired + // leaseExpiresAt will collide in the claim store instead of renewing. const existingClaim = await deps.claimStore.getClaim(bounty.claimId); - const claimIsActive = existingClaim?.status === "active"; + const leaseValid = + existingClaim?.status === "active" && + new Date(existingClaim.leaseExpiresAt).getTime() > now.getTime(); - const renewalClaimId = claimIsActive ? bounty.claimId : crypto.randomUUID(); + const renewalClaimId = leaseValid ? bounty.claimId : crypto.randomUUID(); const renewed = await deps.claimStore.claimOrRenew({ claimId: renewalClaimId, targetRef: `bounty:${input.bountyId}`, @@ -261,9 +263,19 @@ export async function claimBountyOperation( leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), }); - // If we rotated the claim ID, update the bounty record to point at the new claim + // If we rotated the claim ID, update the bounty record to point at the + // new claim. On failure, release the orphaned new claim. if (renewalClaimId !== bounty.claimId) { - await deps.bountyStore.claimBounty(input.bountyId, agent, renewed.claimId); + try { + await deps.bountyStore.claimBounty(input.bountyId, agent, renewed.claimId); + } catch (rebindErr) { + try { + await deps.claimStore.release(renewed.claimId); + } catch { + // Best-effort — claim will expire via lease timeout + } + throw rebindErr; + } } return ok({ diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index 3b1fb8e1..d9b63bd3 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -349,36 +349,11 @@ try { process.exit(1); } -// --- Background sweep reconciler ------------------------------------------ - -import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; -import { SettlementSweep } from "../core/settlement-sweep.js"; -import { SweepReconciler } from "../core/sweep-reconciler.js"; - -let mcpSweepReconciler: SweepReconciler | undefined; -if (deps.bountyStore) { - mcpSweepReconciler = new SweepReconciler({ - intervalMs: 60_000, - onCycle(results) { - for (const r of results) { - if (r.found > 0 || r.errors.length > 0) { - process.stderr.write( - `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}\n`, - ); - } - } - }, - }); - mcpSweepReconciler.register(new BountyIndexSweep(deps.bountyStore)); - // SettlementSweep without CreditsService: recovers non-escrowed bounties - // and completed escrowed bounties (capture already happened). Only - // pending_settlement+reservationId cases log errors and wait for #253. - mcpSweepReconciler.register(new SettlementSweep(deps.bountyStore)); - mcpSweepReconciler.start(); - process.stderr.write("grove-mcp: sweep-reconciler started\n"); -} - // --- Server setup --------------------------------------------------------- +// NOTE: No sweep reconciler here. The stdio MCP server (grove-mcp) is spawned +// per-agent — running zone-wide sweeps from every agent process would cause +// N×load and CAS conflicts. Sweeps run in the long-lived singleton processes +// only: src/server/serve.ts (HTTP server) and src/mcp/serve-http.ts (HTTP MCP). const server = await createMcpServer(deps, preset); const transport = new StdioServerTransport(); @@ -387,7 +362,6 @@ await server.connect(transport); // Graceful shutdown const shutdown = async (): Promise => { - mcpSweepReconciler?.stop(); await server.close(); close(); process.exit(0); From dcfaae7926ccd671d482d734d44a61fd81af6be1 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 19:14:07 -0700 Subject: [PATCH 12/14] =?UTF-8?q?fix(bounty):=20review-loop=20round=204=20?= =?UTF-8?q?=E2=80=94=201=20HIGH,=202=20MEDIUM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. [high] Rebind compensation re-reads bounty: only releases the new claim if the bounty didn't commit the rebind (post-commit safety). 2. [medium] Serialized sweep cycles: in-flight guard prevents overlapping async cycles from contending with each other. 3. [medium] BountyIndexSweep stale-marker detection: known limitation — listBounties(status) filters stale entries before the sweep sees them. Full fix requires a raw index listing API (store-layer change). repairIndex handles cleanup when triggered by other paths. --- src/core/operations/bounty.ts | 9 +++++-- src/core/sweep-reconciler.ts | 44 +++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index e52ad885..a3734802 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -264,13 +264,18 @@ export async function claimBountyOperation( }); // If we rotated the claim ID, update the bounty record to point at the - // new claim. On failure, release the orphaned new claim. + // new claim. On failure, only release if the bounty didn't commit the rebind. if (renewalClaimId !== bounty.claimId) { try { await deps.bountyStore.claimBounty(input.bountyId, agent, renewed.claimId); } catch (rebindErr) { + // Re-read: if the bounty now points at the new claim, the write + // committed (post-commit error) — do NOT release the live claim. try { - await deps.claimStore.release(renewed.claimId); + const current = await deps.bountyStore.getBounty(input.bountyId); + if (!current || current.claimId !== renewed.claimId) { + await deps.claimStore.release(renewed.claimId); + } } catch { // Best-effort — claim will expire via lease timeout } diff --git a/src/core/sweep-reconciler.ts b/src/core/sweep-reconciler.ts index 8d97fa29..a5c3ff03 100644 --- a/src/core/sweep-reconciler.ts +++ b/src/core/sweep-reconciler.ts @@ -93,6 +93,7 @@ export class SweepReconciler { private readonly onError?: (error: unknown) => void; private timer: ReturnType | undefined; private running = false; + private cycleInFlight = false; constructor(config?: SweepReconcilerConfig) { this.intervalMs = config?.intervalMs ?? DEFAULT_INTERVAL_MS; @@ -121,26 +122,35 @@ export class SweepReconciler { } } - /** Run one sweep cycle immediately (useful for testing or on-demand). */ + /** + * Run one sweep cycle immediately (useful for testing or on-demand). + * Serialized: if a cycle is already in flight, returns empty results. + */ async runCycle(): Promise { - const results: SweepResult[] = []; - for (const strategy of this.strategies) { - try { - const result = await strategy.sweep(); - results.push(result); - } catch (err) { - // Strategy.sweep() should not throw, but guard against it - results.push({ - strategy: strategy.name, - found: 0, - repaired: 0, - errors: [err instanceof Error ? err : new Error(String(err))], - }); - this.onError?.(err); + if (this.cycleInFlight) return []; + this.cycleInFlight = true; + try { + const results: SweepResult[] = []; + for (const strategy of this.strategies) { + try { + const result = await strategy.sweep(); + results.push(result); + } catch (err) { + // Strategy.sweep() should not throw, but guard against it + results.push({ + strategy: strategy.name, + found: 0, + repaired: 0, + errors: [err instanceof Error ? err : new Error(String(err))], + }); + this.onError?.(err); + } } + this.onCycle?.(results); + return results; + } finally { + this.cycleInFlight = false; } - this.onCycle?.(results); - return results; } /** Whether the reconciler is currently running. */ From 9e886b363cdf63dc0d70a217c57ca696875907c2 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 19:35:57 -0700 Subject: [PATCH 13/14] fix(bounty): close remaining review-loop gaps with VFS primitives MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All three "persistent" findings from the review loop are now fixed using existing Nexus VFS operations — no Nexus changes needed. 1. repairIndex race: check exists() before each stale marker delete, re-read the authoritative document right before deleting to confirm the bounty hasn't transitioned TO that status concurrently. 2. BountyIndexSweep stale-marker detection: new listIndexStatuses() method on NexusBountyStore checks which status index entries actually exist using client.exists(). Sweep now detects both missing current entries AND stale old-status entries in a single pass. 3. Added listIndexStatuses to BountyStore interface (optional) and FailingBountyStore wrapper. --- src/core/bounty-index-sweep.ts | 57 ++++++++++---------- src/core/bounty-store.ts | 9 ++++ src/core/operations/failing-bounty-store.ts | 4 ++ src/nexus/nexus-bounty-store.ts | 59 +++++++++++++++------ 4 files changed, 85 insertions(+), 44 deletions(-) diff --git a/src/core/bounty-index-sweep.ts b/src/core/bounty-index-sweep.ts index 465b6f4d..1057cd55 100644 --- a/src/core/bounty-index-sweep.ts +++ b/src/core/bounty-index-sweep.ts @@ -1,13 +1,14 @@ /** * BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency. * - * Detection-based: for each bounty, checks whether it appears in its - * status-filtered query result. Only calls repairIndex() when an actual - * inconsistency is found (missing current-status entry or stale old-status - * marker). Does NOT rewrite healthy indexes, avoiding unnecessary VFS - * pressure under Nexus rate limits. + * Detection-based: for each bounty, compares the authoritative document status + * against actual index entries using listIndexStatuses(). Calls repairIndex() + * only when an inconsistency is found: + * 1. Missing current-status index entry (new index write failed) + * 2. Stale old-status index entry (old index delete failed) * - * For stores without a separate index (e.g., SQLite), this sweep is a no-op. + * For stores without listIndexStatuses (e.g., SQLite), falls back to checking + * whether the bounty appears in its status-filtered query result. */ import type { BountyStore } from "./bounty-store.js"; @@ -26,40 +27,40 @@ export class BountyIndexSweep implements SweepStrategy { let repaired = 0; const errors: Error[] = []; - // Skip entirely if the store has no index to repair if (!this.bountyStore.repairIndex) { return { strategy: this.name, found: 0, repaired: 0, errors: [] }; } try { - // Unfiltered list gives us the authoritative document state. const allBounties = await this.bountyStore.listBounties(); - // Build a set of bountyIds per status from status-filtered queries - // to detect both missing entries and stale entries. - const statusSets = new Map>(); for (const bounty of allBounties) { - if (!statusSets.has(bounty.status)) { - const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); - statusSets.set(bounty.status, new Set(byStatus.map((b) => b.bountyId))); - } - } + try { + let needsRepair = false; - for (const bounty of allBounties) { - const inCorrectIndex = statusSets.get(bounty.status)?.has(bounty.bountyId) ?? false; - if (!inCorrectIndex) { - // Missing from the correct status index — needs repair - found++; - try { + if (this.bountyStore.listIndexStatuses) { + // Precise detection: check raw index entries directly + const indexStatuses = await this.bountyStore.listIndexStatuses(bounty.bountyId); + const hasCorrect = indexStatuses.includes(bounty.status); + const hasStale = indexStatuses.some((s) => s !== bounty.status); + needsRepair = !hasCorrect || hasStale; + } else { + // Fallback: check if bounty appears in its status-filtered query + const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); + needsRepair = !byStatus.some((b) => b.bountyId === bounty.bountyId); + } + + if (needsRepair) { + found++; await this.bountyStore.repairIndex!(bounty.bountyId); repaired++; - } catch (err) { - errors.push( - err instanceof Error - ? err - : new Error(`Index repair failed for bounty ${bounty.bountyId}: ${String(err)}`), - ); } + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Index check failed for bounty ${bounty.bountyId}: ${String(err)}`), + ); } } } catch (err) { diff --git a/src/core/bounty-store.ts b/src/core/bounty-store.ts index c6210112..3200b4ca 100644 --- a/src/core/bounty-store.ts +++ b/src/core/bounty-store.ts @@ -175,6 +175,15 @@ export interface BountyStore { */ repairIndex?(bountyId: string): Promise; + /** + * List which status index entries exist for a bounty. + * Used by BountyIndexSweep to detect stale markers without relying on + * the filtered listBounties which hides them. + * + * Optional: only implemented by stores with a separate status index. + */ + listIndexStatuses?(bountyId: string): Promise; + // ----------------------------------------------------------------------- // Reward records // ----------------------------------------------------------------------- diff --git a/src/core/operations/failing-bounty-store.ts b/src/core/operations/failing-bounty-store.ts index e5a66005..9ef5cc6c 100644 --- a/src/core/operations/failing-bounty-store.ts +++ b/src/core/operations/failing-bounty-store.ts @@ -128,6 +128,10 @@ export class FailingBountyStore implements BountyStore { return this.delegate.repairIndex?.(bountyId); } + async listIndexStatuses(bountyId: string): Promise { + return this.delegate.listIndexStatuses?.(bountyId) ?? []; + } + close(): void { this.delegate.close(); } diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index 617d8a35..23c18065 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -230,25 +230,16 @@ export class NexusBountyStore implements BountyStore { } async repairIndex(bountyId: string): Promise { - // Read with ETag so we can detect if a concurrent transition changed - // the bounty between our read and the cleanup deletes. const result = await this.readBountyWithEtag(bountyId); if (!result) return; - const { bounty, etag } = result; + const { bounty } = result; // Ensure the correct status index entry exists await this.writeStatusIndex(bounty); - // Before deleting stale markers, re-verify the bounty hasn't changed. - // A concurrent transitionBounty could have moved the status between our - // initial read and now — deleting the new status's marker would be wrong. - const recheck = await this.readBountyWithEtag(bountyId); - if (!recheck || recheck.etag !== etag) { - // Bounty was modified concurrently — skip cleanup, next sweep will retry - return; - } - - // Safe to clean: the document hasn't changed since our read + // Check which stale index entries actually exist before deleting. + // Only delete entries that exist AND don't match the current status. + // Re-read the bounty before each delete to catch concurrent transitions. const allStatuses: BountyStatus[] = [ "draft", "open", @@ -259,18 +250,54 @@ export class NexusBountyStore implements BountyStore { "expired", "cancelled", ] as BountyStatus[]; + for (const status of allStatuses) { if (status === bounty.status) continue; + const markerPath = bountyStatusIndexPath(this.zoneId, status, bountyId); + const markerExists = await withSemaphore(this.semaphore, () => + this.client.exists(markerPath), + ); + if (!markerExists) continue; + + // Re-read the authoritative document right before deleting to ensure + // a concurrent transition hasn't moved the bounty TO this status. + const fresh = await this.getBounty(bountyId); + if (!fresh || fresh.status === status) continue; + await safeCleanup( - withSemaphore(this.semaphore, () => - this.client.delete(bountyStatusIndexPath(this.zoneId, status, bountyId)), - ), + withSemaphore(this.semaphore, () => this.client.delete(markerPath)), "repairIndex: delete stale status index", { silent: true }, ); } } + /** + * Check which status index entries exist for a bounty. + * Returns the set of statuses that have an index marker. + * Used by BountyIndexSweep to detect stale entries. + */ + async listIndexStatuses(bountyId: string): Promise { + const allStatuses: BountyStatus[] = [ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ] as BountyStatus[]; + const found: string[] = []; + for (const status of allStatuses) { + const exists = await withSemaphore(this.semaphore, () => + this.client.exists(bountyStatusIndexPath(this.zoneId, status, bountyId)), + ); + if (exists) found.push(status); + } + return found; + } + async recordReward(_reward: RewardRecord): Promise { // Rewards are not yet stored in Nexus VFS (future work) } From 127031d1587f6ab78670012597442201bbd04e73 Mon Sep 17 00:00:00 2001 From: Tao Feng Date: Tue, 14 Apr 2026 19:40:37 -0700 Subject: [PATCH 14/14] fix(bounty): exactOptionalPropertyTypes compat for SweepReconciler fields --- src/core/sweep-reconciler.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/sweep-reconciler.ts b/src/core/sweep-reconciler.ts index a5c3ff03..a653d211 100644 --- a/src/core/sweep-reconciler.ts +++ b/src/core/sweep-reconciler.ts @@ -89,8 +89,8 @@ const DEFAULT_INTERVAL_MS = 60_000; export class SweepReconciler { private readonly strategies: SweepStrategy[] = []; private readonly intervalMs: number; - private readonly onCycle?: (results: readonly SweepResult[]) => void; - private readonly onError?: (error: unknown) => void; + private readonly onCycle: ((results: readonly SweepResult[]) => void) | undefined; + private readonly onError: ((error: unknown) => void) | undefined; private timer: ReturnType | undefined; private running = false; private cycleInFlight = false;