Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions apps/mcp-server/test/messages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,10 @@ describe('task threads — direct messages', () => {
expect(senderInbox.find((m) => m.id === message_observation_id)).toBeUndefined();
});

it('to_session_id routes to the target session and stays invisible to mismatched-agent participants', async () => {
it('to_session_id routes only to the target session, not every matching-agent participant', async () => {
const { task_id, sessionA, sessionB, sessionC } = seedThreeSessionTask();
store.startSession({ id: 'D', ide: 'claude-code', cwd: '/repo' });
new TaskThread(store, task_id).join('D', 'claude');

const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
Expand All @@ -214,15 +216,55 @@ describe('task threads — direct messages', () => {
});
expect(cInbox.find((m) => m.id === message_observation_id)).toBeDefined();

// B is a codex session; filter by to_agent='claude' should exclude B
// *and* a second claude session would also be excluded if to_session_id
// were honoured strictly. Here we only seed one extra claude session (C),
// so we assert the negative case via B.
const bInbox = await call<Array<{ id: number }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
});
expect(bInbox.find((m) => m.id === message_observation_id)).toBeUndefined();

const dInbox = await call<Array<{ id: number }>>('task_messages', {
session_id: 'D',
agent: 'claude',
});
expect(dInbox.find((m) => m.id === message_observation_id)).toBeUndefined();
expect(new TaskThread(store, task_id).pendingMessagesFor('D', 'claude')).toHaveLength(0);
});

it('task_ids cannot expose messages from tasks the caller has not joined', async () => {
const { sessionA, sessionB } = seedTwoSessionTask();
store.startSession({ id: 'C', ide: 'codex', cwd: '/repo' });
const privateThread = TaskThread.open(store, {
repo_root: '/repo',
branch: 'feat/private',
session_id: sessionA,
});
privateThread.join(sessionA, 'claude');
privateThread.join('C', 'codex');

const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id: privateThread.task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'codex',
content: 'private codex-only task details',
},
);

const outsiderInbox = await call<Array<{ id: number }>>('task_messages', {
session_id: sessionB,
agent: 'codex',
task_ids: [privateThread.task_id],
});
expect(outsiderInbox.find((m) => m.id === message_observation_id)).toBeUndefined();

const participantInbox = await call<Array<{ id: number }>>('task_messages', {
session_id: 'C',
agent: 'codex',
task_ids: [privateThread.task_id],
});
expect(participantInbox.find((m) => m.id === message_observation_id)).toBeDefined();
});

it('since_ts cursor filters out older messages', async () => {
Expand Down Expand Up @@ -321,4 +363,43 @@ describe('task threads — direct messages', () => {
});
expect(err.code).toBe(TASK_THREAD_ERROR_CODES.NOT_MESSAGE);
});

it('mark_read rejects non-participants and non-recipients without clearing unread status', async () => {
const { task_id, sessionA, sessionB, sessionC } = seedThreeSessionTask();

const { message_observation_id } = await call<{ message_observation_id: number }>(
'task_message',
{
task_id,
session_id: sessionA,
agent: 'claude',
to_agent: 'claude',
to_session_id: sessionC,
content: 'targeted to session C only',
},
);

const wrongParticipant = await callError('task_message_mark_read', {
message_observation_id,
session_id: sessionB,
});
expect(wrongParticipant.code).toBe(TASK_THREAD_ERROR_CODES.NOT_TARGET_SESSION);

store.startSession({ id: 'outsider', ide: 'codex', cwd: '/repo' });
const outsider = await callError('task_message_mark_read', {
message_observation_id,
session_id: 'outsider',
});
expect(outsider.code).toBe(TASK_THREAD_ERROR_CODES.NOT_PARTICIPANT);

const meta = JSON.parse(store.storage.getObservation(message_observation_id)?.metadata ?? '{}');
expect(meta.status).toBe('unread');
expect(meta.read_by_session_id).toBeNull();

const target = await call<{ status: string }>('task_message_mark_read', {
message_observation_id,
session_id: sessionC,
});
expect(target.status).toBe('read');
});
});
14 changes: 7 additions & 7 deletions packages/core/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
MessageTarget,
MessageUrgency,
} from './task-thread.js';
import { isMessageAddressedTo } from './task-thread.js';

