diff --git a/AGENTS.md b/AGENTS.md index a7585f0837..5aab6899ca 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -49,11 +49,34 @@ Bun is the test runner throughout. Co-locate test files next to source as `*.tes ## Authentication & Authorization Uses Better Auth for authentication (OAuth 2.1 + SSO + API keys). Authorization uses custom AccessControl layer with organization/project-level RBAC. Auth configuration is in `apps/mesh/auth-config.json` (example: `auth-config.example.json`). +## Documentation Requirements + +**IMPORTANT**: Documentation must be updated alongside code changes. Before committing: + +1. **Check for relevant docs** in `apps/docs/client/src/content/` (EN + PT-BR) +2. **Update affected docs** when changing: + - API behavior or endpoints + - Configuration options + - Authentication/authorization flows + - Connection types or proxy behavior + - New features or removed functionality +3. **Update README.md** for significant features +4. **Review docs diff** before committing to ensure accuracy + +Documentation locations: +- `apps/docs/client/src/content/en/` - English docs +- `apps/docs/client/src/content/pt-br/` - Portuguese docs +- `README.md` - Project overview and quick start +- `packages/*/README.md` - Package-specific docs + +Run `bun run docs:dev` to preview documentation changes locally. + ## Commit & Pull Request Guidelines Follow Conventional Commit-style history: `type(scope): message`, optionally wrapping the type in brackets for chores (e.g., `[chore]: update deps`). Reference issues with `(#1234)` when applicable. PRs should include: - Succinct summary of changes - Testing notes and affected areas - Screenshots for UI changes +- **Documentation updates** for any behavior changes - Confirm formatting (`bun run fmt`) and linting (`bun run lint`) pass - Run tests (`bun test`) before requesting review - Flag follow-up work with TODOs linked to issues diff --git a/README.md b/README.md index fb3d61663b..d99f24cb71 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,23 @@ Gateways are configurable and extensible. You can add new strategies and also cu --- +## STDIO Connections (Local MCPs) + +Run npx packages or custom scripts as MCP servers. Mesh passes credentials via environment variables: + +```bash +# Mesh spawns your MCP with these env vars: +MESH_TOKEN= # Infinite-expiry JWT for mesh API calls +MESH_URL= # Mesh instance URL +MESH_STATE= # Binding values as JSON +``` + +Your MCP just reads `process.env.MESH_TOKEN` — no special configuration tools needed. This mirrors how HTTP connections receive `x-mesh-token` headers. + +→ See [Building STDIO MCPs](https://docs.deco.page/en/mcp-mesh/mcp-servers) for examples in [decocms/mcps](https://github.com/decocms/mcps). + +--- + ## Define Tools Tools are first-class citizens. Type-safe, audited, observable, and callable via MCP. diff --git a/apps/docs/client/src/content/en/mcp-mesh/mcp-servers.mdx b/apps/docs/client/src/content/en/mcp-mesh/mcp-servers.mdx index 8c7bc93ac0..19ddb53bf3 100644 --- a/apps/docs/client/src/content/en/mcp-mesh/mcp-servers.mdx +++ b/apps/docs/client/src/content/en/mcp-mesh/mcp-servers.mdx @@ -8,7 +8,69 @@ import Callout from "../../../components/ui/Callout.astro"; ## What is a connection? -An **MCP Server** in the Mesh is a configured upstream MCP endpoint (typically HTTP). The Mesh stores its configuration and (optionally) credentials, and can then proxy MCP requests to it. +An **MCP Server** in the Mesh is a configured upstream MCP endpoint. The Mesh stores its configuration and (optionally) credentials, and can then proxy MCP requests to it. + +## Connection Types + +The Mesh supports two types of connections: + +### HTTP Connections + +HTTP connections are the most common type. They connect to remote MCP servers via HTTP/SSE endpoints. + +- **Use cases**: Cloud-hosted MCP servers, SaaS integrations, production deployments +- **Token handling**: Short-lived tokens (5 minutes) issued per request + +### STDIO Connections (Local Commands) + +STDIO connections spawn a local process (like `npx` or custom scripts) and communicate via stdin/stdout. + +- **Use cases**: Local tools, npx packages, development, private MCPs +- **Token handling**: Infinite-expiry tokens persisted locally by the MCP server + + + STDIO connections are perfect for running npm packages as MCP servers. Example: `npx @decocms/local-fs` runs a file system MCP locally. + + +## STDIO Credentials via Environment Variables + +When mesh spawns an STDIO MCP process, it passes credentials as environment variables: + +| Variable | Description | +|----------|-------------| +| `MESH_TOKEN` | JWT token for authenticating with Mesh API (infinite expiry) | +| `MESH_URL` | Base URL of the Mesh instance | +| `MESH_STATE` | JSON-encoded state with binding values | + +This is analogous to how HTTP connections receive `x-mesh-token` headers. + + + **No special tools needed!** Just read `process.env.MESH_TOKEN` on startup. No need to implement `ON_MCP_CONFIGURATION` or any configuration protocol. + + +### Example (Node.js/Bun) + +```typescript +// Read mesh credentials from env +const meshToken = process.env.MESH_TOKEN; +const meshUrl = process.env.MESH_URL; +const state = process.env.MESH_STATE ? JSON.parse(process.env.MESH_STATE) : {}; + +// Use token for mesh API calls +const response = await fetch(`${meshUrl}/mcp/${connectionId}`, { + headers: { Authorization: `Bearer ${meshToken}` }, + // ... +}); +``` + +## Building STDIO-Compatible MCPs + +See examples in the [decocms/mcps](https://github.com/decocms/mcps) repository: + +- `template-minimal/` - Minimal MCP without view +- `template-with-view/` - MCP with web interface +- `local-fs/` - File system MCP (runs via npx) +- `perplexity/`, `openrouter/` - Production MCPs ## In the UI diff --git a/apps/docs/client/src/content/pt-br/mcp-mesh/mcp-servers.mdx b/apps/docs/client/src/content/pt-br/mcp-mesh/mcp-servers.mdx index 91811aab93..ae278e5a1c 100644 --- a/apps/docs/client/src/content/pt-br/mcp-mesh/mcp-servers.mdx +++ b/apps/docs/client/src/content/pt-br/mcp-mesh/mcp-servers.mdx @@ -1,24 +1,86 @@ --- title: MCP Servers -description: Conexões com MCP servers upstream (HTTP) que o Mesh faz proxy e observa +description: Conexões com MCP servers upstream que o Mesh faz proxy e observa icon: Server --- +import Callout from "../../../components/ui/Callout.astro"; + ## O que é um MCP Server (no Mesh)? -No Mesh, um **MCP Server** é uma **connection** para um MCP upstream (normalmente um endpoint MCP via HTTP). +No Mesh, um **MCP Server** é uma **connection** para um MCP upstream. Cada connection guarda: -- **URL do MCP** +- **URL do MCP** (para HTTP) ou **comando** (para STDIO) - **credenciais/config** (quando necessário, armazenadas no vault) - metadados para operação (nome, status, etc.) +## Tipos de Conexão + +### Conexões HTTP + +Conexões HTTP conectam a servidores MCP remotos via endpoints HTTP/SSE. + +- **Casos de uso**: MCPs em nuvem, integrações SaaS, produção +- **Tokens**: JWT com expiração curta (5 minutos) + +### Conexões STDIO (Comandos Locais) + +Conexões STDIO iniciam um processo local (como `npx` ou scripts) e comunicam via stdin/stdout. + +- **Casos de uso**: Ferramentas locais, pacotes npx, desenvolvimento +- **Tokens**: JWT sem expiração, persistido localmente + + + Conexões STDIO são perfeitas para rodar pacotes npm como MCPs. Exemplo: `npx @decocms/local-fs` roda um MCP de sistema de arquivos localmente. + + +## Credenciais STDIO via Variáveis de Ambiente + +Quando o mesh inicia um processo MCP STDIO, ele passa credenciais como variáveis de ambiente: + +| Variável | Descrição | +|----------|-----------| +| `MESH_TOKEN` | Token JWT para autenticação com a API do Mesh (sem expiração) | +| `MESH_URL` | URL base da instância Mesh | +| `MESH_STATE` | Estado com valores de bindings em JSON | + +Isso é análogo a como conexões HTTP recebem headers `x-mesh-token`. + + + **Nenhuma ferramenta especial necessária!** Apenas leia `process.env.MESH_TOKEN` na inicialização. Não precisa implementar `ON_MCP_CONFIGURATION` ou qualquer protocolo de configuração. + + +### Exemplo (Node.js/Bun) + +```typescript +// Ler credenciais mesh das env vars +const meshToken = process.env.MESH_TOKEN; +const meshUrl = process.env.MESH_URL; +const state = process.env.MESH_STATE ? JSON.parse(process.env.MESH_STATE) : {}; + +// Usar token para chamadas à API mesh +const response = await fetch(`${meshUrl}/mcp/${connectionId}`, { + headers: { Authorization: `Bearer ${meshToken}` }, + // ... +}); +``` + ## Quando usar vs Gateway - Use **connection** para isolar e depurar um upstream específico. - Use **gateway** para expor um surface agregado/curado para os clients. +## Construindo MCPs STDIO + +Veja exemplos no repositório [decocms/mcps](https://github.com/decocms/mcps): + +- `template-minimal/` - MCP mínimo sem view +- `template-with-view/` - MCP com interface web +- `local-fs/` - MCP de sistema de arquivos (roda via npx) +- `perplexity/`, `openrouter/` - MCPs em produção + ## Boas práticas - Mantenha URLs e credenciais por ambiente (dev/staging/prod). diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 2aecd51da3..a06a22ba21 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -11,7 +11,6 @@ import { PrometheusSerializer } from "@opentelemetry/exporter-prometheus"; import { Hono } from "hono"; import { cors } from "hono/cors"; -import { logger } from "hono/logger"; import { auth } from "../auth"; import { ContextFactory, @@ -45,10 +44,71 @@ import { import { MiddlewareHandler } from "hono/types"; import { getToolsByCategory, MANAGEMENT_TOOLS } from "../tools/registry"; import { Env } from "./env"; +import { dangerouslyCreateSuperUserMCPProxy } from "./routes/proxy"; +import { resetStdioConnectionPool } from "../stdio/stable-transport"; + const getHandleOAuthProtectedResourceMetadata = () => oAuthProtectedResourceMetadata(auth); const getHandleOAuthDiscoveryMetadata = () => oAuthDiscoveryMetadata(auth); +/** + * Auto-start STDIO connections by title + * Used with AUTO_START_CONNECTIONS env var + * + * Uses dangerouslyCreateSuperUserMCPProxy to create a system-level proxy + * that spawns the STDIO process with proper credentials. + */ +async function autoStartConnectionsByTitle( + database: MeshDatabase, + titles: string[], +) { + const db = database.db; + + // Query all STDIO connections matching the titles + const connections = await db + .selectFrom("connections") + .selectAll() + .where("connection_type", "=", "STDIO") + .where("title", "in", titles) + .execute(); + + if (connections.length === 0) { + console.log(`[AutoStart] No matching STDIO connections found`); + return; + } + + console.log( + `[AutoStart] Found ${connections.length} connections to start: ${connections.map((c) => c.title).join(", ")}`, + ); + + for (const conn of connections) { + try { + console.log(`[AutoStart] Starting: ${conn.title} (${conn.id})`); + + // Create system context and use the superuser proxy + const ctx = await ContextFactory.create(); + + const proxy = await dangerouslyCreateSuperUserMCPProxy(conn.id, { + ...ctx, + auth: { ...ctx.auth, user: { id: "auto-start" } }, + }); + + // listTools() uses cached DB data, doesn't spawn STDIO + // listPrompts() forces actual client connection, triggering spawn + // Ignore "Method not found" - some MCPs don't implement prompts + try { + await proxy.client.listPrompts(); + } catch (e) { + // Ignore - the spawn happened, that's what matters + } + + console.log(`[AutoStart] ✓ ${conn.title} started`); + } catch (error) { + console.error(`[AutoStart] ✗ ${conn.title} failed:`, error); + } + } +} + /** * Resource server metadata type */ @@ -76,6 +136,13 @@ export interface CreateAppOptions { export function createApp(options: CreateAppOptions = {}) { const database = options.database ?? getDb(); + // Kill and respawn STDIO connections on restart/HMR + // Old processes have stale credentials, need fresh spawn with new tokens + // IMPORTANT: Track this promise so autoStart waits for it to complete + const poolResetPromise = resetStdioConnectionPool().catch((err) => { + console.error("[StableStdio] Error resetting pool:", err); + }); + // Stop any existing event bus worker (cleanup during HMR) if (currentEventBus && currentEventBus.isRunning()) { console.log("[EventBus] Stopping previous worker (HMR cleanup)"); @@ -127,8 +194,124 @@ export function createApp(options: CreateAppOptions = {}) { }), ); - // Request logging - app.use("*", logger()); + // ANSI color codes for elegant logging + const colors = { + reset: "\x1b[0m", + dim: "\x1b[2m", + bold: "\x1b[1m", + // Request methods + GET: "\x1b[36m", // cyan + POST: "\x1b[33m", // yellow + PUT: "\x1b[35m", // magenta + DELETE: "\x1b[31m", // red + // Status codes + ok: "\x1b[32m", // green + redirect: "\x1b[36m", // cyan + clientError: "\x1b[33m", // yellow + serverError: "\x1b[31m", // red + // Special + mcp: "\x1b[35m", // magenta for MCP + tool: "\x1b[96m", // bright cyan for tool names + duration: "\x1b[90m", // gray + }; + + const getStatusColor = (status: number) => { + if (status >= 500) return colors.serverError; + if (status >= 400) return colors.clientError; + if (status >= 300) return colors.redirect; + return colors.ok; + }; + + const getMethodColor = (method: string) => { + return colors[method as keyof typeof colors] || colors.reset; + }; + + // Request logging - enhanced for MCP calls with colors + app.use("*", async (c, next) => { + const start = Date.now(); + const method = c.req.method; + const path = c.req.path; + + // Skip noisy paths + if (path === "/api/auth/get-session" || path.includes("favicon")) { + await next(); + return; + } + + // For MCP calls, extract tool/method info + let mcpInfo = ""; + let isMcpCall = false; + if (path.startsWith("/mcp") && method === "POST") { + isMcpCall = true; + try { + const cloned = c.req.raw.clone(); + const body = (await cloned.json()) as { + method?: string; + params?: { + name?: string; + arguments?: Record; + }; + }; + if (body.method === "tools/call" && body.params?.name) { + const toolName = body.params.name; + const args = body.params.arguments || {}; + + // For event bus calls, show the event type prominently + if (toolName === "EVENT_PUBLISH" && args.type) { + mcpInfo = `${colors.tool}EVENT_PUBLISH${colors.reset} ${colors.bold}→ ${args.type}${colors.reset}`; + } else if (toolName === "EVENT_SUBSCRIBE" && args.eventType) { + mcpInfo = `${colors.tool}EVENT_SUBSCRIBE${colors.reset} ${colors.bold}← ${args.eventType}${colors.reset}`; + } else if (toolName === "EVENT_UNSUBSCRIBE" && args.eventType) { + mcpInfo = `${colors.tool}EVENT_UNSUBSCRIBE${colors.reset} ${colors.dim}✕ ${args.eventType}${colors.reset}`; + } else { + // Default: show tool name with arg keys + const argKeys = Object.keys(args); + const argsStr = + argKeys.length > 0 + ? argKeys.slice(0, 3).join(",") + + (argKeys.length > 3 ? "…" : "") + : ""; + mcpInfo = `${colors.tool}${toolName}${colors.dim}(${argsStr})${colors.reset}`; + } + } else if (body.method) { + mcpInfo = `${colors.dim}${body.method}${colors.reset}`; + } + } catch { + // Ignore parse errors + } + } + + // Format path - shorten connection IDs + let displayPath = path; + if (path.startsWith("/mcp/conn_")) { + const connId = path.split("/")[2] ?? ""; + displayPath = `/mcp/${colors.mcp}${connId.slice(0, 12)}…${colors.reset}`; + } else if (path === "/mcp") { + displayPath = `${colors.mcp}/mcp${colors.reset}`; + } else if (path === "/mcp/registry") { + displayPath = `${colors.mcp}/mcp/registry${colors.reset}`; + } + + // Log incoming request + const methodColor = getMethodColor(method); + const arrow = isMcpCall ? "◀" : "←"; + console.log( + `${colors.dim}${arrow}${colors.reset} ${methodColor}${method}${colors.reset} ${displayPath}${mcpInfo ? ` ${mcpInfo}` : ""}`, + ); + + await next(); + + const duration = Date.now() - start; + const status = c.res.status; + const statusColor = getStatusColor(status); + const durationStr = + duration < 1000 ? `${duration}ms` : `${(duration / 1000).toFixed(1)}s`; + const outArrow = isMcpCall ? "▶" : "→"; + + console.log( + `${colors.dim}${outArrow}${colors.reset} ${methodColor}${method}${colors.reset} ${displayPath}${mcpInfo ? ` ${mcpInfo}` : ""} ${statusColor}${status}${colors.reset} ${colors.duration}${durationStr}${colors.reset}`, + ); + }); // Log response body for 5xx errors app.use("*", async (c, next) => { @@ -400,6 +583,35 @@ export function createApp(options: CreateAppOptions = {}) { console.log("[EventBus] Worker started"); }); + // Auto-start connections specified in AUTO_START_CONNECTIONS env var + // Format: comma-separated connection titles, e.g. "Bridge,Pilot" + const autoStartConnections = process.env.AUTO_START_CONNECTIONS; + if (autoStartConnections) { + const connectionTitles = autoStartConnections + .split(",") + .map((s) => s.trim()) + .filter(Boolean); + if (connectionTitles.length > 0) { + console.log( + `[AutoStart] Will start connections: ${connectionTitles.join(", ")}`, + ); + // Wait for pool reset to complete before starting new connections + // This prevents race conditions where old processes haven't been killed yet + // Also add a small delay to let the app fully initialize + (async () => { + try { + // Wait for pool reset to finish (kills old STDIO processes) + await poolResetPromise; + // Small delay to ensure ports are released + await new Promise((resolve) => setTimeout(resolve, 500)); + await autoStartConnectionsByTitle(database, connectionTitles); + } catch (error) { + console.error("[AutoStart] Failed:", error); + } + })(); + } + } + // Inject MeshContext into requests // Skip auth routes, static files, health check, and metrics - they don't need MeshContext app.use("*", async (c, next) => { diff --git a/apps/mesh/src/api/routes/proxy.ts b/apps/mesh/src/api/routes/proxy.ts index d7f9ae8147..69324c5e66 100644 --- a/apps/mesh/src/api/routes/proxy.ts +++ b/apps/mesh/src/api/routes/proxy.ts @@ -221,8 +221,9 @@ async function createMCPProxyDoNotUseDirectly( connection.configuration_scopes, ); - // Issue short-lived JWT with configuration permissions - // JWT can be decoded directly by downstream to access payload + // Issue JWT with configuration permissions + // HTTP connections get 5-min tokens, STDIO connections get infinite tokens + // STDIO servers persist tokens locally to .env for restart survival const userId = ctx.auth.user?.id ?? ctx.auth.apiKey?.userId; if (!userId) { console.error("User ID required to issue configuration token"); @@ -230,17 +231,24 @@ async function createMCPProxyDoNotUseDirectly( } try { - configurationToken = await issueMeshToken({ - sub: userId, - user: { id: userId }, - metadata: { - state: connection.configuration_state ?? undefined, - meshUrl: process.env.MESH_URL ?? ctx.baseUrl, - connectionId, - organizationId: ctx.organization?.id, + // STDIO connections get infinite tokens - they persist them locally to .env + // This avoids the need to re-send ON_MCP_CONFIGURATION on every request + const isStdioConnection = connection.connection_type === "STDIO"; + + configurationToken = await issueMeshToken( + { + sub: userId, + user: { id: userId }, + metadata: { + state: connection.configuration_state ?? undefined, + meshUrl: process.env.MESH_URL ?? ctx.baseUrl, + connectionId, + organizationId: ctx.organization?.id, + }, + permissions, }, - permissions, - }); + { noExpiration: isStdioConnection }, + ); } catch (error) { console.error("Failed to issue configuration token:", error); // Continue without configuration token - downstream will fail if it requires it @@ -279,6 +287,34 @@ async function createMCPProxyDoNotUseDirectly( ? (connection.connection_headers as HttpConnectionParameters | null) : null; + // Build env vars for STDIO connections (token + state passed via env) + const buildStdioEnv = async (): Promise> => { + await ensureConfigurationToken(); + const meshUrl = process.env.MESH_URL ?? ctx.baseUrl; + + const env: Record = {}; + + // Pass mesh credentials via env vars - STDIO servers just read these + if (configurationToken) { + env.MESH_TOKEN = configurationToken; + } + if (meshUrl) { + env.MESH_URL = meshUrl; + } + + // Pass the connection ID so STDIO servers can identify themselves + // (needed for event bus subscriptions via gateway) + env.MESH_CONNECTION_ID = connectionId; + + // Pass state as JSON for bindings + const state = connection.configuration_state; + if (state && Object.keys(state).length > 0) { + env.MESH_STATE = JSON.stringify(state); + } + + return env; + }; + // Create client factory for downstream MCP based on connection_type const createClient = async () => { switch (connection.connection_type) { @@ -297,16 +333,22 @@ async function createMCPProxyDoNotUseDirectly( throw new Error("STDIO connection missing parameters"); } + // Build env with mesh credentials - STDIO servers read MESH_TOKEN/MESH_URL/MESH_STATE + const meshEnv = await buildStdioEnv(); + const env = { ...stdioParams.envVars, ...meshEnv }; + // Get or create stable connection - respawns automatically if closed // We want stable local MCP connection - don't spawn new process per request - return getStableStdioClient({ + const client = await getStableStdioClient({ id: connectionId, name: connection.title, command: stdioParams.command, args: stdioParams.args, - env: stdioParams.envVars, + env, cwd: stdioParams.cwd, }); + + return client; } case "HTTP": @@ -703,6 +745,21 @@ async function createMCPProxyDoNotUseDirectly( getPrompt, }, callStreamableTool, + /** + * Get the configuration token for this proxy. + * This is the JWT that downstream MCPs can use to call back to Mesh. + * Useful for STDIO connections that can't receive headers per-request. + */ + getConfigurationToken: async (): Promise => { + await ensureConfigurationToken(); + return configurationToken; + }, + /** + * Get the Mesh URL that downstream MCPs should call back to. + */ + getMeshUrl: (): string => { + return process.env.MESH_URL ?? ctx.baseUrl; + }, }; } diff --git a/apps/mesh/src/auth/jwt.test.ts b/apps/mesh/src/auth/jwt.test.ts index 20170f6064..c199bd29ec 100644 --- a/apps/mesh/src/auth/jwt.test.ts +++ b/apps/mesh/src/auth/jwt.test.ts @@ -78,7 +78,7 @@ describe("JWT Utility Functions", () => { }, }; - const token = await issueMeshToken(payload, "1h"); + const token = await issueMeshToken(payload, { expiresIn: "1h" }); const decoded = decodeMeshToken(token); // Expiration should be ~1 hour from now @@ -343,7 +343,7 @@ describe("Token Expiration", () => { }; // Issue token with very short expiration - const token = await issueMeshToken(payload, "1s"); + const token = await issueMeshToken(payload, { expiresIn: "1s" }); // Token should be valid immediately const validResult = await verifyMeshToken(token); @@ -369,7 +369,7 @@ describe("Token Expiration", () => { }, }; - const token = await issueMeshToken(payload, "1s"); + const token = await issueMeshToken(payload, { expiresIn: "1s" }); // Wait for expiration await new Promise((resolve) => setTimeout(resolve, 1500)); diff --git a/apps/mesh/src/auth/jwt.ts b/apps/mesh/src/auth/jwt.ts index a1c60ae808..a4eab43e5e 100644 --- a/apps/mesh/src/auth/jwt.ts +++ b/apps/mesh/src/auth/jwt.ts @@ -6,39 +6,51 @@ * - Verified by downstream services using the shared secret * * The secret is loaded from MESH_JWT_SECRET environment variable. - * If not set, a random secret is generated (not persistent across restarts). + * If not set, a random secret is generated (persisted via globalThis for HMR). */ import { decodeJwt, type JWTPayload, jwtVerify, SignJWT } from "jose"; import { randomBytes } from "crypto"; import { authConfig } from "./index"; -// JWT signing secret - loaded from env or generated -let jwtSecret: Uint8Array | null = null; +// Use globalThis to persist JWT secret across HMR (hot module reload) +// This prevents tokens from becoming invalid during development +const JWT_SECRET_KEY = "__mesh_jwt_secret__"; + +declare global { + var __mesh_jwt_secret__: Uint8Array | undefined; +} /** * Get or generate the JWT signing secret + * Uses globalThis to survive HMR reloads in development */ function getSecret(): Uint8Array { - if (jwtSecret) { - return jwtSecret; + // Check globalThis first (survives HMR) + if (globalThis[JWT_SECRET_KEY]) { + return globalThis[JWT_SECRET_KEY]; } const envSecret = process.env.MESH_JWT_SECRET ?? authConfig.jwt?.secret ?? process.env.BETTER_AUTH_SECRET; + + let secret: Uint8Array; if (envSecret) { - jwtSecret = new TextEncoder().encode(envSecret); + secret = new TextEncoder().encode(envSecret); } else { - // Generate a random secret - note: not persistent across restarts + // Generate a random secret - note: not persistent across full restarts + // but will survive HMR reloads via globalThis console.warn( - "MESH_JWT_SECRET not set - generating random secret (not persistent)", + "MESH_JWT_SECRET not set - generating random secret (survives HMR, not full restart)", ); - jwtSecret = new Uint8Array(randomBytes(32)); + secret = new Uint8Array(randomBytes(32)); } - return jwtSecret; + // Store in globalThis to survive HMR + globalThis[JWT_SECRET_KEY] = secret; + return secret; } /** @@ -66,24 +78,37 @@ export interface MeshTokenPayload { export type MeshJwtPayload = JWTPayload & MeshTokenPayload; +export interface IssueMeshTokenOptions { + /** Expiration time (default: "5m"). Ignored if noExpiration is true. */ + expiresIn?: string; + /** If true, issues a token with no expiration. Use for STDIO connections. */ + noExpiration?: boolean; +} + /** * Issue a signed JWT with mesh token payload * * @param payload - The token payload - * @param expiresIn - Expiration time (default: 5 minutes) + * @param options - Token options (expiresIn, noExpiration) * @returns Signed JWT string */ export async function issueMeshToken( payload: MeshTokenPayload, - expiresIn: string = "5m", + options: IssueMeshTokenOptions = {}, ): Promise { + const { expiresIn = "5m", noExpiration = false } = options; const secret = getSecret(); - return await new SignJWT(payload as unknown as JWTPayload) + const jwt = new SignJWT(payload as unknown as JWTPayload) .setProtectedHeader({ alg: "HS256", typ: "JWT" }) - .setIssuedAt() - .setExpirationTime(expiresIn) - .sign(secret); + .setIssuedAt(); + + // STDIO connections get infinite tokens - they persist them locally + if (!noExpiration) { + jwt.setExpirationTime(expiresIn); + } + + return await jwt.sign(secret); } /** diff --git a/apps/mesh/src/core/context-factory.ts b/apps/mesh/src/core/context-factory.ts index 70dd052f97..88b01bca54 100644 --- a/apps/mesh/src/core/context-factory.ts +++ b/apps/mesh/src/core/context-factory.ts @@ -593,14 +593,29 @@ async function authenticateRequest( // Context Factory // ============================================================================ -let createContextFn: (req?: Request) => Promise; +// Use globalThis to persist context factory across HMR (hot module reload) +// This prevents race conditions where ContextFactory.create() is called +// before ContextFactory.set() during module reinitialization +const CONTEXT_FACTORY_KEY = "__mesh_context_factory__"; + +declare global { + var __mesh_context_factory__: + | ((req?: Request) => Promise) + | undefined; +} export const ContextFactory = { set: (fn: (req?: Request) => Promise) => { - createContextFn = fn; + globalThis[CONTEXT_FACTORY_KEY] = fn; }, create: async (req?: Request) => { - return await createContextFn(req); + const createFn = globalThis[CONTEXT_FACTORY_KEY]; + if (!createFn) { + throw new Error( + "ContextFactory not initialized. This usually happens during HMR - the app is still initializing.", + ); + } + return await createFn(req); }, }; diff --git a/apps/mesh/src/event-bus/event-bus.ts b/apps/mesh/src/event-bus/event-bus.ts index ff1aeede2a..15d0138702 100644 --- a/apps/mesh/src/event-bus/event-bus.ts +++ b/apps/mesh/src/event-bus/event-bus.ts @@ -105,6 +105,17 @@ export class EventBus implements IEventBus { // Find matching subscriptions and create delivery records const subscriptions = await this.storage.getMatchingSubscriptions(event); + + console.log( + `[EventBus] Event ${event.type} from ${event.source}: ${subscriptions.length} matching subscriptions`, + ); + if (subscriptions.length > 0) { + console.log( + `[EventBus] Subscribers:`, + subscriptions.map((s) => s.connectionId).join(", "), + ); + } + if (subscriptions.length > 0) { // Determine when to deliver: // - deliverAt: use specified time @@ -121,6 +132,9 @@ export class EventBus implements IEventBus { // Only notify strategy for immediate delivery (no scheduled time and no cron) // Scheduled events will be picked up by the polling worker at the right time if (this.notifyStrategy && !deliverAt) { + console.log( + `[EventBus] Triggering immediate processing for ${event.type}`, + ); await this.notifyStrategy.notify(eventId).catch((error) => { console.warn("[EventBus] Notify failed (non-critical):", error); }); diff --git a/apps/mesh/src/event-bus/worker.ts b/apps/mesh/src/event-bus/worker.ts index b808c2e8ef..61f2dd676c 100644 --- a/apps/mesh/src/event-bus/worker.ts +++ b/apps/mesh/src/event-bus/worker.ts @@ -109,6 +109,7 @@ export class EventBusWorker { private notifySubscriber: NotifySubscriberFn; private running = false; private processing = false; + private pendingProcessRequest = false; private config: Required; constructor( @@ -160,10 +161,15 @@ export class EventBusWorker { * Called by the NotifyStrategy when events are available */ async processNow(): Promise { - if (!this.running) return; + if (!this.running) { + return; + } - // Prevent concurrent processing - if (this.processing) return; + // If already processing, mark that we need another run after current finishes + if (this.processing) { + this.pendingProcessRequest = true; + return; + } this.processing = true; try { @@ -172,6 +178,13 @@ export class EventBusWorker { console.error("[EventBus] Error processing events:", error); } finally { this.processing = false; + + // If processNow was called while we were processing, run again + if (this.pendingProcessRequest) { + this.pendingProcessRequest = false; + // Use setImmediate to avoid stack overflow on rapid fire events + setImmediate(() => this.processNow()); + } } } @@ -186,6 +199,8 @@ export class EventBusWorker { ); if (pendingDeliveries.length === 0) return; + console.log(`[EventBus] Processing ${pendingDeliveries.length} deliveries`); + // Group by subscription (connection) const grouped = groupByConnection(pendingDeliveries); @@ -194,12 +209,21 @@ export class EventBusWorker { for (const [subscriptionId, batch] of grouped) { try { + console.log( + `[EventBus] Delivering ${batch.events.length} events to ${batch.connectionId}`, + ); + // Call ON_EVENTS on the subscriber connection const result = await this.notifySubscriber( batch.connectionId, batch.events, ); + console.log( + `[EventBus] Delivery result for ${batch.connectionId}:`, + JSON.stringify(result).slice(0, 200), + ); + // Check if per-event results were provided if (result.results && Object.keys(result.results).length > 0) { // Per-event mode: process each event individually diff --git a/apps/mesh/src/stdio/stable-transport.ts b/apps/mesh/src/stdio/stable-transport.ts index 66cddbeb49..82d17db740 100644 --- a/apps/mesh/src/stdio/stable-transport.ts +++ b/apps/mesh/src/stdio/stable-transport.ts @@ -40,6 +40,8 @@ interface StableConnection { config: StableStdioConfig; status: "connecting" | "connected" | "reconnecting" | "failed"; connectPromise: Promise | null; + /** Process ID for killing the process tree on cleanup */ + pid?: number; } /** @@ -94,9 +96,18 @@ export async function getStableStdioClient( ): Promise { const existing = connectionPool.get(config.id); - // If we have an existing connection that's connected, return the stable wrapper + // If we have an existing connection that's connected, verify it's still alive if (existing?.status === "connected" && existing.stableClient) { - return existing.stableClient; + try { + // Quick ping to verify connection is alive (listTools has low overhead) + await existing.stableClient.listTools(); + return existing.stableClient; + } catch { + // Connection is dead, mark for respawn + console.log(`[StableStdio] Stale connection detected: ${config.id}`); + existing.status = "failed"; + existing.connectPromise = null; + } } // If we're already connecting/reconnecting, wait for that @@ -195,7 +206,17 @@ export async function getStableStdioClient( } connection.status = "connected"; - console.log(`[StableStdio] Connected: ${config.id}`); + + // Capture PID for process tree cleanup during shutdown + // The MCP SDK stores the spawned process in _process (private but accessible) + const transportProcess = ( + transport as unknown as { _process?: { pid?: number } } + )._process; + connection.pid = transportProcess?.pid; + + console.log( + `[StableStdio] Connected: ${config.id} (PID: ${connection.pid ?? "unknown"})`, + ); // Return the stable wrapper (close() is disabled) return connection.stableClient; @@ -218,6 +239,53 @@ export async function getStableStdioClient( return connection.connectPromise; } +/** + * Kill a process tree (parent and all children) + * This is needed because `bun --watch` spawns child processes + * that don't get killed when the parent receives SIGTERM + */ +async function killProcessTree(pid: number): Promise { + try { + // First, find all child processes + const { spawn } = await import("child_process"); + + // Use pgrep to find children (works on macOS and Linux) + const pgrep = spawn("pgrep", ["-P", String(pid)]); + const childPids: number[] = []; + + pgrep.stdout?.on("data", (data: Buffer) => { + const pids = data + .toString() + .trim() + .split("\n") + .filter(Boolean) + .map(Number); + childPids.push(...pids); + }); + + await new Promise((resolve) => pgrep.on("close", resolve)); + + // Recursively kill children first + for (const childPid of childPids) { + await killProcessTree(childPid); + } + + // Kill the process itself with SIGKILL + try { + process.kill(pid, "SIGKILL"); + } catch { + // Process might already be dead + } + } catch { + // Fallback: just try to kill the PID directly + try { + process.kill(pid, "SIGKILL"); + } catch { + // Process might already be dead + } + } +} + /** * Force close a stable stdio connection * Used for explicit shutdown (e.g., server shutdown) @@ -233,9 +301,25 @@ async function forceCloseStdioConnection(id: string): Promise { if (connection.client) { connection.client.onclose = undefined; } - await connection.client?.close(); - } catch { - // Ignore close errors + + // Use the PID we captured when the connection was created + const pid = connection.pid; + + // First, try graceful close + try { + await connection.client?.close(); + } catch { + // Ignore close errors + } + + // Then, kill the entire process tree to ensure children are dead + // This is important for `bun --watch` which spawns child processes + if (pid) { + console.log(`[StableStdio] Killing process tree for PID ${pid}`); + await killProcessTree(pid); + } + } catch (error) { + console.error(`[StableStdio] Error closing connection ${id}:`, error); } connectionPool.delete(id); @@ -254,6 +338,79 @@ async function forceCloseAllStdioConnections(): Promise { await Promise.allSettled(closePromises); connectionPool.clear(); + + // Small delay to ensure OS releases ports after processes are killed + await new Promise((resolve) => setTimeout(resolve, 100)); +} + +/** + * Kill orphaned STDIO processes that might be left from previous Mesh instances. + * This handles cases where the connection pool is empty but old processes are still running. + */ +async function killOrphanedStdioProcesses(): Promise { + const { exec } = await import("child_process"); + const { promisify } = await import("util"); + const execAsync = promisify(exec); + + // Patterns for processes spawned by Mesh that might be orphaned + // These are common command patterns for STDIO MCPs + const patterns = [ + "mesh-bridge.*server", // Mesh bridge server + "pilot.*server/main", // Pilot server + ]; + + for (const pattern of patterns) { + try { + // Use pkill with -f to match full command line + // -9 for SIGKILL to ensure termination + await execAsync(`pkill -9 -f "${pattern}" 2>/dev/null || true`); + } catch { + // Ignore errors - process might not exist + } + } + + // Also kill anything listening on port 9999 (Bridge WebSocket) + try { + const { stdout } = await execAsync(`lsof -t -i:9999 2>/dev/null || true`); + const pids = stdout.trim().split("\n").filter(Boolean); + for (const pid of pids) { + try { + process.kill(Number(pid), "SIGKILL"); + console.log( + `[StableStdio] Killed orphaned process on port 9999: PID ${pid}`, + ); + } catch { + // Process might already be dead + } + } + } catch { + // Ignore errors + } +} + +/** + * Force close all connections and clear the pool + * Used on app startup/HMR to ensure fresh processes with new credentials + */ +export async function resetStdioConnectionPool(): Promise { + console.log( + `[StableStdio] Reset requested. Pool size: ${connectionPool.size}, keys: [${Array.from(connectionPool.keys()).join(", ")}]`, + ); + + // First, close connections we know about in the pool + if (connectionPool.size > 0) { + console.log( + `[StableStdio] Resetting ${connectionPool.size} connections (killing processes)`, + ); + await forceCloseAllStdioConnections(); + console.log( + `[StableStdio] Reset complete. Pool size: ${connectionPool.size}`, + ); + } + + // Then, kill any orphaned processes that might be left from previous runs + // (handles case where pool was empty but old processes are still running) + await killOrphanedStdioProcesses(); } // Register shutdown handlers - clean up connections before exit diff --git a/apps/mesh/src/tools/connection/fetch-tools.ts b/apps/mesh/src/tools/connection/fetch-tools.ts index e92f95e84f..3bfe9c8e0e 100644 --- a/apps/mesh/src/tools/connection/fetch-tools.ts +++ b/apps/mesh/src/tools/connection/fetch-tools.ts @@ -164,8 +164,18 @@ async function fetchToolsFromStdioMCP( setTimeout(() => reject(new Error("Tool fetch timeout")), 10_000); }); + console.log( + `[STDIO tool fetch] Connecting to ${connection.id}: ${stdioParams.command} ${stdioParams.args?.join(" ")}`, + ); + await Promise.race([client.connect(transport), timeoutPromise]); + console.log(`[STDIO tool fetch] Connected, listing tools...`); + const result = await Promise.race([client.listTools(), timeoutPromise]); + console.log( + `[STDIO tool fetch] Got ${result.tools?.length ?? 0} tools:`, + result.tools?.map((t) => t.name), + ); if (!result.tools || result.tools.length === 0) { return null; @@ -178,10 +188,7 @@ async function fetchToolsFromStdioMCP( outputSchema: tool.outputSchema ?? undefined, })); } catch (error) { - console.error( - `Failed to fetch tools from STDIO connection ${connection.id}:`, - error, - ); + console.error(`[STDIO tool fetch] Failed for ${connection.id}:`, error); return null; } finally { try { diff --git a/apps/mesh/src/tools/connection/update.ts b/apps/mesh/src/tools/connection/update.ts index d8d275f783..2334341eb9 100644 --- a/apps/mesh/src/tools/connection/update.ts +++ b/apps/mesh/src/tools/connection/update.ts @@ -185,11 +185,19 @@ export const COLLECTION_CONNECTIONS_UPDATE = defineTool({ ) { try { const proxy = await ctx.createMCPProxy(id); + // Get configuration token and mesh URL for STDIO connections + // HTTP connections receive these via headers, but STDIO needs them in arguments + const meshToken = await proxy.getConfigurationToken(); + const meshUrl = proxy.getMeshUrl(); + await proxy.client.callTool({ name: "ON_MCP_CONFIGURATION", arguments: { state: finalState, scopes: finalScopes, + // Include mesh context for STDIO connections that can't receive headers + meshToken, + meshUrl, }, }); } catch (error) { diff --git a/apps/mesh/src/tools/eventbus/subscribe.ts b/apps/mesh/src/tools/eventbus/subscribe.ts index 4cc54c27a0..8dd6cec2a0 100644 --- a/apps/mesh/src/tools/eventbus/subscribe.ts +++ b/apps/mesh/src/tools/eventbus/subscribe.ts @@ -22,11 +22,12 @@ export const EVENT_SUBSCRIBE = defineTool({ const organization = requireOrganization(ctx); await ctx.access.check(); - // Get the subscriber connection ID from the caller's token - const connectionId = ctx.connectionId; + // Get the subscriber connection ID + // Use explicit subscriberId if provided (for calls via gateway), otherwise use caller's connection + const connectionId = input.subscriberId || ctx.connectionId; if (!connectionId) { throw new Error( - "Connection ID required to subscribe. Use a connection-scoped token.", + "Connection ID required to subscribe. Use a connection-scoped token or provide subscriberId.", ); } // Create the subscription diff --git a/apps/mesh/src/web/components/details/connection/settings-tab/index.tsx b/apps/mesh/src/web/components/details/connection/settings-tab/index.tsx index 27344eb015..af7cd9223e 100644 --- a/apps/mesh/src/web/components/details/connection/settings-tab/index.tsx +++ b/apps/mesh/src/web/components/details/connection/settings-tab/index.tsx @@ -18,10 +18,10 @@ import { useToolCall } from "@/web/hooks/use-tool-call"; import { authenticateMcp } from "@/web/lib/mcp-oauth"; import { KEYS } from "@/web/lib/query-keys"; import { Button } from "@deco/ui/components/button.tsx"; -import { Key01, File06, Loading01 } from "@untitledui/icons"; +import { Key01, File06, Loading01, RefreshCw01 } from "@untitledui/icons"; import { zodResolver } from "@hookform/resolvers/zod"; import { useQueryClient } from "@tanstack/react-query"; -import { Suspense } from "react"; +import { Suspense, useState } from "react"; import { useForm } from "react-hook-form"; import { toast } from "sonner"; import { ViewActions } from "../../layout"; @@ -429,6 +429,7 @@ function SettingsTabContentImpl(props: SettingsTabContentImplProps) { const connectionActions = useConnectionActions(); const queryClient = useQueryClient(); + const [isRefreshing, setIsRefreshing] = useState(false); // Check if connection has README const repository = connection?.metadata?.repository as @@ -484,9 +485,37 @@ function SettingsTabContentImpl(props: SettingsTabContentImplProps) { toast.success("Authentication successful"); }; + const handleRefreshTools = async () => { + setIsRefreshing(true); + try { + // Trigger an update with no changes to force tool refresh + // Note: connectionActions.update handles success/error toasts via onSuccess/onError + await connectionActions.update.mutateAsync({ + id: connection.id, + data: {}, + }); + } finally { + setIsRefreshing(false); + } + }; + return ( <> + {hasAnyChanges && (