Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/core/bounty-index-sweep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* BountyIndexSweep — reconciler strategy for NexusBountyStore dual-write consistency.
*
* Detection-based: for each bounty, compares the authoritative document status
* against actual index entries using listIndexStatuses(). Calls repairIndex()
* only when an inconsistency is found:
* 1. Missing current-status index entry (new index write failed)
* 2. Stale old-status index entry (old index delete failed)
*
* For stores without listIndexStatuses (e.g., SQLite), falls back to checking
* whether the bounty appears in its status-filtered query result.
*/

import type { BountyStore } from "./bounty-store.js";
import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js";

export class BountyIndexSweep implements SweepStrategy {
readonly name = "BountyIndexSweep";
private readonly bountyStore: BountyStore;

constructor(bountyStore: BountyStore) {
this.bountyStore = bountyStore;
}

async sweep(): Promise<SweepResult> {
let found = 0;
let repaired = 0;
const errors: Error[] = [];

if (!this.bountyStore.repairIndex) {
return { strategy: this.name, found: 0, repaired: 0, errors: [] };
}

try {
const allBounties = await this.bountyStore.listBounties();

for (const bounty of allBounties) {
try {
let needsRepair = false;

if (this.bountyStore.listIndexStatuses) {
// Precise detection: check raw index entries directly
const indexStatuses = await this.bountyStore.listIndexStatuses(bounty.bountyId);
const hasCorrect = indexStatuses.includes(bounty.status);
const hasStale = indexStatuses.some((s) => s !== bounty.status);
needsRepair = !hasCorrect || hasStale;
} else {
// Fallback: check if bounty appears in its status-filtered query
const byStatus = await this.bountyStore.listBounties({ status: bounty.status });
needsRepair = !byStatus.some((b) => b.bountyId === bounty.bountyId);
}

if (needsRepair) {
found++;
await this.bountyStore.repairIndex!(bounty.bountyId);
repaired++;
}
} catch (err) {
errors.push(
err instanceof Error
? err
: new Error(`Index check failed for bounty ${bounty.bountyId}: ${String(err)}`),
);
}
}
} catch (err) {
errors.push(
err instanceof Error ? err : new Error(`BountyIndexSweep failed: ${String(err)}`),
);
}

return { strategy: this.name, found, repaired, errors };
}
}
24 changes: 23 additions & 1 deletion src/core/bounty-logic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,31 @@ describe("validateBountyTransition", () => {
).not.toThrow();
});

