Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .changeset/messaging-overhaul.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
"@colony/core": minor
"@colony/mcp-server": minor
---

Eight-part overhaul of the `task_message` system so directed-agent messaging
behaves more like a real coordination channel than a one-shot inbox.

`@colony/core`:

- `MessageMetadata` gains `expires_at`, `retracted_at`, `retract_reason`, and
`claimed_by_session_id` / `claimed_by_agent` / `claimed_at`. `MessageStatus`
picks up `expired` and `retracted` terminal states. `parseMessage` backfills
the new fields to `null` so legacy rows still pass the strict-null
visibility predicates without a migration.
- `TaskThread.postMessage` accepts `expires_in_ms`, auto-claims a still-
unclaimed `to_agent='any'` broadcast on reply, and keeps reply-chain depth
authoritative at 1-deep — only the immediate parent flips to `replied`.
- New `TaskThread.retractMessage` (sender-only, refuses replied messages) and
`TaskThread.claimBroadcastMessage` (idempotent for the existing claimer,
rejects directed messages with `NOT_BROADCAST`).
- `TaskThread.markMessageRead` writes a sibling `message_read` observation
so the original sender's inbox can render read receipts; past-TTL reads
flip the on-disk status to `expired` and throw `MESSAGE_EXPIRED`.
- `pendingMessagesFor` and `listMessagesForAgent` filter retracted, expired,
and other-agents'-claimed broadcasts. Inbox summaries surface `expires_at`,
`is_claimable_broadcast`, and the claim state.
- `buildAttentionInbox` adds `summary.blocked` (gates non-message lanes when
any unread is `blocking`), `coalesced_messages` (groups by task / sender /
urgency), and `read_receipts` (drops once the recipient replies). New
`read_receipt_window_ms` / `read_receipt_limit` options.

`@colony/mcp-server`:

- `task_message` accepts `expires_in_minutes` (max 7 days).
- New `task_message_retract` and `task_message_claim` tools.
- `task_messages` shape now includes `expires_at`, `is_claimable_broadcast`,
`claimed_by_session_id`, and `claimed_by_agent`.
- Tool descriptions document the 1-deep reply contract, retract semantics,
TTL behavior, and broadcast-claim flow.
92 changes: 89 additions & 3 deletions apps/mcp-server/src/tools/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@ import { z } from 'zod';
import type { ToolContext } from './context.js';
import { mcpError, mcpErrorResponse } from './shared.js';

const EXPIRES_IN_MINUTES_MAX = 60 * 24 * 7;

