Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion apps/cli/src/commands/inbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,31 @@ export function registerInboxCommand(program: Command): void {
kleur.bold(`Inbox for ${agent}@${session.slice(0, 8)} — ${inbox.summary.next_action}`),
);
lines.push(
` handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`,
` messages: ${inbox.summary.unread_message_count} handoffs: ${inbox.summary.pending_handoff_count} wakes: ${inbox.summary.pending_wake_count} stalled lanes: ${inbox.summary.stalled_lane_count} recent other claims: ${inbox.summary.recent_other_claim_count}`,
);

const blockingMessages = inbox.unread_messages.filter((m) => m.urgency === 'blocking');
const needsReplyMessages = inbox.unread_messages.filter(
(m) => m.urgency === 'needs_reply',
);
const fyiMessages = inbox.unread_messages.filter((m) => m.urgency === 'fyi');
if (blockingMessages.length > 0 || needsReplyMessages.length > 0) {
lines.push('');
lines.push(kleur.red('Unread messages:'));
for (const m of [...blockingMessages, ...needsReplyMessages]) {
lines.push(` #${m.id} task ${m.task_id} from ${m.from_agent} [${m.urgency}]`);
lines.push(` ${m.preview.replace(/\s+/g, ' ').trim()}`);
lines.push(
` reply: task_message(task_id=${m.task_id}, session_id="${session}", agent="${agent}", to_agent="any", to_session_id="${m.from_session_id}", reply_to=${m.id}, urgency="fyi", content="...")`,
);
}
}
if (fyiMessages.length > 0) {
lines.push(
` FYI messages: ${fyiMessages.length} unread collapsed; use --json to expand`,
);
}

