Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/wake-requests-attention-inbox.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
'@colony/core': minor
'@colony/hooks': minor
'@colony/mcp-server': minor
'@imdeadpool/colony': minor
---

Add wake-request primitive and attention inbox for idle/stalled cross-agent nudges.

- `task_wake` / `task_ack_wake` / `task_cancel_wake` MCP tools post lightweight nudges on a task thread — no claim transfer, no baton pass. Targets see the request on their next SessionStart or UserPromptSubmit turn with a copy-paste-ready ack call.
- `attention_inbox` MCP tool + `colony inbox` CLI command aggregate pending handoffs, pending wakes, stalled lanes from the hivemind snapshot, and recent other-session file claims into one compact view. Bodies are not expanded; fetch via `get_observations`.
- Hook injection extended: `buildTaskPreface` surfaces pending wake requests alongside pending handoffs; `buildTaskUpdatesPreface` inlines an ack call for wake requests that arrive between turns.

Deferred follow-ups (not in this change): safe session takeover, claim TTL renewal, session Stop checkpoint, and any terminal-control wake mechanism.
92 changes: 92 additions & 0 deletions apps/cli/src/commands/inbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { join } from 'node:path';
import { loadSettings, resolveDataDir } from '@colony/config';
import { MemoryStore, buildAttentionInbox } from '@colony/core';
import type { Command } from 'commander';
import kleur from 'kleur';

export function registerInboxCommand(program: Command): void {
program
.command('inbox')
.description('Compact list of attention items for a session: pending handoffs, wakes, stalled lanes, recent claims')
.requiredOption('--session <id>', 'your session_id')
.requiredOption('--agent <name>', 'your agent name (e.g. claude, codex)')
.option('--repo-root <path>', 'repo root to scan for stalled lanes')
.option('--json', 'emit the full inbox as JSON')
.action(async (opts: { session: string; agent: string; repoRoot?: string; json?: boolean }) => {
const settings = loadSettings();
const dbPath = join(resolveDataDir(settings.dataDir), 'data.db');
const store = new MemoryStore({ dbPath, settings });
try {
const inbox = buildAttentionInbox(store, {
session_id: opts.session,
agent: opts.agent,
...(opts.repoRoot !== undefined ? { repo_root: opts.repoRoot } : {}),
});

if (opts.json) {
process.stdout.write(`${JSON.stringify(inbox, null, 2)}\n`);
return;
}

const lines: string[] = [];
lines.push(
kleur.bold(
`Inbox for ${opts.agent}@${opts.session.slice(0, 8)} — ${inbox.summary.next_action}`,
),
);
lines.push(
` handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`,
);

if (inbox.pending_handoffs.length > 0) {
lines.push('');
lines.push(kleur.cyan('Pending handoffs:'));
for (const h of inbox.pending_handoffs) {
const mins = Math.max(0, Math.round((h.expires_at - inbox.generated_at) / 60_000));
lines.push(
` #${h.id} task ${h.task_id} from ${h.from_agent} (${mins}m left): ${h.summary}`,
);
lines.push(
` accept: task_accept_handoff(handoff_observation_id=${h.id}, session_id="${opts.session}")`,
);
}
}
if (inbox.pending_wakes.length > 0) {
lines.push('');
lines.push(kleur.yellow('Pending wakes:'));
for (const w of inbox.pending_wakes) {
const mins = Math.max(0, Math.round((w.expires_at - inbox.generated_at) / 60_000));
lines.push(
` #${w.id} task ${w.task_id} from ${w.from_agent} (${mins}m left): ${w.reason}`,
);
if (w.next_step) lines.push(` next: ${w.next_step}`);
lines.push(
` ack: task_ack_wake(wake_observation_id=${w.id}, session_id="${opts.session}")`,
);
}
}
if (inbox.stalled_lanes.length > 0) {
lines.push('');
lines.push(kleur.magenta('Stalled lanes:'));
for (const lane of inbox.stalled_lanes) {
lines.push(
` ${lane.branch} [${lane.activity}] ${lane.owner}: ${lane.activity_summary}`,
);
}
}
if (inbox.recent_other_claims.length > 0) {
lines.push('');
lines.push(kleur.gray('Recent other-session claims:'));
for (const c of inbox.recent_other_claims) {
lines.push(
` task ${c.task_id} ${c.file_path} by ${c.by_session_id.slice(0, 8)}`,
);
}
}

process.stdout.write(`${lines.join('\n')}\n`);
} finally {
store.close();
}
});
}
2 changes: 2 additions & 0 deletions apps/cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { registerDebriefCommand } from './commands/debrief.js';
import { registerDoctorCommand } from './commands/doctor.js';
import { registerExportCommand } from './commands/export.js';
import { registerHookCommand } from './commands/hook.js';
import { registerInboxCommand } from './commands/inbox.js';
import { registerInstallCommand } from './commands/install.js';
import { registerLifecycleCommands } from './commands/lifecycle.js';
import { registerMcpCommand } from './commands/mcp.js';
Expand Down Expand Up @@ -43,6 +44,7 @@ export function createProgram(): Command {
registerNoteCommand(program);
registerObserveCommand(program);
registerDebriefCommand(program);
registerInboxCommand(program);

return program;
}
Expand Down
138 changes: 138 additions & 0 deletions apps/mcp-server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pathToFileURL } from 'node:url';
import { type Settings, loadSettings, resolveDataDir } from '@colony/config';
import {
type AgentCapabilities,
type AttentionInboxOptions,
DEFAULT_CAPABILITIES,
type Embedder,
type HivemindOptions,
Expand All @@ -14,6 +15,7 @@ import {
ProposalSystem,
type SearchResult,
TaskThread,
buildAttentionInbox,
loadProfile,
readHivemind,
saveProfile,
Expand Down Expand Up @@ -516,6 +518,142 @@ export function buildServer(store: MemoryStore, settings: Settings): McpServer {
},
);

server.tool(
'task_wake',
'Post a wake request on a task thread — a lightweight nudge surfaced to the target on their next turn. No claim transfer. Use when you need another session to attend to something but a full handoff is the wrong shape.',
{
task_id: z.number().int().positive(),
session_id: z.string().min(1).describe('your session_id (the sender)'),
agent: z.string().min(1).describe('your agent name, e.g. claude or codex'),
to_agent: z.enum(['claude', 'codex', 'any']),
to_session_id: z.string().optional(),
reason: z.string().min(1),
next_step: z.string().optional(),
expires_in_minutes: z.number().int().positive().max(1440).optional(),
},
async (args) => {
const thread = new TaskThread(store, args.task_id);
const id = thread.requestWake({
from_session_id: args.session_id,
from_agent: args.agent,
to_agent: args.to_agent,
...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}),
reason: args.reason,
...(args.next_step !== undefined ? { next_step: args.next_step } : {}),
...(args.expires_in_minutes !== undefined
? { expires_in_ms: args.expires_in_minutes * 60_000 }
: {}),
});
return {
content: [
{ type: 'text', text: JSON.stringify({ wake_observation_id: id, status: 'pending' }) },
],
};
},
);