export function register(server: McpServer, ctx: ToolContext): void {
const { store } = ctx;

server.tool(
'task_message',
"Send a direct message to another agent on a task thread. Use for coordination chat that doesn't transfer file claims — for 'hand off the work + files', use task_hand_off instead. Urgency controls preface prominence: fyi (collapsed), needs_reply (summary + expected action), blocking (top-of-preface). Pass reply_to to chain onto an earlier message; the parent's status flips to 'replied' atomically.",
[
"Send a direct message to another agent on a task thread. Use for coordination chat that doesn't transfer file claims — for 'hand off the work + files', use task_hand_off instead.",
'Urgency controls preface prominence: fyi (coalesced into a counter), needs_reply (rendered as a summary + expected action), blocking (top-of-preface, never coalesced).',
'Pass reply_to to chain onto an earlier message; the parent\'s immediate status flips to "replied". Reply chains are 1-deep authoritative: replies-to-replies are allowed but only the immediate parent flips, never a transitively-referenced ancestor.',
'expires_in_minutes is an optional TTL. Past-TTL messages drop out of unread inbox queries and any later mark_read fails with MESSAGE_EXPIRED; their bodies stay in storage for audit and FTS.',
'Replying to a still-unclaimed broadcast (to_agent=any) auto-claims it for you, hiding the broadcast from other recipients.',
].join(' '),
{
task_id: z.number().int().positive(),
session_id: z.string().min(1).describe('your session_id (the sender)'),
Expand All @@ -22,6 +30,15 @@ export function register(server: McpServer, ctx: ToolContext): void {
content: z.string().min(1),
reply_to: z.number().int().positive().optional(),
urgency: z.enum(['fyi', 'needs_reply', 'blocking']).optional(),
expires_in_minutes: z
.number()
.int()
.positive()
.max(EXPIRES_IN_MINUTES_MAX)
.optional()
.describe(
'Optional message TTL in minutes (max 7 days). Past-TTL unread messages disappear from the inbox; bodies remain searchable.',
),
},
async (args) => {
const thread = new TaskThread(store, args.task_id);
Expand All @@ -33,6 +50,9 @@ export function register(server: McpServer, ctx: ToolContext): void {
content: args.content,
...(args.reply_to !== undefined ? { reply_to: args.reply_to } : {}),
...(args.urgency !== undefined ? { urgency: args.urgency } : {}),
...(args.expires_in_minutes !== undefined
? { expires_in_ms: args.expires_in_minutes * 60_000 }
: {}),
});
return {
content: [
Expand All @@ -47,7 +67,7 @@ export function register(server: McpServer, ctx: ToolContext): void {

server.tool(
'task_messages',
'List messages addressed to you across tasks you participate in (or scoped to task_ids). Compact shape: id, task_id, ts, from_session_id/agent, urgency, status, reply_to, preview. Fetch full bodies via get_observations. Does NOT mark as read — call task_message_mark_read for that.',
'List messages addressed to you across tasks you participate in (or scoped to task_ids). Compact shape includes urgency, status, expires_at, claim state for broadcasts, and a content preview. Fetch full bodies via get_observations. Does NOT mark as read — call task_message_mark_read for that. Retracted messages and broadcasts already claimed by other agents are filtered out.',
{
session_id: z.string().min(1),
agent: z.string().min(1),
Expand All @@ -71,7 +91,7 @@ export function register(server: McpServer, ctx: ToolContext): void {

server.tool(
'task_message_mark_read',
'Mark a message as read. Idempotent: re-marking a read or replied message is a no-op. Returns the resulting status.',
'Mark a message as read. Idempotent: re-marking a read or replied message is a no-op. Writes a sibling message_read observation so the original sender can see read receipts in their attention inbox. Past-TTL messages flip to expired and return MESSAGE_EXPIRED. Retracted messages return ALREADY_RETRACTED.',
{
message_observation_id: z.number().int().positive(),
session_id: z.string().min(1),
Expand All @@ -93,4 +113,70 @@ export function register(server: McpServer, ctx: ToolContext): void {
}
},
);

server.tool(
'task_message_retract',
'Retract a message you sent. Sets status=retracted; recipients no longer see it in their inbox, but the body stays in storage (still searchable, still in the timeline) for audit. Cannot retract a message that has already been replied to — at that point the recipient has invested response work.',
{
message_observation_id: z.number().int().positive(),
session_id: z.string().min(1).describe('your session_id (must match the original sender)'),
reason: z.string().min(1).optional(),
},
async ({ message_observation_id, session_id, reason }) => {
const obs = store.storage.getObservation(message_observation_id);
if (!obs?.task_id) {
return mcpErrorResponse(
TASK_THREAD_ERROR_CODES.OBSERVATION_NOT_ON_TASK,
'observation is not on a task',
);
}
const thread = new TaskThread(store, obs.task_id);
try {
thread.retractMessage(message_observation_id, session_id, reason);
return {
content: [{ type: 'text', text: JSON.stringify({ status: 'retracted' }) }],
};
} catch (err) {
return mcpError(err);
}
},
);

server.tool(
'task_message_claim',
"Claim a to_agent='any' broadcast message. Once claimed, the broadcast drops out of every other recipient's inbox; only the claimer keeps seeing it. Replying to an unclaimed broadcast auto-claims, so this tool is for the 'silently take ownership before responding' case. Errors: NOT_BROADCAST (directed message), ALREADY_CLAIMED (someone else got there first — idempotent for the existing claimer).",
{
message_observation_id: z.number().int().positive(),
session_id: z.string().min(1),
agent: z.string().min(1),
},
async ({ message_observation_id, session_id, agent }) => {
const obs = store.storage.getObservation(message_observation_id);
if (!obs?.task_id) {
return mcpErrorResponse(
TASK_THREAD_ERROR_CODES.OBSERVATION_NOT_ON_TASK,
'observation is not on a task',
);
}
const thread = new TaskThread(store, obs.task_id);
try {
const meta = thread.claimBroadcastMessage(message_observation_id, session_id, agent);
return {
content: [
{
type: 'text',
text: JSON.stringify({
status: 'claimed',
claimed_by_session_id: meta.claimed_by_session_id,
claimed_by_agent: meta.claimed_by_agent,
claimed_at: meta.claimed_at,
}),
},
],
};
} catch (err) {
return mcpError(err);
}
},
);
}
161 changes: 161 additions & 0 deletions apps/mcp-server/test/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,4 +402,165 @@ describe('task threads — direct messages', () => {
});
expect(target.status).toBe('read');
});

it('expires_in_minutes hides past-TTL messages from unread_only and blocks mark_read', async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();
const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
content: 'short-lived',
urgency: 'fyi',
expires_in_minutes: 1,
},
);

// Push expires_at into the past to simulate elapsed TTL.
const row = store.storage.getObservation(message_observation_id);
const meta = JSON.parse(row?.metadata ?? '{}') as { expires_at: number };
meta.expires_at = Date.now() - 1000;
store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta));

const inbox = await call<Array<{ id: number; status: string }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
unread_only: true,
});
expect(inbox.find((m) => m.id === message_observation_id)).toBeUndefined();

const err = await callError('task_message_mark_read', {
message_observation_id,
session_id: sessionB,
});
expect(err.code).toBe(TASK_THREAD_ERROR_CODES.MESSAGE_EXPIRED);
});