/**
* Compact view of a directed message, safe to return from MCP list-shape
Expand Down Expand Up @@ -51,8 +52,8 @@ const DEFAULT_PREVIEW_LENGTH = 120;
* Return messages addressed to this (session, agent), newest-first. A
* message is addressed to the caller when:
* - metadata.to_session_id === session_id, OR
* - metadata.to_agent === agent, OR
* - metadata.to_agent === 'any' (broadcast)
* - metadata.to_session_id is absent and metadata.to_agent === agent, OR
* - metadata.to_session_id is absent and metadata.to_agent === 'any'
*
* The caller's own sends are filtered out — your outbox is not your inbox.
* This function does *not* mark messages as read; `markMessageRead` is a
Expand All @@ -68,7 +69,9 @@ export function listMessagesForAgent(
const previewLen = opts.previewLength ?? DEFAULT_PREVIEW_LENGTH;

const taskIds = opts.task_ids?.length
? [...new Set(opts.task_ids)]
? [...new Set(opts.task_ids)].filter(
(task_id) => store.storage.getParticipantAgent(task_id, opts.session_id) !== undefined,
)
: participatingTaskIds(store, opts.session_id);

const out: MessageSummary[] = [];
Expand All @@ -86,10 +89,7 @@ export function listMessagesForAgent(
}
if (meta.kind !== 'message') continue;

const addressedToMe =
meta.to_session_id === opts.session_id ||
meta.to_agent === opts.agent ||
meta.to_agent === 'any';
const addressedToMe = isMessageAddressedTo(meta, opts.session_id, opts.agent);
if (!addressedToMe) continue;
if (opts.unread_only && meta.status !== 'unread') continue;

Expand Down
37 changes: 36 additions & 1 deletion packages/core/src/task-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ export interface PostMessageArgs {
urgency?: MessageUrgency;
}

export function isMessageAddressedTo(
meta: MessageMetadata,
session_id: string,
agent: string,
): boolean {
if (meta.to_session_id !== null) return meta.to_session_id === session_id;
if (meta.to_agent === 'any') return true;
return meta.to_agent === agent;
}

const DEFAULT_HANDOFF_TTL_MS = 2 * 60 * 60 * 1000;
const DEFAULT_WAKE_TTL_MS = 24 * 60 * 60 * 1000;

Expand Down Expand Up @@ -753,6 +763,31 @@ export class TaskThread {
if (!meta) {
throw taskError(TASK_THREAD_ERROR_CODES.METADATA_MISSING, 'message metadata missing');
}
const myAgent = this.store.storage.getParticipantAgent(this.task_id, session_id);
if (!myAgent) {
throw taskError(
TASK_THREAD_ERROR_CODES.NOT_PARTICIPANT,
'session is not a participant on this task',
);
}
if (meta.from_session_id === session_id) {
throw taskError(
TASK_THREAD_ERROR_CODES.NOT_TARGET_SESSION,
'message was sent by this session',
);
}
if (meta.to_session_id !== null && meta.to_session_id !== session_id) {
throw taskError(
TASK_THREAD_ERROR_CODES.NOT_TARGET_SESSION,
'message is addressed to a different session',
);
}
if (meta.to_session_id === null && meta.to_agent !== 'any' && meta.to_agent !== myAgent) {
throw taskError(
TASK_THREAD_ERROR_CODES.NOT_TARGET_AGENT,
`message is for ${meta.to_agent}, not ${myAgent}`,
);
}
if (meta.status === 'unread') {
meta.status = 'read';
meta.read_by_session_id = session_id;
Expand All @@ -777,7 +812,7 @@ export class TaskThread {
({ meta }) =>
meta.status === 'unread' &&
meta.from_session_id !== session_id &&
(meta.to_session_id === session_id || meta.to_agent === 'any' || meta.to_agent === agent),
isMessageAddressedTo(meta, session_id, agent),
);
}
}
Expand Down
2 changes: 2 additions & 0 deletions vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ export const workspaceAliases = {
'@colony/config': resolve(rootDir, 'packages/config/src/index.ts'),
'@colony/core': resolve(rootDir, 'packages/core/src/index.ts'),
'@colony/embedding': resolve(rootDir, 'packages/embedding/src/index.ts'),
'@colony/foraging': resolve(rootDir, 'packages/foraging/src/index.ts'),
'@colony/hooks': resolve(rootDir, 'packages/hooks/src/index.ts'),
'@colony/installers': resolve(rootDir, 'packages/installers/src/index.ts'),
'@colony/mcp-server': resolve(rootDir, 'apps/mcp-server/src/server.ts'),
'@colony/process': resolve(rootDir, 'packages/process/src/index.ts'),
'@colony/spec': resolve(rootDir, 'packages/spec/src/index.ts'),
'@colony/storage': resolve(rootDir, 'packages/storage/src/index.ts'),
'@colony/worker': resolve(rootDir, 'apps/worker/src/server.ts'),
};
Expand Down
Loading