server.tool(
'task_ack_wake',
'Acknowledge a pending wake request addressed to you. Records an ack on the task thread so the sender sees the response on their next turn.',
{
wake_observation_id: z.number().int().positive(),
session_id: z.string().min(1),
},
async ({ wake_observation_id, session_id }) => {
const obs = store.storage.getObservation(wake_observation_id);
if (!obs?.task_id) {
return {
content: [
{ type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) },
],
isError: true,
};
}
const thread = new TaskThread(store, obs.task_id);
try {
thread.acknowledgeWake(wake_observation_id, session_id);
return { content: [{ type: 'text', text: JSON.stringify({ status: 'acknowledged' }) }] };
} catch (err) {
return {
content: [
{
type: 'text',
text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }),
},
],
isError: true,
};
}
},
);

server.tool(
'task_cancel_wake',
'Cancel a pending wake request. Either the sender (withdrawing) or the target (declining) may cancel.',
{
wake_observation_id: z.number().int().positive(),
session_id: z.string().min(1),
reason: z.string().optional(),
},
async ({ wake_observation_id, session_id, reason }) => {
const obs = store.storage.getObservation(wake_observation_id);
if (!obs?.task_id) {
return {
content: [
{ type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) },
],
isError: true,
};
}
const thread = new TaskThread(store, obs.task_id);
try {
thread.cancelWake(wake_observation_id, session_id, reason);
return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] };
} catch (err) {
return {
content: [
{
type: 'text',
text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }),
},
],
isError: true,
};
}
},
);