it('task_message_retract hides body from recipients but FTS still indexes it', async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();
const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
content: 'unique-needle-token-for-fts',
},
);

const before = await call<Array<{ id: number }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
});
expect(before.find((m) => m.id === message_observation_id)).toBeDefined();

const { status } = await call<{ status: string }>('task_message_retract', {
message_observation_id,
session_id: sessionA,
reason: 'duplicate',
});
expect(status).toBe('retracted');

const after = await call<Array<{ id: number }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
});
expect(after.find((m) => m.id === message_observation_id)).toBeUndefined();

// Body still findable via FTS.
const hits = store.storage.searchFts('"unique-needle-token-for-fts"');
expect(hits.find((h) => h.id === message_observation_id)).toBeDefined();
});

it('task_message_retract refuses non-senders with NOT_SENDER', async () => {
const { task_id, sessionA, sessionB } = seedTwoSessionTask();
const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
content: 'mine to retract',
},
);
const err = await callError('task_message_retract', {
message_observation_id,
session_id: sessionB,
});
expect(err.code).toBe(TASK_THREAD_ERROR_CODES.NOT_SENDER);
});

it('task_message_claim hides broadcast from non-claimers and rejects directed messages', async () => {
const { task_id, sessionA, sessionB, sessionC } = seedThreeSessionTask();
const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'any',
content: 'broadcast for claim',
},
);

// Pre-claim: B and C both see it.
expect(
(
await call<Array<{ id: number; is_claimable_broadcast: boolean }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
task_ids: [task_id],
})
).find((m) => m.id === message_observation_id)?.is_claimable_broadcast,
).toBe(true);

const claim = await call<{ status: string; claimed_by_session_id: string }>(
'task_message_claim',
{
message_observation_id,
session_id: sessionB,
agent: 'codex',
},
);
expect(claim.status).toBe('claimed');
expect(claim.claimed_by_session_id).toBe(sessionB);

// C no longer sees the broadcast in their inbox.
const cInbox = await call<Array<{ id: number }>>('task_messages', {
session_id: sessionC,
agent: 'claude',
task_ids: [task_id],
});
expect(cInbox.find((m) => m.id === message_observation_id)).toBeUndefined();

// Second claimer rejected with ALREADY_CLAIMED.
const dupe = await callError('task_message_claim', {
message_observation_id,
session_id: sessionC,
agent: 'claude',
});
expect(dupe.code).toBe(TASK_THREAD_ERROR_CODES.ALREADY_CLAIMED);

// Directed message can't be claimed.
const { message_observation_id: directedId } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
content: 'directed',
},
);
const notBroadcast = await callError('task_message_claim', {
message_observation_id: directedId,
session_id: sessionB,
agent: 'codex',
});
expect(notBroadcast.code).toBe(TASK_THREAD_ERROR_CODES.NOT_BROADCAST);
});
});
2 changes: 2 additions & 0 deletions apps/mcp-server/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ describe('MCP server', () => {
'task_hand_off',
'task_list',
'task_message',
'task_message_claim',
'task_message_mark_read',
'task_message_retract',
'task_messages',
'task_plan_claim_subtask',
'task_plan_complete_subtask',
Expand Down
Loading
Loading