From ebaef97f56ddd8a8440d1e05abac599c309bc67b Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Fri, 24 Apr 2026 04:28:35 +0200 Subject: [PATCH] feat(core,hooks,mcp,cli): wake requests + attention inbox Add a lightweight cross-agent "please attend" primitive and an inbox view: - task_wake / task_ack_wake / task_cancel_wake MCP tools post a wake request on a task thread. No claim transfer, no baton pass. Targets see it in the SessionStart preface and in the turn-boundary activity block with a copy-paste ack call; sender sees the ack on their next turn. - attention_inbox MCP tool plus `colony inbox` CLI command aggregate pending handoffs, pending wakes, stalled hivemind lanes, and recent other-session file claims into one compact progressive-disclosure view. Mirrors the handoff lifecycle (pending -> acknowledged/cancelled/expired) and reuses the same metadata / task-scoping shape so the two primitives can coexist without a schema change. Follow-ups deferred: takeover of stalled/dead sessions, claim-TTL renewal, SessionEnd checkpoint, terminal-control wake. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/wake-requests-attention-inbox.md | 14 + apps/cli/src/commands/inbox.ts | 92 ++++++ apps/cli/src/index.ts | 2 + apps/mcp-server/src/server.ts | 138 +++++++++ apps/mcp-server/test/server.test.ts | 4 + apps/mcp-server/test/task-threads.test.ts | 69 +++++ packages/core/src/attention-inbox.ts | 280 ++++++++++++++++++ packages/core/src/index.ts | 14 + packages/core/src/task-thread.ts | 195 +++++++++++- packages/core/test/attention-inbox.test.ts | 129 ++++++++ packages/core/test/wake-request.test.ts | 156 ++++++++++ packages/hooks/src/handlers/session-start.ts | 20 +- .../hooks/src/handlers/user-prompt-submit.ts | 5 + packages/hooks/test/task-injection.test.ts | 30 ++ 14 files changed, 1146 insertions(+), 2 deletions(-) create mode 100644 .changeset/wake-requests-attention-inbox.md create mode 100644 apps/cli/src/commands/inbox.ts create mode 100644 packages/core/src/attention-inbox.ts create mode 100644 packages/core/test/attention-inbox.test.ts create mode 100644 packages/core/test/wake-request.test.ts diff --git a/.changeset/wake-requests-attention-inbox.md b/.changeset/wake-requests-attention-inbox.md new file mode 100644 index 0000000..d56b893 --- /dev/null +++ b/.changeset/wake-requests-attention-inbox.md @@ -0,0 +1,14 @@ +--- +'@colony/core': minor +'@colony/hooks': minor +'@colony/mcp-server': minor +'@imdeadpool/colony': minor +--- + +Add wake-request primitive and attention inbox for idle/stalled cross-agent nudges. + +- `task_wake` / `task_ack_wake` / `task_cancel_wake` MCP tools post lightweight nudges on a task thread — no claim transfer, no baton pass. Targets see the request on their next SessionStart or UserPromptSubmit turn with a copy-paste-ready ack call. +- `attention_inbox` MCP tool + `colony inbox` CLI command aggregate pending handoffs, pending wakes, stalled lanes from the hivemind snapshot, and recent other-session file claims into one compact view. Bodies are not expanded; fetch via `get_observations`. +- Hook injection extended: `buildTaskPreface` surfaces pending wake requests alongside pending handoffs; `buildTaskUpdatesPreface` inlines an ack call for wake requests that arrive between turns. + +Deferred follow-ups (not in this change): safe session takeover, claim TTL renewal, session Stop checkpoint, and any terminal-control wake mechanism. diff --git a/apps/cli/src/commands/inbox.ts b/apps/cli/src/commands/inbox.ts new file mode 100644 index 0000000..8b1dba9 --- /dev/null +++ b/apps/cli/src/commands/inbox.ts @@ -0,0 +1,92 @@ +import { join } from 'node:path'; +import { loadSettings, resolveDataDir } from '@colony/config'; +import { MemoryStore, buildAttentionInbox } from '@colony/core'; +import type { Command } from 'commander'; +import kleur from 'kleur'; + +export function registerInboxCommand(program: Command): void { + program + .command('inbox') + .description('Compact list of attention items for a session: pending handoffs, wakes, stalled lanes, recent claims') + .requiredOption('--session ', 'your session_id') + .requiredOption('--agent ', 'your agent name (e.g. claude, codex)') + .option('--repo-root ', 'repo root to scan for stalled lanes') + .option('--json', 'emit the full inbox as JSON') + .action(async (opts: { session: string; agent: string; repoRoot?: string; json?: boolean }) => { + const settings = loadSettings(); + const dbPath = join(resolveDataDir(settings.dataDir), 'data.db'); + const store = new MemoryStore({ dbPath, settings }); + try { + const inbox = buildAttentionInbox(store, { + session_id: opts.session, + agent: opts.agent, + ...(opts.repoRoot !== undefined ? { repo_root: opts.repoRoot } : {}), + }); + + if (opts.json) { + process.stdout.write(`${JSON.stringify(inbox, null, 2)}\n`); + return; + } + + const lines: string[] = []; + lines.push( + kleur.bold( + `Inbox for ${opts.agent}@${opts.session.slice(0, 8)} — ${inbox.summary.next_action}`, + ), + ); + lines.push( + ` handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`, + ); + + if (inbox.pending_handoffs.length > 0) { + lines.push(''); + lines.push(kleur.cyan('Pending handoffs:')); + for (const h of inbox.pending_handoffs) { + const mins = Math.max(0, Math.round((h.expires_at - inbox.generated_at) / 60_000)); + lines.push( + ` #${h.id} task ${h.task_id} from ${h.from_agent} (${mins}m left): ${h.summary}`, + ); + lines.push( + ` accept: task_accept_handoff(handoff_observation_id=${h.id}, session_id="${opts.session}")`, + ); + } + } + if (inbox.pending_wakes.length > 0) { + lines.push(''); + lines.push(kleur.yellow('Pending wakes:')); + for (const w of inbox.pending_wakes) { + const mins = Math.max(0, Math.round((w.expires_at - inbox.generated_at) / 60_000)); + lines.push( + ` #${w.id} task ${w.task_id} from ${w.from_agent} (${mins}m left): ${w.reason}`, + ); + if (w.next_step) lines.push(` next: ${w.next_step}`); + lines.push( + ` ack: task_ack_wake(wake_observation_id=${w.id}, session_id="${opts.session}")`, + ); + } + } + if (inbox.stalled_lanes.length > 0) { + lines.push(''); + lines.push(kleur.magenta('Stalled lanes:')); + for (const lane of inbox.stalled_lanes) { + lines.push( + ` ${lane.branch} [${lane.activity}] ${lane.owner}: ${lane.activity_summary}`, + ); + } + } + if (inbox.recent_other_claims.length > 0) { + lines.push(''); + lines.push(kleur.gray('Recent other-session claims:')); + for (const c of inbox.recent_other_claims) { + lines.push( + ` task ${c.task_id} ${c.file_path} by ${c.by_session_id.slice(0, 8)}`, + ); + } + } + + process.stdout.write(`${lines.join('\n')}\n`); + } finally { + store.close(); + } + }); +} diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index ba4731a..d8b9ad0 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -8,6 +8,7 @@ import { registerDebriefCommand } from './commands/debrief.js'; import { registerDoctorCommand } from './commands/doctor.js'; import { registerExportCommand } from './commands/export.js'; import { registerHookCommand } from './commands/hook.js'; +import { registerInboxCommand } from './commands/inbox.js'; import { registerInstallCommand } from './commands/install.js'; import { registerLifecycleCommands } from './commands/lifecycle.js'; import { registerMcpCommand } from './commands/mcp.js'; @@ -43,6 +44,7 @@ export function createProgram(): Command { registerNoteCommand(program); registerObserveCommand(program); registerDebriefCommand(program); + registerInboxCommand(program); return program; } diff --git a/apps/mcp-server/src/server.ts b/apps/mcp-server/src/server.ts index 7f81d6d..7df48d6 100644 --- a/apps/mcp-server/src/server.ts +++ b/apps/mcp-server/src/server.ts @@ -5,6 +5,7 @@ import { pathToFileURL } from 'node:url'; import { type Settings, loadSettings, resolveDataDir } from '@colony/config'; import { type AgentCapabilities, + type AttentionInboxOptions, DEFAULT_CAPABILITIES, type Embedder, type HivemindOptions, @@ -14,6 +15,7 @@ import { ProposalSystem, type SearchResult, TaskThread, + buildAttentionInbox, loadProfile, readHivemind, saveProfile, @@ -516,6 +518,142 @@ export function buildServer(store: MemoryStore, settings: Settings): McpServer { }, ); + server.tool( + 'task_wake', + 'Post a wake request on a task thread — a lightweight nudge surfaced to the target on their next turn. No claim transfer. Use when you need another session to attend to something but a full handoff is the wrong shape.', + { + task_id: z.number().int().positive(), + session_id: z.string().min(1).describe('your session_id (the sender)'), + agent: z.string().min(1).describe('your agent name, e.g. claude or codex'), + to_agent: z.enum(['claude', 'codex', 'any']), + to_session_id: z.string().optional(), + reason: z.string().min(1), + next_step: z.string().optional(), + expires_in_minutes: z.number().int().positive().max(1440).optional(), + }, + async (args) => { + const thread = new TaskThread(store, args.task_id); + const id = thread.requestWake({ + from_session_id: args.session_id, + from_agent: args.agent, + to_agent: args.to_agent, + ...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}), + reason: args.reason, + ...(args.next_step !== undefined ? { next_step: args.next_step } : {}), + ...(args.expires_in_minutes !== undefined + ? { expires_in_ms: args.expires_in_minutes * 60_000 } + : {}), + }); + return { + content: [ + { type: 'text', text: JSON.stringify({ wake_observation_id: id, status: 'pending' }) }, + ], + }; + }, + ); + + server.tool( + 'task_ack_wake', + 'Acknowledge a pending wake request addressed to you. Records an ack on the task thread so the sender sees the response on their next turn.', + { + wake_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + }, + async ({ wake_observation_id, session_id }) => { + const obs = store.storage.getObservation(wake_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.acknowledgeWake(wake_observation_id, session_id); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'acknowledged' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); + + server.tool( + 'task_cancel_wake', + 'Cancel a pending wake request. Either the sender (withdrawing) or the target (declining) may cancel.', + { + wake_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + reason: z.string().optional(), + }, + async ({ wake_observation_id, session_id, reason }) => { + const obs = store.storage.getObservation(wake_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.cancelWake(wake_observation_id, session_id, reason); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); + + server.tool( + 'attention_inbox', + 'Compact list of what needs your attention: pending handoffs, pending wakes, stalled lanes, recent other-session file claims. Fetch bodies via get_observations.', + { + session_id: z.string().min(1), + agent: z.string().min(1), + repo_root: z.string().min(1).optional(), + repo_roots: z.array(z.string().min(1)).max(20).optional(), + recent_claim_window_minutes: z.number().int().positive().max(1440).optional(), + recent_claim_limit: z.number().int().positive().max(100).optional(), + task_ids: z.array(z.number().int().positive()).max(100).optional(), + }, + async (args) => { + const options: AttentionInboxOptions = { + session_id: args.session_id, + agent: args.agent, + }; + if (args.repo_root !== undefined) options.repo_root = args.repo_root; + if (args.repo_roots !== undefined) options.repo_roots = args.repo_roots; + if (args.recent_claim_window_minutes !== undefined) { + options.recent_claim_window_ms = args.recent_claim_window_minutes * 60_000; + } + if (args.recent_claim_limit !== undefined) { + options.recent_claim_limit = args.recent_claim_limit; + } + if (args.task_ids !== undefined) options.task_ids = args.task_ids; + const inbox = buildAttentionInbox(store, options); + return { content: [{ type: 'text', text: JSON.stringify(inbox) }] }; + }, + ); + server.tool( 'task_foraging_report', 'List pending and recently promoted proposals on a (repo_root, branch). Pending proposals whose strength has evaporated below the noise floor are omitted.', diff --git a/apps/mcp-server/test/server.test.ts b/apps/mcp-server/test/server.test.ts index b00dad0..4e55a48 100644 --- a/apps/mcp-server/test/server.test.ts +++ b/apps/mcp-server/test/server.test.ts @@ -48,12 +48,15 @@ describe('MCP server', () => { expect(tools.map((t) => t.name).sort()).toEqual([ 'agent_get_profile', 'agent_upsert_profile', + 'attention_inbox', 'get_observations', 'hivemind', 'hivemind_context', 'list_sessions', 'search', 'task_accept_handoff', + 'task_ack_wake', + 'task_cancel_wake', 'task_claim_file', 'task_decline_handoff', 'task_foraging_report', @@ -64,6 +67,7 @@ describe('MCP server', () => { 'task_reinforce', 'task_timeline', 'task_updates_since', + 'task_wake', 'timeline', ]); }); diff --git a/apps/mcp-server/test/task-threads.test.ts b/apps/mcp-server/test/task-threads.test.ts index a434eef..8fd26a9 100644 --- a/apps/mcp-server/test/task-threads.test.ts +++ b/apps/mcp-server/test/task-threads.test.ts @@ -173,6 +173,75 @@ describe('task threads — handoff lifecycle', () => { expect(afterMeta.status).toBe('expired'); }); + it('task_wake + task_ack_wake + attention_inbox round trip', async () => { + const { task_id, sessionA, sessionB } = seedTwoSessionTask(); + + const { wake_observation_id } = await call<{ wake_observation_id: number }>('task_wake', { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + reason: 'please review the migration shape', + next_step: 'look at packages/storage/src/schema.ts', + }); + + const inbox = await call<{ + pending_wakes: Array<{ id: number; reason: string }>; + summary: { pending_wake_count: number; next_action: string }; + }>('attention_inbox', { + session_id: sessionB, + agent: 'codex', + task_ids: [task_id], + }); + expect(inbox.pending_wakes.map((w) => w.id)).toContain(wake_observation_id); + expect(inbox.summary.pending_wake_count).toBeGreaterThan(0); + + const acked = await call<{ status: string }>('task_ack_wake', { + wake_observation_id, + session_id: sessionB, + }); + expect(acked.status).toBe('acknowledged'); + + const row = store.storage.getObservation(wake_observation_id); + const meta = JSON.parse(row?.metadata ?? '{}'); + expect(meta.status).toBe('acknowledged'); + expect(meta.acknowledged_by_session_id).toBe(sessionB); + + const retry = await client.callTool({ + name: 'task_ack_wake', + arguments: { wake_observation_id, session_id: sessionB }, + }); + expect(retry.isError).toBe(true); + }); + + it('task_cancel_wake cancels a pending wake without side effects on claims', async () => { + const { task_id, sessionA, sessionB } = seedTwoSessionTask(); + + const { wake_observation_id } = await call<{ wake_observation_id: number }>('task_wake', { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + reason: 'nevermind', + }); + + await call('task_cancel_wake', { + wake_observation_id, + session_id: sessionA, + reason: 'resolved offline', + }); + + const row = store.storage.getObservation(wake_observation_id); + const meta = JSON.parse(row?.metadata ?? '{}'); + expect(meta.status).toBe('cancelled'); + + const res = await client.callTool({ + name: 'task_ack_wake', + arguments: { wake_observation_id, session_id: sessionB }, + }); + expect(res.isError).toBe(true); + }); + it("task_updates_since filters out the caller's own posts", async () => { const { task_id, sessionA, sessionB } = seedTwoSessionTask(); const cursor = Date.now() - 1; // strictly before either post diff --git a/packages/core/src/attention-inbox.ts b/packages/core/src/attention-inbox.ts new file mode 100644 index 0000000..4f0f944 --- /dev/null +++ b/packages/core/src/attention-inbox.ts @@ -0,0 +1,280 @@ +import type { TaskClaimRow } from '@colony/storage'; +import { + type HivemindActivity, + type HivemindOptions, + type HivemindSession, + readHivemind, +} from './hivemind.js'; +import type { MemoryStore } from './memory-store.js'; +import { + type HandoffMetadata, + type HandoffTarget, + TaskThread, + type WakeRequestMetadata, +} from './task-thread.js'; + +/** + * Pending handoff item reduced to the shape the inbox surfaces: id, sender, + * one-line reason, expiry hint, task + accept-call hint. Full body stays in + * the observation row — callers that want it fetch via get_observations. + */ +export interface InboxHandoff { + id: number; + task_id: number; + from_agent: string; + from_session_id: string; + to_agent: HandoffTarget; + to_session_id: string | null; + summary: string; + expires_at: number; + ts: number; +} + +export interface InboxWake { + id: number; + task_id: number; + from_agent: string; + from_session_id: string; + reason: string; + next_step: string; + expires_at: number; + ts: number; +} + +export interface InboxLane { + repo_root: string; + branch: string; + task: string; + owner: string; + activity: HivemindActivity; + activity_summary: string; + worktree_path: string; + updated_at: string; +} + +export interface InboxRecentClaim { + task_id: number; + file_path: string; + by_session_id: string; + claimed_at: number; +} + +export interface AttentionInbox { + generated_at: number; + session_id: string; + agent: string; + summary: { + pending_handoff_count: number; + pending_wake_count: number; + stalled_lane_count: number; + recent_other_claim_count: number; + next_action: string; + }; + pending_handoffs: InboxHandoff[]; + pending_wakes: InboxWake[]; + stalled_lanes: InboxLane[]; + recent_other_claims: InboxRecentClaim[]; +} + +export interface AttentionInboxOptions { + session_id: string; + agent: string; + repo_root?: string; + repo_roots?: string[]; + now?: number; + /** + * Window (ms) for "recent other-session claim" surfacing. Default 15m — + * longer than the 5m active-edit window the UserPromptSubmit conflict + * preface uses, because the inbox is a review surface, not a live warning. + */ + recent_claim_window_ms?: number; + recent_claim_limit?: number; + /** Tasks to scan for pending handoffs/wakes. Defaults to all tasks the + * session participates in. */ + task_ids?: number[]; +} + +const DEFAULT_RECENT_CLAIM_WINDOW_MS = 15 * 60_000; +const DEFAULT_RECENT_CLAIM_LIMIT = 20; + +/** + * Aggregate "things that need this session's attention" across tasks and + * hivemind lanes. Progressive disclosure: this is the compact shape. Full + * observation bodies are fetched via get_observations by id. + * + * Scope intentionally narrow: pending handoffs, pending wakes, stalled + * lanes, other-session recent claims. Items explicitly not here yet — + * "PRs open needing merge" (no GitHub integration) and "stale lock > TTL" + * (claim TTL renewal is a separate follow-up) — are deferred to later PRs. + */ +export function buildAttentionInbox( + store: MemoryStore, + opts: AttentionInboxOptions, +): AttentionInbox { + const now = opts.now ?? Date.now(); + const taskIds = resolveTaskIds(store, opts); + + const pending_handoffs: InboxHandoff[] = []; + const pending_wakes: InboxWake[] = []; + const recent_other_claims: InboxRecentClaim[] = []; + + const recentWindow = opts.recent_claim_window_ms ?? DEFAULT_RECENT_CLAIM_WINDOW_MS; + const recentLimit = opts.recent_claim_limit ?? DEFAULT_RECENT_CLAIM_LIMIT; + const recentSince = now - recentWindow; + + for (const task_id of taskIds) { + const thread = new TaskThread(store, task_id); + for (const h of thread.pendingHandoffsFor(opts.session_id, opts.agent)) { + pending_handoffs.push(compactHandoff(task_id, h.id, h.ts, h.meta)); + } + for (const w of thread.pendingWakesFor(opts.session_id, opts.agent)) { + pending_wakes.push(compactWake(task_id, w.id, w.ts, w.meta)); + } + for (const claim of store.storage.recentClaims(task_id, recentSince, recentLimit)) { + if (claim.session_id === opts.session_id) continue; + recent_other_claims.push(compactClaim(claim)); + } + } + + const stalled_lanes = collectStalledLanes(opts); + + const summary = { + pending_handoff_count: pending_handoffs.length, + pending_wake_count: pending_wakes.length, + stalled_lane_count: stalled_lanes.length, + recent_other_claim_count: recent_other_claims.length, + next_action: deriveNextAction({ + pending_handoffs, + pending_wakes, + stalled_lanes, + recent_other_claims, + }), + }; + + return { + generated_at: now, + session_id: opts.session_id, + agent: opts.agent, + summary, + pending_handoffs, + pending_wakes, + stalled_lanes, + recent_other_claims, + }; +} + +function resolveTaskIds(store: MemoryStore, opts: AttentionInboxOptions): number[] { + if (opts.task_ids && opts.task_ids.length > 0) { + return [...new Set(opts.task_ids)]; + } + // All tasks the session is currently participating in. Broader than + // findActiveTaskForSession (which returns one id); the inbox wants every + // lane the session could hear from. + const rows = store.storage.listTasks(200); + const participating: number[] = []; + for (const task of rows) { + const agent = store.storage.getParticipantAgent(task.id, opts.session_id); + if (agent !== undefined) participating.push(task.id); + } + return participating; +} + +function compactHandoff( + task_id: number, + id: number, + ts: number, + meta: HandoffMetadata, +): InboxHandoff { + return { + id, + task_id, + from_agent: meta.from_agent, + from_session_id: meta.from_session_id, + to_agent: meta.to_agent, + to_session_id: meta.to_session_id, + summary: meta.summary, + expires_at: meta.expires_at, + ts, + }; +} + +function compactWake( + task_id: number, + id: number, + ts: number, + meta: WakeRequestMetadata, +): InboxWake { + return { + id, + task_id, + from_agent: meta.from_agent, + from_session_id: meta.from_session_id, + reason: meta.reason, + next_step: meta.next_step, + expires_at: meta.expires_at, + ts, + }; +} + +function compactClaim(row: TaskClaimRow): InboxRecentClaim { + return { + task_id: row.task_id, + file_path: row.file_path, + by_session_id: row.session_id, + claimed_at: row.claimed_at, + }; +} + +function collectStalledLanes(opts: AttentionInboxOptions): InboxLane[] { + const options: HivemindOptions = { includeStale: true }; + if (opts.repo_root !== undefined) options.repoRoot = opts.repo_root; + if (opts.repo_roots !== undefined) options.repoRoots = opts.repo_roots; + if (opts.now !== undefined) options.now = opts.now; + + try { + const snapshot = readHivemind(options); + return snapshot.sessions.filter(isLaneStalled).map(toInboxLane); + } catch { + // Best effort — hivemind read touches the filesystem and must never + // turn an inbox query into a fatal error. + return []; + } +} + +function isLaneStalled(session: HivemindSession): boolean { + return session.activity === 'stalled' || session.activity === 'dead'; +} + +function toInboxLane(session: HivemindSession): InboxLane { + return { + repo_root: session.repo_root, + branch: session.branch, + task: session.task, + owner: `${session.agent}/${session.cli}`, + activity: session.activity, + activity_summary: session.activity_summary, + worktree_path: session.worktree_path, + updated_at: session.updated_at, + }; +} + +function deriveNextAction(parts: { + pending_handoffs: InboxHandoff[]; + pending_wakes: InboxWake[]; + stalled_lanes: InboxLane[]; + recent_other_claims: InboxRecentClaim[]; +}): string { + if (parts.pending_handoffs.length > 0) { + return 'Respond to pending handoffs first; each baton pass is blocking until accept or decline.'; + } + if (parts.pending_wakes.length > 0) { + return 'Acknowledge pending wake requests; another session is waiting on you.'; + } + if (parts.stalled_lanes.length > 0) { + return 'Review stalled lanes — takeover may be safer than waiting for the owner to return.'; + } + if (parts.recent_other_claims.length > 0) { + return 'Other sessions have recent file claims nearby; coordinate before editing the same files.'; + } + return 'Inbox is quiet; no immediate attention items.'; +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 7b8623f..08334aa 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -17,7 +17,21 @@ export { type HandoffStatus, type HandoffTarget, type HandOffArgs, + type RequestWakeArgs, + type WakeRequestMetadata, + type WakeRequestObservation, + type WakeStatus, + type WakeTarget, } from './task-thread.js'; +export { + buildAttentionInbox, + type AttentionInbox, + type AttentionInboxOptions, + type InboxHandoff, + type InboxLane, + type InboxRecentClaim, + type InboxWake, +} from './attention-inbox.js'; export { detectRepoBranch } from './git-detect.js'; export { PheromoneSystem, diff --git a/packages/core/src/task-thread.ts b/packages/core/src/task-thread.ts index 3a721dc..67f70d5 100644 --- a/packages/core/src/task-thread.ts +++ b/packages/core/src/task-thread.ts @@ -15,11 +15,55 @@ export type CoordinationKind = | 'decline' | 'decision' | 'blocker' - | 'note'; + | 'note' + | 'wake_request' + | 'wake_ack' + | 'wake_cancel'; export type HandoffStatus = 'pending' | 'accepted' | 'expired' | 'cancelled'; export type HandoffTarget = 'claude' | 'codex' | 'any'; +export type WakeStatus = 'pending' | 'acknowledged' | 'expired' | 'cancelled'; +export type WakeTarget = 'claude' | 'codex' | 'any'; + +/** + * Structured payload for a wake_request observation. Kept in metadata so the + * hook preface can render it without decompressing content. Mirrors the + * HandoffMetadata shape on purpose — the two primitives have symmetric + * lifecycle (pending → terminal) and symmetric eligibility rules. A wake is + * a lighter-weight nudge: no claim transfer, no baton pass, just "please + * attend to this on your next turn". + */ +export interface WakeRequestMetadata { + kind: 'wake_request'; + from_session_id: string; + from_agent: string; + to_agent: WakeTarget; + to_session_id: string | null; + reason: string; + next_step: string; + status: WakeStatus; + acknowledged_by_session_id: string | null; + acknowledged_at: number | null; + expires_at: number; +} + +export interface WakeRequestObservation { + id: number; + ts: number; + meta: WakeRequestMetadata; +} + +export interface RequestWakeArgs { + from_session_id: string; + from_agent: string; + to_agent: WakeTarget; + to_session_id?: string; + reason: string; + next_step?: string; + expires_in_ms?: number; +} + /** * The structured payload stored inside observation.metadata for handoff * messages. Keeping this shape narrow and typed is what lets the SessionStart @@ -71,6 +115,7 @@ export interface HandOffArgs { } const DEFAULT_HANDOFF_TTL_MS = 2 * 60 * 60 * 1000; +const DEFAULT_WAKE_TTL_MS = 24 * 60 * 60 * 1000; /** * TaskThread wraps MemoryStore + Storage with task-scoped coordination @@ -337,6 +382,134 @@ export class TaskThread { (meta.to_session_id === session_id || meta.to_agent === 'any' || meta.to_agent === agent), ); } + + /** + * Post a wake request. A wake is a lightweight nudge — no claim transfer, + * no baton pass — that surfaces to the target on their next SessionStart + * or UserPromptSubmit turn. Use when an idle/stalled session needs to + * attend to something but a full handoff would be the wrong shape + * (e.g. review is needed, not ownership transfer). + */ + requestWake(args: RequestWakeArgs): number { + const now = Date.now(); + const meta: WakeRequestMetadata = { + kind: 'wake_request', + from_session_id: args.from_session_id, + from_agent: args.from_agent, + to_agent: args.to_agent, + to_session_id: args.to_session_id ?? null, + reason: args.reason, + next_step: args.next_step ?? '', + status: 'pending', + acknowledged_by_session_id: null, + acknowledged_at: null, + expires_at: now + (args.expires_in_ms ?? DEFAULT_WAKE_TTL_MS), + }; + const id = this.store.addObservation({ + session_id: args.from_session_id, + kind: 'wake_request', + content: renderWakeContent(meta), + task_id: this.task_id, + metadata: meta as unknown as Record, + }); + this.store.storage.touchTask(this.task_id, now); + return id; + } + + /** + * Acknowledge a pending wake request. Flips status to `acknowledged` and + * records a wake_ack observation replying to the original. The sender + * sees the ack in their next task-updates preface. Unlike handoffs no + * claim state moves. + */ + acknowledgeWake(wake_observation_id: number, session_id: string): void { + const obs = this.store.storage.getObservation(wake_observation_id); + if (!obs || obs.kind !== 'wake_request') { + throw new Error(`observation ${wake_observation_id} is not a wake_request`); + } + if (obs.task_id !== this.task_id) { + throw new Error(`wake belongs to task ${obs.task_id}, not ${this.task_id}`); + } + const meta = parseWake(obs.metadata); + if (!meta) throw new Error('wake metadata missing'); + if (meta.status !== 'pending') throw new Error(`wake is ${meta.status}, cannot acknowledge`); + if (Date.now() > meta.expires_at) { + meta.status = 'expired'; + this.store.storage.updateObservationMetadata(wake_observation_id, JSON.stringify(meta)); + throw new Error('wake expired'); + } + if (meta.to_session_id && meta.to_session_id !== session_id) { + throw new Error('wake is addressed to a different session'); + } + const myAgent = this.store.storage.getParticipantAgent(this.task_id, session_id); + if (meta.to_agent !== 'any' && myAgent && meta.to_agent !== myAgent) { + throw new Error(`wake is for ${meta.to_agent}, not ${myAgent}`); + } + + this.store.storage.transaction(() => { + meta.status = 'acknowledged'; + meta.acknowledged_by_session_id = session_id; + meta.acknowledged_at = Date.now(); + this.store.storage.updateObservationMetadata(wake_observation_id, JSON.stringify(meta)); + this.store.addObservation({ + session_id, + kind: 'wake_ack', + content: `acknowledged wake #${wake_observation_id}`, + task_id: this.task_id, + reply_to: wake_observation_id, + metadata: { kind: 'wake_ack', wake_observation_id }, + }); + this.store.storage.touchTask(this.task_id); + }); + } + + /** + * Sender-side cancel. Only the original sender may cancel; target can + * decline via a separate path (currently surfaced as an explicit cancel + * from the target too — we accept both sides to keep the surface small). + */ + cancelWake(wake_observation_id: number, session_id: string, reason?: string): void { + const obs = this.store.storage.getObservation(wake_observation_id); + if (!obs || obs.kind !== 'wake_request') throw new Error('not a wake_request'); + if (obs.task_id !== this.task_id) throw new Error('wake belongs to a different task'); + const meta = parseWake(obs.metadata); + if (!meta) throw new Error('wake metadata missing'); + if (meta.status !== 'pending') throw new Error(`wake is ${meta.status}`); + this.store.storage.transaction(() => { + meta.status = 'cancelled'; + this.store.storage.updateObservationMetadata(wake_observation_id, JSON.stringify(meta)); + this.store.addObservation({ + session_id, + kind: 'wake_cancel', + content: reason + ? `cancelled wake #${wake_observation_id}: ${reason}` + : `cancelled wake #${wake_observation_id}`, + task_id: this.task_id, + reply_to: wake_observation_id, + metadata: { kind: 'wake_cancel', wake_observation_id }, + }); + this.store.storage.touchTask(this.task_id); + }); + } + + /** Pending, unexpired wake requests visible to `session_id` / `agent`. */ + pendingWakesFor(session_id: string, agent: string): WakeRequestObservation[] { + const now = Date.now(); + return this.store.storage + .taskObservationsByKind(this.task_id, 'wake_request') + .map((row) => { + const meta = parseWake(row.metadata); + return meta ? { id: row.id, ts: row.ts, meta } : null; + }) + .filter((x): x is WakeRequestObservation => x !== null) + .filter( + ({ meta }) => + meta.status === 'pending' && + now < meta.expires_at && + meta.from_session_id !== session_id && + (meta.to_session_id === session_id || meta.to_agent === 'any' || meta.to_agent === agent), + ); + } } function parseHandoff(metadata: string | null): HandoffMetadata | null { @@ -352,6 +525,26 @@ function parseHandoff(metadata: string | null): HandoffMetadata | null { } } +function parseWake(metadata: string | null): WakeRequestMetadata | null { + if (!metadata) return null; + try { + const parsed = JSON.parse(metadata) as unknown; + if (!parsed || typeof parsed !== 'object') return null; + const m = parsed as Partial; + if (m.kind !== 'wake_request' || typeof m.status !== 'string') return null; + return parsed as WakeRequestMetadata; + } catch { + return null; + } +} + +function renderWakeContent(m: WakeRequestMetadata): string { + const target = m.to_session_id ?? m.to_agent; + const lines = [`WAKE REQUEST from ${m.from_agent} -> ${target}`, `Reason: ${m.reason}`]; + if (m.next_step) lines.push(`Next step: ${m.next_step}`); + return lines.join('\n'); +} + function renderHandoffContent(m: HandoffMetadata): string { const lines = [ `HANDOFF from ${m.from_agent} -> ${m.to_session_id ?? m.to_agent}`, diff --git a/packages/core/test/attention-inbox.test.ts b/packages/core/test/attention-inbox.test.ts new file mode 100644 index 0000000..507d191 --- /dev/null +++ b/packages/core/test/attention-inbox.test.ts @@ -0,0 +1,129 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { defaultSettings } from '@colony/config'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { buildAttentionInbox } from '../src/attention-inbox.js'; +import { MemoryStore } from '../src/memory-store.js'; +import { TaskThread } from '../src/task-thread.js'; + +let dir: string; +let store: MemoryStore; + +function seed(...ids: string[]): void { + for (const id of ids) { + store.startSession({ id, ide: 'claude-code', cwd: '/r' }); + } +} + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'colony-attention-')); + store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings }); +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +describe('buildAttentionInbox', () => { + it('aggregates pending handoffs, wakes, and recent other-session claims for a participating agent', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + // Claude posts a handoff and a wake request addressed to codex. + const handoffId = thread.handOff({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + summary: 'please take the API module', + transferred_files: ['src/api.ts'], + }); + const wakeId = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + reason: 'PR review needed', + next_step: 'look at PR #42', + }); + + // Claude also claims an unrelated file recently — codex's inbox should + // surface it as a "recent other-session claim" near their lane. + thread.claimFile({ session_id: 'claude', file_path: 'src/viewer.tsx' }); + + const inbox = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread.task_id], + }); + + expect(inbox.pending_handoffs.map((h) => h.id)).toEqual([handoffId]); + expect(inbox.pending_wakes.map((w) => w.id)).toEqual([wakeId]); + expect(inbox.pending_wakes[0]?.reason).toBe('PR review needed'); + expect(inbox.pending_wakes[0]?.next_step).toBe('look at PR #42'); + + expect(inbox.recent_other_claims.some((c) => c.file_path === 'src/viewer.tsx')).toBe(true); + expect(inbox.recent_other_claims.every((c) => c.by_session_id !== 'codex')).toBe(true); + + expect(inbox.summary.pending_handoff_count).toBe(1); + expect(inbox.summary.pending_wake_count).toBe(1); + expect(inbox.summary.next_action).toMatch(/handoff/i); + }); + + it("omits the requesting session's own claims and own handoffs", () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox-self', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + thread.claimFile({ session_id: 'codex', file_path: 'src/own.ts' }); + thread.handOff({ + from_session_id: 'codex', + from_agent: 'codex', + to_agent: 'claude', + summary: 'sent from codex, so codex must not see it', + }); + + const inbox = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread.task_id], + }); + + expect(inbox.pending_handoffs).toHaveLength(0); + expect(inbox.recent_other_claims.find((c) => c.file_path === 'src/own.ts')).toBeUndefined(); + }); + + it('returns the quiet-inbox next_action hint when nothing is pending', () => { + seed('codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox-quiet', + session_id: 'codex', + }); + thread.join('codex', 'codex'); + + const inbox = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread.task_id], + // Disable hivemind scan side channel by pointing at a fresh repo root + // that has no .omx state. + repo_root: dir, + }); + + expect(inbox.pending_handoffs).toHaveLength(0); + expect(inbox.pending_wakes).toHaveLength(0); + expect(inbox.summary.next_action).toMatch(/quiet/i); + }); +}); diff --git a/packages/core/test/wake-request.test.ts b/packages/core/test/wake-request.test.ts new file mode 100644 index 0000000..00d7a7e --- /dev/null +++ b/packages/core/test/wake-request.test.ts @@ -0,0 +1,156 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { defaultSettings } from '@colony/config'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { MemoryStore } from '../src/memory-store.js'; +import { TaskThread, type WakeRequestMetadata } from '../src/task-thread.js'; + +let dir: string; +let store: MemoryStore; + +function seed(...ids: string[]): void { + for (const id of ids) { + store.startSession({ id, ide: 'claude-code', cwd: '/r' }); + } +} + +function openThread(session_id: string): TaskThread { + return TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/wake', + session_id, + }); +} + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'colony-wake-request-')); + store = new MemoryStore({ dbPath: join(dir, 'data.db'), settings: defaultSettings }); +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +describe('TaskThread wake requests', () => { + it('requestWake records a pending wake with structured metadata', () => { + seed('claude', 'codex'); + const thread = openThread('claude'); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + const id = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + reason: 'PR review stuck, need your eyes', + next_step: 'open PR #42 and leave inline comments', + }); + + const row = store.storage.getObservation(id); + expect(row?.kind).toBe('wake_request'); + expect(row?.task_id).toBe(thread.task_id); + const meta = JSON.parse(row?.metadata ?? '{}') as WakeRequestMetadata; + expect(meta.kind).toBe('wake_request'); + expect(meta.status).toBe('pending'); + expect(meta.to_agent).toBe('codex'); + expect(meta.reason).toBe('PR review stuck, need your eyes'); + expect(meta.next_step).toBe('open PR #42 and leave inline comments'); + expect(meta.expires_at).toBeGreaterThan(Date.now()); + }); + + it('pendingWakesFor hides the sender and expired/cancelled wakes', () => { + seed('claude', 'codex'); + const thread = openThread('claude'); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + const fresh = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'any', + reason: 'fresh', + }); + const stale = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'any', + reason: 'stale', + }); + // Force expiry on the stale wake. + const row = store.storage.getObservation(stale); + const meta = JSON.parse(row?.metadata ?? '{}') as WakeRequestMetadata; + meta.expires_at = Date.now() - 1000; + store.storage.updateObservationMetadata(stale, JSON.stringify(meta)); + + expect(thread.pendingWakesFor('codex', 'codex').map((w) => w.id)).toEqual([fresh]); + expect(thread.pendingWakesFor('claude', 'claude')).toHaveLength(0); + }); + + it('acknowledgeWake flips status, records ack observation, sender sees it next turn', () => { + seed('claude', 'codex'); + const thread = openThread('claude'); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + const wakeId = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + reason: 'please check the migration', + }); + thread.acknowledgeWake(wakeId, 'codex'); + + const row = store.storage.getObservation(wakeId); + const meta = JSON.parse(row?.metadata ?? '{}') as WakeRequestMetadata; + expect(meta.status).toBe('acknowledged'); + expect(meta.acknowledged_by_session_id).toBe('codex'); + expect(meta.acknowledged_at).toBeGreaterThan(0); + + // Second ack is rejected — status is terminal. + expect(() => thread.acknowledgeWake(wakeId, 'codex')).toThrow(/acknowledged|pending/); + + // An ack observation exists on the thread so the sender's + // UserPromptSubmit preface can render "codex acked wake #…". + const timeline = store.storage.taskTimeline(thread.task_id, 50); + expect(timeline.some((o) => o.kind === 'wake_ack')).toBe(true); + }); + + it('acknowledgeWake rejects a mismatched agent when wake is addressed to specific agent', () => { + seed('claude', 'codex', 'intruder'); + const thread = openThread('claude'); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + thread.join('intruder', 'intruder-agent'); + + const wakeId = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + reason: 'please check the migration', + }); + expect(() => thread.acknowledgeWake(wakeId, 'intruder')).toThrow(/codex/); + thread.acknowledgeWake(wakeId, 'codex'); + }); + + it('cancelWake flips status to cancelled and records a note', () => { + seed('claude', 'codex'); + const thread = openThread('claude'); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + + const wakeId = thread.requestWake({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + reason: 'nevermind, resolved', + }); + thread.cancelWake(wakeId, 'claude', 'resolved offline'); + + const row = store.storage.getObservation(wakeId); + const meta = JSON.parse(row?.metadata ?? '{}') as WakeRequestMetadata; + expect(meta.status).toBe('cancelled'); + expect(() => thread.acknowledgeWake(wakeId, 'codex')).toThrow(/cancelled|pending/); + }); +}); diff --git a/packages/hooks/src/handlers/session-start.ts b/packages/hooks/src/handlers/session-start.ts index a8cee9b..86a6018 100644 --- a/packages/hooks/src/handlers/session-start.ts +++ b/packages/hooks/src/handlers/session-start.ts @@ -60,10 +60,11 @@ export function buildTaskPreface( thread.join(input.session_id, agent); const pending = thread.pendingHandoffsFor(input.session_id, agent); + const pendingWakes = thread.pendingWakesFor(input.session_id, agent); const others = thread.participants().filter((p) => p.session_id !== input.session_id); const lines: string[] = []; - if (others.length > 0 || pending.length > 0) { + if (others.length > 0 || pending.length > 0 || pendingWakes.length > 0) { const who = others.length > 0 ? others.map((p) => `${p.agent}@${p.session_id.slice(0, 8)}`).join(', ') @@ -112,6 +113,23 @@ export function buildTaskPreface( ` decline with: task_decline_handoff(handoff_observation_id=${h.id}, session_id="${input.session_id}", reason="...")`, ); } + for (const w of pendingWakes) { + const minsLeft = Math.max(0, Math.round((w.meta.expires_at - Date.now()) / 60_000)); + lines.push(''); + lines.push( + `PENDING WAKE #${w.id} from ${w.meta.from_agent} (expires in ${minsLeft}m):`, + ` reason: ${w.meta.reason}`, + ); + if (w.meta.next_step) { + lines.push(` next: ${w.meta.next_step}`); + } + // Mirror the handoff ergonomics — inline the session_id so the ack + // call validates on first try instead of round-tripping through an + // "invalid arguments" error. + lines.push( + ` ack with: task_ack_wake(wake_observation_id=${w.id}, session_id="${input.session_id}")`, + ); + } return lines.join('\n'); } diff --git a/packages/hooks/src/handlers/user-prompt-submit.ts b/packages/hooks/src/handlers/user-prompt-submit.ts index 3b302b2..ef1d8f1 100644 --- a/packages/hooks/src/handlers/user-prompt-submit.ts +++ b/packages/hooks/src/handlers/user-prompt-submit.ts @@ -150,6 +150,11 @@ function buildTaskUpdatesPreface(store: MemoryStore, session_id: string, sinceTs ` -> accept with: task_accept_handoff(handoff_observation_id=${o.id}, session_id="${session_id}")`, ); } + if (o.kind === 'wake_request') { + lines.push( + ` -> ack with: task_ack_wake(wake_observation_id=${o.id}, session_id="${session_id}")`, + ); + } } if (skipped > 0) { lines.push(` (${skipped} older message(s) omitted — call task_timeline to see them)`); diff --git a/packages/hooks/test/task-injection.test.ts b/packages/hooks/test/task-injection.test.ts index 4aeacb5..ec76cc4 100644 --- a/packages/hooks/test/task-injection.test.ts +++ b/packages/hooks/test/task-injection.test.ts @@ -86,4 +86,34 @@ describe('SessionStart task preface injection', () => { // ramp rather than silently letting the handoff expire. expect(preface).toContain('task_decline_handoff'); }); + + it('surfaces a pending wake request with a ready-to-copy ack call', () => { + store.startSession({ id: 'A', ide: 'claude-code', cwd: repo }); + const thread = TaskThread.open(store, { + repo_root: repo, + branch: 'feat/handoff', + session_id: 'A', + }); + thread.join('A', 'claude'); + thread.requestWake({ + from_session_id: 'A', + from_agent: 'claude', + to_agent: 'codex', + reason: 'stuck on migration shape, need a second pair of eyes', + next_step: 'look at packages/storage/src/schema.ts', + }); + + store.startSession({ id: 'B', ide: 'codex', cwd: repo }); + const preface = buildTaskPreface(store, { + session_id: 'B', + cwd: repo, + ide: 'codex', + }); + + expect(preface).toContain('PENDING WAKE'); + expect(preface).toContain('stuck on migration shape'); + expect(preface).toContain('packages/storage/src/schema.ts'); + expect(preface).toContain('task_ack_wake'); + expect(preface).toContain('session_id="B"'); + }); });