server.tool(
'attention_inbox',
'Compact list of what needs your attention: pending handoffs, pending wakes, stalled lanes, recent other-session file claims. Fetch bodies via get_observations.',
{
session_id: z.string().min(1),
agent: z.string().min(1),
repo_root: z.string().min(1).optional(),
repo_roots: z.array(z.string().min(1)).max(20).optional(),
recent_claim_window_minutes: z.number().int().positive().max(1440).optional(),
recent_claim_limit: z.number().int().positive().max(100).optional(),
task_ids: z.array(z.number().int().positive()).max(100).optional(),
},
async (args) => {
const options: AttentionInboxOptions = {
session_id: args.session_id,
agent: args.agent,
};
if (args.repo_root !== undefined) options.repo_root = args.repo_root;
if (args.repo_roots !== undefined) options.repo_roots = args.repo_roots;
if (args.recent_claim_window_minutes !== undefined) {
options.recent_claim_window_ms = args.recent_claim_window_minutes * 60_000;
}
if (args.recent_claim_limit !== undefined) {
options.recent_claim_limit = args.recent_claim_limit;
}
if (args.task_ids !== undefined) options.task_ids = args.task_ids;
const inbox = buildAttentionInbox(store, options);
return { content: [{ type: 'text', text: JSON.stringify(inbox) }] };
},
);

server.tool(
'task_foraging_report',
'List pending and recently promoted proposals on a (repo_root, branch). Pending proposals whose strength has evaporated below the noise floor are omitted.',
Expand Down
4 changes: 4 additions & 0 deletions apps/mcp-server/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ describe('MCP server', () => {
expect(tools.map((t) => t.name).sort()).toEqual([
'agent_get_profile',
'agent_upsert_profile',
'attention_inbox',
'get_observations',
'hivemind',
'hivemind_context',
'list_sessions',
'search',
'task_accept_handoff',
'task_ack_wake',
'task_cancel_wake',
'task_claim_file',
'task_decline_handoff',
'task_foraging_report',
Expand All @@ -64,6 +67,7 @@ describe('MCP server', () => {
'task_reinforce',
'task_timeline',
'task_updates_since',
'task_wake',
'timeline',
]);
});
Expand Down
69 changes: 69 additions & 0 deletions apps/mcp-server/test/task-threads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,75 @@ describe('task threads — handoff lifecycle', () => {
expect(afterMeta.status).toBe('expired');
});

it('task_wake + task_ack_wake + attention_inbox round trip', async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();

const { wake_observation_id } = await call<{ wake_observation_id: number }>('task_wake', {
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
reason: 'please review the migration shape',
next_step: 'look at packages/storage/src/schema.ts',
});

const inbox = await call<{
pending_wakes: Array<{ id: number; reason: string }>;
summary: { pending_wake_count: number; next_action: string };
}>('attention_inbox', {
session_id: sessionB,
agent: 'codex',
task_ids: [task_id],
});
expect(inbox.pending_wakes.map((w) => w.id)).toContain(wake_observation_id);
expect(inbox.summary.pending_wake_count).toBeGreaterThan(0);

const acked = await call<{ status: string }>('task_ack_wake', {
wake_observation_id,
session_id: sessionB,
});
expect(acked.status).toBe('acknowledged');

const row = store.storage.getObservation(wake_observation_id);
const meta = JSON.parse(row?.metadata ?? '{}');
expect(meta.status).toBe('acknowledged');
expect(meta.acknowledged_by_session_id).toBe(sessionB);

const retry = await client.callTool({
name: 'task_ack_wake',
arguments: { wake_observation_id, session_id: sessionB },
});
expect(retry.isError).toBe(true);
});

it('task_cancel_wake cancels a pending wake without side effects on claims', async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();

const { wake_observation_id } = await call<{ wake_observation_id: number }>('task_wake', {
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
reason: 'nevermind',
});

await call('task_cancel_wake', {
wake_observation_id,
session_id: sessionA,
reason: 'resolved offline',
});

const row = store.storage.getObservation(wake_observation_id);
const meta = JSON.parse(row?.metadata ?? '{}');
expect(meta.status).toBe('cancelled');

const res = await client.callTool({
name: 'task_ack_wake',
arguments: { wake_observation_id, session_id: sessionB },
});
expect(res.isError).toBe(true);
});

it("task_updates_since filters out the caller's own posts", async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();
const cursor = Date.now() - 1; // strictly before either post
Expand Down
Loading
Loading