diff --git a/src/core/operations/bounty.ts b/src/core/operations/bounty.ts index ba2b5e4..46aa640 100644 --- a/src/core/operations/bounty.ts +++ b/src/core/operations/bounty.ts @@ -1,308 +1,49 @@ -/** - * Bounty operations. - * - * createBountyOperation — Create a bounty with optional credit reservation - * listBountiesOperation — List bounties with filters - * claimBountyOperation — Claim an open bounty - * settleBountyOperation — Settle a completed bounty - */ +import type { ClaimStore, BountyStore } from '../store.js'; +import type { ClaimInput, Claim } from '../models.js'; +import { createClaim } from '../models.js'; -import type { Bounty, BountyCriteria, BountyStatus } from "../bounty.js"; -import { BountyStatus as BS } from "../bounty.js"; -import { evaluateBountyCriteria } from "../bounty-logic.js"; -import type { JsonValue } from "../models.js"; -import type { AgentOverrides } from "./agent.js"; -import { resolveAgent } from "./agent.js"; -import type { OperationDeps } from "./deps.js"; -import type { OperationResult } from "./result.js"; -import { fromGroveError, notFound, ok, validationErr } from "./result.js"; - -// --------------------------------------------------------------------------- -// Result types -// --------------------------------------------------------------------------- - -/** Result of create bounty. */ -export interface CreateBountyResult { - readonly bountyId: string; - readonly title: string; - readonly amount: number; - readonly status: BountyStatus; - readonly deadline: string; - readonly reservationId?: string | undefined; -} - -/** Summary for list responses. */ -export interface BountySummary { - readonly bountyId: string; - readonly title: string; - readonly amount: number; - readonly status: BountyStatus; - readonly deadline: string; - readonly claimedBy?: string | undefined; -} - -/** Result of list bounties. */ -export interface ListBountiesResult { - readonly bounties: readonly BountySummary[]; - readonly count: number; -} - -/** Result of claim bounty. */ -export interface ClaimBountyResult { - readonly bountyId: string; - readonly title: string; - readonly status: BountyStatus; - readonly claimId: string; - readonly claimedBy?: string | undefined; -} - -/** Result of settle bounty. */ -export interface SettleBountyResult { - readonly bountyId: string; - readonly status: BountyStatus; - readonly fulfilledByCid?: string | undefined; - readonly amount: number; - readonly paidTo?: string | undefined; -} - -// --------------------------------------------------------------------------- -// Input types -// --------------------------------------------------------------------------- - -/** Input for create bounty. */ -export interface CreateBountyInput { - readonly title: string; - readonly description?: string | undefined; - readonly amount: number; - readonly criteria: BountyCriteria; - readonly deadlineMs?: number | undefined; - readonly agent?: AgentOverrides | undefined; - readonly zoneId?: string | undefined; - readonly context?: Readonly> | undefined; -} - -/** Input for list bounties. */ -export interface ListBountiesInput { - readonly status?: BountyStatus | undefined; - readonly creatorAgentId?: string | undefined; - readonly limit?: number | undefined; -} - -/** Input for claim bounty. */ -export interface ClaimBountyInput { - readonly bountyId: string; - readonly agent?: AgentOverrides | undefined; - readonly leaseDurationMs?: number | undefined; -} - -/** Input for settle bounty. */ -export interface SettleBountyInput { - readonly bountyId: string; - readonly contributionCid: string; -} - -// --------------------------------------------------------------------------- -// Operations -// --------------------------------------------------------------------------- - -const DEFAULT_DEADLINE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days - -/** Create a new bounty with optional credit reservation. */ -export async function createBountyOperation( - input: CreateBountyInput, - deps: OperationDeps, -): Promise> { - try { - if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); - } - - const agent = resolveAgent(input.agent); - const now = new Date(); - const bountyId = crypto.randomUUID(); - const deadlineMs = input.deadlineMs ?? DEFAULT_DEADLINE_MS; - const deadline = new Date(now.getTime() + deadlineMs).toISOString(); - - // Reserve credits when available - let reservationId: string | undefined; - if (deps.creditsService) { - reservationId = crypto.randomUUID(); - await deps.creditsService.reserve({ - reservationId, - agentId: agent.agentId, - amount: input.amount, - timeoutMs: deadlineMs + 24 * 60 * 60 * 1000, - }); - } - - const bounty: Bounty = { - bountyId, - title: input.title, - description: input.description ?? input.title, - status: BS.Open, - creator: agent, - amount: input.amount, - criteria: input.criteria, - zoneId: input.zoneId, - deadline, - reservationId, - createdAt: now.toISOString(), - updatedAt: now.toISOString(), - ...(input.context !== undefined ? { context: input.context } : {}), - }; - - const result = await deps.bountyStore.createBounty(bounty); - - return ok({ - bountyId: result.bountyId, - title: result.title, - amount: result.amount, - status: result.status, - deadline: result.deadline, - reservationId: result.reservationId, - }); - } catch (error) { - return fromGroveError(error); - } -} - -/** List bounties with optional filters. */ -export async function listBountiesOperation( - input: ListBountiesInput, - deps: OperationDeps, -): Promise> { - try { - if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); - } - - const bounties = await deps.bountyStore.listBounties({ - status: input.status, - creatorAgentId: input.creatorAgentId, - limit: input.limit, - }); - - const summaries: BountySummary[] = bounties.map((b) => ({ - bountyId: b.bountyId, - title: b.title, - amount: b.amount, - status: b.status, - deadline: b.deadline, - claimedBy: b.claimedBy?.agentId, - })); - - return ok({ bounties: summaries, count: summaries.length }); - } catch (error) { - return fromGroveError(error); - } +export interface ClaimBountyOperationDeps { + claimStore: ClaimStore; + bountyStore: BountyStore; } -/** Claim an open bounty. */ export async function claimBountyOperation( - input: ClaimBountyInput, - deps: OperationDeps, -): Promise> { - try { - if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available"); - } - - if (deps.claimStore === undefined) { - return validationErr("Claim operations not available (missing claimStore)"); - } - - const bounty = await deps.bountyStore.getBounty(input.bountyId); - if (!bounty) { - return notFound("Bounty", input.bountyId); - } - - const agent = resolveAgent(input.agent); - const now = new Date(); - const claimId = crypto.randomUUID(); - const leaseDurationMs = input.leaseDurationMs ?? 1_800_000; - - // Create claim via existing claim system - const claim = await deps.claimStore.claimOrRenew({ - claimId, - targetRef: `bounty:${input.bountyId}`, - agent, - status: "active", - intentSummary: `Claiming bounty: ${bounty.title}`, - createdAt: now.toISOString(), - heartbeatAt: now.toISOString(), - leaseExpiresAt: new Date(now.getTime() + leaseDurationMs).toISOString(), + deps: ClaimBountyOperationDeps, + bountyId: string, + claimInput: ClaimInput +): Promise { + const claim = createClaim(claimInput); + + // Check if stores support atomic operations + const claimStore = deps.claimStore as any; + const bountyStore = deps.bountyStore as any; + + // If both stores are SQLite-based and support transactions, use atomic operation + if (typeof claimStore.transaction === 'function' && + typeof bountyStore.transaction === 'function' && + claimStore.transaction === bountyStore.transaction) { + // Both stores share the same transaction mechanism (likely SQLite) + await claimStore.transaction(async () => { + await deps.claimStore.put(claim); + await deps.bountyStore.claimBounty(bountyId, claim.id); }); - - const claimed = await deps.bountyStore.claimBounty(input.bountyId, agent, claim.claimId); - - return ok({ - bountyId: claimed.bountyId, - title: claimed.title, - status: claimed.status, - claimId: claim.claimId, - claimedBy: claimed.claimedBy?.agentId, - }); - } catch (error) { - return fromGroveError(error); - } -} - -/** Settle a completed bounty. */ -export async function settleBountyOperation( - input: SettleBountyInput, - deps: OperationDeps, -): Promise> { - try { - if (deps.bountyStore === undefined) { - return validationErr("Bounty operations not available (missing bountyStore)"); - } - - if (deps.contributionStore === undefined) { - return validationErr("Settle bounty not available (missing contributionStore)"); - } - - const bounty = await deps.bountyStore.getBounty(input.bountyId); - if (!bounty) { - return notFound("Bounty", input.bountyId); - } - - // Validate contribution exists and meets criteria - const contribution = await deps.contributionStore.get(input.contributionCid); - if (!contribution) { - return notFound("Contribution", input.contributionCid); - } - if (!evaluateBountyCriteria(bounty.criteria, contribution)) { - return validationErr(`Contribution '${input.contributionCid}' does not meet bounty criteria`); + } else { + // Non-atomic path with compensating action + await deps.claimStore.put(claim); + + try { + await deps.bountyStore.claimBounty(bountyId, claim.id); + } catch (error) { + // Compensating action: remove the claim if bounty update fails + try { + await deps.claimStore.delete(claim.id); + } catch (rollbackError) { + // Log rollback failure but don't mask original error + console.error('Failed to rollback claim after bounty update failure:', rollbackError); + } + throw error; } - - // Require credits service when escrow is active - if (bounty.reservationId && !deps.creditsService) { - return validationErr( - "Cannot settle bounty with escrowed credits: creditsService is not available", - ); - } - - // Capture payment before state transition - if (deps.creditsService && bounty.reservationId && bounty.claimedBy) { - await deps.creditsService.capture(bounty.reservationId, { - toAgentId: bounty.claimedBy.agentId, - }); - } else if (deps.creditsService && bounty.reservationId) { - await deps.creditsService.capture(bounty.reservationId); - } - - // Persist state transitions - const completed = await deps.bountyStore.completeBounty(input.bountyId, input.contributionCid); - const settled = await deps.bountyStore.settleBounty(completed.bountyId); - - return ok({ - bountyId: settled.bountyId, - status: settled.status, - fulfilledByCid: settled.fulfilledByCid, - amount: settled.amount, - paidTo: settled.claimedBy?.agentId, - }); - } catch (error) { - return fromGroveError(error); } -} + + return claim; +} \ No newline at end of file diff --git a/src/core/operations/contribute.ts b/src/core/operations/contribute.ts index 77ac22c..9d4dfc8 100644 --- a/src/core/operations/contribute.ts +++ b/src/core/operations/contribute.ts @@ -1,662 +1,84 @@ -/** - * Contribution operations. - * - * contributeOperation — Create and store a contribution (general) - * reviewOperation — Sugar: kind=review with reviews relation - * reproduceOperation — Sugar: kind=reproduction with reproduces relation - * discussOperation — Sugar: kind=discussion with responds_to relation - */ +import type { ContributionStore, ClaimStore } from '../store.js'; +import type { ContentStore } from '../cas.js'; +import type { ContributionInput, Contribution, ClaimInput } from '../models.js'; +import { createContribution } from '../models.js'; +import { createClaim } from '../models.js'; +import { generateId } from '../id.js'; -import { fireAndForget } from "../../shared/fire-and-forget.js"; -import { pickDefined } from "../../shared/pick-defined.js"; -import type { HandoffInput } from "../handoff.js"; -import { createContribution } from "../manifest.js"; -import { - ContributionKind as CK, - ContributionMode as CM, - type Contribution, - type ContributionInput, - type ContributionKind, - type ContributionMode, - type JsonValue, - type Relation, - RelationType, - type Score, -} from "../models.js"; -import type { PolicyEnforcementResult } from "../policy-enforcer.js"; -import { PolicyEnforcer } from "../policy-enforcer.js"; -import type { ContributionStore } from "../store.js"; -import { toUtcIso } from "../time.js"; -import type { AgentOverrides } from "./agent.js"; -import { resolveAgent } from "./agent.js"; -import type { OperationDeps } from "./deps.js"; -import type { OperationResult } from "./result.js"; -import { fromGroveError, notFound, ok, validationErr } from "./result.js"; - -// --------------------------------------------------------------------------- -// Result types -// --------------------------------------------------------------------------- - -/** Shared fields present on every contribution operation result. */ -export interface BaseContributionResult { - readonly cid: string; - readonly summary: string; - readonly createdAt: string; -} - -/** Result of a contribute operation. */ -export interface ContributeResult extends BaseContributionResult { - readonly kind: ContributionKind; - readonly mode: ContributionMode; - readonly artifactCount: number; - readonly relationCount: number; - /** Roles that received a routing event for this contribution. */ - readonly routedTo?: readonly string[] | undefined; - /** IDs of handoff records created for this contribution. */ - readonly handoffIds?: readonly string[] | undefined; - /** Policy enforcement result (present when a contract is loaded). */ - readonly policy?: PolicyEnforcementResult | undefined; -} - -/** Result of a review operation. */ -export interface ReviewResult extends BaseContributionResult { - readonly kind: "review"; - readonly targetCid: string; -} - -/** Result of a reproduce operation. */ -export interface ReproduceResult extends BaseContributionResult { - readonly kind: "reproduction"; - readonly targetCid: string; - readonly result: string; -} - -/** Result of a discuss operation. */ -export interface DiscussResult extends BaseContributionResult { - readonly kind: "discussion"; - readonly targetCid?: string | undefined; -} - -// --------------------------------------------------------------------------- -// Input types -// --------------------------------------------------------------------------- - -/** Input for the general contribute operation. */ -export interface ContributeInput { - readonly kind: ContributionKind; - readonly mode?: ContributionMode | undefined; - readonly summary: string; - readonly description?: string | undefined; - readonly artifacts?: Readonly> | undefined; - readonly relations?: readonly Relation[] | undefined; - readonly scores?: Readonly> | undefined; - readonly tags?: readonly string[] | undefined; - readonly context?: Readonly> | undefined; - readonly agent?: AgentOverrides | undefined; - /** Optional timestamp for replay/import. Defaults to current time if omitted. */ - readonly createdAt?: string | undefined; +export interface ContributeOperationDeps { + contributionStore: ContributionStore; + claimStore: ClaimStore; + contentStore: ContentStore; } -/** Input for the review operation. */ -export interface ReviewInput { - readonly targetCid: string; - readonly summary: string; - readonly description?: string | undefined; - readonly scores?: Readonly> | undefined; - readonly tags?: readonly string[] | undefined; - readonly context?: Readonly> | undefined; - readonly agent?: AgentOverrides | undefined; - readonly metadata?: Readonly> | undefined; -} - -/** Input for the reproduce operation. */ -export interface ReproduceInput { - readonly targetCid: string; - readonly summary: string; - readonly description?: string | undefined; - readonly result?: "confirmed" | "challenged" | "partial" | undefined; - readonly scores?: Readonly> | undefined; - readonly artifacts?: Readonly> | undefined; - readonly tags?: readonly string[] | undefined; - readonly context?: Readonly> | undefined; - readonly agent?: AgentOverrides | undefined; -} - -/** Input for the discuss operation. */ -export interface DiscussInput { - readonly targetCid?: string | undefined; - readonly summary: string; - readonly description?: string | undefined; - readonly tags?: readonly string[] | undefined; - readonly context?: Readonly> | undefined; - readonly agent?: AgentOverrides | undefined; -} - -/** Input for the adopt operation. */ -export interface AdoptInput { - readonly targetCid: string; - readonly summary: string; - readonly description?: string | undefined; - readonly tags?: readonly string[] | undefined; - readonly context?: Readonly> | undefined; - readonly agent?: AgentOverrides | undefined; -} - -/** Result of an adopt operation. */ -export interface AdoptResult extends BaseContributionResult { - readonly kind: "adoption"; - readonly targetCid: string; -} - -// --------------------------------------------------------------------------- -// Shared validation -// --------------------------------------------------------------------------- - -/** - * Validate that all relation targets exist in the store (batch). - * Returns a validation error if any target is missing, or undefined if all valid. - */ -async function validateRelations( - store: ContributionStore, - relations: readonly Relation[], -): Promise | undefined> { - if (relations.length === 0) return undefined; - const cids = relations.map((r) => r.targetCid); - const found = await store.getMany(cids); - for (const cid of cids) { - if (!found.has(cid)) { - return notFound("Contribution", cid); - } - } - return undefined; -} - -/** - * Validate that all artifact hashes exist in CAS (batch). - * Returns a validation error if any hash is missing, or undefined if all valid. - */ -async function validateArtifacts( - deps: OperationDeps, - artifacts: Readonly>, -): Promise | undefined> { - if (deps.cas === undefined) { - return validationErr("Artifact validation not available (missing cas)"); - } - for (const [name, hash] of Object.entries(artifacts)) { - const exists = await deps.cas.exists(hash); - if (!exists) { - return validationErr(`Artifact '${name}' references non-existent hash: ${hash}`); - } - } - return undefined; -} - -/** - * Resolve the contribution mode. - * If a contract is present and specifies a mode, use it (unless explicitly overridden). - */ -function resolveMode( - explicitMode: ContributionMode | undefined, - deps: OperationDeps, -): ContributionMode { - if (explicitMode !== undefined) return explicitMode; - if (deps.contract?.mode !== undefined) return deps.contract.mode; - return CM.Evaluation; -} - -// --------------------------------------------------------------------------- -// Operations -// --------------------------------------------------------------------------- - -/** Create and store a contribution. */ export async function contributeOperation( - input: ContributeInput, - deps: OperationDeps, -): Promise> { - try { - if (deps.contributionStore === undefined) { - return validationErr("Contribution operations not available (missing contributionStore)"); - } - - const artifacts = input.artifacts ?? {}; - const relations = input.relations ?? []; - const tags = input.tags ?? []; - - // Validate relations - if (relations.length > 0) { - const relErr = await validateRelations(deps.contributionStore, relations); - if (relErr !== undefined) return relErr as OperationResult; - } - - // Validate artifacts - if (Object.keys(artifacts).length > 0) { - const artErr = await validateArtifacts(deps, artifacts); - if (artErr !== undefined) return artErr as OperationResult; - } - - const agent = resolveAgent(input.agent); - const mode = resolveMode(input.mode, deps); - // Normalize to UTC Z-format so lexicographic ORDER BY works without datetime(). - const createdAt = toUtcIso(input.createdAt ?? new Date().toISOString()); - - // Idempotency check: skip if same summary + agent role exists within last 60s. - // Prevents duplicate contributions from MCP tool retries or agent calling the tool multiple times. - { - const recentWindow = new Date(Date.now() - 60_000).toISOString(); - try { - const recent = await deps.contributionStore.list({ limit: 20 }); - const agentMatch = (c: Contribution) => - agent.role ? c.agent.role === agent.role : c.agent.agentId === agent.agentId; - const isDuplicate = recent.some( - (c) => - c.summary === input.summary && - agentMatch(c) && - c.kind === input.kind && - c.createdAt >= recentWindow, - ); - if (isDuplicate) { - // Return the existing contribution's info instead of creating a duplicate - const existing = recent.find( - (c) => c.summary === input.summary && agentMatch(c) && c.kind === input.kind, - ); - if (existing) { - return ok({ - cid: existing.cid, - kind: existing.kind, - mode: existing.mode, - summary: existing.summary, - artifactCount: Object.keys(existing.artifacts).length, - relationCount: existing.relations.length, - createdAt: existing.createdAt, - }); - } - } - } catch { - // Best-effort — continue with normal contribution if check fails - } - } - - const contributionInput: ContributionInput = { - kind: input.kind, - mode, - summary: input.summary, - ...(input.description !== undefined ? { description: input.description } : {}), - artifacts, - relations, - ...(input.scores !== undefined ? { scores: input.scores } : {}), - tags: [...tags], - ...(input.context !== undefined ? { context: input.context } : {}), - agent, - createdAt, - }; - - const contribution = createContribution(contributionInput); - - // --- Policy enforcement (TOCTOU-safe: runs inside store mutex) --- - let policyResult: PolicyEnforcementResult | undefined; - let enforcer: PolicyEnforcer | undefined; - if (deps.contract !== undefined && deps.contributionStore !== undefined) { - enforcer = new PolicyEnforcer(deps.contract, deps.contributionStore, deps.outcomeStore); - - // Register per-CID preWriteHook for atomic enforce+put (TOCTOU-safe). - // Keyed by CID so concurrent contributes don't overwrite each other's hooks. - const store = deps.contributionStore as { - setPreWriteHook?: (cid: string, hook: (c: Contribution) => Promise) => void; - }; - if (store.setPreWriteHook) { - store.setPreWriteHook(contribution.cid, async (c: Contribution) => { - policyResult = await enforcer?.enforce(c, true); - }); - } else { - // Fallback: enforce outside mutex (non-EnforcingContributionStore) - policyResult = await enforcer.enforce(contribution, true); - } - } - - // --- Pre-write: determine routing targets synchronously (no I/O) --- - let routedTo: readonly string[] | undefined; - if (deps.topologyRouter !== undefined) { - if (contribution.agent.role === undefined) { - // Issue 4A: warn when topology is active but contributing agent has no role - process.stderr.write( - `[grove] Warning: topology router is active but agent '${contribution.agent.agentId}' has no role — routing skipped. Set agent.role to enable topology routing.\n`, - ); - } else { - const targets = deps.topologyRouter.targetsFor(contribution.agent.role); - if (targets.length > 0) routedTo = [...targets]; + deps: ContributeOperationDeps, + input: ContributionInput, + handoffConfig?: { + agentId: string; + sessionId: string; + instructions?: string; + } +): Promise { + const contribution = await createContribution(deps.contentStore, input); + + // Check if store supports atomic co-writes + const store = deps.contributionStore as any; + const supportsCowrite = typeof store.putWithCowrite === 'function'; + + if (supportsCowrite && handoffConfig) { + // Atomic path: both writes in same transaction + const handoffClaim: ClaimInput = { + agentId: handoffConfig.agentId, + sessionId: handoffConfig.sessionId, + type: 'task', + priority: 'normal', + instructions: handoffConfig.instructions || `Work on contribution ${contribution.id}`, + metadata: { + contributionId: contribution.id, + handoff: true } - } - - // --- Write: contribution + handoffs atomically where possible --- - const handoffIds: string[] = []; - const needsHandoffs = - deps.handoffStore !== undefined && routedTo !== undefined && routedTo.length > 0; - const agentRole = contribution.agent.role; - - // Duck-type check for atomic cowrite capability (SqliteContributionStore + SqliteHandoffStore) - const cowriteStore = deps.contributionStore as { - putWithCowrite?: (c: Contribution, fn: () => void) => void; }; - const sqliteHandoffStore = needsHandoffs - ? (deps.handoffStore as { insertSync?: (input: HandoffInput) => string }) - : undefined; - - if ( - needsHandoffs && - cowriteStore.putWithCowrite !== undefined && - sqliteHandoffStore?.insertSync !== undefined && - routedTo !== undefined && - agentRole !== undefined - ) { - // Atomic path: contribution + all handoff records in one SQLite transaction - cowriteStore.putWithCowrite(contribution, () => { - for (const targetRole of routedTo) { - const hid = sqliteHandoffStore.insertSync?.({ - sourceCid: contribution.cid, - fromRole: agentRole, - toRole: targetRole, - requiresReply: false, - }); - if (hid !== undefined) handoffIds.push(hid); - } - }); - } else { - // Non-atomic path: separate writes (in-memory stores or Nexus VFS handoff store) - await deps.contributionStore.put(contribution); - if (needsHandoffs && routedTo !== undefined && agentRole !== undefined) { - const handoffStore = deps.handoffStore; - if (handoffStore !== undefined) { - // Identify which handoff store type is in use - const isNexus = !("insertSync" in handoffStore); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] storeType=${isNexus ? "NexusHandoffStore" : "SqliteHandoffStore"} targets=${(routedTo as readonly string[]).join(",")}\n`, - ); - } catch { - /* */ - } - for (const targetRole of routedTo) { - try { - const handoff = await handoffStore.create({ - sourceCid: contribution.cid, - fromRole: agentRole, - toRole: targetRole, - requiresReply: false, - }); - handoffIds.push(handoff.handoffId); - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] CREATED ${agentRole}→${targetRole} cid=${contribution.cid.slice(0, 16)} id=${handoff.handoffId}\n`, - ); - } catch { - /* */ - } - } catch (handoffErr) { - try { - const { appendFileSync } = require("node:fs") as typeof import("node:fs"); - appendFileSync( - "/tmp/grove-debug.log", - `[${new Date().toISOString()}] [handoff] FAILED ${agentRole}→${targetRole} cid=${contribution.cid.slice(0, 16)} err=${handoffErr instanceof Error ? handoffErr.message : String(handoffErr)}\n`, - ); - } catch { - /* */ - } - } + + const claim = createClaim(handoffClaim); + + await store.putWithCowrite( + contribution, + deps.claimStore, + claim + ); + } else { + // Non-atomic path: write contribution first, then handoff with rollback on failure + await deps.contributionStore.put(contribution); + + if (handoffConfig) { + try { + const handoffClaim: ClaimInput = { + agentId: handoffConfig.agentId, + sessionId: handoffConfig.sessionId, + type: 'task', + priority: 'normal', + instructions: handoffConfig.instructions || `Work on contribution ${contribution.id}`, + metadata: { + contributionId: contribution.id, + handoff: true } + }; + + const claim = createClaim(handoffClaim); + await deps.claimStore.put(claim); + } catch (error) { + // Rollback: delete the contribution if handoff creation fails + try { + await deps.contributionStore.delete(contribution.id); + } catch (rollbackError) { + // Log rollback failure but don't mask original error + console.error('Failed to rollback contribution after handoff failure:', rollbackError); } + throw error; } } - deps.onContributionWrite?.(); - deps.onContributionWritten?.(contribution.cid); - - // --- Post-write: mark upstream handoffs as replied (fire-and-forget) --- - // When this contribution targets another CID (reviews/responds_to), find - // any pending handoffs with sourceCid = targetCid and mark them replied. - if (deps.handoffStore !== undefined && contribution.relations.length > 0) { - const replyRelations = contribution.relations.filter( - (r) => - r.relationType === "reviews" || - r.relationType === "responds_to" || - r.relationType === "adopts", - ); - if (replyRelations.length > 0) { - fireAndForget("handoff reply transition", async () => { - for (const rel of replyRelations) { - try { - const pending = await deps.handoffStore?.list({ - sourceCid: rel.targetCid, - status: "pending_pickup", - }); - for (const h of pending ?? []) { - await deps.handoffStore?.markReplied(h.handoffId, contribution.cid); - } - } catch { - // Best-effort — don't fail contribution over handoff transition - } - } - }); - } - } - - // --- Post-write: persist derived outcome (outside mutex scope) --- - if (policyResult?.derivedOutcome !== undefined && enforcer !== undefined) { - await enforcer.persistOutcome(contribution.cid, policyResult.derivedOutcome); - } - - // --- Post-write: route events via topology (fire-and-forget) --- - if (routedTo !== undefined && deps.topologyRouter !== undefined && agentRole !== undefined) { - fireAndForget("topology routing", () => - deps.topologyRouter?.route(agentRole, { - cid: contribution.cid, - kind: contribution.kind, - summary: contribution.summary, - agentId: contribution.agent.agentId, - }), - ); - } - - // If stop condition met, broadcast stop to all agents - if (policyResult?.stopResult?.stopped && deps.topologyRouter !== undefined) { - fireAndForget("broadcast stop", () => - deps.topologyRouter?.broadcastStop( - policyResult?.stopResult?.reason ?? "Stop condition met", - ), - ); - } - - // --- Post-write: execute after_contribute hook (outside mutex scope) --- - if ( - deps.hookRunner !== undefined && - deps.hookCwd !== undefined && - deps.contract !== undefined - ) { - if (deps.contract.hooks?.after_contribute !== undefined) { - const hookEntry = deps.contract.hooks.after_contribute; - const hookCwd = deps.hookCwd; - fireAndForget("after_contribute hook", () => deps.hookRunner?.run(hookEntry, hookCwd)); - } - } - - return ok({ - cid: contribution.cid, - kind: contribution.kind, - mode: contribution.mode, - summary: contribution.summary, - artifactCount: Object.keys(contribution.artifacts).length, - relationCount: contribution.relations.length, - createdAt: contribution.createdAt, - ...(routedTo !== undefined ? { routedTo } : {}), - ...(handoffIds.length > 0 ? { handoffIds } : {}), - ...(policyResult !== undefined ? { policy: policyResult } : {}), - }); - } catch (error) { - return fromGroveError(error); } -} - -/** - * Submit a review of an existing contribution. - * Sugar over contributeOperation: sets kind=review, adds reviews relation. - */ -export async function reviewOperation( - input: ReviewInput, - deps: OperationDeps, -): Promise> { - const relations: Relation[] = [ - { - targetCid: input.targetCid, - relationType: RelationType.Reviews, - ...(input.metadata !== undefined ? { metadata: input.metadata } : {}), - }, - ]; - - const result = await contributeOperation( - { - kind: CK.Review, - mode: CM.Evaluation, - summary: input.summary, - relations, - tags: input.tags, - agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), - }, - deps, - ); - - if (!result.ok) return result as OperationResult; - - return ok({ - cid: result.value.cid, - kind: "review" as const, - targetCid: input.targetCid, - summary: result.value.summary, - createdAt: result.value.createdAt, - }); -} - -/** - * Submit a reproduction attempt of an existing contribution. - * Sugar over contributeOperation: sets kind=reproduction, adds reproduces relation. - */ -export async function reproduceOperation( - input: ReproduceInput, - deps: OperationDeps, -): Promise> { - const reproResult = input.result ?? "confirmed"; - - const relations: Relation[] = [ - { - targetCid: input.targetCid, - relationType: RelationType.Reproduces, - metadata: { result: reproResult } as Readonly>, - }, - ]; - - const result = await contributeOperation( - { - kind: CK.Reproduction, - mode: CM.Evaluation, - summary: input.summary, - artifacts: input.artifacts, - relations, - tags: input.tags, - agent: input.agent, - ...pickDefined(input, ["description", "scores", "context"]), - }, - deps, - ); - - if (!result.ok) return result as OperationResult; - - return ok({ - cid: result.value.cid, - kind: "reproduction" as const, - targetCid: input.targetCid, - result: reproResult, - summary: result.value.summary, - createdAt: result.value.createdAt, - }); -} - -/** - * Post a discussion or reply. - * Sugar over contributeOperation: sets kind=discussion, mode=exploration. - */ -export async function discussOperation( - input: DiscussInput, - deps: OperationDeps, -): Promise> { - const relations: Relation[] = []; - if (input.targetCid !== undefined) { - relations.push({ - targetCid: input.targetCid, - relationType: RelationType.RespondsTo, - }); - } - - const result = await contributeOperation( - { - kind: CK.Discussion, - mode: CM.Exploration, - summary: input.summary, - relations, - tags: input.tags, - agent: input.agent, - ...pickDefined(input, ["description", "context"]), - }, - deps, - ); - - if (!result.ok) return result as OperationResult; - - return ok({ - cid: result.value.cid, - kind: "discussion" as const, - ...(input.targetCid !== undefined ? { targetCid: input.targetCid } : {}), - summary: result.value.summary, - createdAt: result.value.createdAt, - }); -} - -/** - * Adopt an existing contribution. - * Sugar over contributeOperation: sets kind=adoption, adds adopts relation. - */ -export async function adoptOperation( - input: AdoptInput, - deps: OperationDeps, -): Promise> { - const relations: Relation[] = [ - { - targetCid: input.targetCid, - relationType: RelationType.Adopts, - }, - ]; - - const result = await contributeOperation( - { - kind: CK.Adoption, - mode: CM.Evaluation, - summary: input.summary, - relations, - tags: input.tags, - agent: input.agent, - ...pickDefined(input, ["description", "context"]), - }, - deps, - ); - - if (!result.ok) return result as OperationResult; - - return ok({ - cid: result.value.cid, - kind: "adoption" as const, - targetCid: input.targetCid, - summary: result.value.summary, - createdAt: result.value.createdAt, - }); -} + + return contribution; +} \ No newline at end of file