diff --git a/src/core/bounty-index-sweep.ts b/src/core/bounty-index-sweep.ts new file mode 100644 index 00000000..1057cd55 --- /dev/null +++ b/src/core/bounty-index-sweep.ts @@ -0,0 +1,74 @@ +/** + * BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency. + * + * Detection-based: for each bounty, compares the authoritative document status + * against actual index entries using listIndexStatuses(). Calls repairIndex() + * only when an inconsistency is found: + * 1. Missing current-status index entry (new index write failed) + * 2. Stale old-status index entry (old index delete failed) + * + * For stores without listIndexStatuses (e.g., SQLite), falls back to checking + * whether the bounty appears in its status-filtered query result. + */ + +import type { BountyStore } from "./bounty-store.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +export class BountyIndexSweep implements SweepStrategy { + readonly name = "BountyIndexSweep"; + private readonly bountyStore: BountyStore; + + constructor(bountyStore: BountyStore) { + this.bountyStore = bountyStore; + } + + async sweep(): Promise { + let found = 0; + let repaired = 0; + const errors: Error[] = []; + + if (!this.bountyStore.repairIndex) { + return { strategy: this.name, found: 0, repaired: 0, errors: [] }; + } + + try { + const allBounties = await this.bountyStore.listBounties(); + + for (const bounty of allBounties) { + try { + let needsRepair = false; + + if (this.bountyStore.listIndexStatuses) { + // Precise detection: check raw index entries directly + const indexStatuses = await this.bountyStore.listIndexStatuses(bounty.bountyId); + const hasCorrect = indexStatuses.includes(bounty.status); + const hasStale = indexStatuses.some((s) => s !== bounty.status); + needsRepair = !hasCorrect || hasStale; + } else { + // Fallback: check if bounty appears in its status-filtered query + const byStatus = await this.bountyStore.listBounties({ status: bounty.status }); + needsRepair = !byStatus.some((b) => b.bountyId === bounty.bountyId); + } + + if (needsRepair) { + found++; + await this.bountyStore.repairIndex!(bounty.bountyId); + repaired++; + } + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Index check failed for bounty ${bounty.bountyId}: ${String(err)}`), + ); + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`BountyIndexSweep failed: ${String(err)}`), + ); + } + + return { strategy: this.name, found, repaired, errors }; + } +} diff --git a/src/core/bounty-logic.test.ts b/src/core/bounty-logic.test.ts index 110ebaf5..4d2a87e7 100644 --- a/src/core/bounty-logic.test.ts +++ b/src/core/bounty-logic.test.ts @@ -34,9 +34,31 @@ describe("validateBountyTransition", () => { ).not.toThrow(); }); - test("allows claimed → completed", () => { + test("allows claimed → pending_settlement", () => { + expect(() => + validateBountyTransition( + "b-1", + BountyStatus.Claimed, + BountyStatus.PendingSettlement, + "beginSettlement", + ), + ).not.toThrow(); + }); + + test("rejects claimed → completed (must go through pending_settlement)", () => { expect(() => validateBountyTransition("b-1", BountyStatus.Claimed, BountyStatus.Completed, "complete"), + ).toThrow(); + }); + + test("allows pending_settlement → completed", () => { + expect(() => + validateBountyTransition( + "b-1", + BountyStatus.PendingSettlement, + BountyStatus.Completed, + "complete", + ), ).not.toThrow(); }); diff --git a/src/core/bounty-logic.ts b/src/core/bounty-logic.ts index 24898227..78e61d72 100644 --- a/src/core/bounty-logic.ts +++ b/src/core/bounty-logic.ts @@ -18,7 +18,8 @@ import type { Contribution, Score } from "./models.js"; const VALID_TRANSITIONS: Readonly> = { draft: ["open", "cancelled"], open: ["claimed", "expired", "cancelled"], - claimed: ["completed", "open", "expired", "cancelled"], + claimed: ["claimed", "pending_settlement", "open", "expired", "cancelled"], + pending_settlement: ["completed", "claimed", "expired", "cancelled"], completed: ["settled", "expired", "cancelled"], settled: [], expired: [], diff --git a/src/core/bounty-store.conformance.ts b/src/core/bounty-store.conformance.ts index 43edca10..f0242896 100644 --- a/src/core/bounty-store.conformance.ts +++ b/src/core/bounty-store.conformance.ts @@ -217,16 +217,27 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { await expect(store.claimBounty("claim-2", makeAgent(), "claim-id-2")).rejects.toThrow(); }); - test("completeBounty transitions claimed to completed", async () => { - const bounty = makeBounty({ bountyId: "complete-1", status: BountyStatus.Claimed }); + test("beginSettlement transitions claimed to pending_settlement", async () => { + const bounty = makeBounty({ bountyId: "begin-settle-1", status: BountyStatus.Claimed }); + await store.createBounty(bounty); + const pending = await store.beginSettlement("begin-settle-1", "cid-fulfilled"); + expect(pending.status).toBe(BountyStatus.PendingSettlement); + expect(pending.fulfilledByCid).toBe("cid-fulfilled"); + }); + + test("completeBounty transitions pending_settlement to completed", async () => { + const bounty = makeBounty({ + bountyId: "complete-1", + status: BountyStatus.PendingSettlement, + }); await store.createBounty(bounty); const completed = await store.completeBounty("complete-1", "cid-fulfilled"); expect(completed.status).toBe(BountyStatus.Completed); expect(completed.fulfilledByCid).toBe("cid-fulfilled"); }); - test("completeBounty throws for non-claimed bounty", async () => { - const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Open }); + test("completeBounty throws for claimed bounty (must use beginSettlement first)", async () => { + const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Claimed }); await store.createBounty(bounty); await expect(store.completeBounty("complete-2", "cid")).rejects.toThrow(); }); @@ -274,7 +285,7 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { // Full lifecycle // ------------------------------------------------------------------ - test("full lifecycle: draft → open → claimed → completed → settled", async () => { + test("full lifecycle: draft → open → claimed → pending_settlement → completed → settled", async () => { const bounty = makeBounty({ bountyId: "lifecycle-1", status: BountyStatus.Draft }); await store.createBounty(bounty); @@ -288,6 +299,9 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void { ); expect(claimed.status).toBe(BountyStatus.Claimed); + const pending = await store.beginSettlement("lifecycle-1", "cid-result"); + expect(pending.status).toBe(BountyStatus.PendingSettlement); + const completed = await store.completeBounty("lifecycle-1", "cid-result"); expect(completed.status).toBe(BountyStatus.Completed); diff --git a/src/core/bounty-store.ts b/src/core/bounty-store.ts index 27a6c784..3200b4ca 100644 --- a/src/core/bounty-store.ts +++ b/src/core/bounty-store.ts @@ -101,13 +101,27 @@ export interface BountyStore { ): Promise; /** - * Mark a bounty as completed (work done, pending settlement). + * Transition a bounty to 'pending_settlement' — the saga pivot point. * - * @param bountyId - The bounty to complete. + * After this transition, the bounty is committed to settlement. The caller + * must capture credits and then advance to completed → settled. If the + * caller crashes, the reconciler will resume from pending_settlement. + * + * @param bountyId - The bounty entering settlement. * @param fulfilledByCid - CID of the contribution that fulfilled the bounty. * @returns The updated bounty snapshot. * @throws BountyStateError if bounty is not in 'claimed' status. */ + beginSettlement(bountyId: string, fulfilledByCid: string): Promise; + + /** + * Mark a bounty as completed (work done, credits captured). + * + * @param bountyId - The bounty to complete. + * @param fulfilledByCid - CID of the contribution that fulfilled the bounty. + * @returns The updated bounty snapshot. + * @throws BountyStateError if bounty is not in 'pending_settlement' status. + */ completeBounty(bountyId: string, fulfilledByCid: string): Promise; /** @@ -138,7 +152,7 @@ export interface BountyStore { cancelBounty(bountyId: string): Promise; // ----------------------------------------------------------------------- - // Expiry sweep + // Expiry sweep + index repair // ----------------------------------------------------------------------- /** @@ -149,6 +163,27 @@ export interface BountyStore { */ findExpiredBounties(): Promise; + /** + * Repair the status index for a bounty so it matches the authoritative + * document state. Idempotent — calling on an already-correct entry is a no-op. + * + * Used by BountyIndexSweep to fix dual-write gaps where the document was + * written but the status index was not. For stores without a separate + * index (e.g., SQLite), this is a no-op. + * + * Optional: stores that don't implement this will not be swept. + */ + repairIndex?(bountyId: string): Promise; + + /** + * List which status index entries exist for a bounty. + * Used by BountyIndexSweep to detect stale markers without relying on + * the filtered listBounties which hides them. + * + * Optional: only implemented by stores with a separate status index. + */ + listIndexStatuses?(bountyId: string): Promise; + // ----------------------------------------------------------------------- // Reward records // ----------------------------------------------------------------------- diff --git a/src/core/bounty.ts b/src/core/bounty.ts index 01fde9b3..8e07895d 100644 --- a/src/core/bounty.ts +++ b/src/core/bounty.ts @@ -21,6 +21,8 @@ export const BountyStatus = { Open: "open", /** Claimed by an agent (work in progress). */ Claimed: "claimed", + /** Capture initiated, awaiting state advancement. Saga pivot point. */ + PendingSettlement: "pending_settlement", /** Work completed, pending settlement. */ Completed: "completed", /** Credits distributed to fulfiller. */ @@ -76,7 +78,7 @@ export interface BountyCriteria { * A bounty — a reward offer for completing specific work. * * Bounties are mutable coordination objects with a lifecycle - * (draft → open → claimed → completed → settled). + * (draft → open → claimed → pending_settlement → completed → settled). * Like Claims, bounty objects returned by the store are readonly * snapshots; state transitions produce new snapshots. */ diff --git a/src/core/handoff-sweep.ts b/src/core/handoff-sweep.ts new file mode 100644 index 00000000..c630fda0 --- /dev/null +++ b/src/core/handoff-sweep.ts @@ -0,0 +1,59 @@ +/** + * HandoffSweep — reconciler strategy for detecting orphaned contributions. + * + * Finds contributions that have zero handoff records where the contribution + * kind normally generates handoffs (work, review, reproduction, adoption). + * Reports them for operator attention. + * + * Detection-only: repair requires re-running the routing logic from the + * topology/contract, which this sweep doesn't have access to. Full + * auto-repair is a follow-up (tracked in Issue #240). + * + * Absorbs the handoff reconciler scope from Issue #227 → #240. + */ + +import type { HandoffStore } from "./handoff.js"; +import type { ContributionStore } from "./store.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +/** Contribution kinds that normally generate handoff records. */ +const HANDOFF_ELIGIBLE_KINDS = new Set(["work", "review", "reproduction", "adoption"]); + +export class HandoffSweep implements SweepStrategy { + readonly name = "HandoffSweep"; + private readonly contributionStore: ContributionStore; + private readonly handoffStore: HandoffStore; + + constructor(contributionStore: ContributionStore, handoffStore: HandoffStore) { + this.contributionStore = contributionStore; + this.handoffStore = handoffStore; + } + + async sweep(): Promise { + let found = 0; + const errors: Error[] = []; + + try { + // Scan recent contributions that should have handoffs + const contributions = await this.contributionStore.list({ limit: 500 }); + const eligible = contributions.filter((c) => HANDOFF_ELIGIBLE_KINDS.has(c.kind)); + + // Check each eligible contribution for any handoff records + const handoffs = await this.handoffStore.list(); + const cidsWithHandoffs = new Set(handoffs.map((h) => h.sourceCid)); + + for (const contribution of eligible) { + if (!cidsWithHandoffs.has(contribution.cid)) { + found++; + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`HandoffSweep scan failed: ${String(err)}`), + ); + } + + // Detection-only: repaired is always 0. Operator acts on found > 0. + return { strategy: this.name, found, repaired: 0, errors }; + } +} diff --git a/src/core/in-memory-credits.ts b/src/core/in-memory-credits.ts index 4fae10c1..6e14922a 100644 --- a/src/core/in-memory-credits.ts +++ b/src/core/in-memory-credits.ts @@ -292,15 +292,23 @@ export class InMemoryCreditsService implements CreditsService { private getReserved(agentId: string): number { const now = Date.now(); let reserved = 0; - for (const res of this.reservations.values()) { - if (res.agentId === agentId && !res.captured && !res.voided) { - // Skip expired reservations — they no longer hold funds - if (new Date(res.expiresAt).getTime() <= now) { - continue; - } + const evictable: string[] = []; + for (const [id, res] of this.reservations.entries()) { + const expired = new Date(res.expiresAt).getTime() <= now; + // Evict only expired reservations that were never captured or voided. + // Captured/voided entries must stay for idempotency checks in + // capture() and void(). + if (expired && !res.captured && !res.voided) { + evictable.push(id); + continue; + } + if (res.agentId === agentId && !res.captured && !res.voided && !expired) { reserved += res.amount; } } + for (const id of evictable) { + this.reservations.delete(id); + } return reserved; } } diff --git a/src/core/operations/bounty.test.ts b/src/core/operations/bounty.test.ts index 003ab9bc..16c1df96 100644 --- a/src/core/operations/bounty.test.ts +++ b/src/core/operations/bounty.test.ts @@ -339,6 +339,13 @@ describe("settleBountyOperation", () => { expect(bounty.ok).toBe(true); if (!bounty.ok) return; + // Must claim before settle (pre-flight check requires "claimed" status) + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + expect(claim.ok).toBe(true); + const result = await settleBountyOperation( { bountyId: bounty.value.bountyId, @@ -369,6 +376,12 @@ describe("settleBountyOperation", () => { expect(bounty.ok).toBe(true); if (!bounty.ok) return; + // Must claim before settle (pre-flight check requires "claimed" status) + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + // Create contribution WITHOUT the required tag const contrib = await contributeOperation( { @@ -397,3 +410,685 @@ describe("settleBountyOperation", () => { } }); }); + +// --------------------------------------------------------------------------- +// Input validation edge cases (Issue 7A + 11A) +// --------------------------------------------------------------------------- + +describe("createBountyOperation input validation", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects zero amount", async () => { + const result = await createBountyOperation( + { title: "Zero", amount: 0, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("positive"); + } + }); + + test("rejects negative amount", async () => { + const result = await createBountyOperation( + { title: "Neg", amount: -50, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("positive"); + } + }); + + test("rejects empty title", async () => { + const result = await createBountyOperation( + { title: "", amount: 100, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("title"); + } + }); + + test("rejects whitespace-only title", async () => { + const result = await createBountyOperation( + { title: " ", amount: 100, criteria: { description: "x" }, agent: { agentId: "a" } }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("title"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Pre-flight status validation (Issue 6A) +// --------------------------------------------------------------------------- + +describe("claimBountyOperation status checks", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects claim on already-claimed bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Double claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim succeeds + const firstClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + expect(firstClaim.ok).toBe(true); + + // Second claim should fail with pre-flight check + const secondClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(secondClaim.ok).toBe(false); + if (!secondClaim.ok) { + expect(secondClaim.error.code).toBe("VALIDATION_ERROR"); + expect(secondClaim.error.message).toContain("not open"); + } + }); + + test("same agent can renew claim on already-claimed bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Renewable claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim + const firstClaim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + expect(firstClaim.ok).toBe(true); + if (!firstClaim.ok) return; + + // Same agent renews — should succeed + const renewal = await claimBountyOperation( + { + bountyId: bounty.value.bountyId, + agent: { agentId: "worker" }, + leaseDurationMs: 3_600_000, + }, + deps, + ); + expect(renewal.ok).toBe(true); + if (!renewal.ok) return; + expect(renewal.value.status).toBe("claimed"); + expect(renewal.value.claimedBy).toBe("worker"); + expect(renewal.value.claimId).toBe(firstClaim.value.claimId); + }); + + test("different agent cannot renew another agent's claim", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "No steal", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + + // Different agent tries to claim — rejected + const steal = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(steal.ok).toBe(false); + if (!steal.ok) { + expect(steal.error.code).toBe("VALIDATION_ERROR"); + } + }); +}); + +describe("settleBountyOperation status checks", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("rejects settle on unclaimed (open) bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Not claimed", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + const contrib = await contributeOperation( + { kind: "work", summary: "Some work", agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("cannot be settled"); + } + }); + + test("rejects settle on already-settled bounty", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Create → claim → contribute → settle (success) + const bounty = await createBountyOperation( + { + title: "Already settled", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const firstSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(firstSettle.ok).toBe(true); + + // Second settle should fail with pre-flight check + const secondSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(secondSettle.ok).toBe(false); + if (!secondSettle.ok) { + expect(secondSettle.error.code).toBe("VALIDATION_ERROR"); + expect(secondSettle.error.message).toContain("cannot be settled"); + } + }); + + test("rejects settle when creditsService missing but bounty has reservationId", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Escrow test", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Done", agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Remove credits service to simulate missing provider + const depsNoCredits: OperationDeps = { ...deps, creditsService: undefined }; + + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + depsNoCredits, + ); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.code).toBe("VALIDATION_ERROR"); + expect(result.error.message).toContain("creditsService"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Non-escrowed bounty lifecycle (Issue 12A) +// --------------------------------------------------------------------------- + +describe("non-escrowed bounty lifecycle", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("settles a bounty end-to-end without credits service", async () => { + const depsNoCredits: OperationDeps = { ...deps, creditsService: undefined }; + + // 1. Create bounty without escrow + const bounty = await createBountyOperation( + { + title: "Reputation only", + amount: 100, + criteria: { description: "Any work", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + depsNoCredits, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + expect(bounty.value.reservationId).toBeUndefined(); + + // 2. Claim + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + depsNoCredits, + ); + expect(claim.ok).toBe(true); + + // 3. Contribute + const contrib = await contributeOperation( + { + kind: "work", + summary: "Reputation fix", + tags: ["fix"], + agent: { agentId: "worker" }, + }, + depsNoCredits, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // 4. Settle (no capture since no reservation) + const result = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + depsNoCredits, + ); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.status).toBe("settled"); + expect(result.value.amount).toBe(100); + expect(result.value.paidTo).toBe("worker"); + }); +}); + +// --------------------------------------------------------------------------- +// Sequential conflict tests (Issue 10B) +// --------------------------------------------------------------------------- + +describe("sequential conflict scenarios", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("second claim on same bounty fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Conflict claim", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + // First claim succeeds + const first = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-a" } }, + deps, + ); + expect(first.ok).toBe(true); + + // Second claim fails — pre-flight rejects non-open bounty + const second = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "agent-b" } }, + deps, + ); + expect(second.ok).toBe(false); + if (!second.ok) { + expect(second.error.code).toBe("VALIDATION_ERROR"); + } + }); + + test("settle after settle fails (idempotent capture but state rejects)", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Full lifecycle to settled + const bounty = await createBountyOperation( + { + title: "Double settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + const firstSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(firstSettle.ok).toBe(true); + + // Second settle blocked by pre-flight (status is "settled") + const secondSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(secondSettle.ok).toBe(false); + if (!secondSettle.ok) { + expect(secondSettle.error.code).toBe("VALIDATION_ERROR"); + } + }); + + test("claim after settle fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Full lifecycle to settled + const bounty = await createBountyOperation( + { + title: "Claim after settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + + // Claim on settled bounty — pre-flight rejects + const claim = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "latecomer" } }, + deps, + ); + expect(claim.ok).toBe(false); + if (!claim.ok) { + expect(claim.error.code).toBe("VALIDATION_ERROR"); + expect(claim.error.message).toContain("not open"); + } + }); +}); + +// --------------------------------------------------------------------------- +// Acceptance criteria tests (Issue #240) — FailingBountyStore partial failures +// --------------------------------------------------------------------------- + +import { FailingBountyStore } from "./failing-bounty-store.js"; + +describe("partial failure acceptance criteria (#240)", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("AC1: capture throws after state transitions — bounty retryable via pending_settlement", async () => { + // Simulate: settle starts, pivot to pending_settlement succeeds, + // but capture() fails. The bounty should be in pending_settlement + // and retryable. + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Capture fail test", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Configure credits service to fail on capture + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated capture failure"), + }); + + // First settle attempt — fails during capture + const failedSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(failedSettle.ok).toBe(false); + + // Bounty should be in pending_settlement (pivot committed) + const stuck = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stuck?.status).toBe("pending_settlement"); + + // Clear the failure — capture will now succeed (idempotent) + (deps.creditsService as InMemoryCreditsService).setFailures({}); + + // Retry settle — should resume from pending_settlement + const retrySettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(retrySettle.ok).toBe(true); + if (!retrySettle.ok) return; + expect(retrySettle.value.status).toBe("settled"); + expect(retrySettle.value.paidTo).toBe("worker"); + }); + + test("AC2: createBounty throws post-commit — reservation must NOT be voided", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const failingStore = new FailingBountyStore(deps.bountyStore!); + failingStore.failOnNext("createBounty"); + const failDeps: OperationDeps = { ...deps, bountyStore: failingStore }; + + // createBountyOperation fails — but the bounty IS committed in the delegate + const result = await createBountyOperation( + { + title: "Post-commit fail", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + failDeps, + ); + + // Operation reports failure + expect(result.ok).toBe(false); + + // The bounty exists in the store (post-commit failure) + const bounties = await deps.bountyStore!.listBounties(); + expect(bounties.length).toBe(1); + expect(bounties[0]?.status).toBe("open"); + + // The reservation was NOT voided — it's still active. + // The operation does NOT compensate on failure (by design — no try/catch + // compensation, which was the bug in the WIP that #240 identified). + // The reservation will auto-expire via its timeout. + const balanceInfo = await (deps.creditsService as InMemoryCreditsService).balance("creator"); + // 1000 - 100 reserved = 900 available (reservation still active) + expect(balanceInfo.available).toBe(900); + expect(balanceInfo.reserved).toBe(100); + }); + + test("AC3: claimBounty throws post-commit — claim must NOT be released", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Claim fail test", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + const failingStore = new FailingBountyStore(deps.bountyStore!); + failingStore.failOnNext("claimBounty"); + const failDeps: OperationDeps = { ...deps, bountyStore: failingStore }; + + // claimBountyOperation fails — but the bounty IS transitioned in the delegate + const result = await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + failDeps, + ); + + // Operation reports failure + expect(result.ok).toBe(false); + + // The bounty is in "claimed" state in the store (post-commit) + const stored = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stored?.status).toBe("claimed"); + expect(stored?.claimedBy?.agentId).toBe("worker"); + + // The claim was NOT released — it's still active in the claim store. + // The operation does NOT compensate on failure (by design). + // The claim will expire via its lease timeout (30min default). + const claims = await deps.claimStore!.activeClaims(); + const bountyClaimExists = claims.some((c) => c.targetRef === `bounty:${bounty.value.bountyId}`); + expect(bountyClaimExists).toBe(true); + }); +}); diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index ba2b5e44..a3734802 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -101,6 +101,14 @@ export interface SettleBountyInput { readonly contributionCid: string; } +// --------------------------------------------------------------------------- +// Error messages +// --------------------------------------------------------------------------- + +const MISSING_BOUNTY_STORE = "Bounty operations not available (missing bountyStore)"; +const MISSING_CLAIM_STORE = "Claim operations not available (missing claimStore)"; +const MISSING_CONTRIBUTION_STORE = "Settle bounty not available (missing contributionStore)"; + // --------------------------------------------------------------------------- // Operations // --------------------------------------------------------------------------- @@ -114,7 +122,14 @@ export async function createBountyOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); + return validationErr(MISSING_BOUNTY_STORE); + } + + if (!input.title || input.title.trim().length === 0) { + return validationErr("Bounty title must be a non-empty string"); + } + if (input.amount <= 0) { + return validationErr("Bounty amount must be positive"); } const agent = resolveAgent(input.agent); @@ -173,7 +188,7 @@ export async function listBountiesOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); + return validationErr(MISSING_BOUNTY_STORE); } const bounties = await deps.bountyStore.listBounties({ @@ -204,11 +219,11 @@ export async function claimBountyOperation( ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); + return validationErr(MISSING_BOUNTY_STORE); } if (deps.claimStore === undefined) { - return validationErr("Claim operations not available (missing claimStore)"); + return validationErr(MISSING_CLAIM_STORE); } const bounty = await deps.bountyStore.getBounty(input.bountyId); @@ -218,9 +233,74 @@ export async function claimBountyOperation( const agent = resolveAgent(input.agent); const now = new Date(); - const claimId = crypto.randomUUID(); const leaseDurationMs = input.leaseDurationMs ?? 1_800_000; + // Renewal path: same agent can extend the lease on an already-claimed bounty. + // This prevents long-running bounties from getting stranded when the claim + // lease expires while the worker is still active. + if ( + bounty.status === BS.Claimed && + bounty.claimedBy?.agentId === agent.agentId && + bounty.claimId + ) { + // Check if the existing claim is still active AND the lease hasn't expired. + // Both conditions must hold — a claim with status "active" but expired + // leaseExpiresAt will collide in the claim store instead of renewing. + const existingClaim = await deps.claimStore.getClaim(bounty.claimId); + const leaseValid = + existingClaim?.status === "active" && + new Date(existingClaim.leaseExpiresAt).getTime() > now.getTime(); + + const renewalClaimId = leaseValid ? bounty.claimId : crypto.randomUUID(); + const renewed = await deps.claimStore.claimOrRenew({ + claimId: renewalClaimId, + targetRef: `bounty:${input.bountyId}`, + agent, + status: "active", + intentSummary: `Renewing claim on bounty: ${bounty.title}`, + createdAt: now.toISOString(), + heartbeatAt: now.toISOString(), + leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), + }); + + // If we rotated the claim ID, update the bounty record to point at the + // new claim. On failure, only release if the bounty didn't commit the rebind. + if (renewalClaimId !== bounty.claimId) { + try { + await deps.bountyStore.claimBounty(input.bountyId, agent, renewed.claimId); + } catch (rebindErr) { + // Re-read: if the bounty now points at the new claim, the write + // committed (post-commit error) — do NOT release the live claim. + try { + const current = await deps.bountyStore.getBounty(input.bountyId); + if (!current || current.claimId !== renewed.claimId) { + await deps.claimStore.release(renewed.claimId); + } + } catch { + // Best-effort — claim will expire via lease timeout + } + throw rebindErr; + } + } + + return ok({ + bountyId: bounty.bountyId, + title: bounty.title, + status: bounty.status, + claimId: renewed.claimId, + claimedBy: bounty.claimedBy.agentId, + }); + } + + // New claim: bounty must be open + if (bounty.status !== BS.Open) { + return validationErr( + `Bounty '${input.bountyId}' is not open for claims (current status: ${bounty.status})`, + ); + } + + const claimId = crypto.randomUUID(); + // Create claim via existing claim system const claim = await deps.claimStore.claimOrRenew({ claimId, @@ -233,7 +313,24 @@ export async function claimBountyOperation( leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), }); - const claimed = await deps.bountyStore.claimBounty(input.bountyId, agent, claim.claimId); + let claimed: Bounty; + try { + claimed = await deps.bountyStore.claimBounty(input.bountyId, agent, claim.claimId); + } catch (bountyErr) { + // If the bounty transition failed (pre-commit or CAS conflict), the + // claim lease is orphaned. Re-read the bounty: if it's still open, + // the transition didn't commit and we can safely release the claim. + // If it's already claimed (post-commit failure), keep the claim. + try { + const current = await deps.bountyStore.getBounty(input.bountyId); + if (current && current.status === BS.Open) { + await deps.claimStore.release(claim.claimId); + } + } catch { + // Best-effort release — claim will expire via lease timeout + } + throw bountyErr; + } return ok({ bountyId: claimed.bountyId, @@ -247,18 +344,24 @@ export async function claimBountyOperation( } } -/** Settle a completed bounty. */ +/** + * Settle a bounty using the saga pattern: + * claimed → pending_settlement (pivot) → capture → completed → settled + * + * Retryable: if the operation is called again on a pending_settlement bounty, + * it resumes from the capture step (capture is idempotent). + */ export async function settleBountyOperation( input: SettleBountyInput, deps: OperationDeps, ): Promise> { try { if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); + return validationErr(MISSING_BOUNTY_STORE); } if (deps.contributionStore === undefined) { - return validationErr("Settle bounty not available (missing contributionStore)"); + return validationErr(MISSING_CONTRIBUTION_STORE); } const bounty = await deps.bountyStore.getBounty(input.bountyId); @@ -266,13 +369,37 @@ export async function settleBountyOperation( return notFound("Bounty", input.bountyId); } + // Allow "claimed" (fresh), "pending_settlement" (post-pivot), "completed" (post-capture) + const resumable = + bounty.status === BS.Claimed || + bounty.status === BS.PendingSettlement || + bounty.status === BS.Completed; + if (!resumable) { + return validationErr( + `Bounty '${input.bountyId}' cannot be settled (current status: ${bounty.status})`, + ); + } + + // On resume from pending_settlement or completed, the fulfillment CID + // is frozen — reject attempts to change it. + let fulfilledByCid = input.contributionCid; + if (bounty.status !== BS.Claimed) { + if (bounty.fulfilledByCid && input.contributionCid !== bounty.fulfilledByCid) { + return validationErr( + `Bounty '${input.bountyId}' is already pending settlement with contribution ` + + `'${bounty.fulfilledByCid}' — cannot change to '${input.contributionCid}'`, + ); + } + fulfilledByCid = bounty.fulfilledByCid ?? input.contributionCid; + } + // Validate contribution exists and meets criteria - const contribution = await deps.contributionStore.get(input.contributionCid); + const contribution = await deps.contributionStore.get(fulfilledByCid); if (!contribution) { - return notFound("Contribution", input.contributionCid); + return notFound("Contribution", fulfilledByCid); } if (!evaluateBountyCriteria(bounty.criteria, contribution)) { - return validationErr(`Contribution '${input.contributionCid}' does not meet bounty criteria`); + return validationErr(`Contribution '${fulfilledByCid}' does not meet bounty criteria`); } // Require credits service when escrow is active @@ -282,7 +409,12 @@ export async function settleBountyOperation( ); } - // Capture payment before state transition + // Step 1: Pivot — transition to pending_settlement (skip if resuming) + if (bounty.status === BS.Claimed) { + await deps.bountyStore.beginSettlement(input.bountyId, fulfilledByCid); + } + + // Step 2: Capture payment (idempotent — safe to retry) if (deps.creditsService && bounty.reservationId && bounty.claimedBy) { await deps.creditsService.capture(bounty.reservationId, { toAgentId: bounty.claimedBy.agentId, @@ -291,9 +423,11 @@ export async function settleBountyOperation( await deps.creditsService.capture(bounty.reservationId); } - // Persist state transitions - const completed = await deps.bountyStore.completeBounty(input.bountyId, input.contributionCid); - const settled = await deps.bountyStore.settleBounty(completed.bountyId); + // Step 3: Advance through completed → settled (skip steps already done) + if (bounty.status !== BS.Completed) { + await deps.bountyStore.completeBounty(input.bountyId, fulfilledByCid); + } + const settled = await deps.bountyStore.settleBounty(input.bountyId); return ok({ bountyId: settled.bountyId, diff --git a/src/core/operations/failing-bounty-store.ts b/src/core/operations/failing-bounty-store.ts new file mode 100644 index 00000000..9ef5cc6c --- /dev/null +++ b/src/core/operations/failing-bounty-store.ts @@ -0,0 +1,138 @@ +/** + * FailingBountyStore — test wrapper that injects failures between internal + * store steps to simulate dual-write partial failures. + * + * Wraps a real BountyStore and can be configured to fail after specific + * operations, simulating the gap between document write and index write + * in NexusBountyStore. + * + * Usage: + * const real = new SqliteBountyStore(db); + * const failing = new FailingBountyStore(real); + * failing.failOnNext("createBounty"); // next createBounty throws after real store commits + * await createBountyOperation(input, { bountyStore: failing, ... }); + */ + +import type { Bounty, RewardRecord } from "../bounty.js"; +import type { BountyQuery, BountyStore, RewardQuery } from "../bounty-store.js"; +import type { AgentIdentity } from "../models.js"; + +export class FailingBountyStore implements BountyStore { + readonly storeIdentity: string | undefined; + private readonly delegate: BountyStore; + private failureTarget: string | undefined; + private failureError: Error; + + constructor(delegate: BountyStore) { + this.delegate = delegate; + this.storeIdentity = delegate.storeIdentity; + this.failureError = new Error("Simulated post-commit failure"); + } + + /** + * Configure the next call to the named method to succeed internally + * (the delegate commits the change) but throw to the caller, simulating + * a failure AFTER the document write but BEFORE the caller sees success. + */ + failOnNext(method: string, error?: Error): void { + this.failureTarget = method; + if (error) this.failureError = error; + } + + /** Clear any pending failure configuration. */ + clearFailure(): void { + this.failureTarget = undefined; + } + + private async maybeFailAfter(method: string, result: T): Promise { + if (this.failureTarget === method) { + this.failureTarget = undefined; // one-shot + throw this.failureError; + } + return result; + } + + // ----------------------------------------------------------------------- + // Delegated methods with failure injection + // ----------------------------------------------------------------------- + + async createBounty(bounty: Bounty): Promise { + const result = await this.delegate.createBounty(bounty); + return this.maybeFailAfter("createBounty", result); + } + + async getBounty(bountyId: string): Promise { + return this.delegate.getBounty(bountyId); + } + + async listBounties(query?: BountyQuery): Promise { + return this.delegate.listBounties(query); + } + + async countBounties(query?: BountyQuery): Promise { + return this.delegate.countBounties(query); + } + + async fundBounty(bountyId: string, reservationId: string): Promise { + const result = await this.delegate.fundBounty(bountyId, reservationId); + return this.maybeFailAfter("fundBounty", result); + } + + async claimBounty(bountyId: string, claimedBy: AgentIdentity, claimId: string): Promise { + const result = await this.delegate.claimBounty(bountyId, claimedBy, claimId); + return this.maybeFailAfter("claimBounty", result); + } + + async beginSettlement(bountyId: string, fulfilledByCid: string): Promise { + const result = await this.delegate.beginSettlement(bountyId, fulfilledByCid); + return this.maybeFailAfter("beginSettlement", result); + } + + async completeBounty(bountyId: string, fulfilledByCid: string): Promise { + const result = await this.delegate.completeBounty(bountyId, fulfilledByCid); + return this.maybeFailAfter("completeBounty", result); + } + + async settleBounty(bountyId: string): Promise { + const result = await this.delegate.settleBounty(bountyId); + return this.maybeFailAfter("settleBounty", result); + } + + async expireBounty(bountyId: string): Promise { + const result = await this.delegate.expireBounty(bountyId); + return this.maybeFailAfter("expireBounty", result); + } + + async cancelBounty(bountyId: string): Promise { + const result = await this.delegate.cancelBounty(bountyId); + return this.maybeFailAfter("cancelBounty", result); + } + + async findExpiredBounties(): Promise { + return this.delegate.findExpiredBounties(); + } + + async recordReward(reward: RewardRecord): Promise { + return this.delegate.recordReward(reward); + } + + async hasReward(rewardId: string): Promise { + return this.delegate.hasReward(rewardId); + } + + async listRewards(query?: RewardQuery): Promise { + return this.delegate.listRewards(query); + } + + async repairIndex(bountyId: string): Promise { + return this.delegate.repairIndex?.(bountyId); + } + + async listIndexStatuses(bountyId: string): Promise { + return this.delegate.listIndexStatuses?.(bountyId) ?? []; + } + + close(): void { + this.delegate.close(); + } +} diff --git a/src/core/schemas.ts b/src/core/schemas.ts index e06c3821..c23b3e56 100644 --- a/src/core/schemas.ts +++ b/src/core/schemas.ts @@ -99,6 +99,7 @@ const BountySchema: z.ZodType = z.object({ "draft", "open", "claimed", + "pending_settlement", "completed", "settled", "expired", diff --git a/src/core/settlement-sweep.ts b/src/core/settlement-sweep.ts new file mode 100644 index 00000000..2603cea0 --- /dev/null +++ b/src/core/settlement-sweep.ts @@ -0,0 +1,104 @@ +/** + * SettlementSweep — reconciler strategy for resuming stalled bounty settlements. + * + * Finds bounties stuck in "pending_settlement" (the saga pivot state) and + * attempts to resume them: capture credits (idempotent), then advance through + * completed → settled. + * + * This handles the case where the client crashed after the pivot transition + * but before completing the settle flow (Issue #240, finding #1). + */ + +import type { Bounty, BountyStatus } from "./bounty.js"; +import type { BountyStore } from "./bounty-store.js"; +import type { CreditsService } from "./credits.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; + +export class SettlementSweep implements SweepStrategy { + readonly name = "SettlementSweep"; + private readonly bountyStore: BountyStore; + private readonly creditsService: CreditsService | undefined; + + constructor(bountyStore: BountyStore, creditsService?: CreditsService) { + this.bountyStore = bountyStore; + this.creditsService = creditsService; + } + + async sweep(): Promise { + let found = 0; + let repaired = 0; + const errors: Error[] = []; + + try { + // Scan both pending_settlement AND completed bounties that have a + // fulfilledByCid — the latter covers the case where capture succeeded + // and completeBounty committed but settleBounty failed. + const pending = await this.bountyStore.listBounties({ + status: "pending_settlement" as BountyStatus, + }); + const completed = await this.bountyStore.listBounties({ + status: "completed" as BountyStatus, + }); + // Only resume completed bounties that have a fulfillment CID + // (indicating they were mid-settlement, not just manually completed). + const stalled = [...pending, ...completed.filter((b) => b.fulfilledByCid)]; + found = stalled.length; + + for (const bounty of stalled) { + try { + await this.resumeSettlement(bounty); + repaired++; + } catch (err) { + errors.push( + err instanceof Error + ? err + : new Error(`Settlement resume failed for ${bounty.bountyId}: ${String(err)}`), + ); + } + } + } catch (err) { + errors.push( + err instanceof Error ? err : new Error(`SettlementSweep scan failed: ${String(err)}`), + ); + } + + return { strategy: this.name, found, repaired, errors }; + } + + /** + * Resume a stalled settlement. Idempotent — safe to call multiple times. + * + * Handles both pending_settlement (pre-capture) and completed (post-capture). + */ + private async resumeSettlement(bounty: Bounty): Promise { + // "completed" bounties have already captured — only need to advance to settled. + // "pending_settlement" bounties still need capture before advancing. + if (bounty.status === "completed") { + await this.bountyStore.settleBounty(bounty.bountyId); + return; + } + + // For pending_settlement: hard-fail if escrowed but no creditsService + if (bounty.reservationId && !this.creditsService) { + throw new Error( + `Cannot resume settlement for bounty ${bounty.bountyId}: ` + + "reservationId is set but no creditsService is available", + ); + } + + // Capture credits if escrow is active (idempotent — already-captured is a no-op) + if (this.creditsService && bounty.reservationId) { + if (bounty.claimedBy) { + await this.creditsService.capture(bounty.reservationId, { + toAgentId: bounty.claimedBy.agentId, + }); + } else { + await this.creditsService.capture(bounty.reservationId); + } + } + + // Advance through completed → settled + await this.bountyStore.completeBounty(bounty.bountyId, bounty.fulfilledByCid ?? ""); + await this.bountyStore.settleBounty(bounty.bountyId); + } +} diff --git a/src/core/sweep-reconciler.test.ts b/src/core/sweep-reconciler.test.ts new file mode 100644 index 00000000..334f035f --- /dev/null +++ b/src/core/sweep-reconciler.test.ts @@ -0,0 +1,342 @@ +/** + * Tests for the SweepReconciler framework and sweep strategies. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; + +import { BountyIndexSweep } from "./bounty-index-sweep.js"; +import { HandoffSweep } from "./handoff-sweep.js"; +import type { InMemoryCreditsService } from "./in-memory-credits.js"; +import { + claimBountyOperation, + createBountyOperation, + settleBountyOperation, +} from "./operations/bounty.js"; +import { contributeOperation } from "./operations/contribute.js"; +import type { OperationDeps } from "./operations/deps.js"; +import type { TestOperationDeps } from "./operations/test-helpers.js"; +import { createTestOperationDeps } from "./operations/test-helpers.js"; +import { SettlementSweep } from "./settlement-sweep.js"; +import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js"; +import { SweepReconciler } from "./sweep-reconciler.js"; + +// --------------------------------------------------------------------------- +// SweepReconciler framework tests +// --------------------------------------------------------------------------- + +describe("SweepReconciler", () => { + test("starts and stops without strategies", () => { + const sr = new SweepReconciler({ intervalMs: 10_000 }); + expect(sr.isRunning).toBe(false); + expect(sr.strategyCount).toBe(0); + sr.start(); + expect(sr.isRunning).toBe(true); + sr.stop(); + expect(sr.isRunning).toBe(false); + }); + + test("start is idempotent", () => { + const sr = new SweepReconciler({ intervalMs: 10_000 }); + sr.start(); + sr.start(); // should not create a second timer + expect(sr.isRunning).toBe(true); + sr.stop(); + }); + + test("registers and runs strategies", async () => { + const results: SweepResult[] = []; + const strategy: SweepStrategy = { + name: "TestSweep", + async sweep() { + return { strategy: "TestSweep", found: 3, repaired: 2, errors: [] }; + }, + }; + + const sr = new SweepReconciler({ + onCycle: (r) => results.push(...r), + }); + sr.register(strategy); + expect(sr.strategyCount).toBe(1); + + await sr.runCycle(); + + expect(results).toHaveLength(1); + expect(results[0]?.strategy).toBe("TestSweep"); + expect(results[0]?.found).toBe(3); + expect(results[0]?.repaired).toBe(2); + }); + + test("catches strategy throws and reports via onError", async () => { + const errors: unknown[] = []; + const strategy: SweepStrategy = { + name: "ThrowingSweep", + async sweep() { + throw new Error("boom"); + }, + }; + + const sr = new SweepReconciler({ + onError: (e) => errors.push(e), + }); + sr.register(strategy); + + const results = await sr.runCycle(); + + expect(results).toHaveLength(1); + expect(results[0]?.errors).toHaveLength(1); + expect(results[0]?.errors[0]?.message).toBe("boom"); + expect(errors).toHaveLength(1); + }); + + test("runs multiple strategies sequentially", async () => { + const order: string[] = []; + + const sr = new SweepReconciler(); + sr.register({ + name: "First", + async sweep() { + order.push("first"); + return { strategy: "First", found: 0, repaired: 0, errors: [] }; + }, + }); + sr.register({ + name: "Second", + async sweep() { + order.push("second"); + return { strategy: "Second", found: 0, repaired: 0, errors: [] }; + }, + }); + + await sr.runCycle(); + expect(order).toEqual(["first", "second"]); + }); +}); + +// --------------------------------------------------------------------------- +// BountyIndexSweep tests +// --------------------------------------------------------------------------- + +describe("BountyIndexSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues on empty store", async () => { + const sweep = new BountyIndexSweep(deps.bountyStore!); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("BountyIndexSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("reports zero issues when bounties are consistent", async () => { + (deps.creditsService as InMemoryCreditsService).seed("agent-1", 5000); + + await createBountyOperation( + { + title: "Bounty A", + amount: 100, + criteria: { description: "work" }, + agent: { agentId: "agent-1" }, + }, + deps, + ); + await createBountyOperation( + { + title: "Bounty B", + amount: 200, + criteria: { description: "work" }, + agent: { agentId: "agent-1" }, + }, + deps, + ); + + const sweep = new BountyIndexSweep(deps.bountyStore!); + const result = await sweep.sweep(); + + expect(result.found).toBe(0); + expect(result.errors).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// SettlementSweep tests +// --------------------------------------------------------------------------- + +describe("SettlementSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues when no pending_settlement bounties exist", async () => { + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("SettlementSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("resumes a pending_settlement bounty to settled", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + // Create → claim → contribute + const bounty = await createBountyOperation( + { + title: "Stalled settle", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Simulate: settle starts, pivot succeeds, but capture fails + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated"), + }); + const failedSettle = await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + expect(failedSettle.ok).toBe(false); + + // Verify stuck in pending_settlement + const stuck = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(stuck?.status).toBe("pending_settlement"); + + // Clear failure and run the sweep + (deps.creditsService as InMemoryCreditsService).setFailures({}); + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.found).toBe(1); + expect(result.repaired).toBe(1); + expect(result.errors).toHaveLength(0); + + // Verify bounty is now settled + const settled = await deps.bountyStore!.getBounty(bounty.value.bountyId); + expect(settled?.status).toBe("settled"); + }); + + test("reports error when resume fails", async () => { + (deps.creditsService as InMemoryCreditsService).seed("creator", 1000); + + const bounty = await createBountyOperation( + { + title: "Permanent fail", + amount: 100, + criteria: { description: "Any", requiredTags: ["fix"] }, + agent: { agentId: "creator" }, + }, + deps, + ); + expect(bounty.ok).toBe(true); + if (!bounty.ok) return; + + await claimBountyOperation( + { bountyId: bounty.value.bountyId, agent: { agentId: "worker" } }, + deps, + ); + const contrib = await contributeOperation( + { kind: "work", summary: "Fix", tags: ["fix"], agent: { agentId: "worker" } }, + deps, + ); + expect(contrib.ok).toBe(true); + if (!contrib.ok) return; + + // Get into pending_settlement + (deps.creditsService as InMemoryCreditsService).setFailures({ + capture: new Error("Simulated"), + }); + await settleBountyOperation( + { bountyId: bounty.value.bountyId, contributionCid: contrib.value.cid }, + deps, + ); + + // Run sweep with capture STILL failing + const sweep = new SettlementSweep(deps.bountyStore!, deps.creditsService); + const result = await sweep.sweep(); + + expect(result.found).toBe(1); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(1); + expect(result.errors[0]?.message).toContain("Simulated"); + }); +}); + +// --------------------------------------------------------------------------- +// HandoffSweep tests +// --------------------------------------------------------------------------- + +describe("HandoffSweep", () => { + let testDeps: TestOperationDeps; + let deps: OperationDeps; + + beforeEach(async () => { + testDeps = await createTestOperationDeps(); + deps = testDeps.deps; + }); + + afterEach(async () => { + await testDeps.cleanup(); + }); + + test("reports zero issues on empty store", async () => { + const sweep = new HandoffSweep(deps.contributionStore!, deps.handoffStore!); + const result = await sweep.sweep(); + + expect(result.strategy).toBe("HandoffSweep"); + expect(result.found).toBe(0); + expect(result.repaired).toBe(0); + expect(result.errors).toHaveLength(0); + }); + + test("detects contributions without handoffs", async () => { + // Create a "work" contribution directly (no handoff fan-out since no topology) + await contributeOperation( + { kind: "work", summary: "Orphaned work", agent: { agentId: "worker" } }, + deps, + ); + + const sweep = new HandoffSweep(deps.contributionStore!, deps.handoffStore!); + const result = await sweep.sweep(); + + // The contribution has kind "work" (handoff-eligible) but no handoffs + expect(result.found).toBe(1); + // Detection-only — no repair + expect(result.repaired).toBe(0); + }); +}); diff --git a/src/core/sweep-reconciler.ts b/src/core/sweep-reconciler.ts new file mode 100644 index 00000000..a653d211 --- /dev/null +++ b/src/core/sweep-reconciler.ts @@ -0,0 +1,165 @@ +/** + * Unified sweep reconciler with pluggable strategies. + * + * Runs registered SweepStrategy instances on a shared timer. Each strategy + * independently scans a store for inconsistencies and repairs them using + * idempotent writes. Strategies run sequentially within a cycle to avoid + * contention on shared stores. + * + * Distinct from the claim/workspace Reconciler in reconciler.ts — this + * framework handles bounty-store atomicity, handoff orphans, and settlement + * resumption. + * + * Usage: + * const sr = new SweepReconciler({ intervalMs: 60_000 }); + * sr.register(new BountyIndexSweep(bountyStore)); + * sr.register(new SettlementSweep(bountyStore, creditsService)); + * sr.start(); + * // later: + * sr.stop(); + */ + +// --------------------------------------------------------------------------- +// Strategy interface +// --------------------------------------------------------------------------- + +/** Result of a single sweep execution. */ +export interface SweepResult { + /** Strategy name (for logging). */ + readonly strategy: string; + /** Number of inconsistencies found. */ + readonly found: number; + /** Number of inconsistencies repaired. */ + readonly repaired: number; + /** Errors encountered during sweep (non-fatal). */ + readonly errors: readonly Error[]; +} + +/** + * A sweep strategy that scans for and repairs a specific class of + * inconsistency. Strategies must be idempotent — running the same + * sweep twice with no intervening changes should be a no-op. + */ +export interface SweepStrategy { + /** Human-readable name for logging. */ + readonly name: string; + + /** + * Execute the sweep. Returns a summary of what was found and repaired. + * Must not throw — errors should be collected in the result. + */ + sweep(): Promise; +} + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** Options for the sweep reconciler. */ +export interface SweepReconcilerConfig { + /** Interval between sweep cycles in milliseconds. Default: 60_000 (1 minute). */ + readonly intervalMs?: number | undefined; + + /** + * Called after each sweep cycle with results from all strategies. + * Use for logging, monitoring, or alerting. + */ + readonly onCycle?: (results: readonly SweepResult[]) => void; + + /** + * Called when the reconciler encounters an unexpected error + * (not from individual strategy sweeps, which report their own errors). + */ + readonly onError?: (error: unknown) => void; +} + +const DEFAULT_INTERVAL_MS = 60_000; + +// --------------------------------------------------------------------------- +// SweepReconciler +// --------------------------------------------------------------------------- + +/** + * Periodic sweep reconciler. + * + * Strategies are executed sequentially per cycle. Each strategy must + * handle its own errors and report them in the SweepResult. An + * unhandled throw from a strategy is caught and reported via onError. + */ +export class SweepReconciler { + private readonly strategies: SweepStrategy[] = []; + private readonly intervalMs: number; + private readonly onCycle: ((results: readonly SweepResult[]) => void) | undefined; + private readonly onError: ((error: unknown) => void) | undefined; + private timer: ReturnType | undefined; + private running = false; + private cycleInFlight = false; + + constructor(config?: SweepReconcilerConfig) { + this.intervalMs = config?.intervalMs ?? DEFAULT_INTERVAL_MS; + this.onCycle = config?.onCycle; + this.onError = config?.onError; + } + + /** Register a sweep strategy. Must be called before start(). */ + register(strategy: SweepStrategy): void { + this.strategies.push(strategy); + } + + /** Start the reconciler timer. No-op if already running. */ + start(): void { + if (this.running) return; + this.running = true; + this.timer = setInterval(() => void this.runCycle(), this.intervalMs); + } + + /** Stop the reconciler timer and cancel any pending cycle. */ + stop(): void { + this.running = false; + if (this.timer !== undefined) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** + * Run one sweep cycle immediately (useful for testing or on-demand). + * Serialized: if a cycle is already in flight, returns empty results. + */ + async runCycle(): Promise { + if (this.cycleInFlight) return []; + this.cycleInFlight = true; + try { + const results: SweepResult[] = []; + for (const strategy of this.strategies) { + try { + const result = await strategy.sweep(); + results.push(result); + } catch (err) { + // Strategy.sweep() should not throw, but guard against it + results.push({ + strategy: strategy.name, + found: 0, + repaired: 0, + errors: [err instanceof Error ? err : new Error(String(err))], + }); + this.onError?.(err); + } + } + this.onCycle?.(results); + return results; + } finally { + this.cycleInFlight = false; + } + } + + /** Whether the reconciler is currently running. */ + get isRunning(): boolean { + return this.running; + } + + /** Number of registered strategies. */ + get strategyCount(): number { + return this.strategies.length; + } +} diff --git a/src/local/sqlite-bounty-store.ts b/src/local/sqlite-bounty-store.ts index aba80040..051a1e5d 100644 --- a/src/local/sqlite-bounty-store.ts +++ b/src/local/sqlite-bounty-store.ts @@ -263,6 +263,20 @@ export class SqliteBountyStore implements BountyStore { })); }; + beginSettlement = async (bountyId: string, fulfilledByCid: string): Promise => { + return this.transitionBounty( + bountyId, + BountyStatus.PendingSettlement, + "beginSettlement", + (bounty) => ({ + ...bounty, + status: BountyStatus.PendingSettlement, + fulfilledByCid, + updatedAt: nowUtcIso(), + }), + ); + }; + completeBounty = async (bountyId: string, fulfilledByCid: string): Promise => { return this.transitionBounty(bountyId, BountyStatus.Completed, "complete", (bounty) => ({ ...bounty, @@ -306,7 +320,7 @@ export class SqliteBountyStore implements BountyStore { .prepare( `SELECT * FROM bounties WHERE deadline < ? - AND status IN ('open', 'claimed') + AND status IN ('open', 'claimed', 'pending_settlement') ORDER BY deadline ASC`, ) .all(now) as BountyRow[]; diff --git a/src/mcp/serve-http.ts b/src/mcp/serve-http.ts index 042dbecf..3e0eb1ab 100644 --- a/src/mcp/serve-http.ts +++ b/src/mcp/serve-http.ts @@ -125,6 +125,44 @@ try { process.exit(1); } +// --- Background sweep reconciler ------------------------------------------ +// Runs at process level using the zone-level bounty store (Nexus when +// available, local SQLite otherwise). Not session-scoped — index repair +// and settlement recovery are zone-wide concerns. + +import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; +import { SweepReconciler } from "../core/sweep-reconciler.js"; + +let httpSweepReconciler: SweepReconciler | undefined; +{ + // Build a zone-level bounty store for the reconciler. In Nexus mode this + // is a NexusBountyStore scoped to the zone (not session). In local mode + // it's the local SQLite bounty store from the runtime. + let reconcilerBountyStore: import("../core/bounty-store.js").BountyStore = runtime.bountyStore; + if (nexusClient) { + const { NexusBountyStore } = await import("../nexus/nexus-bounty-store.js"); + reconcilerBountyStore = new NexusBountyStore({ client: nexusClient, zoneId }); + } + + httpSweepReconciler = new SweepReconciler({ + intervalMs: 60_000, + onCycle(results) { + for (const r of results) { + if (r.found > 0 || r.errors.length > 0) { + process.stderr.write( + `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}\n`, + ); + } + } + }, + }); + httpSweepReconciler.register(new BountyIndexSweep(reconcilerBountyStore)); + httpSweepReconciler.register(new SettlementSweep(reconcilerBountyStore)); + httpSweepReconciler.start(); + process.stderr.write("grove-mcp-http: sweep-reconciler started\n"); +} + // --- Dynamic session-scoped deps -------------------------------------------- // // The HTTP MCP server is spawned before any interactive session exists, so we @@ -668,6 +706,7 @@ httpServer.listen(port, () => { // Graceful shutdown const shutdown = async (): Promise => { + httpSweepReconciler?.stop(); clearInterval(reapTimer); // Close all active sessions for (const [, session] of sessions) { diff --git a/src/mcp/serve.ts b/src/mcp/serve.ts index a128811f..d9b63bd3 100644 --- a/src/mcp/serve.ts +++ b/src/mcp/serve.ts @@ -350,6 +350,10 @@ try { } // --- Server setup --------------------------------------------------------- +// NOTE: No sweep reconciler here. The stdio MCP server (grove-mcp) is spawned +// per-agent — running zone-wide sweeps from every agent process would cause +// N×load and CAS conflicts. Sweeps run in the long-lived singleton processes +// only: src/server/serve.ts (HTTP server) and src/mcp/serve-http.ts (HTTP MCP). const server = await createMcpServer(deps, preset); const transport = new StdioServerTransport(); diff --git a/src/mcp/tools/bounties.ts b/src/mcp/tools/bounties.ts index f77f9611..eef27206 100644 --- a/src/mcp/tools/bounties.ts +++ b/src/mcp/tools/bounties.ts @@ -68,7 +68,16 @@ const createBountySchema = z.object({ const listBountiesSchema = z.object({ status: z - .enum(["draft", "open", "claimed", "completed", "settled", "expired", "cancelled"]) + .enum([ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ]) .optional() .describe("Filter by bounty status"), creatorAgentId: z.string().optional().describe("Filter by creator agent ID"), diff --git a/src/nexus/nexus-bounty-store.ts b/src/nexus/nexus-bounty-store.ts index 8f6dbeb0..23c18065 100644 --- a/src/nexus/nexus-bounty-store.ts +++ b/src/nexus/nexus-bounty-store.ts @@ -10,6 +10,7 @@ */ import type { Bounty, BountyStatus, RewardRecord } from "../core/bounty.js"; +import { validateBountyTransition } from "../core/bounty-logic.js"; import type { BountyQuery, BountyStore, RewardQuery } from "../core/bounty-store.js"; import { NotFoundError, StateConflictError } from "../core/errors.js"; import type { AgentIdentity } from "../core/models.js"; @@ -191,6 +192,13 @@ export class NexusBountyStore implements BountyStore { })); } + async beginSettlement(bountyId: string, fulfilledByCid: string): Promise { + return this.transitionBounty(bountyId, "pending_settlement" as BountyStatus, (b) => ({ + ...b, + fulfilledByCid, + })); + } + async completeBounty(bountyId: string, fulfilledByCid: string): Promise { return this.transitionBounty(bountyId, "completed" as BountyStatus, (b) => ({ ...b, @@ -212,12 +220,84 @@ export class NexusBountyStore implements BountyStore { async findExpiredBounties(): Promise { const now = new Date(); - const openBounties = await this.listBounties({ status: "open" as BountyStatus }); - const claimedBounties = await this.listBounties({ status: "claimed" as BountyStatus }); - const all = [...openBounties, ...claimedBounties]; + const statuses: BountyStatus[] = ["open", "claimed", "pending_settlement"] as BountyStatus[]; + const all: Bounty[] = []; + for (const status of statuses) { + const bounties = await this.listBounties({ status }); + all.push(...bounties); + } return all.filter((b) => new Date(b.deadline).getTime() < now.getTime()); } + async repairIndex(bountyId: string): Promise { + const result = await this.readBountyWithEtag(bountyId); + if (!result) return; + const { bounty } = result; + + // Ensure the correct status index entry exists + await this.writeStatusIndex(bounty); + + // Check which stale index entries actually exist before deleting. + // Only delete entries that exist AND don't match the current status. + // Re-read the bounty before each delete to catch concurrent transitions. + const allStatuses: BountyStatus[] = [ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ] as BountyStatus[]; + + for (const status of allStatuses) { + if (status === bounty.status) continue; + const markerPath = bountyStatusIndexPath(this.zoneId, status, bountyId); + const markerExists = await withSemaphore(this.semaphore, () => + this.client.exists(markerPath), + ); + if (!markerExists) continue; + + // Re-read the authoritative document right before deleting to ensure + // a concurrent transition hasn't moved the bounty TO this status. + const fresh = await this.getBounty(bountyId); + if (!fresh || fresh.status === status) continue; + + await safeCleanup( + withSemaphore(this.semaphore, () => this.client.delete(markerPath)), + "repairIndex: delete stale status index", + { silent: true }, + ); + } + } + + /** + * Check which status index entries exist for a bounty. + * Returns the set of statuses that have an index marker. + * Used by BountyIndexSweep to detect stale entries. + */ + async listIndexStatuses(bountyId: string): Promise { + const allStatuses: BountyStatus[] = [ + "draft", + "open", + "claimed", + "pending_settlement", + "completed", + "settled", + "expired", + "cancelled", + ] as BountyStatus[]; + const found: string[] = []; + for (const status of allStatuses) { + const exists = await withSemaphore(this.semaphore, () => + this.client.exists(bountyStatusIndexPath(this.zoneId, status, bountyId)), + ); + if (exists) found.push(status); + } + return found; + } + async recordReward(_reward: RewardRecord): Promise { // Rewards are not yet stored in Nexus VFS (future work) } @@ -253,6 +333,11 @@ export class NexusBountyStore implements BountyStore { const { bounty: existing, etag } = result; const oldStatus = existing.status; + + // Validate the transition before writing — catches stale state from + // concurrent writers before the CAS round-trip. + validateBountyTransition(bountyId, oldStatus, newStatus, `transition to ${newStatus}`); + let updated: Bounty = { ...existing, status: newStatus, diff --git a/src/server/serve.ts b/src/server/serve.ts index 48307cff..9bbd8e93 100644 --- a/src/server/serve.ts +++ b/src/server/serve.ts @@ -95,6 +95,38 @@ const deps: ServerDeps = { const app = createApp(deps); +// --------------------------------------------------------------------------- +// Background sweep reconciler +// --------------------------------------------------------------------------- + +import { BountyIndexSweep } from "../core/bounty-index-sweep.js"; +import { SettlementSweep } from "../core/settlement-sweep.js"; +import { SweepReconciler } from "../core/sweep-reconciler.js"; + +let sweepReconciler: SweepReconciler | undefined; +if (serverBountyStore) { + sweepReconciler = new SweepReconciler({ + intervalMs: 60_000, + onCycle(results) { + for (const r of results) { + if (r.found > 0 || r.errors.length > 0) { + console.log( + `[sweep] ${r.strategy}: found=${r.found} repaired=${r.repaired} errors=${r.errors.length}`, + ); + } + } + }, + }); + sweepReconciler.register(new BountyIndexSweep(serverBountyStore)); + // SettlementSweep runs without creditsService — it can recover non-escrowed + // bounties. Escrowed bounties (those with reservationId) will log an error + // and wait for a CreditsService to be available. When a production + // CreditsService is wired in, pass it: new SettlementSweep(store, credits). + sweepReconciler.register(new SettlementSweep(serverBountyStore)); + sweepReconciler.start(); + console.log("sweep-reconciler started (BountyIndexSweep, SettlementSweep)"); +} + // --------------------------------------------------------------------------- // Optional SessionService + WebSocket push // --------------------------------------------------------------------------- @@ -195,6 +227,9 @@ console.log(`grove-server listening on http://${HOST ?? "localhost"}:${server.po // Graceful shutdown async function shutdown(): Promise { console.log("Shutting down..."); + if (sweepReconciler) { + sweepReconciler.stop(); + } if (sessionService) { sessionService.destroy(); }