if (inbox.pending_handoffs.length > 0) {
lines.push('');
lines.push(kleur.cyan('Pending handoffs:'));
Expand Down
30 changes: 29 additions & 1 deletion packages/core/src/attention-inbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
readHivemind,
} from './hivemind.js';
import type { MemoryStore } from './memory-store.js';
import { type MessageSummary, listMessagesForAgent } from './messages.js';
import {
type HandoffMetadata,
type HandoffTarget,
Expand Down Expand Up @@ -41,6 +42,8 @@ export interface InboxWake {
ts: number;
}

export type InboxMessage = MessageSummary;

export interface InboxLane {
repo_root: string;
branch: string;
Expand All @@ -66,12 +69,14 @@ export interface AttentionInbox {
summary: {
pending_handoff_count: number;
pending_wake_count: number;
unread_message_count: number;
stalled_lane_count: number;
recent_other_claim_count: number;
next_action: string;
};
pending_handoffs: InboxHandoff[];
pending_wakes: InboxWake[];
unread_messages: InboxMessage[];
stalled_lanes: InboxLane[];
recent_other_claims: InboxRecentClaim[];
}
Expand All @@ -92,6 +97,9 @@ export interface AttentionInboxOptions {
/** Tasks to scan for pending handoffs/wakes. Defaults to all tasks the
* session participates in. */
task_ids?: number[];
unread_message_limit?: number;
/** Defaults true; hooks can disable filesystem hivemind reads for hot paths. */
include_stalled_lanes?: boolean;
}

const DEFAULT_RECENT_CLAIM_WINDOW_MS = 15 * 60_000;
Expand All @@ -117,6 +125,13 @@ export function buildAttentionInbox(
const pending_handoffs: InboxHandoff[] = [];
const pending_wakes: InboxWake[] = [];
const recent_other_claims: InboxRecentClaim[] = [];
const unread_messages = listMessagesForAgent(store, {
session_id: opts.session_id,
agent: opts.agent,
task_ids: taskIds,
unread_only: true,
...(opts.unread_message_limit !== undefined ? { limit: opts.unread_message_limit } : {}),
});

const recentWindow = opts.recent_claim_window_ms ?? DEFAULT_RECENT_CLAIM_WINDOW_MS;
const recentLimit = opts.recent_claim_limit ?? DEFAULT_RECENT_CLAIM_LIMIT;
Expand All @@ -136,16 +151,18 @@ export function buildAttentionInbox(
}
}

const stalled_lanes = collectStalledLanes(opts);
const stalled_lanes = opts.include_stalled_lanes === false ? [] : collectStalledLanes(opts);

const summary = {
pending_handoff_count: pending_handoffs.length,
pending_wake_count: pending_wakes.length,
unread_message_count: unread_messages.length,
stalled_lane_count: stalled_lanes.length,
recent_other_claim_count: recent_other_claims.length,
next_action: deriveNextAction({
pending_handoffs,
pending_wakes,
unread_messages,
stalled_lanes,
recent_other_claims,
}),
Expand All @@ -158,6 +175,7 @@ export function buildAttentionInbox(
summary,
pending_handoffs,
pending_wakes,
unread_messages,
stalled_lanes,
recent_other_claims,
};
Expand Down Expand Up @@ -261,15 +279,25 @@ function toInboxLane(session: HivemindSession): InboxLane {
function deriveNextAction(parts: {
pending_handoffs: InboxHandoff[];
pending_wakes: InboxWake[];
unread_messages: InboxMessage[];
stalled_lanes: InboxLane[];
recent_other_claims: InboxRecentClaim[];
}): string {
if (parts.unread_messages.some((m) => m.urgency === 'blocking')) {
return 'Answer blocking task messages first; another agent is explicitly blocked on you.';
}
if (parts.pending_handoffs.length > 0) {
return 'Respond to pending handoffs first; each baton pass is blocking until accept or decline.';
}
if (parts.unread_messages.some((m) => m.urgency === 'needs_reply')) {
return 'Reply to task messages that need a response before starting unrelated work.';
}
if (parts.pending_wakes.length > 0) {
return 'Acknowledge pending wake requests; another session is waiting on you.';
}
if (parts.unread_messages.length > 0) {
return 'Review unread FYI task messages when context allows.';
}
if (parts.stalled_lanes.length > 0) {
return 'Review stalled lanes — takeover may be safer than waiting for the owner to return.';
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export {
type AttentionInboxOptions,
type InboxHandoff,
type InboxLane,
type InboxMessage,
type InboxRecentClaim,
type InboxWake,
} from './attention-inbox.js';
Expand Down
25 changes: 23 additions & 2 deletions packages/core/test/attention-inbox.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ afterEach(() => {
});

describe('buildAttentionInbox', () => {
it('aggregates pending handoffs, wakes, and recent other-session claims for a participating agent', () => {
it('aggregates unread messages, pending handoffs, wakes, and recent other-session claims for a participating agent', () => {
seed('claude', 'codex');
const thread = TaskThread.open(store, {
repo_root: '/r',
Expand All @@ -52,6 +52,20 @@ describe('buildAttentionInbox', () => {
reason: 'PR review needed',
next_step: 'look at PR #42',
});
const messageId = thread.postMessage({
from_session_id: 'claude',
from_agent: 'claude',
to_agent: 'codex',
content: 'schema direction is blocking the next slice',
urgency: 'blocking',
});
thread.postMessage({
from_session_id: 'claude',
from_agent: 'claude',
to_agent: 'codex',
content: 'FYI: docs landed',
urgency: 'fyi',
});

// Claude also claims an unrelated file recently — codex's inbox should
// surface it as a "recent other-session claim" near their lane.
Expand All @@ -65,6 +79,10 @@ describe('buildAttentionInbox', () => {

expect(inbox.pending_handoffs.map((h) => h.id)).toEqual([handoffId]);
expect(inbox.pending_wakes.map((w) => w.id)).toEqual([wakeId]);
expect(inbox.unread_messages.map((m) => m.id)).toContain(messageId);
expect(inbox.unread_messages.map((m) => m.urgency)).toEqual(
expect.arrayContaining(['fyi', 'blocking']),
);
expect(inbox.pending_wakes[0]?.reason).toBe('PR review needed');
expect(inbox.pending_wakes[0]?.next_step).toBe('look at PR #42');

Expand All @@ -73,7 +91,8 @@ describe('buildAttentionInbox', () => {

expect(inbox.summary.pending_handoff_count).toBe(1);
expect(inbox.summary.pending_wake_count).toBe(1);
expect(inbox.summary.next_action).toMatch(/handoff/i);
expect(inbox.summary.unread_message_count).toBe(2);
expect(inbox.summary.next_action).toMatch(/blocking task messages/i);
});

it("omits the requesting session's own claims and own handoffs", () => {
Expand Down Expand Up @@ -102,6 +121,7 @@ describe('buildAttentionInbox', () => {

expect(inbox.pending_handoffs).toHaveLength(0);
expect(inbox.recent_other_claims.find((c) => c.file_path === 'src/own.ts')).toBeUndefined();
expect(inbox.unread_messages).toHaveLength(0);
});

it('returns the quiet-inbox next_action hint when nothing is pending', () => {
Expand All @@ -124,6 +144,7 @@ describe('buildAttentionInbox', () => {

expect(inbox.pending_handoffs).toHaveLength(0);
expect(inbox.pending_wakes).toHaveLength(0);
expect(inbox.unread_messages).toHaveLength(0);
expect(inbox.summary.next_action).toMatch(/quiet/i);
});
});
70 changes: 68 additions & 2 deletions packages/hooks/src/handlers/session-start.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { type MemoryStore, ProposalSystem, TaskThread, detectRepoBranch } from '@colony/core';
import {
type InboxMessage,
type MemoryStore,
ProposalSystem,
TaskThread,
buildAttentionInbox,
detectRepoBranch,
} from '@colony/core';
import { spawnNodeScript } from '@colony/process';
import type { HookInput } from '../types.js';

Expand Down Expand Up @@ -103,10 +110,22 @@ export function buildTaskPreface(

const pending = thread.pendingHandoffsFor(input.session_id, agent);
const pendingWakes = thread.pendingWakesFor(input.session_id, agent);
const unreadMessages = buildAttentionInbox(store, {
session_id: input.session_id,
agent,
task_ids: [thread.task_id],
repo_root: detected.repo_root,
include_stalled_lanes: false,
}).unread_messages;
const others = thread.participants().filter((p) => p.session_id !== input.session_id);

const lines: string[] = [];
if (others.length > 0 || pending.length > 0 || pendingWakes.length > 0) {
if (
others.length > 0 ||
pending.length > 0 ||
pendingWakes.length > 0 ||
unreadMessages.length > 0
) {
const who =
others.length > 0
? others.map((p) => `${p.agent}@${p.session_id.slice(0, 8)}`).join(', ')
Expand All @@ -116,6 +135,7 @@ export function buildTaskPreface(
`Joined with: ${who}. Post coordination via MCP tools task_post / task_claim_file / task_hand_off.`,
);
}
appendMessagePreface(lines, unreadMessages, input.session_id, agent);
for (const h of pending) {
const minsLeft = Math.max(0, Math.round((h.meta.expires_at - Date.now()) / 60_000));
lines.push('');
Expand Down Expand Up @@ -175,6 +195,52 @@ export function buildTaskPreface(
return lines.join('\n');
}

function appendMessagePreface(
lines: string[],
messages: InboxMessage[],
session_id: string,
agent: string,
): void {
const blocking = messages.filter((m) => m.urgency === 'blocking');
const needsReply = messages.filter((m) => m.urgency === 'needs_reply');
const fyi = messages.filter((m) => m.urgency === 'fyi');

for (const m of blocking) {
appendMessage(lines, m, 'BLOCKING MESSAGE', session_id, agent);
}
for (const m of needsReply) {
appendMessage(lines, m, 'MESSAGE NEEDS REPLY', session_id, agent);
}
if (fyi.length > 0) {
lines.push('');
lines.push(
`FYI MESSAGES: ${fyi.length} unread collapsed; expand with: task_messages(session_id="${session_id}", agent="${agent}", task_ids=[${[...new Set(fyi.map((m) => m.task_id))].join(', ')}], unread_only=true)`,
);
}
}

function appendMessage(
lines: string[],
message: InboxMessage,
label: string,
session_id: string,
agent: string,
): void {
lines.push('');
lines.push(`${label} #${message.id} from ${message.from_agent}:`);
lines.push(` preview: ${compactPreview(message.preview)}`);
lines.push(
` reply with: task_message(task_id=${message.task_id}, session_id="${session_id}", agent="${agent}", to_agent="any", to_session_id="${message.from_session_id}", reply_to=${message.id}, urgency="fyi", content="...")`,
);
lines.push(
` mark read: task_message_mark_read(message_observation_id=${message.id}, session_id="${session_id}")`,
);
}

function compactPreview(preview: string): string {
return preview.replace(/\s+/g, ' ').trim();
}

/**
* Surface pending proposals and recently promoted ones for this branch.
* Agents see this at SessionStart so they know what ideas the colony
Expand Down
53 changes: 53 additions & 0 deletions packages/hooks/test/task-injection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,57 @@ describe('SessionStart task preface injection', () => {
expect(preface).toContain('task_ack_wake');
expect(preface).toContain('session_id="B"');
});

it('surfaces unread messages by urgency and keeps FYI collapsed', () => {
store.startSession({ id: 'A', ide: 'claude-code', cwd: repo });
const thread = TaskThread.open(store, {
repo_root: repo,
branch: 'feat/handoff',
session_id: 'A',
});
thread.join('A', 'claude');
thread.postMessage({
from_session_id: 'A',
from_agent: 'claude',
to_agent: 'codex',
content: 'blocked until you choose the schema direction',
urgency: 'blocking',
});
thread.postMessage({
from_session_id: 'A',
from_agent: 'claude',
to_agent: 'codex',
content: 'please confirm the generated-column tradeoff',
urgency: 'needs_reply',
});
thread.postMessage({
from_session_id: 'A',
from_agent: 'claude',
to_agent: 'codex',
content: 'FYI: local docs mention the migration path',
urgency: 'fyi',
});

store.startSession({ id: 'B', ide: 'codex', cwd: repo });
const preface = buildTaskPreface(store, {
session_id: 'B',
cwd: repo,
ide: 'codex',
});

const blocking = preface.indexOf('BLOCKING MESSAGE');
const needsReply = preface.indexOf('MESSAGE NEEDS REPLY');
const fyi = preface.indexOf('FYI MESSAGES: 1 unread collapsed');

expect(blocking).toBeGreaterThanOrEqual(0);
expect(needsReply).toBeGreaterThan(blocking);
expect(fyi).toBeGreaterThan(needsReply);
expect(preface).toContain('blocked until you choose schema direction');
expect(preface).toContain('confirm generated-column tradeoff');
expect(preface).not.toContain('FYI: local docs mention the migration path');
expect(preface).toContain('task_message(');
expect(preface).toContain('to_session_id="A"');
expect(preface).toContain('task_message_mark_read');
expect(preface).toContain('task_messages(session_id="B", agent="codex"');
});
});
1 change: 1 addition & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@colony/hooks": ["./packages/hooks/src/index.ts"],
"@colony/installers": ["./packages/installers/src/index.ts"],
"@colony/mcp-server": ["./apps/mcp-server/src/server.ts"],
"@colony/process": ["./packages/process/src/index.ts"],
"@colony/worker": ["./apps/worker/src/server.ts"]
}
},
Expand Down
1 change: 1 addition & 0 deletions vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const workspaceAliases = {
'@colony/hooks': resolve(rootDir, 'packages/hooks/src/index.ts'),
'@colony/installers': resolve(rootDir, 'packages/installers/src/index.ts'),
'@colony/mcp-server': resolve(rootDir, 'apps/mcp-server/src/server.ts'),
'@colony/process': resolve(rootDir, 'packages/process/src/index.ts'),
'@colony/storage': resolve(rootDir, 'packages/storage/src/index.ts'),
'@colony/worker': resolve(rootDir, 'apps/worker/src/server.ts'),
};
Expand Down
Loading