From cedf3a5f522019b3c5a52b0446eeed03517fb2da Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Sat, 25 Apr 2026 00:14:24 +0200 Subject: [PATCH] Surface urgent task messages at session start Unread direct messages were invisible unless agents remembered to query task_messages manually. Route unread message summaries through the attention inbox and render them during SessionStart so blocking and reply-needed messages become turn-start context while FYI stays collapsed. Constraint: SessionStart must stay cheap, so the hook disables stalled-lane filesystem reads when it only needs task-scoped unread messages. Rejected: Render every FYI preview inline | would add context noise and defeat the urgency dial. Confidence: high Scope-risk: moderate Directive: Keep FYI collapsed unless product data shows agents miss low-urgency context. Tested: pnpm --filter @colony/core test -- attention-inbox.test.ts; pnpm --filter @colony/hooks test -- task-injection.test.ts; pnpm --filter @colony/core typecheck; pnpm --filter @colony/hooks typecheck; pnpm exec biome check touched files; git diff --check Not-tested: @imdeadpool/colony-cli typecheck is blocked by existing @colony/spec errors in apps/mcp-server/src/tools/spec.ts --- apps/cli/src/commands/inbox.ts | 24 ++++++- packages/core/src/attention-inbox.ts | 30 ++++++++- packages/core/src/index.ts | 1 + packages/core/test/attention-inbox.test.ts | 25 ++++++- packages/hooks/src/handlers/session-start.ts | 70 +++++++++++++++++++- packages/hooks/test/task-injection.test.ts | 53 +++++++++++++++ tsconfig.base.json | 1 + vitest.config.ts | 1 + 8 files changed, 199 insertions(+), 6 deletions(-) diff --git a/apps/cli/src/commands/inbox.ts b/apps/cli/src/commands/inbox.ts index af8d20c..a3a04ce 100644 --- a/apps/cli/src/commands/inbox.ts +++ b/apps/cli/src/commands/inbox.ts @@ -71,9 +71,31 @@ export function registerInboxCommand(program: Command): void { kleur.bold(`Inbox for ${agent}@${session.slice(0, 8)} — ${inbox.summary.next_action}`), ); lines.push( - ` handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`, + ` messages: ${inbox.summary.unread_message_count} handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`, ); + const blockingMessages = inbox.unread_messages.filter((m) => m.urgency === 'blocking'); + const needsReplyMessages = inbox.unread_messages.filter( + (m) => m.urgency === 'needs_reply', + ); + const fyiMessages = inbox.unread_messages.filter((m) => m.urgency === 'fyi'); + if (blockingMessages.length > 0 || needsReplyMessages.length > 0) { + lines.push(''); + lines.push(kleur.red('Unread messages:')); + for (const m of [...blockingMessages, ...needsReplyMessages]) { + lines.push(` #${m.id} task ${m.task_id} from ${m.from_agent} [${m.urgency}]`); + lines.push(` ${m.preview.replace(/\s+/g, ' ').trim()}`); + lines.push( + ` reply: task_message(task_id=${m.task_id}, session_id="${session}", agent="${agent}", to_agent="any", to_session_id="${m.from_session_id}", reply_to=${m.id}, urgency="fyi", content="...")`, + ); + } + } + if (fyiMessages.length > 0) { + lines.push( + ` FYI messages: ${fyiMessages.length} unread collapsed; use --json to expand`, + ); + } + if (inbox.pending_handoffs.length > 0) { lines.push(''); lines.push(kleur.cyan('Pending handoffs:')); diff --git a/packages/core/src/attention-inbox.ts b/packages/core/src/attention-inbox.ts index 4f0f944..906ffc2 100644 --- a/packages/core/src/attention-inbox.ts +++ b/packages/core/src/attention-inbox.ts @@ -6,6 +6,7 @@ import { readHivemind, } from './hivemind.js'; import type { MemoryStore } from './memory-store.js'; +import { type MessageSummary, listMessagesForAgent } from './messages.js'; import { type HandoffMetadata, type HandoffTarget, @@ -41,6 +42,8 @@ export interface InboxWake { ts: number; } +export type InboxMessage = MessageSummary; + export interface InboxLane { repo_root: string; branch: string; @@ -66,12 +69,14 @@ export interface AttentionInbox { summary: { pending_handoff_count: number; pending_wake_count: number; + unread_message_count: number; stalled_lane_count: number; recent_other_claim_count: number; next_action: string; }; pending_handoffs: InboxHandoff[]; pending_wakes: InboxWake[]; + unread_messages: InboxMessage[]; stalled_lanes: InboxLane[]; recent_other_claims: InboxRecentClaim[]; } @@ -92,6 +97,9 @@ export interface AttentionInboxOptions { /** Tasks to scan for pending handoffs/wakes. Defaults to all tasks the * session participates in. */ task_ids?: number[]; + unread_message_limit?: number; + /** Defaults true; hooks can disable filesystem hivemind reads for hot paths. */ + include_stalled_lanes?: boolean; } const DEFAULT_RECENT_CLAIM_WINDOW_MS = 15 * 60_000; @@ -117,6 +125,13 @@ export function buildAttentionInbox( const pending_handoffs: InboxHandoff[] = []; const pending_wakes: InboxWake[] = []; const recent_other_claims: InboxRecentClaim[] = []; + const unread_messages = listMessagesForAgent(store, { + session_id: opts.session_id, + agent: opts.agent, + task_ids: taskIds, + unread_only: true, + ...(opts.unread_message_limit !== undefined ? { limit: opts.unread_message_limit } : {}), + }); const recentWindow = opts.recent_claim_window_ms ?? DEFAULT_RECENT_CLAIM_WINDOW_MS; const recentLimit = opts.recent_claim_limit ?? DEFAULT_RECENT_CLAIM_LIMIT; @@ -136,16 +151,18 @@ export function buildAttentionInbox( } } - const stalled_lanes = collectStalledLanes(opts); + const stalled_lanes = opts.include_stalled_lanes === false ? [] : collectStalledLanes(opts); const summary = { pending_handoff_count: pending_handoffs.length, pending_wake_count: pending_wakes.length, + unread_message_count: unread_messages.length, stalled_lane_count: stalled_lanes.length, recent_other_claim_count: recent_other_claims.length, next_action: deriveNextAction({ pending_handoffs, pending_wakes, + unread_messages, stalled_lanes, recent_other_claims, }), @@ -158,6 +175,7 @@ export function buildAttentionInbox( summary, pending_handoffs, pending_wakes, + unread_messages, stalled_lanes, recent_other_claims, }; @@ -261,15 +279,25 @@ function toInboxLane(session: HivemindSession): InboxLane { function deriveNextAction(parts: { pending_handoffs: InboxHandoff[]; pending_wakes: InboxWake[]; + unread_messages: InboxMessage[]; stalled_lanes: InboxLane[]; recent_other_claims: InboxRecentClaim[]; }): string { + if (parts.unread_messages.some((m) => m.urgency === 'blocking')) { + return 'Answer blocking task messages first; another agent is explicitly blocked on you.'; + } if (parts.pending_handoffs.length > 0) { return 'Respond to pending handoffs first; each baton pass is blocking until accept or decline.'; } + if (parts.unread_messages.some((m) => m.urgency === 'needs_reply')) { + return 'Reply to task messages that need a response before starting unrelated work.'; + } if (parts.pending_wakes.length > 0) { return 'Acknowledge pending wake requests; another session is waiting on you.'; } + if (parts.unread_messages.length > 0) { + return 'Review unread FYI task messages when context allows.'; + } if (parts.stalled_lanes.length > 0) { return 'Review stalled lanes — takeover may be safer than waiting for the owner to return.'; } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 79ececa..13205ed 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -44,6 +44,7 @@ export { type AttentionInboxOptions, type InboxHandoff, type InboxLane, + type InboxMessage, type InboxRecentClaim, type InboxWake, } from './attention-inbox.js'; diff --git a/packages/core/test/attention-inbox.test.ts b/packages/core/test/attention-inbox.test.ts index 507d191..af7912d 100644 --- a/packages/core/test/attention-inbox.test.ts +++ b/packages/core/test/attention-inbox.test.ts @@ -27,7 +27,7 @@ afterEach(() => { }); describe('buildAttentionInbox', () => { - it('aggregates pending handoffs, wakes, and recent other-session claims for a participating agent', () => { + it('aggregates unread messages, pending handoffs, wakes, and recent other-session claims for a participating agent', () => { seed('claude', 'codex'); const thread = TaskThread.open(store, { repo_root: '/r', @@ -52,6 +52,20 @@ describe('buildAttentionInbox', () => { reason: 'PR review needed', next_step: 'look at PR #42', }); + const messageId = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'schema direction is blocking the next slice', + urgency: 'blocking', + }); + thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'FYI: docs landed', + urgency: 'fyi', + }); // Claude also claims an unrelated file recently — codex's inbox should // surface it as a "recent other-session claim" near their lane. @@ -65,6 +79,10 @@ describe('buildAttentionInbox', () => { expect(inbox.pending_handoffs.map((h) => h.id)).toEqual([handoffId]); expect(inbox.pending_wakes.map((w) => w.id)).toEqual([wakeId]); + expect(inbox.unread_messages.map((m) => m.id)).toContain(messageId); + expect(inbox.unread_messages.map((m) => m.urgency)).toEqual( + expect.arrayContaining(['fyi', 'blocking']), + ); expect(inbox.pending_wakes[0]?.reason).toBe('PR review needed'); expect(inbox.pending_wakes[0]?.next_step).toBe('look at PR #42'); @@ -73,7 +91,8 @@ describe('buildAttentionInbox', () => { expect(inbox.summary.pending_handoff_count).toBe(1); expect(inbox.summary.pending_wake_count).toBe(1); - expect(inbox.summary.next_action).toMatch(/handoff/i); + expect(inbox.summary.unread_message_count).toBe(2); + expect(inbox.summary.next_action).toMatch(/blocking task messages/i); }); it("omits the requesting session's own claims and own handoffs", () => { @@ -102,6 +121,7 @@ describe('buildAttentionInbox', () => { expect(inbox.pending_handoffs).toHaveLength(0); expect(inbox.recent_other_claims.find((c) => c.file_path === 'src/own.ts')).toBeUndefined(); + expect(inbox.unread_messages).toHaveLength(0); }); it('returns the quiet-inbox next_action hint when nothing is pending', () => { @@ -124,6 +144,7 @@ describe('buildAttentionInbox', () => { expect(inbox.pending_handoffs).toHaveLength(0); expect(inbox.pending_wakes).toHaveLength(0); + expect(inbox.unread_messages).toHaveLength(0); expect(inbox.summary.next_action).toMatch(/quiet/i); }); }); diff --git a/packages/hooks/src/handlers/session-start.ts b/packages/hooks/src/handlers/session-start.ts index eb5e47f..ee2bf27 100644 --- a/packages/hooks/src/handlers/session-start.ts +++ b/packages/hooks/src/handlers/session-start.ts @@ -1,4 +1,11 @@ -import { type MemoryStore, ProposalSystem, TaskThread, detectRepoBranch } from '@colony/core'; +import { + type InboxMessage, + type MemoryStore, + ProposalSystem, + TaskThread, + buildAttentionInbox, + detectRepoBranch, +} from '@colony/core'; import { spawnNodeScript } from '@colony/process'; import type { HookInput } from '../types.js'; @@ -103,10 +110,22 @@ export function buildTaskPreface( const pending = thread.pendingHandoffsFor(input.session_id, agent); const pendingWakes = thread.pendingWakesFor(input.session_id, agent); + const unreadMessages = buildAttentionInbox(store, { + session_id: input.session_id, + agent, + task_ids: [thread.task_id], + repo_root: detected.repo_root, + include_stalled_lanes: false, + }).unread_messages; const others = thread.participants().filter((p) => p.session_id !== input.session_id); const lines: string[] = []; - if (others.length > 0 || pending.length > 0 || pendingWakes.length > 0) { + if ( + others.length > 0 || + pending.length > 0 || + pendingWakes.length > 0 || + unreadMessages.length > 0 + ) { const who = others.length > 0 ? others.map((p) => `${p.agent}@${p.session_id.slice(0, 8)}`).join(', ') @@ -116,6 +135,7 @@ export function buildTaskPreface( `Joined with: ${who}. Post coordination via MCP tools task_post / task_claim_file / task_hand_off.`, ); } + appendMessagePreface(lines, unreadMessages, input.session_id, agent); for (const h of pending) { const minsLeft = Math.max(0, Math.round((h.meta.expires_at - Date.now()) / 60_000)); lines.push(''); @@ -175,6 +195,52 @@ export function buildTaskPreface( return lines.join('\n'); } +function appendMessagePreface( + lines: string[], + messages: InboxMessage[], + session_id: string, + agent: string, +): void { + const blocking = messages.filter((m) => m.urgency === 'blocking'); + const needsReply = messages.filter((m) => m.urgency === 'needs_reply'); + const fyi = messages.filter((m) => m.urgency === 'fyi'); + + for (const m of blocking) { + appendMessage(lines, m, 'BLOCKING MESSAGE', session_id, agent); + } + for (const m of needsReply) { + appendMessage(lines, m, 'MESSAGE NEEDS REPLY', session_id, agent); + } + if (fyi.length > 0) { + lines.push(''); + lines.push( + `FYI MESSAGES: ${fyi.length} unread collapsed; expand with: task_messages(session_id="${session_id}", agent="${agent}", task_ids=[${[...new Set(fyi.map((m) => m.task_id))].join(', ')}], unread_only=true)`, + ); + } +} + +function appendMessage( + lines: string[], + message: InboxMessage, + label: string, + session_id: string, + agent: string, +): void { + lines.push(''); + lines.push(`${label} #${message.id} from ${message.from_agent}:`); + lines.push(` preview: ${compactPreview(message.preview)}`); + lines.push( + ` reply with: task_message(task_id=${message.task_id}, session_id="${session_id}", agent="${agent}", to_agent="any", to_session_id="${message.from_session_id}", reply_to=${message.id}, urgency="fyi", content="...")`, + ); + lines.push( + ` mark read: task_message_mark_read(message_observation_id=${message.id}, session_id="${session_id}")`, + ); +} + +function compactPreview(preview: string): string { + return preview.replace(/\s+/g, ' ').trim(); +} + /** * Surface pending proposals and recently promoted ones for this branch. * Agents see this at SessionStart so they know what ideas the colony diff --git a/packages/hooks/test/task-injection.test.ts b/packages/hooks/test/task-injection.test.ts index ec76cc4..1bd62aa 100644 --- a/packages/hooks/test/task-injection.test.ts +++ b/packages/hooks/test/task-injection.test.ts @@ -116,4 +116,57 @@ describe('SessionStart task preface injection', () => { expect(preface).toContain('task_ack_wake'); expect(preface).toContain('session_id="B"'); }); + + it('surfaces unread messages by urgency and keeps FYI collapsed', () => { + store.startSession({ id: 'A', ide: 'claude-code', cwd: repo }); + const thread = TaskThread.open(store, { + repo_root: repo, + branch: 'feat/handoff', + session_id: 'A', + }); + thread.join('A', 'claude'); + thread.postMessage({ + from_session_id: 'A', + from_agent: 'claude', + to_agent: 'codex', + content: 'blocked until you choose the schema direction', + urgency: 'blocking', + }); + thread.postMessage({ + from_session_id: 'A', + from_agent: 'claude', + to_agent: 'codex', + content: 'please confirm the generated-column tradeoff', + urgency: 'needs_reply', + }); + thread.postMessage({ + from_session_id: 'A', + from_agent: 'claude', + to_agent: 'codex', + content: 'FYI: local docs mention the migration path', + urgency: 'fyi', + }); + + store.startSession({ id: 'B', ide: 'codex', cwd: repo }); + const preface = buildTaskPreface(store, { + session_id: 'B', + cwd: repo, + ide: 'codex', + }); + + const blocking = preface.indexOf('BLOCKING MESSAGE'); + const needsReply = preface.indexOf('MESSAGE NEEDS REPLY'); + const fyi = preface.indexOf('FYI MESSAGES: 1 unread collapsed'); + + expect(blocking).toBeGreaterThanOrEqual(0); + expect(needsReply).toBeGreaterThan(blocking); + expect(fyi).toBeGreaterThan(needsReply); + expect(preface).toContain('blocked until you choose schema direction'); + expect(preface).toContain('confirm generated-column tradeoff'); + expect(preface).not.toContain('FYI: local docs mention the migration path'); + expect(preface).toContain('task_message('); + expect(preface).toContain('to_session_id="A"'); + expect(preface).toContain('task_message_mark_read'); + expect(preface).toContain('task_messages(session_id="B", agent="codex"'); + }); }); diff --git a/tsconfig.base.json b/tsconfig.base.json index d7abc38..95c7f98 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -30,6 +30,7 @@ "@colony/hooks": ["./packages/hooks/src/index.ts"], "@colony/installers": ["./packages/installers/src/index.ts"], "@colony/mcp-server": ["./apps/mcp-server/src/server.ts"], + "@colony/process": ["./packages/process/src/index.ts"], "@colony/worker": ["./apps/worker/src/server.ts"] } }, diff --git a/vitest.config.ts b/vitest.config.ts index c8c12fd..c24ae2c 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -12,6 +12,7 @@ export const workspaceAliases = { '@colony/hooks': resolve(rootDir, 'packages/hooks/src/index.ts'), '@colony/installers': resolve(rootDir, 'packages/installers/src/index.ts'), '@colony/mcp-server': resolve(rootDir, 'apps/mcp-server/src/server.ts'), + '@colony/process': resolve(rootDir, 'packages/process/src/index.ts'), '@colony/storage': resolve(rootDir, 'packages/storage/src/index.ts'), '@colony/worker': resolve(rootDir, 'apps/worker/src/server.ts'), };