From 0c5fc3ae9888ec18bb60dcd5e7c5bc4532cee47f Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Fri, 24 Apr 2026 11:27:44 +0200 Subject: [PATCH 1/2] refactor(mcp-server): split server.ts into tools/*.ts per tool group MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `apps/mcp-server/src/server.ts` was ~874 lines of sequential `server.tool(...)` calls — hard to review in a PR, worse to diff. Split the 23 tool registrations into eight focused modules under `tools/`, each exporting `register(server, ctx)`. `buildServer` is now a ~20-line registration list that preserves the original registration order so existing in-memory MCP-client tests and snapshot fixtures keep passing. New files: tools/context.ts — `ToolContext` (store, settings, resolveEmbedder) tools/shared.ts — hivemind-context helpers + types tools/heartbeat.ts — active-session heartbeat wrapper tools/search.ts — search, timeline, get_observations, list_sessions tools/hivemind.ts — hivemind, hivemind_context tools/task.ts — task_list, task_timeline, task_updates_since, task_post, task_claim_file tools/handoff.ts — task_hand_off, task_accept_handoff, task_decline_handoff tools/proposal.ts — task_propose, task_reinforce tools/profile.ts — agent_upsert_profile, agent_get_profile tools/wake.ts — task_wake, task_ack_wake, task_cancel_wake, attention_inbox, task_foraging_report No tool-behavior change. All 17 mcp-server tests (InMemory MCP client + task-thread suites) pass unchanged; full workspace suite 206/206 green. --- apps/mcp-server/src/server.ts | 831 +------------------------ apps/mcp-server/src/tools/context.ts | 14 + apps/mcp-server/src/tools/handoff.ts | 121 ++++ apps/mcp-server/src/tools/heartbeat.ts | 56 ++ apps/mcp-server/src/tools/hivemind.ts | 64 ++ apps/mcp-server/src/tools/profile.ts | 49 ++ apps/mcp-server/src/tools/proposal.ts | 77 +++ apps/mcp-server/src/tools/search.ts | 81 +++ apps/mcp-server/src/tools/shared.ts | 138 ++++ apps/mcp-server/src/tools/task.ts | 107 ++++ apps/mcp-server/src/tools/wake.ts | 163 +++++ 11 files changed, 894 insertions(+), 807 deletions(-) create mode 100644 apps/mcp-server/src/tools/context.ts create mode 100644 apps/mcp-server/src/tools/handoff.ts create mode 100644 apps/mcp-server/src/tools/heartbeat.ts create mode 100644 apps/mcp-server/src/tools/hivemind.ts create mode 100644 apps/mcp-server/src/tools/profile.ts create mode 100644 apps/mcp-server/src/tools/proposal.ts create mode 100644 apps/mcp-server/src/tools/search.ts create mode 100644 apps/mcp-server/src/tools/shared.ts create mode 100644 apps/mcp-server/src/tools/task.ts create mode 100644 apps/mcp-server/src/tools/wake.ts diff --git a/apps/mcp-server/src/server.ts b/apps/mcp-server/src/server.ts index d12cc7b..b978ac6 100644 --- a/apps/mcp-server/src/server.ts +++ b/apps/mcp-server/src/server.ts @@ -1,29 +1,20 @@ #!/usr/bin/env node import { join } from 'node:path'; import { type Settings, loadSettings, resolveDataDir } from '@colony/config'; -import { - type AgentCapabilities, - type AttentionInboxOptions, - DEFAULT_CAPABILITIES, - type Embedder, - type HivemindOptions, - type HivemindSession, - type HivemindSnapshot, - MemoryStore, - ProposalSystem, - type SearchResult, - TaskThread, - buildAttentionInbox, - loadProfile, - readHivemind, - saveProfile, -} from '@colony/core'; +import { type Embedder, MemoryStore } from '@colony/core'; import { createEmbedder } from '@colony/embedding'; -import { type HookInput, type HookName, upsertActiveSession } from '@colony/hooks'; import { isMainEntry } from '@colony/process'; import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; -import { z } from 'zod'; +import type { ToolContext } from './tools/context.js'; +import * as handoff from './tools/handoff.js'; +import { installActiveSessionHeartbeat } from './tools/heartbeat.js'; +import * as hivemind from './tools/hivemind.js'; +import * as profile from './tools/profile.js'; +import * as proposal from './tools/proposal.js'; +import * as search from './tools/search.js'; +import * as task from './tools/task.js'; +import * as wake from './tools/wake.js'; /** * MCP stdio server exposing progressive-disclosure tools: @@ -64,798 +55,24 @@ export function buildServer(store: MemoryStore, settings: Settings): McpServer { return embedder; }; - server.tool( - 'search', - 'Search memory. Returns compact hits — fetch full bodies via get_observations.', - { query: z.string().min(1), limit: z.number().int().positive().max(50).optional() }, - async ({ query, limit }) => { - const e = (await resolveEmbedder()) ?? undefined; - const hits = await store.search(query, limit, e); - return { - content: [{ type: 'text', text: JSON.stringify(hits) }], - }; - }, - ); - - server.tool( - 'timeline', - 'Chronological observation IDs for a session. Use to locate context around a point.', - { - session_id: z.string().min(1), - around_id: z.number().int().positive().optional(), - limit: z.number().int().positive().max(200).optional(), - }, - async ({ session_id, around_id, limit }) => { - const rows = store.timeline(session_id, around_id, limit); - const compact = rows.map((r) => ({ id: r.id, kind: r.kind, ts: r.ts })); - return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; - }, - ); - - server.tool( - 'get_observations', - 'Fetch full observation bodies by ID. Returns expanded text by default.', - { - ids: z.array(z.number().int().positive()).min(1).max(50), - expand: z.boolean().optional(), - }, - async ({ ids, expand: expandOpt }) => { - const rows = store.getObservations(ids, { expand: expandOpt ?? true }); - const payload = rows.map((r) => ({ - id: r.id, - session_id: r.session_id, - kind: r.kind, - ts: r.ts, - content: r.content, - metadata: r.metadata, - })); - return { content: [{ type: 'text', text: JSON.stringify(payload) }] }; - }, - ); - - server.tool( - 'list_sessions', - 'List recent sessions in reverse chronological order. Use to navigate before calling timeline.', - { limit: z.number().int().positive().max(200).optional() }, - async ({ limit }) => { - const sessions = store.storage.listSessions(limit ?? 20); - return { - content: [ - { - type: 'text', - text: JSON.stringify( - sessions.map((s) => ({ - id: s.id, - ide: s.ide, - cwd: s.cwd, - started_at: s.started_at, - ended_at: s.ended_at, - })), - ), - }, - ], - }; - }, - ); - - server.tool( - 'hivemind', - 'Summarize active agent sessions and task ownership from proxy-runtime state files.', - { - repo_root: z.string().min(1).optional(), - repo_roots: z.array(z.string().min(1)).max(20).optional(), - include_stale: z.boolean().optional(), - limit: z.number().int().positive().max(100).optional(), - }, - async ({ repo_root, repo_roots, include_stale, limit }) => { - const options: Parameters[0] = {}; - if (repo_root !== undefined) options.repoRoot = repo_root; - if (repo_roots !== undefined) options.repoRoots = repo_roots; - if (include_stale !== undefined) options.includeStale = include_stale; - if (limit !== undefined) options.limit = limit; - const snapshot = readHivemind(options); - return { content: [{ type: 'text', text: JSON.stringify(snapshot) }] }; - }, - ); - - server.tool( - 'hivemind_context', - 'Return active task ownership plus compact relevant memory hits. Use before fetching full observations.', - { - repo_root: z.string().min(1).optional(), - repo_roots: z.array(z.string().min(1)).max(20).optional(), - include_stale: z.boolean().optional(), - limit: z.number().int().positive().max(100).optional(), - query: z.string().min(1).optional(), - memory_limit: z.number().int().positive().max(10).optional(), - }, - async ({ repo_root, repo_roots, include_stale, limit, query, memory_limit }) => { - const snapshot = readHivemind( - toHivemindOptions({ repo_root, repo_roots, include_stale, limit }), - ); - const memoryLimit = memory_limit ?? 3; - const contextQuery = buildContextQuery(query, snapshot.sessions); - let memoryHits: SearchResult[] = []; - - if (contextQuery) { - const e = (await resolveEmbedder()) ?? undefined; - memoryHits = await store.search(contextQuery, memoryLimit, e); - } - - return { - content: [ - { - type: 'text', - text: JSON.stringify(buildHivemindContext(snapshot, memoryHits, contextQuery)), - }, - ], - }; - }, - ); - - // --- task thread tools --- - // - // Agents already know their session_id from SessionStart; it's passed - // explicitly on every call so this server can stay session-agnostic and - // serve multiple agents (e.g. a shared viewer) without ambient state. - - server.tool( - 'task_list', - 'List recent task threads. Each task groups sessions collaborating on the same (repo_root, branch).', - { limit: z.number().int().positive().max(200).optional() }, - async ({ limit }) => { - const tasks = store.storage.listTasks(limit ?? 50); - return { content: [{ type: 'text', text: JSON.stringify(tasks) }] }; - }, - ); - - server.tool( - 'task_timeline', - 'Recent observations on a task thread (compact: id, kind, session_id, ts, reply_to).', - { - task_id: z.number().int().positive(), - limit: z.number().int().positive().max(200).optional(), - }, - async ({ task_id, limit }) => { - const rows = store.storage.taskTimeline(task_id, limit ?? 50); - const compact = rows.map((r) => ({ - id: r.id, - kind: r.kind, - session_id: r.session_id, - ts: r.ts, - reply_to: r.reply_to, - })); - return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; - }, - ); - - server.tool( - 'task_updates_since', - "Task-thread observations after a cursor ts, excluding this session's own posts.", - { - task_id: z.number().int().positive(), - session_id: z.string().min(1), - since_ts: z.number().int().nonnegative(), - limit: z.number().int().positive().max(200).optional(), - }, - async ({ task_id, session_id, since_ts, limit }) => { - const rows = store.storage - .taskObservationsSince(task_id, since_ts, limit ?? 50) - .filter((o) => o.session_id !== session_id); - const compact = rows.map((r) => ({ - id: r.id, - kind: r.kind, - session_id: r.session_id, - ts: r.ts, - })); - return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; - }, - ); - - server.tool( - 'task_post', - 'Post a coordination message on a task thread. Use specific tools for claim / hand_off / accept.', - { - task_id: z.number().int().positive(), - session_id: z.string().min(1), - kind: z.enum(['question', 'answer', 'decision', 'blocker', 'note']), - content: z.string().min(1), - reply_to: z.number().int().positive().optional(), - }, - async ({ task_id, session_id, kind, content, reply_to }) => { - const thread = new TaskThread(store, task_id); - const id = thread.post({ - session_id, - kind, - content, - ...(reply_to !== undefined ? { reply_to } : {}), - }); - return { content: [{ type: 'text', text: JSON.stringify({ id }) }] }; - }, - ); - - server.tool( - 'task_claim_file', - 'Claim a file on a task thread so overlapping edits from other sessions surface a warning next turn.', - { - task_id: z.number().int().positive(), - session_id: z.string().min(1), - file_path: z.string().min(1), - note: z.string().optional(), - }, - async ({ task_id, session_id, file_path, note }) => { - const thread = new TaskThread(store, task_id); - const id = thread.claimFile({ - session_id, - file_path, - ...(note !== undefined ? { note } : {}), - }); - return { content: [{ type: 'text', text: JSON.stringify({ observation_id: id }) }] }; - }, - ); - - server.tool( - 'task_hand_off', - 'Hand off work to another agent on this task. Atomically releases/transfers file claims.', - { - task_id: z.number().int().positive(), - session_id: z.string().min(1).describe('your session_id (the sender)'), - agent: z.string().min(1).describe('your agent name, e.g. claude or codex'), - to_agent: z.enum(['claude', 'codex', 'any']), - to_session_id: z.string().optional(), - summary: z.string().min(1), - next_steps: z.array(z.string()).optional(), - blockers: z.array(z.string()).optional(), - released_files: z.array(z.string()).optional(), - transferred_files: z.array(z.string()).optional(), - expires_in_minutes: z.number().int().positive().max(480).optional(), - }, - async (args) => { - const thread = new TaskThread(store, args.task_id); - const id = thread.handOff({ - from_session_id: args.session_id, - from_agent: args.agent, - to_agent: args.to_agent, - ...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}), - summary: args.summary, - ...(args.next_steps !== undefined ? { next_steps: args.next_steps } : {}), - ...(args.blockers !== undefined ? { blockers: args.blockers } : {}), - ...(args.released_files !== undefined ? { released_files: args.released_files } : {}), - ...(args.transferred_files !== undefined - ? { transferred_files: args.transferred_files } - : {}), - ...(args.expires_in_minutes !== undefined - ? { expires_in_ms: args.expires_in_minutes * 60_000 } - : {}), - }); - return { - content: [ - { type: 'text', text: JSON.stringify({ handoff_observation_id: id, status: 'pending' }) }, - ], - }; - }, - ); - - server.tool( - 'task_accept_handoff', - 'Accept a pending handoff addressed to you. Installs transferred file claims under your session.', - { - handoff_observation_id: z.number().int().positive(), - session_id: z.string().min(1), - }, - async ({ handoff_observation_id, session_id }) => { - const obs = store.storage.getObservation(handoff_observation_id); - if (!obs?.task_id) { - return { - content: [ - { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, - ], - isError: true, - }; - } - const thread = new TaskThread(store, obs.task_id); - try { - thread.acceptHandoff(handoff_observation_id, session_id); - return { content: [{ type: 'text', text: JSON.stringify({ status: 'accepted' }) }] }; - } catch (err) { - return { - content: [ - { - type: 'text', - text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), - }, - ], - isError: true, - }; - } - }, - ); - - server.tool( - 'task_decline_handoff', - 'Decline a pending handoff. Records a reason and cancels the handoff so the sender can reissue.', - { - handoff_observation_id: z.number().int().positive(), - session_id: z.string().min(1), - reason: z.string().optional(), - }, - async ({ handoff_observation_id, session_id, reason }) => { - const obs = store.storage.getObservation(handoff_observation_id); - if (!obs?.task_id) { - return { - content: [ - { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, - ], - isError: true, - }; - } - const thread = new TaskThread(store, obs.task_id); - try { - thread.declineHandoff(handoff_observation_id, session_id, reason); - return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] }; - } catch (err) { - return { - content: [ - { - type: 'text', - text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), - }, - ], - isError: true, - }; - } - }, - ); - - server.tool( - 'task_propose', - 'Propose a potential improvement scoped to (repo_root, branch). Becomes a real task only after collective reinforcement crosses the promotion threshold.', - { - repo_root: z.string().min(1), - branch: z.string().min(1), - summary: z.string().min(1), - rationale: z.string().min(1), - touches_files: z.array(z.string()).default([]), - session_id: z.string().min(1), - }, - async ({ repo_root, branch, summary, rationale, touches_files, session_id }) => { - const proposals = new ProposalSystem(store); - const id = proposals.propose({ - repo_root, - branch, - summary, - rationale, - touches_files, - session_id, - }); - const strength = proposals.currentStrength(id); - return { - content: [ - { - type: 'text', - text: JSON.stringify({ - proposal_id: id, - strength, - promotion_threshold: ProposalSystem.PROMOTION_THRESHOLD, - }), - }, - ], - }; - }, - ); - - server.tool( - 'task_reinforce', - "Reinforce a pending proposal. kind='explicit' for direct support; 'rediscovered' when you arrived at the same idea independently.", - { - proposal_id: z.number().int().positive(), - session_id: z.string().min(1), - kind: z.enum(['explicit', 'rediscovered']).default('explicit'), - }, - async ({ proposal_id, session_id, kind }) => { - const proposals = new ProposalSystem(store); - const { strength, promoted } = proposals.reinforce({ - proposal_id, - session_id, - kind, - }); - const proposal = store.storage.getProposal(proposal_id); - return { - content: [ - { - type: 'text', - text: JSON.stringify({ - proposal_id, - strength, - promoted, - task_id: proposal?.task_id ?? null, - }), - }, - ], - }; - }, - ); - - server.tool( - 'agent_upsert_profile', - "Set or update an agent's capability profile (ui_work, api_work, test_work, infra_work, doc_work). Weights are 0..1; missing weights keep their current value (or the 0.5 default for first-time profiles). Used by the handoff router to suggest which agent is the best fit for a broadcast ('any') handoff.", - { - agent: z.string().min(1), - capabilities: z - .object({ - ui_work: z.number().min(0).max(1).optional(), - api_work: z.number().min(0).max(1).optional(), - test_work: z.number().min(0).max(1).optional(), - infra_work: z.number().min(0).max(1).optional(), - doc_work: z.number().min(0).max(1).optional(), - }) - .default({}), - }, - async ({ agent, capabilities }) => { - const definedCapabilities = Object.fromEntries( - Object.entries(capabilities).filter(([, value]) => value !== undefined), - ) as Partial; - const profile = saveProfile(store.storage, agent, definedCapabilities); - return { content: [{ type: 'text', text: JSON.stringify(profile) }] }; - }, - ); - - server.tool( - 'agent_get_profile', - 'Read an agent capability profile. Unknown agents return the default (0.5 across all dimensions).', - { agent: z.string().min(1) }, - async ({ agent }) => { - const profile = loadProfile(store.storage, agent); - return { - content: [ - { - type: 'text', - text: JSON.stringify({ ...profile, defaults: DEFAULT_CAPABILITIES }), - }, - ], - }; - }, - ); - - server.tool( - 'task_wake', - 'Post a wake request on a task thread — a lightweight nudge surfaced to the target on their next turn. No claim transfer. Use when you need another session to attend to something but a full handoff is the wrong shape.', - { - task_id: z.number().int().positive(), - session_id: z.string().min(1).describe('your session_id (the sender)'), - agent: z.string().min(1).describe('your agent name, e.g. claude or codex'), - to_agent: z.enum(['claude', 'codex', 'any']), - to_session_id: z.string().optional(), - reason: z.string().min(1), - next_step: z.string().optional(), - expires_in_minutes: z.number().int().positive().max(1440).optional(), - }, - async (args) => { - const thread = new TaskThread(store, args.task_id); - const id = thread.requestWake({ - from_session_id: args.session_id, - from_agent: args.agent, - to_agent: args.to_agent, - ...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}), - reason: args.reason, - ...(args.next_step !== undefined ? { next_step: args.next_step } : {}), - ...(args.expires_in_minutes !== undefined - ? { expires_in_ms: args.expires_in_minutes * 60_000 } - : {}), - }); - return { - content: [ - { type: 'text', text: JSON.stringify({ wake_observation_id: id, status: 'pending' }) }, - ], - }; - }, - ); - - server.tool( - 'task_ack_wake', - 'Acknowledge a pending wake request addressed to you. Records an ack on the task thread so the sender sees the response on their next turn.', - { - wake_observation_id: z.number().int().positive(), - session_id: z.string().min(1), - }, - async ({ wake_observation_id, session_id }) => { - const obs = store.storage.getObservation(wake_observation_id); - if (!obs?.task_id) { - return { - content: [ - { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, - ], - isError: true, - }; - } - const thread = new TaskThread(store, obs.task_id); - try { - thread.acknowledgeWake(wake_observation_id, session_id); - return { content: [{ type: 'text', text: JSON.stringify({ status: 'acknowledged' }) }] }; - } catch (err) { - return { - content: [ - { - type: 'text', - text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), - }, - ], - isError: true, - }; - } - }, - ); - - server.tool( - 'task_cancel_wake', - 'Cancel a pending wake request. Either the sender (withdrawing) or the target (declining) may cancel.', - { - wake_observation_id: z.number().int().positive(), - session_id: z.string().min(1), - reason: z.string().optional(), - }, - async ({ wake_observation_id, session_id, reason }) => { - const obs = store.storage.getObservation(wake_observation_id); - if (!obs?.task_id) { - return { - content: [ - { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, - ], - isError: true, - }; - } - const thread = new TaskThread(store, obs.task_id); - try { - thread.cancelWake(wake_observation_id, session_id, reason); - return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] }; - } catch (err) { - return { - content: [ - { - type: 'text', - text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), - }, - ], - isError: true, - }; - } - }, - ); - - server.tool( - 'attention_inbox', - 'Compact list of what needs your attention: pending handoffs, pending wakes, stalled lanes, recent other-session file claims. Fetch bodies via get_observations.', - { - session_id: z.string().min(1), - agent: z.string().min(1), - repo_root: z.string().min(1).optional(), - repo_roots: z.array(z.string().min(1)).max(20).optional(), - recent_claim_window_minutes: z.number().int().positive().max(1440).optional(), - recent_claim_limit: z.number().int().positive().max(100).optional(), - task_ids: z.array(z.number().int().positive()).max(100).optional(), - }, - async (args) => { - const options: AttentionInboxOptions = { - session_id: args.session_id, - agent: args.agent, - }; - if (args.repo_root !== undefined) options.repo_root = args.repo_root; - if (args.repo_roots !== undefined) options.repo_roots = args.repo_roots; - if (args.recent_claim_window_minutes !== undefined) { - options.recent_claim_window_ms = args.recent_claim_window_minutes * 60_000; - } - if (args.recent_claim_limit !== undefined) { - options.recent_claim_limit = args.recent_claim_limit; - } - if (args.task_ids !== undefined) options.task_ids = args.task_ids; - const inbox = buildAttentionInbox(store, options); - return { content: [{ type: 'text', text: JSON.stringify(inbox) }] }; - }, - ); - - server.tool( - 'task_foraging_report', - 'List pending and recently promoted proposals on a (repo_root, branch). Pending proposals whose strength has evaporated below the noise floor are omitted.', - { - repo_root: z.string().min(1), - branch: z.string().min(1), - }, - async ({ repo_root, branch }) => { - const proposals = new ProposalSystem(store); - const report = proposals.foragingReport(repo_root, branch); - return { content: [{ type: 'text', text: JSON.stringify(report) }] }; - }, - ); + const ctx: ToolContext = { store, settings, resolveEmbedder }; + + // Registration order is load-bearing: `installActiveSessionHeartbeat` wraps + // every subsequent `server.tool(...)` so tool-invocation telemetry uses the + // original tool names. The order below mirrors the order the tools appeared + // in the pre-split monolithic server.ts so existing MCP inspector fixtures + // and snapshot tests stay stable. + search.register(server, ctx); + hivemind.register(server, ctx); + task.register(server, ctx); + handoff.register(server, ctx); + proposal.register(server, ctx); + profile.register(server, ctx); + wake.register(server, ctx); return server; } -interface HivemindToolOptions { - repo_root: string | undefined; - repo_roots: string[] | undefined; - include_stale: boolean | undefined; - limit: number | undefined; -} - -interface HivemindContextLane { - repo_root: string; - branch: string; - task: string; - owner: string; - activity: HivemindSession['activity']; - activity_summary: string; - needs_attention: boolean; - risk: string; - source: HivemindSession['source']; - worktree_path: string; - updated_at: string; - elapsed_seconds: number; - locked_file_count: number; - locked_file_preview: string[]; -} - -interface HivemindContext { - generated_at: string; - repo_roots: string[]; - summary: { - lane_count: number; - memory_hit_count: number; - needs_attention_count: number; - next_action: string; - }; - counts: HivemindSnapshot['counts']; - query: string; - lanes: HivemindContextLane[]; - memory_hits: SearchResult[]; -} - -function toHivemindOptions(input: HivemindToolOptions): HivemindOptions { - const options: HivemindOptions = {}; - if (input.repo_root !== undefined) options.repoRoot = input.repo_root; - if (input.repo_roots !== undefined) options.repoRoots = input.repo_roots; - if (input.include_stale !== undefined) options.includeStale = input.include_stale; - if (input.limit !== undefined) options.limit = input.limit; - return options; -} - -function buildContextQuery(query: string | undefined, sessions: HivemindSession[]): string { - if (query?.trim()) return query.trim(); - const taskText = sessions - .flatMap((session) => [ - session.task, - session.task_name, - session.routing_reason, - ...session.locked_file_preview, - ]) - .map((entry) => entry.trim()) - .filter(Boolean); - return [...new Set(taskText)].join(' ').slice(0, 800); -} - -function buildHivemindContext( - snapshot: HivemindSnapshot, - memoryHits: SearchResult[], - query: string, -): HivemindContext { - const lanes = snapshot.sessions.map(toContextLane); - const needsAttentionCount = lanes.filter((lane) => lane.needs_attention).length; - - return { - generated_at: snapshot.generated_at, - repo_roots: snapshot.repo_roots, - summary: { - lane_count: lanes.length, - memory_hit_count: memoryHits.length, - needs_attention_count: needsAttentionCount, - next_action: nextAction(lanes, memoryHits), - }, - counts: snapshot.counts, - query, - lanes, - memory_hits: memoryHits, - }; -} - -function toContextLane(session: HivemindSession): HivemindContextLane { - const risk = laneRisk(session); - return { - repo_root: session.repo_root, - branch: session.branch, - task: session.task, - owner: `${session.agent}/${session.cli}`, - activity: session.activity, - activity_summary: session.activity_summary, - needs_attention: risk !== 'none', - risk, - source: session.source, - worktree_path: session.worktree_path, - updated_at: session.updated_at, - elapsed_seconds: session.elapsed_seconds, - locked_file_count: session.locked_file_count, - locked_file_preview: session.locked_file_preview, - }; -} - -function laneRisk(session: HivemindSession): string { - if (session.activity === 'dead') return 'dead session'; - if (session.activity === 'stalled') return 'stale telemetry'; - if (session.activity === 'unknown') return 'unknown runtime state'; - return 'none'; -} - -function nextAction(lanes: HivemindContextLane[], memoryHits: SearchResult[]): string { - if (lanes.some((lane) => lane.needs_attention)) { - return 'Inspect lanes with needs_attention before taking over or editing nearby files.'; - } - if (lanes.length > 0 && memoryHits.length > 0) { - return 'Use lane ownership first, then fetch only the specific memory IDs needed.'; - } - if (lanes.length > 0) { - return 'Use lane ownership before editing; no matching memory hit was needed.'; - } - if (memoryHits.length > 0) { - return 'No live lanes found; fetch only the memory IDs needed.'; - } - return 'No live lanes or matching memory found.'; -} - -interface McpClientIdentity { - sessionId: string; - ide: string; -} - -function detectMcpClientIdentity(env: NodeJS.ProcessEnv = process.env): McpClientIdentity { - const codexId = env.CODEX_SESSION_ID?.trim(); - if (codexId) return { sessionId: codexId, ide: 'codex' }; - const claudeId = env.CLAUDECODE_SESSION_ID?.trim() ?? env.CLAUDE_SESSION_ID?.trim(); - if (claudeId) return { sessionId: claudeId, ide: 'claude-code' }; - const override = env.COLONY_CLIENT_SESSION_ID?.trim(); - if (override) return { sessionId: override, ide: env.COLONY_CLIENT_IDE?.trim() ?? 'unknown' }; - // Fallback: stable per parent-process so the lane coalesces across tool calls. - return { sessionId: `mcp-${process.ppid}`, ide: env.COLONY_CLIENT_IDE?.trim() ?? 'unknown' }; -} - -function installActiveSessionHeartbeat(server: McpServer): void { - const client = detectMcpClientIdentity(); - const cwd = process.cwd(); - - const touch = (hook: HookName, extras: Partial = {}): void => { - try { - upsertActiveSession( - { session_id: client.sessionId, ide: client.ide, cwd, ...extras }, - hook, - ); - } catch { - // Heartbeat is best-effort; never fail a tool call because the JSON - // sidecar cannot be written. - } - }; - - // Register the client the moment the server is built — before any tool - // call — so the lane is visible on the very first hivemind query. - touch('session-start', { source: 'mcp-connect' }); - - // Wrap every subsequent `server.tool(...)` registration so each invocation - // bumps lastHeartbeatAt and reports the invoked tool name as the current - // task preview. The SDK overloads this method; we only care that the last - // argument is the handler. - type ToolRegister = McpServer['tool']; - const originalTool = server.tool.bind(server) as ToolRegister; - (server as { tool: ToolRegister }).tool = ((...toolArgs: unknown[]) => { - const name = typeof toolArgs[0] === 'string' ? toolArgs[0] : 'unknown'; - const handlerIndex = toolArgs.length - 1; - const handler = toolArgs[handlerIndex]; - if (typeof handler === 'function') { - const original = handler as (...handlerArgs: unknown[]) => unknown; - toolArgs[handlerIndex] = async (...handlerArgs: unknown[]) => { - touch('post-tool-use', { tool_name: `colony.${name}` }); - return original(...handlerArgs); - }; - } - return (originalTool as (...a: unknown[]) => ReturnType)(...toolArgs); - }) as ToolRegister; -} - export async function main(): Promise { const settings = loadSettings(); const dbPath = join(resolveDataDir(settings.dataDir), 'data.db'); diff --git a/apps/mcp-server/src/tools/context.ts b/apps/mcp-server/src/tools/context.ts new file mode 100644 index 0000000..e7a1290 --- /dev/null +++ b/apps/mcp-server/src/tools/context.ts @@ -0,0 +1,14 @@ +import type { Settings } from '@colony/config'; +import type { Embedder, MemoryStore } from '@colony/core'; + +/** Shared closure captured by every register(server, ctx) call in tools/*.ts. */ +export interface ToolContext { + store: MemoryStore; + settings: Settings; + /** + * Lazy-singleton embedder. Returns null when the provider is `none` or the + * model failed to load. The first `search` call pays the model-load cost; + * every subsequent call reuses the cached instance. + */ + resolveEmbedder: () => Promise; +} diff --git a/apps/mcp-server/src/tools/handoff.ts b/apps/mcp-server/src/tools/handoff.ts new file mode 100644 index 0000000..e77e147 --- /dev/null +++ b/apps/mcp-server/src/tools/handoff.ts @@ -0,0 +1,121 @@ +import { TaskThread } from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store } = ctx; + + server.tool( + 'task_hand_off', + 'Hand off work to another agent on this task. Atomically releases/transfers file claims.', + { + task_id: z.number().int().positive(), + session_id: z.string().min(1).describe('your session_id (the sender)'), + agent: z.string().min(1).describe('your agent name, e.g. claude or codex'), + to_agent: z.enum(['claude', 'codex', 'any']), + to_session_id: z.string().optional(), + summary: z.string().min(1), + next_steps: z.array(z.string()).optional(), + blockers: z.array(z.string()).optional(), + released_files: z.array(z.string()).optional(), + transferred_files: z.array(z.string()).optional(), + expires_in_minutes: z.number().int().positive().max(480).optional(), + }, + async (args) => { + const thread = new TaskThread(store, args.task_id); + const id = thread.handOff({ + from_session_id: args.session_id, + from_agent: args.agent, + to_agent: args.to_agent, + ...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}), + summary: args.summary, + ...(args.next_steps !== undefined ? { next_steps: args.next_steps } : {}), + ...(args.blockers !== undefined ? { blockers: args.blockers } : {}), + ...(args.released_files !== undefined ? { released_files: args.released_files } : {}), + ...(args.transferred_files !== undefined + ? { transferred_files: args.transferred_files } + : {}), + ...(args.expires_in_minutes !== undefined + ? { expires_in_ms: args.expires_in_minutes * 60_000 } + : {}), + }); + return { + content: [ + { type: 'text', text: JSON.stringify({ handoff_observation_id: id, status: 'pending' }) }, + ], + }; + }, + ); + + server.tool( + 'task_accept_handoff', + 'Accept a pending handoff addressed to you. Installs transferred file claims under your session.', + { + handoff_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + }, + async ({ handoff_observation_id, session_id }) => { + const obs = store.storage.getObservation(handoff_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.acceptHandoff(handoff_observation_id, session_id); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'accepted' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); + + server.tool( + 'task_decline_handoff', + 'Decline a pending handoff. Records a reason and cancels the handoff so the sender can reissue.', + { + handoff_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + reason: z.string().optional(), + }, + async ({ handoff_observation_id, session_id, reason }) => { + const obs = store.storage.getObservation(handoff_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.declineHandoff(handoff_observation_id, session_id, reason); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); +} diff --git a/apps/mcp-server/src/tools/heartbeat.ts b/apps/mcp-server/src/tools/heartbeat.ts new file mode 100644 index 0000000..ba36ddc --- /dev/null +++ b/apps/mcp-server/src/tools/heartbeat.ts @@ -0,0 +1,56 @@ +import { type HookInput, type HookName, upsertActiveSession } from '@colony/hooks'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; + +interface McpClientIdentity { + sessionId: string; + ide: string; +} + +function detectMcpClientIdentity(env: NodeJS.ProcessEnv = process.env): McpClientIdentity { + const codexId = env.CODEX_SESSION_ID?.trim(); + if (codexId) return { sessionId: codexId, ide: 'codex' }; + const claudeId = env.CLAUDECODE_SESSION_ID?.trim() ?? env.CLAUDE_SESSION_ID?.trim(); + if (claudeId) return { sessionId: claudeId, ide: 'claude-code' }; + const override = env.COLONY_CLIENT_SESSION_ID?.trim(); + if (override) return { sessionId: override, ide: env.COLONY_CLIENT_IDE?.trim() ?? 'unknown' }; + // Fallback: stable per parent-process so the lane coalesces across tool calls. + return { sessionId: `mcp-${process.ppid}`, ide: env.COLONY_CLIENT_IDE?.trim() ?? 'unknown' }; +} + +export function installActiveSessionHeartbeat(server: McpServer): void { + const client = detectMcpClientIdentity(); + const cwd = process.cwd(); + + const touch = (hook: HookName, extras: Partial = {}): void => { + try { + upsertActiveSession({ session_id: client.sessionId, ide: client.ide, cwd, ...extras }, hook); + } catch { + // Heartbeat is best-effort; never fail a tool call because the JSON + // sidecar cannot be written. + } + }; + + // Register the client the moment the server is built — before any tool + // call — so the lane is visible on the very first hivemind query. + touch('session-start', { source: 'mcp-connect' }); + + // Wrap every subsequent `server.tool(...)` registration so each invocation + // bumps lastHeartbeatAt and reports the invoked tool name as the current + // task preview. The SDK overloads this method; we only care that the last + // argument is the handler. + type ToolRegister = McpServer['tool']; + const originalTool = server.tool.bind(server) as ToolRegister; + (server as { tool: ToolRegister }).tool = ((...toolArgs: unknown[]) => { + const name = typeof toolArgs[0] === 'string' ? toolArgs[0] : 'unknown'; + const handlerIndex = toolArgs.length - 1; + const handler = toolArgs[handlerIndex]; + if (typeof handler === 'function') { + const original = handler as (...handlerArgs: unknown[]) => unknown; + toolArgs[handlerIndex] = async (...handlerArgs: unknown[]) => { + touch('post-tool-use', { tool_name: `colony.${name}` }); + return original(...handlerArgs); + }; + } + return (originalTool as (...a: unknown[]) => ReturnType)(...toolArgs); + }) as ToolRegister; +} diff --git a/apps/mcp-server/src/tools/hivemind.ts b/apps/mcp-server/src/tools/hivemind.ts new file mode 100644 index 0000000..f926544 --- /dev/null +++ b/apps/mcp-server/src/tools/hivemind.ts @@ -0,0 +1,64 @@ +import { readHivemind, type SearchResult } from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; +import { buildContextQuery, buildHivemindContext, toHivemindOptions } from './shared.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store, resolveEmbedder } = ctx; + + server.tool( + 'hivemind', + 'Summarize active agent sessions and task ownership from proxy-runtime state files.', + { + repo_root: z.string().min(1).optional(), + repo_roots: z.array(z.string().min(1)).max(20).optional(), + include_stale: z.boolean().optional(), + limit: z.number().int().positive().max(100).optional(), + }, + async ({ repo_root, repo_roots, include_stale, limit }) => { + const options: Parameters[0] = {}; + if (repo_root !== undefined) options.repoRoot = repo_root; + if (repo_roots !== undefined) options.repoRoots = repo_roots; + if (include_stale !== undefined) options.includeStale = include_stale; + if (limit !== undefined) options.limit = limit; + const snapshot = readHivemind(options); + return { content: [{ type: 'text', text: JSON.stringify(snapshot) }] }; + }, + ); + + server.tool( + 'hivemind_context', + 'Return active task ownership plus compact relevant memory hits. Use before fetching full observations.', + { + repo_root: z.string().min(1).optional(), + repo_roots: z.array(z.string().min(1)).max(20).optional(), + include_stale: z.boolean().optional(), + limit: z.number().int().positive().max(100).optional(), + query: z.string().min(1).optional(), + memory_limit: z.number().int().positive().max(10).optional(), + }, + async ({ repo_root, repo_roots, include_stale, limit, query, memory_limit }) => { + const snapshot = readHivemind( + toHivemindOptions({ repo_root, repo_roots, include_stale, limit }), + ); + const memoryLimit = memory_limit ?? 3; + const contextQuery = buildContextQuery(query, snapshot.sessions); + let memoryHits: SearchResult[] = []; + + if (contextQuery) { + const e = (await resolveEmbedder()) ?? undefined; + memoryHits = await store.search(contextQuery, memoryLimit, e); + } + + return { + content: [ + { + type: 'text', + text: JSON.stringify(buildHivemindContext(snapshot, memoryHits, contextQuery)), + }, + ], + }; + }, + ); +} diff --git a/apps/mcp-server/src/tools/profile.ts b/apps/mcp-server/src/tools/profile.ts new file mode 100644 index 0000000..d72d3bc --- /dev/null +++ b/apps/mcp-server/src/tools/profile.ts @@ -0,0 +1,49 @@ +import { type AgentCapabilities, DEFAULT_CAPABILITIES, loadProfile, saveProfile } from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store } = ctx; + + server.tool( + 'agent_upsert_profile', + "Set or update an agent's capability profile (ui_work, api_work, test_work, infra_work, doc_work). Weights are 0..1; missing weights keep their current value (or the 0.5 default for first-time profiles). Used by the handoff router to suggest which agent is the best fit for a broadcast ('any') handoff.", + { + agent: z.string().min(1), + capabilities: z + .object({ + ui_work: z.number().min(0).max(1).optional(), + api_work: z.number().min(0).max(1).optional(), + test_work: z.number().min(0).max(1).optional(), + infra_work: z.number().min(0).max(1).optional(), + doc_work: z.number().min(0).max(1).optional(), + }) + .default({}), + }, + async ({ agent, capabilities }) => { + const definedCapabilities = Object.fromEntries( + Object.entries(capabilities).filter(([, value]) => value !== undefined), + ) as Partial; + const profile = saveProfile(store.storage, agent, definedCapabilities); + return { content: [{ type: 'text', text: JSON.stringify(profile) }] }; + }, + ); + + server.tool( + 'agent_get_profile', + 'Read an agent capability profile. Unknown agents return the default (0.5 across all dimensions).', + { agent: z.string().min(1) }, + async ({ agent }) => { + const profile = loadProfile(store.storage, agent); + return { + content: [ + { + type: 'text', + text: JSON.stringify({ ...profile, defaults: DEFAULT_CAPABILITIES }), + }, + ], + }; + }, + ); +} diff --git a/apps/mcp-server/src/tools/proposal.ts b/apps/mcp-server/src/tools/proposal.ts new file mode 100644 index 0000000..47beeac --- /dev/null +++ b/apps/mcp-server/src/tools/proposal.ts @@ -0,0 +1,77 @@ +import { ProposalSystem } from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store } = ctx; + + server.tool( + 'task_propose', + 'Propose a potential improvement scoped to (repo_root, branch). Becomes a real task only after collective reinforcement crosses the promotion threshold.', + { + repo_root: z.string().min(1), + branch: z.string().min(1), + summary: z.string().min(1), + rationale: z.string().min(1), + touches_files: z.array(z.string()).default([]), + session_id: z.string().min(1), + }, + async ({ repo_root, branch, summary, rationale, touches_files, session_id }) => { + const proposals = new ProposalSystem(store); + const id = proposals.propose({ + repo_root, + branch, + summary, + rationale, + touches_files, + session_id, + }); + const strength = proposals.currentStrength(id); + return { + content: [ + { + type: 'text', + text: JSON.stringify({ + proposal_id: id, + strength, + promotion_threshold: ProposalSystem.PROMOTION_THRESHOLD, + }), + }, + ], + }; + }, + ); + + server.tool( + 'task_reinforce', + "Reinforce a pending proposal. kind='explicit' for direct support; 'rediscovered' when you arrived at the same idea independently.", + { + proposal_id: z.number().int().positive(), + session_id: z.string().min(1), + kind: z.enum(['explicit', 'rediscovered']).default('explicit'), + }, + async ({ proposal_id, session_id, kind }) => { + const proposals = new ProposalSystem(store); + const { strength, promoted } = proposals.reinforce({ + proposal_id, + session_id, + kind, + }); + const proposal = store.storage.getProposal(proposal_id); + return { + content: [ + { + type: 'text', + text: JSON.stringify({ + proposal_id, + strength, + promoted, + task_id: proposal?.task_id ?? null, + }), + }, + ], + }; + }, + ); +} diff --git a/apps/mcp-server/src/tools/search.ts b/apps/mcp-server/src/tools/search.ts new file mode 100644 index 0000000..3c6dcef --- /dev/null +++ b/apps/mcp-server/src/tools/search.ts @@ -0,0 +1,81 @@ +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store, resolveEmbedder } = ctx; + + server.tool( + 'search', + 'Search memory. Returns compact hits — fetch full bodies via get_observations.', + { query: z.string().min(1), limit: z.number().int().positive().max(50).optional() }, + async ({ query, limit }) => { + const e = (await resolveEmbedder()) ?? undefined; + const hits = await store.search(query, limit, e); + return { + content: [{ type: 'text', text: JSON.stringify(hits) }], + }; + }, + ); + + server.tool( + 'timeline', + 'Chronological observation IDs for a session. Use to locate context around a point.', + { + session_id: z.string().min(1), + around_id: z.number().int().positive().optional(), + limit: z.number().int().positive().max(200).optional(), + }, + async ({ session_id, around_id, limit }) => { + const rows = store.timeline(session_id, around_id, limit); + const compact = rows.map((r) => ({ id: r.id, kind: r.kind, ts: r.ts })); + return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; + }, + ); + + server.tool( + 'get_observations', + 'Fetch full observation bodies by ID. Returns expanded text by default.', + { + ids: z.array(z.number().int().positive()).min(1).max(50), + expand: z.boolean().optional(), + }, + async ({ ids, expand: expandOpt }) => { + const rows = store.getObservations(ids, { expand: expandOpt ?? true }); + const payload = rows.map((r) => ({ + id: r.id, + session_id: r.session_id, + kind: r.kind, + ts: r.ts, + content: r.content, + metadata: r.metadata, + })); + return { content: [{ type: 'text', text: JSON.stringify(payload) }] }; + }, + ); + + server.tool( + 'list_sessions', + 'List recent sessions in reverse chronological order. Use to navigate before calling timeline.', + { limit: z.number().int().positive().max(200).optional() }, + async ({ limit }) => { + const sessions = store.storage.listSessions(limit ?? 20); + return { + content: [ + { + type: 'text', + text: JSON.stringify( + sessions.map((s) => ({ + id: s.id, + ide: s.ide, + cwd: s.cwd, + started_at: s.started_at, + ended_at: s.ended_at, + })), + ), + }, + ], + }; + }, + ); +} diff --git a/apps/mcp-server/src/tools/shared.ts b/apps/mcp-server/src/tools/shared.ts new file mode 100644 index 0000000..4aee474 --- /dev/null +++ b/apps/mcp-server/src/tools/shared.ts @@ -0,0 +1,138 @@ +import type { + HivemindOptions, + HivemindSession, + HivemindSnapshot, + SearchResult, +} from '@colony/core'; + +export interface HivemindToolOptions { + repo_root: string | undefined; + repo_roots: string[] | undefined; + include_stale: boolean | undefined; + limit: number | undefined; +} + +export interface HivemindContextLane { + repo_root: string; + branch: string; + task: string; + owner: string; + activity: HivemindSession['activity']; + activity_summary: string; + needs_attention: boolean; + risk: string; + source: HivemindSession['source']; + worktree_path: string; + updated_at: string; + elapsed_seconds: number; + locked_file_count: number; + locked_file_preview: string[]; +} + +export interface HivemindContext { + generated_at: string; + repo_roots: string[]; + summary: { + lane_count: number; + memory_hit_count: number; + needs_attention_count: number; + next_action: string; + }; + counts: HivemindSnapshot['counts']; + query: string; + lanes: HivemindContextLane[]; + memory_hits: SearchResult[]; +} + +export function toHivemindOptions(input: HivemindToolOptions): HivemindOptions { + const options: HivemindOptions = {}; + if (input.repo_root !== undefined) options.repoRoot = input.repo_root; + if (input.repo_roots !== undefined) options.repoRoots = input.repo_roots; + if (input.include_stale !== undefined) options.includeStale = input.include_stale; + if (input.limit !== undefined) options.limit = input.limit; + return options; +} + +export function buildContextQuery( + query: string | undefined, + sessions: HivemindSession[], +): string { + if (query?.trim()) return query.trim(); + const taskText = sessions + .flatMap((session) => [ + session.task, + session.task_name, + session.routing_reason, + ...session.locked_file_preview, + ]) + .map((entry) => entry.trim()) + .filter(Boolean); + return [...new Set(taskText)].join(' ').slice(0, 800); +} + +export function buildHivemindContext( + snapshot: HivemindSnapshot, + memoryHits: SearchResult[], + query: string, +): HivemindContext { + const lanes = snapshot.sessions.map(toContextLane); + const needsAttentionCount = lanes.filter((lane) => lane.needs_attention).length; + + return { + generated_at: snapshot.generated_at, + repo_roots: snapshot.repo_roots, + summary: { + lane_count: lanes.length, + memory_hit_count: memoryHits.length, + needs_attention_count: needsAttentionCount, + next_action: nextAction(lanes, memoryHits), + }, + counts: snapshot.counts, + query, + lanes, + memory_hits: memoryHits, + }; +} + +function toContextLane(session: HivemindSession): HivemindContextLane { + const risk = laneRisk(session); + return { + repo_root: session.repo_root, + branch: session.branch, + task: session.task, + owner: `${session.agent}/${session.cli}`, + activity: session.activity, + activity_summary: session.activity_summary, + needs_attention: risk !== 'none', + risk, + source: session.source, + worktree_path: session.worktree_path, + updated_at: session.updated_at, + elapsed_seconds: session.elapsed_seconds, + locked_file_count: session.locked_file_count, + locked_file_preview: session.locked_file_preview, + }; +} + +function laneRisk(session: HivemindSession): string { + if (session.activity === 'dead') return 'dead session'; + if (session.activity === 'stalled') return 'stale telemetry'; + if (session.activity === 'unknown') return 'unknown runtime state'; + return 'none'; +} + +function nextAction(lanes: HivemindContextLane[], memoryHits: SearchResult[]): string { + if (lanes.some((lane) => lane.needs_attention)) { + return 'Inspect lanes with needs_attention before taking over or editing nearby files.'; + } + if (lanes.length > 0 && memoryHits.length > 0) { + return 'Use lane ownership first, then fetch only the specific memory IDs needed.'; + } + if (lanes.length > 0) { + return 'Use lane ownership before editing; no matching memory hit was needed.'; + } + if (memoryHits.length > 0) { + return 'No live lanes found; fetch only the memory IDs needed.'; + } + return 'No live lanes or matching memory found.'; +} diff --git a/apps/mcp-server/src/tools/task.ts b/apps/mcp-server/src/tools/task.ts new file mode 100644 index 0000000..8742daa --- /dev/null +++ b/apps/mcp-server/src/tools/task.ts @@ -0,0 +1,107 @@ +import { TaskThread } from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store } = ctx; + + // Task-thread tools. Agents already know their session_id from SessionStart; + // it's passed explicitly on every call so this server stays session-agnostic + // and can serve multiple agents without ambient state. + + server.tool( + 'task_list', + 'List recent task threads. Each task groups sessions collaborating on the same (repo_root, branch).', + { limit: z.number().int().positive().max(200).optional() }, + async ({ limit }) => { + const tasks = store.storage.listTasks(limit ?? 50); + return { content: [{ type: 'text', text: JSON.stringify(tasks) }] }; + }, + ); + + server.tool( + 'task_timeline', + 'Recent observations on a task thread (compact: id, kind, session_id, ts, reply_to).', + { + task_id: z.number().int().positive(), + limit: z.number().int().positive().max(200).optional(), + }, + async ({ task_id, limit }) => { + const rows = store.storage.taskTimeline(task_id, limit ?? 50); + const compact = rows.map((r) => ({ + id: r.id, + kind: r.kind, + session_id: r.session_id, + ts: r.ts, + reply_to: r.reply_to, + })); + return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; + }, + ); + + server.tool( + 'task_updates_since', + "Task-thread observations after a cursor ts, excluding this session's own posts.", + { + task_id: z.number().int().positive(), + session_id: z.string().min(1), + since_ts: z.number().int().nonnegative(), + limit: z.number().int().positive().max(200).optional(), + }, + async ({ task_id, session_id, since_ts, limit }) => { + const rows = store.storage + .taskObservationsSince(task_id, since_ts, limit ?? 50) + .filter((o) => o.session_id !== session_id); + const compact = rows.map((r) => ({ + id: r.id, + kind: r.kind, + session_id: r.session_id, + ts: r.ts, + })); + return { content: [{ type: 'text', text: JSON.stringify(compact) }] }; + }, + ); + + server.tool( + 'task_post', + 'Post a coordination message on a task thread. Use specific tools for claim / hand_off / accept.', + { + task_id: z.number().int().positive(), + session_id: z.string().min(1), + kind: z.enum(['question', 'answer', 'decision', 'blocker', 'note']), + content: z.string().min(1), + reply_to: z.number().int().positive().optional(), + }, + async ({ task_id, session_id, kind, content, reply_to }) => { + const thread = new TaskThread(store, task_id); + const id = thread.post({ + session_id, + kind, + content, + ...(reply_to !== undefined ? { reply_to } : {}), + }); + return { content: [{ type: 'text', text: JSON.stringify({ id }) }] }; + }, + ); + + server.tool( + 'task_claim_file', + 'Claim a file on a task thread so overlapping edits from other sessions surface a warning next turn.', + { + task_id: z.number().int().positive(), + session_id: z.string().min(1), + file_path: z.string().min(1), + note: z.string().optional(), + }, + async ({ task_id, session_id, file_path, note }) => { + const thread = new TaskThread(store, task_id); + const id = thread.claimFile({ + session_id, + file_path, + ...(note !== undefined ? { note } : {}), + }); + return { content: [{ type: 'text', text: JSON.stringify({ observation_id: id }) }] }; + }, + ); +} diff --git a/apps/mcp-server/src/tools/wake.ts b/apps/mcp-server/src/tools/wake.ts new file mode 100644 index 0000000..7ff69c7 --- /dev/null +++ b/apps/mcp-server/src/tools/wake.ts @@ -0,0 +1,163 @@ +import { + type AttentionInboxOptions, + ProposalSystem, + TaskThread, + buildAttentionInbox, +} from '@colony/core'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { z } from 'zod'; +import type { ToolContext } from './context.js'; + +export function register(server: McpServer, ctx: ToolContext): void { + const { store } = ctx; + + server.tool( + 'task_wake', + 'Post a wake request on a task thread — a lightweight nudge surfaced to the target on their next turn. No claim transfer. Use when you need another session to attend to something but a full handoff is the wrong shape.', + { + task_id: z.number().int().positive(), + session_id: z.string().min(1).describe('your session_id (the sender)'), + agent: z.string().min(1).describe('your agent name, e.g. claude or codex'), + to_agent: z.enum(['claude', 'codex', 'any']), + to_session_id: z.string().optional(), + reason: z.string().min(1), + next_step: z.string().optional(), + expires_in_minutes: z.number().int().positive().max(1440).optional(), + }, + async (args) => { + const thread = new TaskThread(store, args.task_id); + const id = thread.requestWake({ + from_session_id: args.session_id, + from_agent: args.agent, + to_agent: args.to_agent, + ...(args.to_session_id !== undefined ? { to_session_id: args.to_session_id } : {}), + reason: args.reason, + ...(args.next_step !== undefined ? { next_step: args.next_step } : {}), + ...(args.expires_in_minutes !== undefined + ? { expires_in_ms: args.expires_in_minutes * 60_000 } + : {}), + }); + return { + content: [ + { type: 'text', text: JSON.stringify({ wake_observation_id: id, status: 'pending' }) }, + ], + }; + }, + ); + + server.tool( + 'task_ack_wake', + 'Acknowledge a pending wake request addressed to you. Records an ack on the task thread so the sender sees the response on their next turn.', + { + wake_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + }, + async ({ wake_observation_id, session_id }) => { + const obs = store.storage.getObservation(wake_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.acknowledgeWake(wake_observation_id, session_id); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'acknowledged' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); + + server.tool( + 'task_cancel_wake', + 'Cancel a pending wake request. Either the sender (withdrawing) or the target (declining) may cancel.', + { + wake_observation_id: z.number().int().positive(), + session_id: z.string().min(1), + reason: z.string().optional(), + }, + async ({ wake_observation_id, session_id, reason }) => { + const obs = store.storage.getObservation(wake_observation_id); + if (!obs?.task_id) { + return { + content: [ + { type: 'text', text: JSON.stringify({ error: 'observation is not on a task' }) }, + ], + isError: true, + }; + } + const thread = new TaskThread(store, obs.task_id); + try { + thread.cancelWake(wake_observation_id, session_id, reason); + return { content: [{ type: 'text', text: JSON.stringify({ status: 'cancelled' }) }] }; + } catch (err) { + return { + content: [ + { + type: 'text', + text: JSON.stringify({ error: err instanceof Error ? err.message : String(err) }), + }, + ], + isError: true, + }; + } + }, + ); + + server.tool( + 'attention_inbox', + 'Compact list of what needs your attention: pending handoffs, pending wakes, stalled lanes, recent other-session file claims. Fetch bodies via get_observations.', + { + session_id: z.string().min(1), + agent: z.string().min(1), + repo_root: z.string().min(1).optional(), + repo_roots: z.array(z.string().min(1)).max(20).optional(), + recent_claim_window_minutes: z.number().int().positive().max(1440).optional(), + recent_claim_limit: z.number().int().positive().max(100).optional(), + task_ids: z.array(z.number().int().positive()).max(100).optional(), + }, + async (args) => { + const options: AttentionInboxOptions = { + session_id: args.session_id, + agent: args.agent, + }; + if (args.repo_root !== undefined) options.repo_root = args.repo_root; + if (args.repo_roots !== undefined) options.repo_roots = args.repo_roots; + if (args.recent_claim_window_minutes !== undefined) { + options.recent_claim_window_ms = args.recent_claim_window_minutes * 60_000; + } + if (args.recent_claim_limit !== undefined) { + options.recent_claim_limit = args.recent_claim_limit; + } + if (args.task_ids !== undefined) options.task_ids = args.task_ids; + const inbox = buildAttentionInbox(store, options); + return { content: [{ type: 'text', text: JSON.stringify(inbox) }] }; + }, + ); + + server.tool( + 'task_foraging_report', + 'List pending and recently promoted proposals on a (repo_root, branch). Pending proposals whose strength has evaporated below the noise floor are omitted.', + { + repo_root: z.string().min(1), + branch: z.string().min(1), + }, + async ({ repo_root, branch }) => { + const proposals = new ProposalSystem(store); + const report = proposals.foragingReport(repo_root, branch); + return { content: [{ type: 'text', text: JSON.stringify(report) }] }; + }, + ); +} From ad674d5ac5be14da27349d3c97060266c9fbcdeb Mon Sep 17 00:00:00 2001 From: NagyVikt Date: Fri, 24 Apr 2026 11:28:14 +0200 Subject: [PATCH 2/2] chore(changeset): mcp-server tools split Document the internal-only refactor that moved the 23 MCP tool registrations into per-group modules. No public surface change. --- .changeset/mcp-split-tools.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .changeset/mcp-split-tools.md diff --git a/.changeset/mcp-split-tools.md b/.changeset/mcp-split-tools.md new file mode 100644 index 0000000..6a26996 --- /dev/null +++ b/.changeset/mcp-split-tools.md @@ -0,0 +1,11 @@ +--- +'@colony/mcp-server': patch +--- + +Split `apps/mcp-server/src/server.ts` into eight per-tool-group modules +under `src/tools/` (search, hivemind, task, handoff, proposal, profile, +wake, plus shared/context/heartbeat helpers). `buildServer()` is now a +small registration list that calls `register(server, ctx)` on each +group in the same order the tools appeared in the pre-split file. +Behavior is unchanged — all 17 mcp-server tests (InMemory MCP client +hitting every tool + task-thread suites) pass without modification.