From 55d1d11fd86d3541583170a56f7791f12fa68ad9 Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Sat, 25 Apr 2026 13:44:08 +0200 Subject: [PATCH] feat(core,mcp): TTL, retract, claim, receipts, coalesce for task messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eight-part overhaul that turns the directed-agent `task_message` system into a real coordination channel instead of a one-shot inbox. `@colony/core`: - `MessageMetadata` gains `expires_at`, `retracted_at`, `retract_reason`, `claimed_by_session_id` / `claimed_by_agent` / `claimed_at`. New `expired` and `retracted` terminal statuses. `parseMessage` backfills the new fields to `null` so legacy rows still pass strict-null predicates without a migration. - `TaskThread.postMessage` accepts `expires_in_ms`, auto-claims a still- unclaimed `to_agent='any'` broadcast on reply, and keeps reply-chain depth authoritative at 1-deep — only the immediate parent flips to `replied`. - New `TaskThread.retractMessage` (sender-only, refuses already-replied) and `TaskThread.claimBroadcastMessage` (idempotent for existing claimer, rejects directed messages with `NOT_BROADCAST`). - `markMessageRead` writes a sibling `message_read` observation so the original sender's preface can render read receipts; past-TTL reads flip status to `expired` and throw `MESSAGE_EXPIRED`. - Inbox queries filter retracted, expired, and other-agents'-claimed broadcasts. `MessageSummary` surfaces `expires_at`, `is_claimable_broadcast`, and the claim state. - `buildAttentionInbox` adds `summary.blocked`, `coalesced_messages`, and `read_receipts` (drops once recipient replies). New `read_receipt_window_ms` / `read_receipt_limit` options. `@colony/mcp-server`: - `task_message` accepts `expires_in_minutes` (max 7 days). - New `task_message_retract` and `task_message_claim` tools. - `task_messages` shape now includes `expires_at`, `is_claimable_broadcast`, `claimed_by_session_id`, `claimed_by_agent`. - Tool descriptions document the 1-deep reply contract, retract semantics, TTL behavior, and broadcast-claim flow. Tests: 71 in `@colony/core` + 33 in `@colony/mcp-server` (8 new), covering TTL expiry/lazy-flip, retract guards, broadcast claiming, auto-claim on reply, read-receipt drop on reply, blocked summary, and legacy-row backward compatibility. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/messaging-overhaul.md | 40 +++ apps/mcp-server/src/tools/message.ts | 92 +++++- apps/mcp-server/test/messages.test.ts | 161 +++++++++++ apps/mcp-server/test/server.test.ts | 2 + docs/mcp.md | 49 +++- packages/core/src/attention-inbox.ts | 170 +++++++++++ packages/core/src/index.ts | 5 + packages/core/src/messages.ts | 66 +++-- packages/core/src/task-thread.ts | 318 +++++++++++++++++++-- packages/core/test/attention-inbox.test.ts | 141 +++++++++ packages/core/test/task-thread.test.ts | 278 ++++++++++++++++++ 11 files changed, 1280 insertions(+), 42 deletions(-) create mode 100644 .changeset/messaging-overhaul.md diff --git a/.changeset/messaging-overhaul.md b/.changeset/messaging-overhaul.md new file mode 100644 index 0000000..49b4d1a --- /dev/null +++ b/.changeset/messaging-overhaul.md @@ -0,0 +1,40 @@ +--- +"@colony/core": minor +"@colony/mcp-server": minor +--- + +Eight-part overhaul of the `task_message` system so directed-agent messaging +behaves more like a real coordination channel than a one-shot inbox. + +`@colony/core`: + +- `MessageMetadata` gains `expires_at`, `retracted_at`, `retract_reason`, and + `claimed_by_session_id` / `claimed_by_agent` / `claimed_at`. `MessageStatus` + picks up `expired` and `retracted` terminal states. `parseMessage` backfills + the new fields to `null` so legacy rows still pass the strict-null + visibility predicates without a migration. +- `TaskThread.postMessage` accepts `expires_in_ms`, auto-claims a still- + unclaimed `to_agent='any'` broadcast on reply, and keeps reply-chain depth + authoritative at 1-deep — only the immediate parent flips to `replied`. +- New `TaskThread.retractMessage` (sender-only, refuses replied messages) and + `TaskThread.claimBroadcastMessage` (idempotent for the existing claimer, + rejects directed messages with `NOT_BROADCAST`). +- `TaskThread.markMessageRead` writes a sibling `message_read` observation + so the original sender's inbox can render read receipts; past-TTL reads + flip the on-disk status to `expired` and throw `MESSAGE_EXPIRED`. +- `pendingMessagesFor` and `listMessagesForAgent` filter retracted, expired, + and other-agents'-claimed broadcasts. Inbox summaries surface `expires_at`, + `is_claimable_broadcast`, and the claim state. +- `buildAttentionInbox` adds `summary.blocked` (gates non-message lanes when + any unread is `blocking`), `coalesced_messages` (groups by task / sender / + urgency), and `read_receipts` (drops once the recipient replies). New + `read_receipt_window_ms` / `read_receipt_limit` options. + +`@colony/mcp-server`: + +- `task_message` accepts `expires_in_minutes` (max 7 days). +- New `task_message_retract` and `task_message_claim` tools. +- `task_messages` shape now includes `expires_at`, `is_claimable_broadcast`, + `claimed_by_session_id`, and `claimed_by_agent`. +- Tool descriptions document the 1-deep reply contract, retract semantics, + TTL behavior, and broadcast-claim flow. diff --git a/apps/mcp-server/src/tools/message.ts b/apps/mcp-server/src/tools/message.ts index 0423147..074c8fc 100644 --- a/apps/mcp-server/src/tools/message.ts +++ b/apps/mcp-server/src/tools/message.ts @@ -4,12 +4,20 @@ import { z } from 'zod'; import type { ToolContext } from './context.js'; import { mcpError, mcpErrorResponse } from './shared.js'; +const EXPIRES_IN_MINUTES_MAX = 60 * 24 * 7; + export function register(server: McpServer, ctx: ToolContext): void { const { store } = ctx; server.tool( 'task_message', - "Send a direct message to another agent on a task thread. Use for coordination chat that doesn't transfer file claims — for 'hand off the work + files', use task_hand_off instead. Urgency controls preface prominence: fyi (collapsed), needs_reply (summary + expected action), blocking (top-of-preface). Pass reply_to to chain onto an earlier message; the parent's status flips to 'replied' atomically.", + [ + "Send a direct message to another agent on a task thread. Use for coordination chat that doesn't transfer file claims — for 'hand off the work + files', use task_hand_off instead.", + 'Urgency controls preface prominence: fyi (coalesced into a counter), needs_reply (rendered as a summary + expected action), blocking (top-of-preface, never coalesced).', + 'Pass reply_to to chain onto an earlier message; the parent\'s immediate status flips to "replied". Reply chains are 1-deep authoritative: replies-to-replies are allowed but only the immediate parent flips, never a transitively-referenced ancestor.', + 'expires_in_minutes is an optional TTL. Past-TTL messages drop out of unread inbox queries and any later mark_read fails with MESSAGE_EXPIRED; their bodies stay in storage for audit and FTS.', + 'Replying to a still-unclaimed broadcast (to_agent=any) auto-claims it for you, hiding the broadcast from other recipients.', + ].join(' '), { task_id: z.number().int().positive(), session_id: z.string().min(1).describe('your session_id (the sender)'), @@ -22,6 +30,15 @@ export function register(server: McpServer, ctx: ToolContext): void { content: z.string().min(1), reply_to: z.number().int().positive().optional(), urgency: z.enum(['fyi', 'needs_reply', 'blocking']).optional(), + expires_in_minutes: z + .number() + .int() + .positive() + .max(EXPIRES_IN_MINUTES_MAX) + .optional() + .describe( + 'Optional message TTL in minutes (max 7 days). Past-TTL unread messages disappear from the inbox; bodies remain searchable.', + ), }, async (args) => { const thread = new TaskThread(store, args.task_id); @@ -33,6 +50,9 @@ export function register(server: McpServer, ctx: ToolContext): void { content: args.content, ...(args.reply_to !== undefined ? { reply_to: args.reply_to } : {}), ...(args.urgency !== undefined ? { urgency: args.urgency } : {}), + ...(args.expires_in_minutes !== undefined + ? { expires_in_ms: args.expires_in_minutes * 60_000 } + : {}), }); return { content: [ @@ -47,7 +67,7 @@ export function register(server: McpServer, ctx: ToolContext): void { server.tool( 'task_messages', - 'List messages addressed to you across tasks you participate in (or scoped to task_ids). Compact shape: id, task_id, ts, from_session_id/agent, urgency, status, reply_to, preview. Fetch full bodies via get_observations. Does NOT mark as read — call task_message_mark_read for that.', + 'List messages addressed to you across tasks you participate in (or scoped to task_ids). Compact shape includes urgency, status, expires_at, claim state for broadcasts, and a content preview. Fetch full bodies via get_observations. Does NOT mark as read — call task_message_mark_read for that. Retracted messages and broadcasts already claimed by other agents are filtered out.', { session_id: z.string().min(1), agent: z.string().min(1), @@ -71,7 +91,7 @@ export function register(server: McpServer, ctx: ToolContext): void { server.tool( 'task_message_mark_read', - 'Mark a message as read. Idempotent: re-marking a read or replied message is a no-op. Returns the resulting status.', + 'Mark a message as read. Idempotent: re-marking a read or replied message is a no-op. Writes a sibling message_read observation so the original sender can see read receipts in their attention inbox. Past-TTL messages flip to expired and return MESSAGE_EXPIRED. Retracted messages return ALREADY_RETRACTED.', { message_observation_id: z.number().int().positive(), session_id: z.string().min(1), @@ -93,4 +113,70 @@ export function register(server: McpServer, ctx: ToolContext): void { } }, ); + + server.tool( + 'task_message_retract', + 'Retract a message you sent. Sets status=retracted; recipients no longer see it in their inbox, but the body stays in storage (still searchable, still in the timeline) for audit. Cannot retract a message that has already been replied to — at that point the recipient has invested response work.', + { + message_observation_id: z.number().int().positive(), + session_id: z.string().min(1).describe('your session_id (must match the original sender)'), + reason: z.string().min(1).optional(), + }, + async ({ message_observation_id, session_id, reason }) => { + const obs = store.storage.getObservation(message_observation_id); + if (!obs?.task_id) { + return mcpErrorResponse( + TASK_THREAD_ERROR_CODES.OBSERVATION_NOT_ON_TASK, + 'observation is not on a task', + ); + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.retractMessage(message_observation_id, session_id, reason); + return { + content: [{ type: 'text', text: JSON.stringify({ status: 'retracted' }) }], + }; + } catch (err) { + return mcpError(err); + } + }, + ); + + server.tool( + 'task_message_claim', + "Claim a to_agent='any' broadcast message. Once claimed, the broadcast drops out of every other recipient's inbox; only the claimer keeps seeing it. Replying to an unclaimed broadcast auto-claims, so this tool is for the 'silently take ownership before responding' case. Errors: NOT_BROADCAST (directed message), ALREADY_CLAIMED (someone else got there first — idempotent for the existing claimer).", + { + message_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + agent: z.string().min(1), + }, + async ({ message_observation_id, session_id, agent }) => { + const obs = store.storage.getObservation(message_observation_id); + if (!obs?.task_id) { + return mcpErrorResponse( + TASK_THREAD_ERROR_CODES.OBSERVATION_NOT_ON_TASK, + 'observation is not on a task', + ); + } + const thread = new TaskThread(store, obs.task_id); + try { + const meta = thread.claimBroadcastMessage(message_observation_id, session_id, agent); + return { + content: [ + { + type: 'text', + text: JSON.stringify({ + status: 'claimed', + claimed_by_session_id: meta.claimed_by_session_id, + claimed_by_agent: meta.claimed_by_agent, + claimed_at: meta.claimed_at, + }), + }, + ], + }; + } catch (err) { + return mcpError(err); + } + }, + ); } diff --git a/apps/mcp-server/test/messages.test.ts b/apps/mcp-server/test/messages.test.ts index 04c86da..868ad33 100644 --- a/apps/mcp-server/test/messages.test.ts +++ b/apps/mcp-server/test/messages.test.ts @@ -402,4 +402,165 @@ describe('task threads — direct messages', () => { }); expect(target.status).toBe('read'); }); + + it('expires_in_minutes hides past-TTL messages from unread_only and blocks mark_read', async () => { + const { task_id, sessionA, sessionB } = seedTwoSessionTask(); + const { message_observation_id } = await call<{ message_observation_id: number }>( + 'task_message', + { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + content: 'short-lived', + urgency: 'fyi', + expires_in_minutes: 1, + }, + ); + + // Push expires_at into the past to simulate elapsed TTL. + const row = store.storage.getObservation(message_observation_id); + const meta = JSON.parse(row?.metadata ?? '{}') as { expires_at: number }; + meta.expires_at = Date.now() - 1000; + store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + + const inbox = await call>('task_messages', { + session_id: sessionB, + agent: 'codex', + unread_only: true, + }); + expect(inbox.find((m) => m.id === message_observation_id)).toBeUndefined(); + + const err = await callError('task_message_mark_read', { + message_observation_id, + session_id: sessionB, + }); + expect(err.code).toBe(TASK_THREAD_ERROR_CODES.MESSAGE_EXPIRED); + }); + + it('task_message_retract hides body from recipients but FTS still indexes it', async () => { + const { task_id, sessionA, sessionB } = seedTwoSessionTask(); + const { message_observation_id } = await call<{ message_observation_id: number }>( + 'task_message', + { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + content: 'unique-needle-token-for-fts', + }, + ); + + const before = await call>('task_messages', { + session_id: sessionB, + agent: 'codex', + }); + expect(before.find((m) => m.id === message_observation_id)).toBeDefined(); + + const { status } = await call<{ status: string }>('task_message_retract', { + message_observation_id, + session_id: sessionA, + reason: 'duplicate', + }); + expect(status).toBe('retracted'); + + const after = await call>('task_messages', { + session_id: sessionB, + agent: 'codex', + }); + expect(after.find((m) => m.id === message_observation_id)).toBeUndefined(); + + // Body still findable via FTS. + const hits = store.storage.searchFts('"unique-needle-token-for-fts"'); + expect(hits.find((h) => h.id === message_observation_id)).toBeDefined(); + }); + + it('task_message_retract refuses non-senders with NOT_SENDER', async () => { + const { task_id, sessionA, sessionB } = seedTwoSessionTask(); + const { message_observation_id } = await call<{ message_observation_id: number }>( + 'task_message', + { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + content: 'mine to retract', + }, + ); + const err = await callError('task_message_retract', { + message_observation_id, + session_id: sessionB, + }); + expect(err.code).toBe(TASK_THREAD_ERROR_CODES.NOT_SENDER); + }); + + it('task_message_claim hides broadcast from non-claimers and rejects directed messages', async () => { + const { task_id, sessionA, sessionB, sessionC } = seedThreeSessionTask(); + const { message_observation_id } = await call<{ message_observation_id: number }>( + 'task_message', + { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'any', + content: 'broadcast for claim', + }, + ); + + // Pre-claim: B and C both see it. + expect( + ( + await call>('task_messages', { + session_id: sessionB, + agent: 'codex', + task_ids: [task_id], + }) + ).find((m) => m.id === message_observation_id)?.is_claimable_broadcast, + ).toBe(true); + + const claim = await call<{ status: string; claimed_by_session_id: string }>( + 'task_message_claim', + { + message_observation_id, + session_id: sessionB, + agent: 'codex', + }, + ); + expect(claim.status).toBe('claimed'); + expect(claim.claimed_by_session_id).toBe(sessionB); + + // C no longer sees the broadcast in their inbox. + const cInbox = await call>('task_messages', { + session_id: sessionC, + agent: 'claude', + task_ids: [task_id], + }); + expect(cInbox.find((m) => m.id === message_observation_id)).toBeUndefined(); + + // Second claimer rejected with ALREADY_CLAIMED. + const dupe = await callError('task_message_claim', { + message_observation_id, + session_id: sessionC, + agent: 'claude', + }); + expect(dupe.code).toBe(TASK_THREAD_ERROR_CODES.ALREADY_CLAIMED); + + // Directed message can't be claimed. + const { message_observation_id: directedId } = await call<{ message_observation_id: number }>( + 'task_message', + { + task_id, + session_id: sessionA, + agent: 'claude', + to_agent: 'codex', + content: 'directed', + }, + ); + const notBroadcast = await callError('task_message_claim', { + message_observation_id: directedId, + session_id: sessionB, + agent: 'codex', + }); + expect(notBroadcast.code).toBe(TASK_THREAD_ERROR_CODES.NOT_BROADCAST); + }); }); diff --git a/apps/mcp-server/test/server.test.ts b/apps/mcp-server/test/server.test.ts index bb130cb..b00302a 100644 --- a/apps/mcp-server/test/server.test.ts +++ b/apps/mcp-server/test/server.test.ts @@ -73,7 +73,9 @@ describe('MCP server', () => { 'task_hand_off', 'task_list', 'task_message', + 'task_message_claim', 'task_message_mark_read', + 'task_message_retract', 'task_messages', 'task_plan_claim_subtask', 'task_plan_complete_subtask', diff --git a/docs/mcp.md b/docs/mcp.md index cac7840..c2ece35 100644 --- a/docs/mcp.md +++ b/docs/mcp.md @@ -399,7 +399,7 @@ Returns `{ status: 'cancelled' }`. Errors include `{ code, error }`. ## `task_message` -Send a direct message to another agent on a task thread. Use for coordination chat that **doesn't** transfer file claims — for "hand off the work + files", use `task_hand_off`. A message is a `task_post` with kind `message`, explicit addressing, and a read/reply lifecycle. +Send a direct message to another agent on a task thread. Use for coordination chat that **doesn't** transfer file claims — for "hand off the work + files", use `task_hand_off`. A message is a `task_post` with kind `message`, explicit addressing, and a read/reply/expire/retract/claim lifecycle. ```json { @@ -412,16 +412,17 @@ Send a direct message to another agent on a task thread. Use for coordination ch "to_session_id": "sess_xyz", "content": "can you re-run the typecheck on your branch?", "urgency": "needs_reply", - "reply_to": 401 + "reply_to": 401, + "expires_in_minutes": 60 } } ``` -`to_agent` ∈ `claude | codex | any` — `any` broadcasts to every participant but the sender. `to_session_id` narrows delivery to a specific live session. `urgency` ∈ `fyi | needs_reply | blocking` and controls preface prominence. `reply_to` chains a reply; the parent message's status flips to `replied` atomically on the send. Returns `{ message_observation_id, status: 'unread' }`. +`to_agent` ∈ `claude | codex | any` — `any` broadcasts to every participant but the sender. `to_session_id` narrows delivery to a specific live session. `urgency` ∈ `fyi | needs_reply | blocking` and controls preface prominence: `fyi` coalesces into a counter, `needs_reply` renders as a summary, `blocking` lands at the top of the preface and never coalesces. `reply_to` chains a reply; the parent message's status flips to `replied` atomically on the send. **Reply chains are 1-deep authoritative**: replies-to-replies are allowed, but only the immediate parent's status flips, never a transitively-referenced ancestor. `expires_in_minutes` (max 7 days) gives the message a TTL — past-TTL unread messages drop out of inbox queries and any later `task_message_mark_read` returns `MESSAGE_EXPIRED`; bodies remain in storage for audit and stay searchable via FTS. Replying to a still-unclaimed broadcast auto-claims it for the replier (see `task_message_claim`). Returns `{ message_observation_id, status: 'unread' }`. ## `task_messages` -List messages addressed to you across tasks you participate in (or scoped with `task_ids`). Compact shape — fetch full bodies via `get_observations`. Does **not** mark as read; call `task_message_mark_read` explicitly so an agent can peek at its inbox during planning without burning the "you have new mail" signal. +List messages addressed to you across tasks you participate in (or scoped with `task_ids`). Compact shape — fetch full bodies via `get_observations`. Does **not** mark as read; call `task_message_mark_read` explicitly so an agent can peek at its inbox during planning without burning the "you have new mail" signal. Retracted messages and broadcasts already claimed by other agents are filtered out of every recipient's view. ```json { @@ -436,11 +437,11 @@ List messages addressed to you across tasks you participate in (or scoped with ` } ``` -Returns `[ { id, task_id, ts, from_session_id, from_agent, to_agent, to_session_id, urgency, status, reply_to, preview } ]`, newest-first. +Returns `[ { id, task_id, ts, from_session_id, from_agent, to_agent, to_session_id, urgency, status, reply_to, preview, expires_at, is_claimable_broadcast, claimed_by_session_id, claimed_by_agent } ]`, newest-first. `status` reflects the effective state: an `unread` row past its TTL surfaces as `expired` even if the on-disk status hasn't been rewritten yet. ## `task_message_mark_read` -Mark a message as read. Idempotent — re-marking a read or replied message is a no-op. Returns the resulting `status`. +Mark a message as read. Idempotent — re-marking a read or replied message is a no-op. Writes a sibling `message_read` observation so the original sender sees a read receipt in their `attention_inbox`. Returns the resulting `status`. ```json { @@ -449,7 +450,41 @@ Mark a message as read. Idempotent — re-marking a read or replied message is a } ``` -Errors include `{ code, error }` with stable codes like `NOT_MESSAGE`, `TASK_MISMATCH`, or `OBSERVATION_NOT_ON_TASK`. +Errors include `{ code, error }` with stable codes: `NOT_MESSAGE`, `TASK_MISMATCH`, `OBSERVATION_NOT_ON_TASK`, `NOT_PARTICIPANT`, `NOT_TARGET_SESSION`, `NOT_TARGET_AGENT`, `MESSAGE_EXPIRED` (TTL elapsed before read; status flips to `expired` on the same call), or `ALREADY_RETRACTED` (sender retracted the message). + +## `task_message_retract` + +Retract a message you sent. Sets the status to `retracted` and the body stops surfacing in any recipient's inbox; the body stays in storage (still searchable via FTS, still in the timeline) for audit. Cannot retract a message that has already been replied to — at that point the recipient has invested response work and silently rewriting the sender's intent would be deceptive. + +```json +{ + "name": "task_message_retract", + "input": { + "message_observation_id": 512, + "session_id": "sess_abc", + "reason": "duplicate of #498" + } +} +``` + +Errors: `NOT_MESSAGE`, `TASK_MISMATCH`, `NOT_SENDER` (only the original sender may retract), `ALREADY_REPLIED`, `ALREADY_RETRACTED`. + +## `task_message_claim` + +Claim a `to_agent='any'` broadcast message. Once claimed, the broadcast drops out of every other recipient's inbox; only the claimer keeps seeing it. Use when you want to silently take ownership of a broadcast before responding — replying via `task_message` already auto-claims, so this tool is for the "I'll handle it but not yet ready to reply" case. + +```json +{ + "name": "task_message_claim", + "input": { + "message_observation_id": 730, + "session_id": "sess_xyz", + "agent": "codex" + } +} +``` + +Returns `{ status: 'claimed', claimed_by_session_id, claimed_by_agent, claimed_at }`. Errors: `NOT_MESSAGE`, `NOT_BROADCAST` (directed messages can't be claimed), `NOT_PARTICIPANT`, `ALREADY_CLAIMED` (someone else got there first — idempotent for the existing claimer), `MESSAGE_EXPIRED`, `ALREADY_RETRACTED`. ## `recall_session` diff --git a/packages/core/src/attention-inbox.ts b/packages/core/src/attention-inbox.ts index 906ffc2..6d6b32c 100644 --- a/packages/core/src/attention-inbox.ts +++ b/packages/core/src/attention-inbox.ts @@ -62,6 +62,45 @@ export interface InboxRecentClaim { claimed_at: number; } +/** + * Coalesced view: a group of inbox messages that share `(task_id, + * from_session_id, urgency)`. Lets the preface render "B sent 4 fyi + * messages on task #12, latest: …" instead of four near-identical lines. + * + * `blocking` urgency lands in groups of size 1 — every blocking message + * stays its own row so no critical signal gets folded into a counter. + * Single-message groups for non-blocking urgencies still ship as a group + * (size 1) so consumers iterate one structure. + */ +export interface CoalescedMessageGroup { + task_id: number; + from_session_id: string; + from_agent: string; + urgency: MessageSummary['urgency']; + count: number; + message_ids: number[]; + latest_id: number; + latest_ts: number; + latest_preview: string; +} + +/** + * Read-receipt surfaced to the original sender. Built from sibling + * `message_read` observations whose metadata names this session as the + * `original_sender_session_id`. The "still-awaiting-reply" predicate is + * computed against the read message's *current* status: if the recipient + * has since replied, the receipt is dropped — the reply is the stronger + * signal and the inbox shouldn't double-surface. + */ +export interface ReadReceipt { + task_id: number; + read_message_id: number; + read_at: number; + read_by_session_id: string; + read_by_agent: string; + urgency: MessageSummary['urgency']; +} + export interface AttentionInbox { generated_at: number; session_id: string; @@ -72,11 +111,24 @@ export interface AttentionInbox { unread_message_count: number; stalled_lane_count: number; recent_other_claim_count: number; + /** + * True iff at least one unread message is `urgency='blocking'`. The + * preface renderer should use this to gate non-message sections — + * advisory only at this layer (we still populate the other fields) + * because a hard hide here would also hide the inbox surface that + * lets consumers debug why they were blocked. + */ + blocked: boolean; next_action: string; }; pending_handoffs: InboxHandoff[]; pending_wakes: InboxWake[]; unread_messages: InboxMessage[]; + /** Same set of unread messages, grouped by (task, sender, urgency). */ + coalesced_messages: CoalescedMessageGroup[]; + /** `message_read` siblings for messages this session originally sent + * that have not been replied to. Sized by `read_receipt_window_ms`. */ + read_receipts: ReadReceipt[]; stalled_lanes: InboxLane[]; recent_other_claims: InboxRecentClaim[]; } @@ -100,10 +152,17 @@ export interface AttentionInboxOptions { unread_message_limit?: number; /** Defaults true; hooks can disable filesystem hivemind reads for hot paths. */ include_stalled_lanes?: boolean; + /** Window (ms) for read-receipt surfacing. Receipts older than this drop + * out so a long-running session doesn't accumulate stale "B read your + * message 3 days ago" hints. Default 6h. */ + read_receipt_window_ms?: number; + read_receipt_limit?: number; } const DEFAULT_RECENT_CLAIM_WINDOW_MS = 15 * 60_000; const DEFAULT_RECENT_CLAIM_LIMIT = 20; +const DEFAULT_READ_RECEIPT_WINDOW_MS = 6 * 60 * 60_000; +const DEFAULT_READ_RECEIPT_LIMIT = 20; /** * Aggregate "things that need this session's attention" across tasks and @@ -153,18 +212,24 @@ export function buildAttentionInbox( const stalled_lanes = opts.include_stalled_lanes === false ? [] : collectStalledLanes(opts); + const read_receipts = collectReadReceipts(store, opts, taskIds, now); + const coalesced_messages = coalesceMessages(unread_messages); + const blocked = unread_messages.some((m) => m.urgency === 'blocking'); + const summary = { pending_handoff_count: pending_handoffs.length, pending_wake_count: pending_wakes.length, unread_message_count: unread_messages.length, stalled_lane_count: stalled_lanes.length, recent_other_claim_count: recent_other_claims.length, + blocked, next_action: deriveNextAction({ pending_handoffs, pending_wakes, unread_messages, stalled_lanes, recent_other_claims, + read_receipts, }), }; @@ -176,11 +241,112 @@ export function buildAttentionInbox( pending_handoffs, pending_wakes, unread_messages, + coalesced_messages, + read_receipts, stalled_lanes, recent_other_claims, }; } +/** + * Group unread messages by `(task_id, from_session_id, urgency)`. Blocking + * urgency is intentionally still grouped — blocking messages from the same + * sender on the same task collapse just like fyi ones — but a blocking + * group of size N still renders as N separate rows in the preface because + * blocking signals don't tolerate folding into a counter. + */ +function coalesceMessages(messages: InboxMessage[]): CoalescedMessageGroup[] { + const groups = new Map(); + for (const m of messages) { + const key = `${m.task_id}${m.from_session_id}${m.urgency}`; + const bucket = groups.get(key); + if (bucket) bucket.push(m); + else groups.set(key, [m]); + } + const out: CoalescedMessageGroup[] = []; + for (const bucket of groups.values()) { + bucket.sort((a, b) => a.ts - b.ts); + const latest = bucket[bucket.length - 1]; + if (!latest) continue; + out.push({ + task_id: latest.task_id, + from_session_id: latest.from_session_id, + from_agent: latest.from_agent, + urgency: latest.urgency, + count: bucket.length, + message_ids: bucket.map((m) => m.id), + latest_id: latest.id, + latest_ts: latest.ts, + latest_preview: latest.preview, + }); + } + // Newest group first, matching unread_messages ordering. + return out.sort((a, b) => b.latest_ts - a.latest_ts); +} + +/** + * Walk task observation rows of kind 'message_read' and surface the ones + * whose metadata names the calling session as the original sender. Drops + * a receipt when the underlying message has since been replied to (the + * reply is the stronger signal) or when the receipt is older than + * `read_receipt_window_ms`. + */ +function collectReadReceipts( + store: MemoryStore, + opts: AttentionInboxOptions, + taskIds: number[], + now: number, +): ReadReceipt[] { + const window = opts.read_receipt_window_ms ?? DEFAULT_READ_RECEIPT_WINDOW_MS; + const cap = opts.read_receipt_limit ?? DEFAULT_READ_RECEIPT_LIMIT; + const since = now - window; + const out: ReadReceipt[] = []; + for (const task_id of taskIds) { + const rows = store.storage.taskObservationsByKind(task_id, 'message_read', cap * 2); + for (const r of rows) { + if (r.ts < since) continue; + if (!r.metadata) continue; + let meta: { + kind?: string; + original_sender_session_id?: string; + read_message_id?: number; + read_by_session_id?: string; + read_by_agent?: string; + urgency?: MessageSummary['urgency']; + ts?: number; + }; + try { + meta = JSON.parse(r.metadata); + } catch { + continue; + } + if (meta.kind !== 'message_read') continue; + if (meta.original_sender_session_id !== opts.session_id) continue; + if (typeof meta.read_message_id !== 'number') continue; + // Drop when the original message has since been replied to. The + // reply already reaches the sender as a fresh inbox entry, so a + // surviving receipt would be redundant noise. + const messageRow = store.storage.getObservation(meta.read_message_id); + if (!messageRow || messageRow.kind !== 'message') continue; + try { + const messageMeta = JSON.parse(messageRow.metadata ?? '{}') as { status?: string }; + if (messageMeta.status === 'replied') continue; + } catch { + continue; + } + out.push({ + task_id, + read_message_id: meta.read_message_id, + read_at: typeof meta.ts === 'number' ? meta.ts : r.ts, + read_by_session_id: meta.read_by_session_id ?? r.session_id, + read_by_agent: meta.read_by_agent ?? '', + urgency: meta.urgency ?? 'fyi', + }); + } + } + return out.sort((a, b) => b.read_at - a.read_at).slice(0, cap); +} + function resolveTaskIds(store: MemoryStore, opts: AttentionInboxOptions): number[] { if (opts.task_ids && opts.task_ids.length > 0) { return [...new Set(opts.task_ids)]; @@ -282,6 +448,7 @@ function deriveNextAction(parts: { unread_messages: InboxMessage[]; stalled_lanes: InboxLane[]; recent_other_claims: InboxRecentClaim[]; + read_receipts: ReadReceipt[]; }): string { if (parts.unread_messages.some((m) => m.urgency === 'blocking')) { return 'Answer blocking task messages first; another agent is explicitly blocked on you.'; @@ -298,6 +465,9 @@ function deriveNextAction(parts: { if (parts.unread_messages.length > 0) { return 'Review unread FYI task messages when context allows.'; } + if (parts.read_receipts.some((r) => r.urgency !== 'fyi')) { + return 'Recipients have read your needs_reply messages without responding — consider following up.'; + } if (parts.stalled_lanes.length > 0) { return 'Review stalled lanes — takeover may be safer than waiting for the owner to return.'; } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 54aa906..4828960 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,6 +14,9 @@ export { TaskThread, TaskThreadError, TASK_THREAD_ERROR_CODES, + isBroadcastMessage, + isMessageAddressedTo, + isVisibleToBroadcastClaimant, type CoordinationKind, type HandoffMetadata, type HandoffObservation, @@ -42,11 +45,13 @@ export { buildAttentionInbox, type AttentionInbox, type AttentionInboxOptions, + type CoalescedMessageGroup, type InboxHandoff, type InboxLane, type InboxMessage, type InboxRecentClaim, type InboxWake, + type ReadReceipt, } from './attention-inbox.js'; export { detectRepoBranch } from './git-detect.js'; export { diff --git a/packages/core/src/messages.ts b/packages/core/src/messages.ts index 0f8c9b5..f5708b0 100644 --- a/packages/core/src/messages.ts +++ b/packages/core/src/messages.ts @@ -1,11 +1,11 @@ import type { MemoryStore } from './memory-store.js'; -import type { - MessageMetadata, - MessageStatus, - MessageTarget, - MessageUrgency, +import type { MessageStatus, MessageTarget, MessageUrgency } from './task-thread.js'; +import { + isBroadcastMessage, + isMessageAddressedTo, + isVisibleToBroadcastClaimant, + parseMessage, } from './task-thread.js'; -import { isMessageAddressedTo } from './task-thread.js'; /** * Compact view of a directed message, safe to return from MCP list-shape @@ -26,6 +26,14 @@ export interface MessageSummary { status: MessageStatus; reply_to: number | null; preview: string; + /** Absolute ms-epoch when this message stops surfacing in inbox queries. + * null when the message has no TTL. */ + expires_at: number | null; + /** True when this is a `to_agent='any'` broadcast that has not yet been + * claimed; the inbox should hint that "first agent to engage owns it". */ + is_claimable_broadcast: boolean; + claimed_by_session_id: string | null; + claimed_by_agent: string | null; } export interface ListMessagesOptions { @@ -59,6 +67,18 @@ const DEFAULT_PREVIEW_LENGTH = 120; * This function does *not* mark messages as read; `markMessageRead` is a * separate explicit call so an agent can peek at its inbox during planning * without destroying the "you have new mail" signal for a later turn. + * + * Filtering rules layered on top of addressing: + * - Retracted messages are always hidden from recipients. The body is + * still in storage (and still searchable via FTS) for the sender's + * audit trail; recipients see only that the message is gone. + * - Past-TTL `unread` messages are hidden from `unread_only` queries — + * they no longer count as awaiting response. They keep showing in + * non-`unread_only` queries with their on-disk status so audit / debug + * callers can see "B never read this before TTL". + * - `to_agent='any'` broadcasts that have been claimed by another + * session drop out of every non-claimer inbox. The claimer keeps + * seeing the message normally. */ export function listMessagesForAgent( store: MemoryStore, @@ -67,6 +87,7 @@ export function listMessagesForAgent( const since = opts.since_ts ?? 0; const limit = Math.min(opts.limit ?? DEFAULT_LIMIT, MAX_LIMIT); const previewLen = opts.previewLength ?? DEFAULT_PREVIEW_LENGTH; + const now = Date.now(); const taskIds = opts.task_ids?.length ? [...new Set(opts.task_ids)].filter( @@ -80,19 +101,25 @@ export function listMessagesForAgent( for (const r of rows) { if (r.ts < since) continue; if (r.session_id === opts.session_id) continue; - if (!r.metadata) continue; - let meta: MessageMetadata; - try { - meta = JSON.parse(r.metadata) as MessageMetadata; - } catch { - continue; - } - if (meta.kind !== 'message') continue; + // parseMessage backfills fields added by the messaging-overhaul + // change (expires_at, claimed_by_*, retracted_at, retract_reason) + // so legacy rows compare correctly against the strict-null predicates + // below. + const meta = parseMessage(r.metadata); + if (!meta) continue; const addressedToMe = isMessageAddressedTo(meta, opts.session_id, opts.agent); if (!addressedToMe) continue; - if (opts.unread_only && meta.status !== 'unread') continue; + if (meta.status === 'retracted') continue; + if (!isVisibleToBroadcastClaimant(meta, opts.session_id)) continue; + + const isExpired = meta.expires_at !== null && now > meta.expires_at; + if (opts.unread_only) { + if (meta.status !== 'unread') continue; + if (isExpired) continue; + } + const broadcast = isBroadcastMessage(meta); out.push({ id: r.id, task_id, @@ -102,9 +129,16 @@ export function listMessagesForAgent( to_agent: meta.to_agent, to_session_id: meta.to_session_id, urgency: meta.urgency, - status: meta.status, + // Surface the effective status: an unread row past TTL renders as + // 'expired' even though the on-disk status is still 'unread'. This + // keeps inbox consumers from having to recompute the predicate. + status: isExpired && meta.status === 'unread' ? 'expired' : meta.status, reply_to: r.reply_to ?? null, preview: r.content.slice(0, previewLen), + expires_at: meta.expires_at, + is_claimable_broadcast: broadcast && meta.claimed_by_session_id === null, + claimed_by_session_id: meta.claimed_by_session_id, + claimed_by_agent: meta.claimed_by_agent, }); } } diff --git a/packages/core/src/task-thread.ts b/packages/core/src/task-thread.ts index 2c1f8f2..d66f57b 100644 --- a/packages/core/src/task-thread.ts +++ b/packages/core/src/task-thread.ts @@ -19,7 +19,9 @@ export type CoordinationKind = | 'wake_request' | 'wake_ack' | 'wake_cancel' - | 'message'; + | 'message' + | 'message_read' + | 'message_retract'; export type HandoffStatus = 'pending' | 'accepted' | 'expired' | 'cancelled'; export type HandoffTarget = 'claude' | 'codex' | 'any'; @@ -27,7 +29,7 @@ export type HandoffTarget = 'claude' | 'codex' | 'any'; export type WakeStatus = 'pending' | 'acknowledged' | 'expired' | 'cancelled'; export type WakeTarget = 'claude' | 'codex' | 'any'; -export type MessageStatus = 'unread' | 'read' | 'replied'; +export type MessageStatus = 'unread' | 'read' | 'replied' | 'expired' | 'retracted'; export type MessageTarget = 'claude' | 'codex' | 'any'; export type MessageUrgency = 'fyi' | 'needs_reply' | 'blocking'; @@ -41,8 +43,14 @@ export const TASK_THREAD_ERROR_CODES = { ALREADY_ACCEPTED: 'ALREADY_ACCEPTED', ALREADY_ACKNOWLEDGED: 'ALREADY_ACKNOWLEDGED', ALREADY_CANCELLED: 'ALREADY_CANCELLED', + ALREADY_REPLIED: 'ALREADY_REPLIED', + ALREADY_RETRACTED: 'ALREADY_RETRACTED', + ALREADY_CLAIMED: 'ALREADY_CLAIMED', HANDOFF_EXPIRED: 'HANDOFF_EXPIRED', WAKE_EXPIRED: 'WAKE_EXPIRED', + MESSAGE_EXPIRED: 'MESSAGE_EXPIRED', + NOT_BROADCAST: 'NOT_BROADCAST', + NOT_SENDER: 'NOT_SENDER', NOT_TARGET_SESSION: 'NOT_TARGET_SESSION', NOT_PARTICIPANT: 'NOT_PARTICIPANT', NOT_TARGET_AGENT: 'NOT_TARGET_AGENT', @@ -177,12 +185,33 @@ export interface HandOffArgs { * the structured fields below live in `observations.metadata` as JSON. * * `status` transitions: - * - `unread` → set at send time - * - `read` → set by `markMessageRead` on the recipient's fetch (advisory) - * - `replied` → set on *write* when someone posts with `reply_to=`; - * authoritative — overrides `read`. Flipping on write (not - * read) avoids a race where the sender could see their own - * reply round-tripped as still-unread. + * - `unread` → set at send time + * - `read` → set by `markMessageRead` on the recipient's fetch (advisory) + * - `replied` → set on *write* when someone posts with `reply_to=`; + * authoritative — overrides `read`. Flipping on write (not + * read) avoids a race where the sender could see their own + * reply round-tripped as still-unread. + * - `expired` → set by `markMessageRead`/`retractMessage`/`postMessage(reply_to=…)` + * when the message's `expires_at` is in the past. Lazy: list + * queries simply hide expired rows by computing the predicate + * client-side, mirroring the `pendingHandoffsFor` pattern. + * - `retracted` → set by `retractMessage` when the original sender retracts. + * The body stays in storage (still searchable, still in + * timeline) but the inbox view shows a terse retraction + * stub instead of the original preview. + * + * Reply-chain depth: `reply_to` is **1-deep authoritative**. We flip *only* + * the immediate parent's status, never a transitively-referenced ancestor. + * Replies-to-replies are allowed but only the immediate parent's status + * changes, and there is no thread-root tracking. If you want a long thread, + * model it as `task_post` notes; messages are for short directed exchanges. + * + * Broadcast claim: `to_agent='any' && to_session_id===null` messages are + * visible to every non-sender participant by default. Once any agent calls + * `claimBroadcastMessage` (or replies to it), `claimed_by_session_id` / + * `claimed_by_agent` / `claimed_at` are set and the message drops out of + * other agents' inboxes — only the claimer keeps seeing it. Replying to a + * still-unclaimed broadcast auto-claims for the replier. */ export interface MessageMetadata { kind: 'message'; @@ -196,6 +225,17 @@ export interface MessageMetadata { read_at: number | null; replied_by_observation_id: number | null; replied_at: number | null; + /** Absolute ms-epoch when this message stops surfacing in inbox queries. + * null = no TTL; the message is visible until explicitly read/replied/retracted. */ + expires_at: number | null; + retracted_at: number | null; + retract_reason: string | null; + /** Set when an agent claims (or auto-claims via reply) a `to_agent=any` + * broadcast. Hides the message from other agents' inboxes; the claimer + * keeps seeing it. Always null on directed messages. */ + claimed_by_session_id: string | null; + claimed_by_agent: string | null; + claimed_at: number | null; } export interface MessageObservation { @@ -212,6 +252,10 @@ export interface PostMessageArgs { content: string; reply_to?: number; urgency?: MessageUrgency; + /** Optional TTL in ms. If omitted, the message has no expiry. Mirrors the + * handoff/wake `expires_in_ms` shape so MCP tool layers can present a + * uniform "expires_in_minutes" affordance. */ + expires_in_ms?: number; } export function isMessageAddressedTo( @@ -224,6 +268,13 @@ export function isMessageAddressedTo( return meta.to_agent === agent; } +/** True when this message was sent as a broadcast (`to_agent='any'`, + * no specific `to_session_id`). Broadcasts can be claimed; directed + * messages cannot. */ +export function isBroadcastMessage(meta: MessageMetadata): boolean { + return meta.to_agent === 'any' && meta.to_session_id === null; +} + const DEFAULT_HANDOFF_TTL_MS = 2 * 60 * 60 * 1000; const DEFAULT_WAKE_TTL_MS = 24 * 60 * 60 * 1000; @@ -692,8 +743,13 @@ export class TaskThread { * lifecycle. If `reply_to` points at another message, we flip the parent's * status to `replied` in the same transaction — authoritative on the * sender side so the sender sees resolution on their next read. + * + * If the parent is a still-unclaimed broadcast (`to_agent='any'`), the + * reply also auto-claims it for this session — silent ownership take so + * other participants stop seeing the broadcast in their inboxes. */ postMessage(args: PostMessageArgs): number { + const now = Date.now(); const meta: MessageMetadata = { kind: 'message', from_session_id: args.from_session_id, @@ -706,6 +762,12 @@ export class TaskThread { read_at: null, replied_by_observation_id: null, replied_at: null, + expires_at: args.expires_in_ms !== undefined ? now + args.expires_in_ms : null, + retracted_at: null, + retract_reason: null, + claimed_by_session_id: null, + claimed_by_agent: null, + claimed_at: null, }; return this.store.storage.transaction(() => { const id = this.store.addObservation({ @@ -727,10 +789,24 @@ export class TaskThread { // authoritative over their own lifecycle. const parentMeta = parent && parent.task_id === this.task_id ? parseMessage(parent.metadata) : null; - if (parentMeta && parentMeta.status !== 'replied') { + // Reply-chain depth is 1-deep: only the immediate parent flips. If + // the parent is itself a reply, we do NOT walk up to flip the root. + if (parentMeta && isReplyableStatus(parentMeta.status)) { parentMeta.status = 'replied'; parentMeta.replied_by_observation_id = id; - parentMeta.replied_at = Date.now(); + parentMeta.replied_at = now; + // Auto-claim a still-unclaimed broadcast on reply. Replying *is* + // engagement, so we don't make agents call task_message_claim + // separately; explicit claim is for the silent-ownership case. + if ( + isBroadcastMessage(parentMeta) && + parentMeta.claimed_by_session_id === null && + parentMeta.from_session_id !== args.from_session_id + ) { + parentMeta.claimed_by_session_id = args.from_session_id; + parentMeta.claimed_by_agent = args.from_agent; + parentMeta.claimed_at = now; + } this.store.storage.updateObservationMetadata(args.reply_to, JSON.stringify(parentMeta)); } } @@ -744,6 +820,14 @@ export class TaskThread { * already-read (or replied) message is a no-op so concurrent fetches from * the same recipient don't clobber the first reader's `read_at`. Returns * the resulting status for callers that want to short-circuit. + * + * Side effect: when this transitions `unread → read`, we also write a + * sibling `message_read` observation. The original sender's preface + * scans those siblings (filtering `original_sender_session_id === me`) + * to surface read receipts without polling. + * + * Past-TTL messages flip to `expired` here and throw `MESSAGE_EXPIRED`. + * Retracted messages throw `ALREADY_RETRACTED`. Both stay terminal. */ markMessageRead(message_observation_id: number, session_id: string): MessageStatus { const obs = this.store.storage.getObservation(message_observation_id); @@ -788,19 +872,193 @@ export class TaskThread { `message is for ${meta.to_agent}, not ${myAgent}`, ); } + if (meta.status === 'retracted') { + throw taskError( + TASK_THREAD_ERROR_CODES.ALREADY_RETRACTED, + 'message has been retracted by the sender', + ); + } + if (meta.expires_at !== null && Date.now() > meta.expires_at && meta.status === 'unread') { + meta.status = 'expired'; + this.store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + throw taskError(TASK_THREAD_ERROR_CODES.MESSAGE_EXPIRED, 'message expired before read'); + } if (meta.status === 'unread') { + const now = Date.now(); meta.status = 'read'; meta.read_by_session_id = session_id; - meta.read_at = Date.now(); - this.store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + meta.read_at = now; + this.store.storage.transaction(() => { + this.store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + // Sibling read-receipt observation: lets the original sender's + // preface render "B read your message at T (no reply yet)" without + // a polling channel. Compressed like every other observation; the + // structured fields live in metadata so the renderer can scan + // without decompressing content. + this.store.addObservation({ + session_id, + kind: 'message_read', + content: `read message #${message_observation_id} from ${meta.from_agent}`, + task_id: this.task_id, + reply_to: message_observation_id, + metadata: { + kind: 'message_read', + read_message_id: message_observation_id, + read_by_session_id: session_id, + read_by_agent: myAgent, + original_sender_session_id: meta.from_session_id, + urgency: meta.urgency, + ts: now, + }, + }); + this.store.storage.touchTask(this.task_id, now); + }); } return meta.status; } + /** + * Sender-side retraction. Flips status to `retracted` and stamps a + * reason; the body stays in storage (still searchable) but inbox views + * render a stub instead of the original preview. Cannot retract a + * message that has already been replied to — at that point the + * recipient has invested response work and silently rewriting the + * sender's intent would be deceptive. + */ + retractMessage(message_observation_id: number, session_id: string, reason?: string): void { + const obs = this.store.storage.getObservation(message_observation_id); + if (!obs || obs.kind !== 'message') { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_MESSAGE, + `observation ${message_observation_id} is not a message`, + ); + } + if (obs.task_id !== this.task_id) { + throw taskError( + TASK_THREAD_ERROR_CODES.TASK_MISMATCH, + `message belongs to task ${obs.task_id}, not ${this.task_id}`, + ); + } + const meta = parseMessage(obs.metadata); + if (!meta) { + throw taskError(TASK_THREAD_ERROR_CODES.METADATA_MISSING, 'message metadata missing'); + } + if (meta.from_session_id !== session_id) { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_SENDER, + 'only the original sender can retract this message', + ); + } + if (meta.status === 'retracted') { + throw taskError(TASK_THREAD_ERROR_CODES.ALREADY_RETRACTED, 'message already retracted'); + } + if (meta.status === 'replied') { + throw taskError( + TASK_THREAD_ERROR_CODES.ALREADY_REPLIED, + 'message has been replied to and cannot be retracted', + ); + } + const now = Date.now(); + meta.status = 'retracted'; + meta.retracted_at = now; + meta.retract_reason = reason ?? null; + this.store.storage.transaction(() => { + this.store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + this.store.addObservation({ + session_id, + kind: 'message_retract', + content: reason + ? `retracted message #${message_observation_id}: ${reason}` + : `retracted message #${message_observation_id}`, + task_id: this.task_id, + reply_to: message_observation_id, + metadata: { + kind: 'message_retract', + retracted_message_id: message_observation_id, + retracted_by_session_id: session_id, + ts: now, + }, + }); + this.store.storage.touchTask(this.task_id, now); + }); + } + + /** + * Claim a `to_agent='any'` broadcast. Once claimed, the message drops + * out of every other recipient's inbox; only the claimer keeps seeing + * it. Directed messages cannot be claimed (NOT_BROADCAST). Replying to + * a still-unclaimed broadcast auto-claims via `postMessage`, so this + * call is for the "silently take ownership without yet replying" case. + */ + claimBroadcastMessage( + message_observation_id: number, + session_id: string, + agent: string, + ): MessageMetadata { + const obs = this.store.storage.getObservation(message_observation_id); + if (!obs || obs.kind !== 'message') { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_MESSAGE, + `observation ${message_observation_id} is not a message`, + ); + } + if (obs.task_id !== this.task_id) { + throw taskError( + TASK_THREAD_ERROR_CODES.TASK_MISMATCH, + `message belongs to task ${obs.task_id}, not ${this.task_id}`, + ); + } + const meta = parseMessage(obs.metadata); + if (!meta) { + throw taskError(TASK_THREAD_ERROR_CODES.METADATA_MISSING, 'message metadata missing'); + } + if (!isBroadcastMessage(meta)) { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_BROADCAST, + 'only to_agent=any broadcasts can be claimed', + ); + } + if (meta.from_session_id === session_id) { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_TARGET_SESSION, + 'sender cannot claim their own broadcast', + ); + } + const myAgent = this.store.storage.getParticipantAgent(this.task_id, session_id); + if (!myAgent) { + throw taskError( + TASK_THREAD_ERROR_CODES.NOT_PARTICIPANT, + 'session is not a participant on this task', + ); + } + if (meta.status === 'retracted') { + throw taskError(TASK_THREAD_ERROR_CODES.ALREADY_RETRACTED, 'message has been retracted'); + } + if (meta.expires_at !== null && Date.now() > meta.expires_at) { + throw taskError(TASK_THREAD_ERROR_CODES.MESSAGE_EXPIRED, 'broadcast expired'); + } + if (meta.claimed_by_session_id !== null) { + if (meta.claimed_by_session_id === session_id) return meta; // idempotent + throw taskError( + TASK_THREAD_ERROR_CODES.ALREADY_CLAIMED, + `broadcast already claimed by ${meta.claimed_by_agent ?? meta.claimed_by_session_id}`, + ); + } + meta.claimed_by_session_id = session_id; + meta.claimed_by_agent = agent; + meta.claimed_at = Date.now(); + this.store.storage.updateObservationMetadata(message_observation_id, JSON.stringify(meta)); + this.store.storage.touchTask(this.task_id); + return meta; + } + /** Unread messages addressed to `session_id` / `agent`. Broadcast * messages (to_agent='any') are visible to every participant but the - * sender. */ + * sender, except once a broadcast has been claimed only the claimer + * keeps seeing it. Past-TTL messages and retracted messages are also + * hidden — list reads are pure filters and do not mutate storage. */ pendingMessagesFor(session_id: string, agent: string): MessageObservation[] { + const now = Date.now(); return this.store.storage .taskObservationsByKind(this.task_id, 'message') .map((row) => { @@ -811,12 +1069,26 @@ export class TaskThread { .filter( ({ meta }) => meta.status === 'unread' && + (meta.expires_at === null || now < meta.expires_at) && meta.from_session_id !== session_id && - isMessageAddressedTo(meta, session_id, agent), + isMessageAddressedTo(meta, session_id, agent) && + isVisibleToBroadcastClaimant(meta, session_id), ); } } +function isReplyableStatus(status: MessageStatus): boolean { + return status === 'unread' || status === 'read'; +} + +/** A claimed broadcast is invisible to non-claimer recipients. Directed + * messages and unclaimed broadcasts pass through. */ +export function isVisibleToBroadcastClaimant(meta: MessageMetadata, session_id: string): boolean { + if (!isBroadcastMessage(meta)) return true; + if (meta.claimed_by_session_id === null) return true; + return meta.claimed_by_session_id === session_id; +} + function parseHandoff(metadata: string | null): HandoffMetadata | null { if (!metadata) return null; try { @@ -843,14 +1115,28 @@ function parseWake(metadata: string | null): WakeRequestMetadata | null { } } -function parseMessage(metadata: string | null): MessageMetadata | null { +export function parseMessage(metadata: string | null): MessageMetadata | null { if (!metadata) return null; try { const parsed = JSON.parse(metadata) as unknown; if (!parsed || typeof parsed !== 'object') return null; const m = parsed as Partial; if (m.kind !== 'message' || typeof m.status !== 'string') return null; - return parsed as MessageMetadata; + // Backfill the fields added in the messaging-overhaul change. Legacy + // rows persisted before this PR shipped have these keys absent (not + // explicitly null), and the visibility predicates below use strict + // `=== null` comparisons. Defaulting at parse time keeps every + // downstream check honest without a database migration. Use `??` so + // newer rows that explicitly set the field to a non-null value are + // preserved unchanged. + const meta = parsed as MessageMetadata; + meta.expires_at = meta.expires_at ?? null; + meta.retracted_at = meta.retracted_at ?? null; + meta.retract_reason = meta.retract_reason ?? null; + meta.claimed_by_session_id = meta.claimed_by_session_id ?? null; + meta.claimed_by_agent = meta.claimed_by_agent ?? null; + meta.claimed_at = meta.claimed_at ?? null; + return meta; } catch { return null; } diff --git a/packages/core/test/attention-inbox.test.ts b/packages/core/test/attention-inbox.test.ts index af7912d..da40e35 100644 --- a/packages/core/test/attention-inbox.test.ts +++ b/packages/core/test/attention-inbox.test.ts @@ -124,6 +124,147 @@ describe('buildAttentionInbox', () => { expect(inbox.unread_messages).toHaveLength(0); }); + it('blocking unread messages set summary.blocked and the message-first next_action', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox-blocked', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'schema choice blocking next slice', + urgency: 'blocking', + }); + + const inbox = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread.task_id], + }); + expect(inbox.summary.blocked).toBe(true); + expect(inbox.summary.next_action).toMatch(/blocking task messages/i); + + // No blocking → blocked=false even with other unread messages. + const thread2 = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox-not-blocked', + session_id: 'claude', + }); + thread2.join('claude', 'claude'); + thread2.join('codex', 'codex'); + thread2.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'fyi only', + urgency: 'fyi', + }); + const inbox2 = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread2.task_id], + }); + expect(inbox2.summary.blocked).toBe(false); + }); + + it('coalesces non-blocking messages by (task, sender, urgency); blocking groups stay size 1', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/coalesce', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'fyi 1', + urgency: 'fyi', + }); + thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'fyi 2', + urgency: 'fyi', + }); + thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'urgent', + urgency: 'blocking', + }); + + const inbox = buildAttentionInbox(store, { + session_id: 'codex', + agent: 'codex', + task_ids: [thread.task_id], + }); + expect(inbox.unread_messages).toHaveLength(3); + + // Two fyi from same sender on same task should coalesce into one group. + const fyiGroup = inbox.coalesced_messages.find((g) => g.urgency === 'fyi'); + expect(fyiGroup?.count).toBe(2); + expect(fyiGroup?.message_ids).toHaveLength(2); + + // Blocking always stays as its own group of size 1. + const blockingGroups = inbox.coalesced_messages.filter((g) => g.urgency === 'blocking'); + expect(blockingGroups).toHaveLength(1); + expect(blockingGroups[0]?.count).toBe(1); + }); + + it('surfaces read receipts for needs_reply messages that have been read but not replied', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/inbox-receipts', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'please review', + urgency: 'needs_reply', + }); + thread.markMessageRead(id, 'codex'); + + const senderInbox = buildAttentionInbox(store, { + session_id: 'claude', + agent: 'claude', + task_ids: [thread.task_id], + }); + expect(senderInbox.read_receipts).toHaveLength(1); + expect(senderInbox.read_receipts[0]?.read_message_id).toBe(id); + expect(senderInbox.read_receipts[0]?.urgency).toBe('needs_reply'); + expect(senderInbox.summary.next_action).toMatch(/recipients have read/i); + + // A reply removes the receipt — the reply is a stronger signal. + thread.postMessage({ + from_session_id: 'codex', + from_agent: 'codex', + to_agent: 'claude', + content: 'looking', + reply_to: id, + }); + const after = buildAttentionInbox(store, { + session_id: 'claude', + agent: 'claude', + task_ids: [thread.task_id], + }); + expect(after.read_receipts).toHaveLength(0); + }); + it('returns the quiet-inbox next_action hint when nothing is pending', () => { seed('codex'); const thread = TaskThread.open(store, { diff --git a/packages/core/test/task-thread.test.ts b/packages/core/test/task-thread.test.ts index cd6550e..a96b59f 100644 --- a/packages/core/test/task-thread.test.ts +++ b/packages/core/test/task-thread.test.ts @@ -193,6 +193,284 @@ describe('TaskThread', () => { expect(thread.pendingHandoffsFor('claude', 'claude')).toHaveLength(0); }); + it('postMessage(expires_in_ms) hides the message from inbox after TTL passes', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/ttl', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'short-lived note', + urgency: 'fyi', + expires_in_ms: 1, + }); + + // Pre-expire the message by rewriting expires_at into the past. + const row = store.storage.getObservation(id); + const meta = JSON.parse(row?.metadata ?? '{}') as { expires_at: number }; + meta.expires_at = Date.now() - 1000; + store.storage.updateObservationMetadata(id, JSON.stringify(meta)); + + expect(thread.pendingMessagesFor('codex', 'codex')).toHaveLength(0); + + // markMessageRead on a past-TTL message must throw MESSAGE_EXPIRED and + // flip the on-disk status to 'expired'. + try { + thread.markMessageRead(id, 'codex'); + throw new Error('expected MESSAGE_EXPIRED'); + } catch (err) { + expect(err).toBeInstanceOf(TaskThreadError); + expect((err as TaskThreadError).code).toBe(TASK_THREAD_ERROR_CODES.MESSAGE_EXPIRED); + } + const post = JSON.parse(store.storage.getObservation(id)?.metadata ?? '{}') as { + status: string; + }; + expect(post.status).toBe('expired'); + }); + + it('retractMessage hides the message from recipients but keeps the body searchable', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/retract', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'oops, ignore me', + urgency: 'needs_reply', + }); + expect(thread.pendingMessagesFor('codex', 'codex').map((m) => m.id)).toEqual([id]); + + thread.retractMessage(id, 'claude', 'duplicate'); + + // Inbox no longer surfaces it. + expect(thread.pendingMessagesFor('codex', 'codex')).toHaveLength(0); + + // Body remains, status='retracted', reason captured. + const row = store.storage.getObservation(id); + const meta = JSON.parse(row?.metadata ?? '{}') as { + status: string; + retract_reason: string; + retracted_at: number; + }; + expect(meta.status).toBe('retracted'); + expect(meta.retract_reason).toBe('duplicate'); + expect(typeof meta.retracted_at).toBe('number'); + }); + + it('retractMessage refuses non-senders and replied messages', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/retract-guard', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'hello', + }); + + expect(() => thread.retractMessage(id, 'codex')).toThrow(/sender/i); + + // Reply turns the message replied; later retraction must fail. + thread.postMessage({ + from_session_id: 'codex', + from_agent: 'codex', + to_agent: 'claude', + content: 'ack', + reply_to: id, + }); + try { + thread.retractMessage(id, 'claude'); + throw new Error('expected ALREADY_REPLIED'); + } catch (err) { + expect(err).toBeInstanceOf(TaskThreadError); + expect((err as TaskThreadError).code).toBe(TASK_THREAD_ERROR_CODES.ALREADY_REPLIED); + } + }); + + it('markMessageRead writes a sibling message_read observation visible to the original sender', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/receipt', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'review when free', + urgency: 'needs_reply', + }); + thread.markMessageRead(id, 'codex'); + + const reads = store.storage.taskObservationsByKind(thread.task_id, 'message_read'); + expect(reads).toHaveLength(1); + const meta = JSON.parse(reads[0]?.metadata ?? '{}') as { + kind: string; + original_sender_session_id: string; + read_message_id: number; + }; + expect(meta.kind).toBe('message_read'); + expect(meta.original_sender_session_id).toBe('claude'); + expect(meta.read_message_id).toBe(id); + }); + + it('claimBroadcastMessage hides the broadcast from non-claimer inboxes', () => { + seed('claude', 'codex', 'C'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/claim', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + thread.join('C', 'claude'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'any', + content: 'anyone want this?', + }); + expect(thread.pendingMessagesFor('codex', 'codex').map((m) => m.id)).toEqual([id]); + expect(thread.pendingMessagesFor('C', 'claude').map((m) => m.id)).toEqual([id]); + + thread.claimBroadcastMessage(id, 'codex', 'codex'); + + expect(thread.pendingMessagesFor('codex', 'codex').map((m) => m.id)).toEqual([id]); + expect(thread.pendingMessagesFor('C', 'claude')).toHaveLength(0); + + // Second claim from a different session is rejected. + try { + thread.claimBroadcastMessage(id, 'C', 'claude'); + throw new Error('expected ALREADY_CLAIMED'); + } catch (err) { + expect(err).toBeInstanceOf(TaskThreadError); + expect((err as TaskThreadError).code).toBe(TASK_THREAD_ERROR_CODES.ALREADY_CLAIMED); + } + + // Idempotent for the existing claimer. + expect(() => thread.claimBroadcastMessage(id, 'codex', 'codex')).not.toThrow(); + }); + + it('claimBroadcastMessage rejects directed messages with NOT_BROADCAST', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/claim-direct', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'codex', + content: 'directed', + }); + try { + thread.claimBroadcastMessage(id, 'codex', 'codex'); + throw new Error('expected NOT_BROADCAST'); + } catch (err) { + expect(err).toBeInstanceOf(TaskThreadError); + expect((err as TaskThreadError).code).toBe(TASK_THREAD_ERROR_CODES.NOT_BROADCAST); + } + }); + + it('replying to an unclaimed broadcast auto-claims it for the replier', () => { + seed('claude', 'codex', 'C'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/auto-claim', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + thread.join('C', 'claude'); + const id = thread.postMessage({ + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'any', + content: 'who can pair?', + }); + thread.postMessage({ + from_session_id: 'codex', + from_agent: 'codex', + to_agent: 'claude', + content: "I'll take it", + reply_to: id, + }); + const meta = JSON.parse(store.storage.getObservation(id)?.metadata ?? '{}') as { + status: string; + claimed_by_session_id: string | null; + claimed_by_agent: string | null; + }; + expect(meta.status).toBe('replied'); + expect(meta.claimed_by_session_id).toBe('codex'); + expect(meta.claimed_by_agent).toBe('codex'); + // Other agents no longer see the broadcast — the reply itself is a + // separate message and may still be in C's inbox, but the broadcast id + // must be gone (status flipped, claim recorded). + expect(thread.pendingMessagesFor('C', 'claude').find((m) => m.id === id)).toBeUndefined(); + }); + + it('legacy message rows (no expires_at/claimed_by_* fields) still surface in pendingMessagesFor', () => { + seed('claude', 'codex'); + const thread = TaskThread.open(store, { + repo_root: '/r', + branch: 'feat/legacy', + session_id: 'claude', + }); + thread.join('claude', 'claude'); + thread.join('codex', 'codex'); + // Mint a row directly with the *pre-overhaul* metadata shape — keys + // expires_at/claimed_by_*/retracted_at/retract_reason absent. The + // parseMessage helper must default these to null so the visibility + // predicates don't read them as ALREADY_CLAIMED or hidden-broadcast. + const id = store.addObservation({ + session_id: 'claude', + kind: 'message', + content: 'pre-overhaul broadcast', + task_id: thread.task_id, + metadata: { + kind: 'message', + from_session_id: 'claude', + from_agent: 'claude', + to_agent: 'any', + to_session_id: null, + urgency: 'fyi', + status: 'unread', + read_by_session_id: null, + read_at: null, + replied_by_observation_id: null, + replied_at: null, + }, + }); + const codexView = thread.pendingMessagesFor('codex', 'codex'); + expect(codexView.map((m) => m.id)).toEqual([id]); + // Legacy directed message also passes the claim filter (claimed_by_* + // defaults to null, so isVisibleToBroadcastClaimant short-circuits). + expect(() => thread.claimBroadcastMessage(id, 'codex', 'codex')).not.toThrow(); + }); + it('declineHandoff cancels the handoff and records a note', () => { seed('claude', 'codex'); const thread = TaskThread.open(store, {