test("allows claimed → completed", () => {
test("allows claimed → pending_settlement", () => {
expect(() =>
validateBountyTransition(
"b-1",
BountyStatus.Claimed,
BountyStatus.PendingSettlement,
"beginSettlement",
),
).not.toThrow();
});

test("rejects claimed → completed (must go through pending_settlement)", () => {
expect(() =>
validateBountyTransition("b-1", BountyStatus.Claimed, BountyStatus.Completed, "complete"),
).toThrow();
});

test("allows pending_settlement → completed", () => {
expect(() =>
validateBountyTransition(
"b-1",
BountyStatus.PendingSettlement,
BountyStatus.Completed,
"complete",
),
).not.toThrow();
});

Expand Down
3 changes: 2 additions & 1 deletion src/core/bounty-logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import type { Contribution, Score } from "./models.js";
const VALID_TRANSITIONS: Readonly<Record<BountyStatus, readonly BountyStatus[]>> = {
draft: ["open", "cancelled"],
open: ["claimed", "expired", "cancelled"],
claimed: ["completed", "open", "expired", "cancelled"],
claimed: ["claimed", "pending_settlement", "open", "expired", "cancelled"],
pending_settlement: ["completed", "claimed", "expired", "cancelled"],
completed: ["settled", "expired", "cancelled"],
settled: [],
expired: [],
Expand Down
24 changes: 19 additions & 5 deletions src/core/bounty-store.conformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,27 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void {
await expect(store.claimBounty("claim-2", makeAgent(), "claim-id-2")).rejects.toThrow();
});

test("completeBounty transitions claimed to completed", async () => {
const bounty = makeBounty({ bountyId: "complete-1", status: BountyStatus.Claimed });
test("beginSettlement transitions claimed to pending_settlement", async () => {
const bounty = makeBounty({ bountyId: "begin-settle-1", status: BountyStatus.Claimed });
await store.createBounty(bounty);
const pending = await store.beginSettlement("begin-settle-1", "cid-fulfilled");
expect(pending.status).toBe(BountyStatus.PendingSettlement);
expect(pending.fulfilledByCid).toBe("cid-fulfilled");
});

test("completeBounty transitions pending_settlement to completed", async () => {
const bounty = makeBounty({
bountyId: "complete-1",
status: BountyStatus.PendingSettlement,
});
await store.createBounty(bounty);
const completed = await store.completeBounty("complete-1", "cid-fulfilled");
expect(completed.status).toBe(BountyStatus.Completed);
expect(completed.fulfilledByCid).toBe("cid-fulfilled");
});

test("completeBounty throws for non-claimed bounty", async () => {
const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Open });
test("completeBounty throws for claimed bounty (must use beginSettlement first)", async () => {
const bounty = makeBounty({ bountyId: "complete-2", status: BountyStatus.Claimed });
await store.createBounty(bounty);
await expect(store.completeBounty("complete-2", "cid")).rejects.toThrow();
});
Expand Down Expand Up @@ -274,7 +285,7 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void {
// Full lifecycle
// ------------------------------------------------------------------

test("full lifecycle: draft → open → claimed → completed → settled", async () => {
test("full lifecycle: draft → open → claimed → pending_settlement → completed → settled", async () => {
const bounty = makeBounty({ bountyId: "lifecycle-1", status: BountyStatus.Draft });
await store.createBounty(bounty);

Expand All @@ -288,6 +299,9 @@ export function runBountyStoreTests(factory: BountyStoreFactory): void {
);
expect(claimed.status).toBe(BountyStatus.Claimed);

const pending = await store.beginSettlement("lifecycle-1", "cid-result");
expect(pending.status).toBe(BountyStatus.PendingSettlement);

const completed = await store.completeBounty("lifecycle-1", "cid-result");
expect(completed.status).toBe(BountyStatus.Completed);

Expand Down
41 changes: 38 additions & 3 deletions src/core/bounty-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,27 @@ export interface BountyStore {
): Promise<Bounty>;

/**
* Mark a bounty as completed (work done, pending settlement).
* Transition a bounty to 'pending_settlement' — the saga pivot point.
*
* @param bountyId - The bounty to complete.
* After this transition, the bounty is committed to settlement. The caller
* must capture credits and then advance to completed → settled. If the
* caller crashes, the reconciler will resume from pending_settlement.
*
* @param bountyId - The bounty entering settlement.
* @param fulfilledByCid - CID of the contribution that fulfilled the bounty.
* @returns The updated bounty snapshot.
* @throws BountyStateError if bounty is not in 'claimed' status.
*/
beginSettlement(bountyId: string, fulfilledByCid: string): Promise<Bounty>;

/**
* Mark a bounty as completed (work done, credits captured).
*
* @param bountyId - The bounty to complete.
* @param fulfilledByCid - CID of the contribution that fulfilled the bounty.
* @returns The updated bounty snapshot.
* @throws BountyStateError if bounty is not in 'pending_settlement' status.
*/
completeBounty(bountyId: string, fulfilledByCid: string): Promise<Bounty>;

/**
Expand Down Expand Up @@ -138,7 +152,7 @@ export interface BountyStore {
cancelBounty(bountyId: string): Promise<Bounty>;

// -----------------------------------------------------------------------
// Expiry sweep
// Expiry sweep + index repair
// -----------------------------------------------------------------------

/**
Expand All @@ -149,6 +163,27 @@ export interface BountyStore {
*/
findExpiredBounties(): Promise<readonly Bounty[]>;

/**
* Repair the status index for a bounty so it matches the authoritative
* document state. Idempotent — calling on an already-correct entry is a no-op.
*
* Used by BountyIndexSweep to fix dual-write gaps where the document was
* written but the status index was not. For stores without a separate
* index (e.g., SQLite), this is a no-op.
*
* Optional: stores that don't implement this will not be swept.
*/
repairIndex?(bountyId: string): Promise<void>;

/**
* List which status index entries exist for a bounty.
* Used by BountyIndexSweep to detect stale markers without relying on
* the filtered listBounties which hides them.
*
* Optional: only implemented by stores with a separate status index.
*/
listIndexStatuses?(bountyId: string): Promise<readonly string[]>;

// -----------------------------------------------------------------------
// Reward records
// -----------------------------------------------------------------------
Expand Down
4 changes: 3 additions & 1 deletion src/core/bounty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export const BountyStatus = {
Open: "open",
/** Claimed by an agent (work in progress). */
Claimed: "claimed",
/** Capture initiated, awaiting state advancement. Saga pivot point. */
PendingSettlement: "pending_settlement",
/** Work completed, pending settlement. */
Completed: "completed",
/** Credits distributed to fulfiller. */
Expand Down Expand Up @@ -76,7 +78,7 @@ export interface BountyCriteria {
* A bounty — a reward offer for completing specific work.
*
* Bounties are mutable coordination objects with a lifecycle
* (draft → open → claimed → completed → settled).
* (draft → open → claimed → pending_settlement → completed → settled).
* Like Claims, bounty objects returned by the store are readonly
* snapshots; state transitions produce new snapshots.
*/
Expand Down
59 changes: 59 additions & 0 deletions src/core/handoff-sweep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* HandoffSweep — reconciler strategy for detecting orphaned contributions.
*
* Finds contributions that have zero handoff records where the contribution
* kind normally generates handoffs (work, review, reproduction, adoption).
* Reports them for operator attention.
*
* Detection-only: repair requires re-running the routing logic from the
* topology/contract, which this sweep doesn't have access to. Full
* auto-repair is a follow-up (tracked in Issue #240).
*
* Absorbs the handoff reconciler scope from Issue #227 → #240.
*/

import type { HandoffStore } from "./handoff.js";
import type { ContributionStore } from "./store.js";
import type { SweepResult, SweepStrategy } from "./sweep-reconciler.js";

/** Contribution kinds that normally generate handoff records. */
const HANDOFF_ELIGIBLE_KINDS = new Set(["work", "review", "reproduction", "adoption"]);

export class HandoffSweep implements SweepStrategy {
readonly name = "HandoffSweep";
private readonly contributionStore: ContributionStore;
private readonly handoffStore: HandoffStore;

constructor(contributionStore: ContributionStore, handoffStore: HandoffStore) {
this.contributionStore = contributionStore;
this.handoffStore = handoffStore;
}

async sweep(): Promise<SweepResult> {
let found = 0;
const errors: Error[] = [];

try {
// Scan recent contributions that should have handoffs
const contributions = await this.contributionStore.list({ limit: 500 });
const eligible = contributions.filter((c) => HANDOFF_ELIGIBLE_KINDS.has(c.kind));

// Check each eligible contribution for any handoff records
const handoffs = await this.handoffStore.list();
const cidsWithHandoffs = new Set(handoffs.map((h) => h.sourceCid));

for (const contribution of eligible) {
if (!cidsWithHandoffs.has(contribution.cid)) {
found++;
}
}
} catch (err) {
errors.push(
err instanceof Error ? err : new Error(`HandoffSweep scan failed: ${String(err)}`),
);
}

// Detection-only: repaired is always 0. Operator acts on found > 0.
return { strategy: this.name, found, repaired: 0, errors };
}
}
20 changes: 14 additions & 6 deletions src/core/in-memory-credits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,23 @@ export class InMemoryCreditsService implements CreditsService {
private getReserved(agentId: string): number {
const now = Date.now();
let reserved = 0;
for (const res of this.reservations.values()) {
if (res.agentId === agentId && !res.captured && !res.voided) {
// Skip expired reservations — they no longer hold funds
if (new Date(res.expiresAt).getTime() <= now) {
continue;
}
const evictable: string[] = [];
for (const [id, res] of this.reservations.entries()) {
const expired = new Date(res.expiresAt).getTime() <= now;
// Evict only expired reservations that were never captured or voided.
// Captured/voided entries must stay for idempotency checks in
// capture() and void().
if (expired && !res.captured && !res.voided) {
evictable.push(id);
continue;
}
if (res.agentId === agentId && !res.captured && !res.voided && !expired) {
reserved += res.amount;
}
}
for (const id of evictable) {
this.reservations.delete(id);
}
return reserved;
}
}
Expand Down
Loading
Loading