diff --git a/package-lock.json b/package-lock.json index d551aa61d..30398ae7d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,6 +27,7 @@ "devDependencies": { "@cfworker/json-schema": "^4.1.1", "@eslint/js": "^9.39.1", + "@hono/node-server": "^1.19.6", "@types/content-type": "^1.1.8", "@types/cors": "^2.8.17", "@types/cross-spawn": "^6.0.6", @@ -40,6 +41,7 @@ "eslint": "^9.8.0", "eslint-config-prettier": "^10.1.8", "eslint-plugin-n": "^17.23.1", + "hono": "^4.10.7", "prettier": "3.6.2", "supertest": "^7.0.0", "tsx": "^4.16.5", @@ -640,6 +642,19 @@ "node": "^18.18.0 || ^20.9.0 || >=21.1.0" } }, + "node_modules/@hono/node-server": { + "version": "1.19.6", + "resolved": "https://registry.npmjs.org/@hono/node-server/-/node-server-1.19.6.tgz", + "integrity": "sha512-Shz/KjlIeAhfiuE93NDKVdZ7HdBVLQAfdbaXEaoAVO3ic9ibRSLGIQGkcBbFyuLr+7/1D5ZCINM8B+6IvXeMtw==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=18.14.1" + }, + "peerDependencies": { + "hono": "^4" + } + }, "node_modules/@humanfs/core": { "version": "0.19.0", "resolved": "https://registry.npmjs.org/@humanfs/core/-/core-0.19.0.tgz", @@ -1319,6 +1334,7 @@ "resolved": "https://registry.npmjs.org/@typescript-eslint/parser/-/parser-8.11.0.tgz", "integrity": "sha512-lmt73NeHdy1Q/2ul295Qy3uninSqi6wQI18XwSpm8w0ZbQXUpjCAWP1Vlv/obudoBiIjJVjlztjQ+d/Md98Yxg==", "dev": true, + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.11.0", "@typescript-eslint/types": "8.11.0", @@ -1718,6 +1734,7 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.0.tgz", "integrity": "sha512-cl669nCJTZBsL97OF4kUQm5g5hC2uihk0NxY3WENAC0TYdILVkAyHymAntgxGkl7K+t0cXIrH5siy5S4XkFycA==", "dev": true, + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2286,6 +2303,7 @@ "resolved": "https://registry.npmjs.org/eslint/-/eslint-9.13.0.tgz", "integrity": "sha512-EYZK6SX6zjFHST/HRytOdA/zE72Cq/bfw45LSyuwrdvcclb/gqV8RRQxywOBEWO2+WDpva6UZa4CcDeJKzUCFA==", "dev": true, + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.2.0", "@eslint-community/regexpp": "^4.11.0", @@ -3115,6 +3133,17 @@ "node": ">= 0.4" } }, + "node_modules/hono": { + "version": "4.10.7", + "resolved": "https://registry.npmjs.org/hono/-/hono-4.10.7.tgz", + "integrity": "sha512-icXIITfw/07Q88nLSkB9aiUrd8rYzSweK681Kjo/TSggaGbOX4RRyxxm71v+3PC8C/j+4rlxGeoTRxQDkaJkUw==", + "dev": true, + "license": "MIT", + "peer": true, + "engines": { + "node": ">=16.9.0" + } + }, "node_modules/http-errors": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.0.tgz", @@ -4272,6 +4301,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -4363,6 +4393,7 @@ "integrity": "sha512-4H8vUNGNjQ4V2EOoGw005+c+dGuPSnhpPBPHBtsZdGZBk/iJb4kguGlPWaZTZ3q5nMtFOEsY0nRDlh9PJyd6SQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "~0.25.0", "get-tsconfig": "^4.7.5" @@ -4408,6 +4439,7 @@ "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.3.tgz", "integrity": "sha512-hjcS1mhfuyi4WW8IWtjP7brDrG2cuDZukyrYrSauoXGNgx0S7zceP07adYkJycEr56BOUTNPzbInooiN3fn1qw==", "dev": true, + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -4609,6 +4641,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -4622,6 +4655,7 @@ "integrity": "sha512-BxAKBWmIbrDgrokdGZH1IgkIk/5mMHDreLDmCJ0qpyJaAteP8NvMhkwr/ZCQNqNH97bw/dANTE9PDzqwJghfMQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -4774,6 +4808,7 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", + "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index 521985df1..c6456dfeb 100644 --- a/package.json +++ b/package.json @@ -115,6 +115,8 @@ }, "devDependencies": { "@cfworker/json-schema": "^4.1.1", + "@hono/node-server": "^1.19.6", + "hono": "^4.10.7", "@eslint/js": "^9.39.1", "@types/content-type": "^1.1.8", "@types/cors": "^2.8.17", diff --git a/src/examples/server/expressFetchStreamableHttp.ts b/src/examples/server/expressFetchStreamableHttp.ts new file mode 100644 index 000000000..ed4dd461b --- /dev/null +++ b/src/examples/server/expressFetchStreamableHttp.ts @@ -0,0 +1,333 @@ +/** + * Example MCP Server using Express with FetchStreamableHTTPServerTransport + * + * This example demonstrates how to use the experimental FetchStreamableHTTPServerTransport + * with Express by converting between Node.js HTTP and Web Standard Request/Response. + * + * The FetchStreamableHTTPServerTransport uses Web Standard APIs, so we need adapter + * functions to convert Express's req/res to Web Standard Request/Response. + * + * To run this example: + * npx tsx src/examples/server/expressFetchStreamableHttp.ts + * + * Then test with curl: + * # Initialize + * curl -X POST http://localhost:3000/mcp \ + * -H "Content-Type: application/json" \ + * -H "Accept: application/json, text/event-stream" \ + * -d '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"test","version":"1.0"},"capabilities":{}},"id":1}' + */ + +import express from 'express'; +import cors from 'cors'; +import { IncomingMessage, ServerResponse } from 'node:http'; +import { McpServer } from '../../server/mcp.js'; +import { FetchStreamableHTTPServerTransport } from '../../experimental/fetch-streamable-http/index.js'; +import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types.js'; +import { z } from 'zod'; + +// Create the Express app +const app = express(); + +// Store active transports by session ID for session management +const transports = new Map(); + +/** + * Converts a Node.js IncomingMessage to a Web Standard Request + */ +async function nodeRequestToWebRequest(req: IncomingMessage, baseUrl: string): Promise { + const url = new URL(req.url ?? '/', baseUrl); + const headers = new Headers(); + + for (const [key, value] of Object.entries(req.headers)) { + if (value) { + if (Array.isArray(value)) { + value.forEach(v => headers.append(key, v)); + } else { + headers.set(key, value); + } + } + } + + // For requests with body (POST), we need to read the body + let body: string | null = null; + if (req.method === 'POST') { + body = await new Promise((resolve, reject) => { + let data = ''; + req.on('data', chunk => { + data += chunk; + }); + req.on('end', () => resolve(data)); + req.on('error', reject); + }); + } + + return new Request(url.toString(), { + method: req.method, + headers, + body: body, + // @ts-expect-error duplex is required for streams but not in types + duplex: 'half' + }); +} + +/** + * Converts a Web Standard Response to a Node.js ServerResponse + */ +async function webResponseToNodeResponse(webResponse: Response, res: ServerResponse): Promise { + // Set status code + res.statusCode = webResponse.status; + + // Copy headers + webResponse.headers.forEach((value, key) => { + res.setHeader(key, value); + }); + + // Handle streaming response (SSE) + if (webResponse.body) { + const reader = webResponse.body.getReader(); + const decoder = new TextDecoder(); + + // For SSE, we need to flush headers immediately + if (webResponse.headers.get('content-type') === 'text/event-stream') { + res.flushHeaders(); + } + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value, { stream: true }); + res.write(chunk); + + // Flush for SSE to ensure real-time delivery + if (typeof (res as NodeJS.WritableStream & { flush?: () => void }).flush === 'function') { + (res as NodeJS.WritableStream & { flush?: () => void }).flush!(); + } + } + } catch { + // Client disconnected or stream error + } finally { + res.end(); + } + } else { + res.end(); + } +} + +/** + * Creates and configures an MCP server with example tools, resources, and prompts + */ +function createMcpServer(): McpServer { + const server = new McpServer( + { + name: 'express-fetch-streamable-http-server', + version: '1.0.0' + }, + { capabilities: { logging: {} } } + ); + + // Register a simple tool + server.registerTool( + 'greet', + { + description: 'Greets someone by name', + inputSchema: { + name: z.string().describe('The name to greet') + } + }, + async ({ name }): Promise => { + return { + content: [ + { + type: 'text', + text: `Hello, ${name}! Welcome to the Express MCP server.` + } + ] + }; + } + ); + + // Register a calculator tool + server.registerTool( + 'calculate', + { + description: 'Performs a simple calculation', + inputSchema: { + operation: z.enum(['add', 'subtract', 'multiply', 'divide']).describe('The operation to perform'), + a: z.number().describe('First operand'), + b: z.number().describe('Second operand') + } + }, + async ({ operation, a, b }): Promise => { + let result: number; + switch (operation) { + case 'add': + result = a + b; + break; + case 'subtract': + result = a - b; + break; + case 'multiply': + result = a * b; + break; + case 'divide': + if (b === 0) { + return { + content: [{ type: 'text', text: 'Error: Division by zero' }], + isError: true + }; + } + result = a / b; + break; + } + return { + content: [ + { + type: 'text', + text: `${a} ${operation} ${b} = ${result}` + } + ] + }; + } + ); + + // Register a prompt + server.registerPrompt( + 'code-review', + { + description: 'A prompt template for code review', + argsSchema: { + language: z.string().describe('Programming language'), + code: z.string().describe('Code to review') + } + }, + async ({ language, code }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please review the following ${language} code:\n\n\`\`\`${language}\n${code}\n\`\`\`` + } + } + ] + }; + } + ); + + // Register a resource + server.registerResource( + 'server-info', + 'mcp://server/info', + { + description: 'Information about this MCP server', + mimeType: 'application/json' + }, + async (): Promise => { + return { + contents: [ + { + uri: 'mcp://server/info', + mimeType: 'application/json', + text: JSON.stringify( + { + name: 'express-fetch-streamable-http-server', + version: '1.0.0', + runtime: 'Node.js', + framework: 'Express', + transport: 'FetchStreamableHTTPServerTransport', + timestamp: new Date().toISOString() + }, + null, + 2 + ) + } + ] + }; + } + ); + + return server; +} + +// Configure CORS middleware +app.use( + cors({ + origin: '*', + methods: ['GET', 'POST', 'DELETE', 'OPTIONS'], + allowedHeaders: ['Content-Type', 'Accept', 'mcp-session-id', 'last-event-id', 'mcp-protocol-version'], + exposedHeaders: ['mcp-session-id'] + }) +); + +// MCP endpoint - handles all methods +app.all('/mcp', async (req, res) => { + const baseUrl = `http://${req.headers.host}`; + + // Check for existing session + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + if (sessionId && transports.has(sessionId)) { + // Reuse existing transport for this session + const transport = transports.get(sessionId)!; + const webRequest = await nodeRequestToWebRequest(req, baseUrl); + const webResponse = await transport.handleRequest(webRequest); + await webResponseToNodeResponse(webResponse, res); + return; + } + + // For new sessions or initialization, create new transport and server + const server = createMcpServer(); + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessioninitialized: sessionId => { + // Store the transport for session reuse + transports.set(sessionId, transport); + console.log(`Session initialized: ${sessionId}`); + }, + onsessionclosed: sessionId => { + // Clean up when session closes + transports.delete(sessionId); + console.log(`Session closed: ${sessionId}`); + } + }); + + await server.connect(transport); + + const webRequest = await nodeRequestToWebRequest(req, baseUrl); + const webResponse = await transport.handleRequest(webRequest); + await webResponseToNodeResponse(webResponse, res); +}); + +// Health check endpoint +app.get('/health', (_req, res) => { + res.json({ + status: 'healthy', + activeSessions: transports.size, + timestamp: new Date().toISOString() + }); +}); + +// Start the server +const PORT = 3000; +app.listen(PORT, () => { + console.log(`MCP server running at http://localhost:${PORT}/mcp`); +}); + +// Handle graceful shutdown +process.on('SIGINT', async () => { + console.log('\nShutting down server...'); + + // Close all active transports + for (const [sessionId, transport] of transports) { + console.log(`Closing session: ${sessionId}`); + await transport.close(); + } + transports.clear(); + + console.log('Server stopped.'); + process.exit(0); +}); diff --git a/src/examples/server/honoFetchStreamableHttp.ts b/src/examples/server/honoFetchStreamableHttp.ts new file mode 100644 index 000000000..00a8b6e16 --- /dev/null +++ b/src/examples/server/honoFetchStreamableHttp.ts @@ -0,0 +1,311 @@ +/** + * Example MCP Server using Hono.js with FetchStreamableHTTPServerTransport + * + * This example demonstrates how to use the experimental FetchStreamableHTTPServerTransport + * with Hono.js to create an MCP server that uses Web Standard APIs. + * + * The FetchStreamableHTTPServerTransport uses Web Standard Request/Response objects, + * making it compatible with various runtimes like Cloudflare Workers, Deno, Bun, etc. + * This example runs on Node.js using @hono/node-server. + * + * To run this example: + * npx tsx src/examples/server/honoFetchStreamableHttp.ts + * + * Then test with curl: + * # Initialize + * curl -X POST http://localhost:3000/mcp \ + * -H "Content-Type: application/json" \ + * -H "Accept: application/json, text/event-stream" \ + * -d '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"test","version":"1.0"},"capabilities":{}},"id":1}' + * + * # List tools (use session ID from init response) + * curl -X POST http://localhost:3000/mcp \ + * -H "Content-Type: application/json" \ + * -H "Accept: application/json, text/event-stream" \ + * -H "mcp-session-id: " \ + * -d '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":2}' + */ + +import { Hono } from 'hono'; +import { cors } from 'hono/cors'; +import { serve } from '@hono/node-server'; +import { McpServer } from '../../server/mcp.js'; +import { FetchStreamableHTTPServerTransport } from '../../experimental/fetch-streamable-http/index.js'; +import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types.js'; +import { z } from 'zod'; + +// Create the Hono app +const app = new Hono(); + +// Store active transports by session ID for session management +const transports = new Map(); + +/** + * Creates and configures an MCP server with example tools, resources, and prompts + */ +function createMcpServer(): McpServer { + const server = new McpServer( + { + name: 'hono-fetch-streamable-http-server', + version: '1.0.0' + }, + { capabilities: { logging: {} } } + ); + + // Register a simple tool using the new registerTool API + server.registerTool( + 'greet', + { + description: 'Greets someone by name', + inputSchema: { + name: z.string().describe('The name to greet') + } + }, + async ({ name }): Promise => { + return { + content: [ + { + type: 'text', + text: `Hello, ${name}! Welcome to the Hono MCP server.` + } + ] + }; + } + ); + + // Register a tool that demonstrates async operations + server.registerTool( + 'calculate', + { + description: 'Performs a simple calculation', + inputSchema: { + operation: z.enum(['add', 'subtract', 'multiply', 'divide']).describe('The operation to perform'), + a: z.number().describe('First operand'), + b: z.number().describe('Second operand') + } + }, + async ({ operation, a, b }): Promise => { + let result: number; + switch (operation) { + case 'add': + result = a + b; + break; + case 'subtract': + result = a - b; + break; + case 'multiply': + result = a * b; + break; + case 'divide': + if (b === 0) { + return { + content: [{ type: 'text', text: 'Error: Division by zero' }], + isError: true + }; + } + result = a / b; + break; + } + return { + content: [ + { + type: 'text', + text: `${a} ${operation} ${b} = ${result}` + } + ] + }; + } + ); + + // Register a tool that sends notifications (demonstrates SSE streaming) + server.registerTool( + 'send-notifications', + { + description: 'Sends a series of notifications to demonstrate SSE streaming', + inputSchema: { + count: z.number().min(1).max(10).default(3).describe('Number of notifications to send'), + interval: z.number().min(100).max(2000).default(500).describe('Interval between notifications in ms') + } + }, + async ({ count, interval }, extra): Promise => { + const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + + for (let i = 1; i <= count; i++) { + await server.sendLoggingMessage( + { + level: 'info', + data: `Notification ${i} of ${count} at ${new Date().toISOString()}` + }, + extra.sessionId + ); + if (i < count) { + await sleep(interval); + } + } + + return { + content: [ + { + type: 'text', + text: `Sent ${count} notifications with ${interval}ms interval` + } + ] + }; + } + ); + + // Register a simple prompt using the new registerPrompt API + server.registerPrompt( + 'code-review', + { + description: 'A prompt template for code review', + argsSchema: { + language: z.string().describe('Programming language'), + code: z.string().describe('Code to review') + } + }, + async ({ language, code }): Promise => { + return { + messages: [ + { + role: 'user', + content: { + type: 'text', + text: `Please review the following ${language} code and provide feedback on: +1. Code quality and best practices +2. Potential bugs or issues +3. Performance considerations +4. Suggestions for improvement + +Code: +\`\`\`${language} +${code} +\`\`\`` + } + } + ] + }; + } + ); + + // Register a simple resource using the new registerResource API + server.registerResource( + 'server-info', + 'mcp://server/info', + { + description: 'Information about this MCP server', + mimeType: 'application/json' + }, + async (): Promise => { + return { + contents: [ + { + uri: 'mcp://server/info', + mimeType: 'application/json', + text: JSON.stringify( + { + name: 'hono-fetch-streamable-http-server', + version: '1.0.0', + runtime: 'Node.js', + framework: 'Hono', + transport: 'FetchStreamableHTTPServerTransport', + timestamp: new Date().toISOString() + }, + null, + 2 + ) + } + ] + }; + } + ); + + return server; +} + +// Configure CORS middleware for all routes +app.use( + '*', + cors({ + origin: '*', + allowMethods: ['GET', 'POST', 'DELETE', 'OPTIONS'], + allowHeaders: ['Content-Type', 'Accept', 'mcp-session-id', 'last-event-id', 'mcp-protocol-version'], + exposeHeaders: ['mcp-session-id'] + }) +); + +// Example auth middleware (uncomment to enable authentication): +// app.use('/mcp', async (c, next) => { +// const token = c.req.header('Authorization')?.replace('Bearer ', ''); +// if (token) { +// // Validate token and set auth info in context +// c.set('auth', { token, clientId: 'example-client' }); +// } +// await next(); +// }); + +app.all('/mcp', async c => { + // Check for existing session + const sessionId = c.req.header('mcp-session-id'); + + if (sessionId && transports.has(sessionId)) { + // Reuse existing transport for this session + const transport = transports.get(sessionId)!; + // Pass auth from context if using auth middleware: { auth: c.get('auth') } + return transport.handleRequest(c.req.raw); + } + + // For new sessions or initialization, create new transport and server + const server = createMcpServer(); + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessioninitialized: sessionId => { + // Store the transport for session reuse + transports.set(sessionId, transport); + console.log(`Session initialized: ${sessionId}`); + }, + onsessionclosed: sessionId => { + // Clean up when session closes + transports.delete(sessionId); + console.log(`Session closed: ${sessionId}`); + } + }); + + await server.connect(transport); + + // Pass auth from context if using auth middleware: { auth: c.get('auth') } + return transport.handleRequest(c.req.raw); +}); + +// Health check endpoint +app.get('/health', c => { + return c.json({ + status: 'healthy', + activeSessions: transports.size, + timestamp: new Date().toISOString() + }); +}); + +// Start the server +const PORT = 3000; +console.log(`MCP server running at http://localhost:${PORT}/mcp`); + +serve({ + fetch: app.fetch, + port: PORT +}); + +// Handle graceful shutdown +process.on('SIGINT', async () => { + console.log('\nShutting down server...'); + + // Close all active transports + for (const [sessionId, transport] of transports) { + console.log(`Closing session: ${sessionId}`); + await transport.close(); + } + transports.clear(); + + console.log('Server stopped.'); + process.exit(0); +}); diff --git a/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.test.ts b/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.test.ts new file mode 100644 index 000000000..fc949412e --- /dev/null +++ b/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.test.ts @@ -0,0 +1,1554 @@ +/** + * Tests for FetchStreamableHTTPServerTransport + * + * These tests use native Web Standard Request/Response objects directly, + * without spinning up HTTP servers. This makes tests faster and simpler. + */ + +import { + FetchStreamableHTTPServerTransport, + EventStore, + EventId, + StreamId, + SessionStore, + SessionState +} from './fetchStreamableHttpServerTransport.js'; +import { McpServer } from '../../server/mcp.js'; +import { CallToolResult, JSONRPCMessage } from '../../types.js'; +import { zodTestMatrix, type ZodMatrixEntry } from '../../__fixtures__/zodTestMatrix.js'; + +/** + * Test transport configuration + */ +interface TestTransportConfig { + sessionIdGenerator: (() => string) | undefined; + enableJsonResponse?: boolean; + eventStore?: EventStore; + sessionStore?: SessionStore; + onsessioninitialized?: (sessionId: string) => void | Promise; + onsessionclosed?: (sessionId: string) => void | Promise; + retryInterval?: number; + allowedHosts?: string[]; + allowedOrigins?: string[]; + enableDnsRebindingProtection?: boolean; +} + +/** + * Common test messages + */ +const TEST_MESSAGES = { + initialize: { + jsonrpc: '2.0', + method: 'initialize', + params: { + clientInfo: { name: 'test-client', version: '1.0' }, + protocolVersion: '2025-03-26', + capabilities: {} + }, + id: 'init-1' + } as JSONRPCMessage, + + toolsList: { + jsonrpc: '2.0', + method: 'tools/list', + params: {}, + id: 'tools-1' + } as JSONRPCMessage +}; + +/** + * Creates a POST request for the transport + */ +function createPostRequest(message: JSONRPCMessage | JSONRPCMessage[], sessionId?: string, extraHeaders?: Record): Request { + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000' + }; + + if (sessionId) { + headers['mcp-session-id'] = sessionId; + headers['mcp-protocol-version'] = '2025-03-26'; + } + + // Apply extraHeaders LAST to allow overriding defaults + if (extraHeaders) { + Object.assign(headers, extraHeaders); + } + + return new Request('http://localhost:3000/mcp', { + method: 'POST', + headers, + body: JSON.stringify(message) + }); +} + +/** + * Creates a GET request for SSE stream + */ +function createGetRequest(sessionId: string, extraHeaders?: Record): Request { + const headers: Record = { + Accept: 'text/event-stream', + Host: 'localhost:3000', + 'mcp-session-id': sessionId, + ...extraHeaders + }; + + return new Request('http://localhost:3000/mcp', { + method: 'GET', + headers + }); +} + +/** + * Creates a DELETE request + */ +function createDeleteRequest(sessionId: string, extraHeaders?: Record): Request { + const headers: Record = { + Host: 'localhost:3000', + 'mcp-session-id': sessionId, + ...extraHeaders + }; + + return new Request('http://localhost:3000/mcp', { + method: 'DELETE', + headers + }); +} + +/** + * Helper to read first SSE event from response + */ +async function readSSEEvent(response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) throw new Error('No response body'); + const { value } = await reader.read(); + return new TextDecoder().decode(value); +} + +/** + * Helper to read all SSE events from response until done + */ +async function readAllSSEEvents(response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) return []; + + const events: string[] = []; + const decoder = new TextDecoder(); + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + events.push(decoder.decode(value, { stream: true })); + } + + return events; +} + +function expectErrorResponse(data: unknown, expectedCode: number, expectedMessagePattern: RegExp): void { + expect(data).toMatchObject({ + jsonrpc: '2.0', + error: expect.objectContaining({ + code: expectedCode, + message: expect.stringMatching(expectedMessagePattern) + }) + }); +} + +describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { + const { z } = entry; + + /** + * Helper to create transport with connected MCP server + */ + async function createTestTransport(config: TestTransportConfig = { sessionIdGenerator: () => crypto.randomUUID() }): Promise<{ + transport: FetchStreamableHTTPServerTransport; + mcpServer: McpServer; + }> { + const mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.tool( + 'greet', + 'A simple greeting tool', + { name: z.string().describe('Name to greet') }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: config.sessionIdGenerator, + enableJsonResponse: config.enableJsonResponse ?? false, + eventStore: config.eventStore, + sessionStore: config.sessionStore, + onsessioninitialized: config.onsessioninitialized, + onsessionclosed: config.onsessionclosed, + retryInterval: config.retryInterval, + allowedHosts: config.allowedHosts, + allowedOrigins: config.allowedOrigins, + enableDnsRebindingProtection: config.enableDnsRebindingProtection + }); + + await mcpServer.connect(transport); + + return { transport, mcpServer }; + } + + describe('FetchStreamableHTTPServerTransport', () => { + let mcpServer: McpServer; + let transport: FetchStreamableHTTPServerTransport; + let sessionId: string; + + beforeEach(async () => { + const result = await createTestTransport(); + transport = result.transport; + mcpServer = result.mcpServer; + }); + + afterEach(async () => { + await transport.close(); + }); + + async function initializeSession(): Promise { + const request = createPostRequest(TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + const newSessionId = response.headers.get('mcp-session-id'); + expect(newSessionId).toBeDefined(); + return newSessionId as string; + } + + it('should initialize server and generate session ID', async () => { + sessionId = await initializeSession(); + expect(sessionId).toBeDefined(); + }); + + it('should reject second initialization request', async () => { + sessionId = await initializeSession(); + + // Try to initialize again + const request = createPostRequest(TEST_MESSAGES.initialize, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error.message).toContain('already initialized'); + }); + + it('should reject batch initialize request', async () => { + const request = createPostRequest([TEST_MESSAGES.initialize, TEST_MESSAGES.toolsList]); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error.message).toContain('Only one initialization request'); + }); + + it('should handle post requests via sse response correctly', async () => { + sessionId = await initializeSession(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + + const text = await readSSEEvent(response); + expect(text).toContain('data:'); + expect(text).toContain('"tools"'); + }); + + it('should call a tool and return the result', async () => { + sessionId = await initializeSession(); + + const toolCallMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { + name: 'greet', + arguments: { name: 'World' } + }, + id: 'call-1' + }; + + const request = createPostRequest(toolCallMessage, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + const eventLines = text.split('\n'); + const dataLine = eventLines.find(line => line.startsWith('data:')); + expect(dataLine).toBeDefined(); + + const eventData = JSON.parse(dataLine!.substring(5)); + expect(eventData).toMatchObject({ + jsonrpc: '2.0', + result: { + content: [ + { + type: 'text', + text: 'Hello, World!' + } + ] + }, + id: 'call-1' + }); + }); + + it('should pass request info to tool callback', async () => { + // Create a new transport with a tool that captures request info + let capturedHeaders: Record | undefined; + + const customMcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + customMcpServer.tool( + 'capture-headers', + 'Captures request headers', + {}, + async (_args, { requestInfo }): Promise => { + capturedHeaders = requestInfo?.headers; + return { content: [{ type: 'text', text: 'captured' }] }; + } + ); + + const customTransport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID() + }); + await customMcpServer.connect(customTransport); + + // Initialize + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await customTransport.handleRequest(initRequest); + const customSessionId = initResponse.headers.get('mcp-session-id')!; + + // Call the tool with custom headers + const toolCallMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'capture-headers', arguments: {} }, + id: 'call-1' + }; + + const request = createPostRequest(toolCallMessage, customSessionId, { 'x-custom-header': 'test-value' }); + const response = await customTransport.handleRequest(request); + + // Wait for the tool to execute by reading the SSE response + await readSSEEvent(response); + + expect(capturedHeaders).toBeDefined(); + expect(capturedHeaders!['x-custom-header']).toBe('test-value'); + + await customTransport.close(); + }); + + it('should reject requests without a valid session ID', async () => { + sessionId = await initializeSession(); + + // Make request without session ID + const request = createPostRequest(TEST_MESSAGES.toolsList); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error.message).toContain('Mcp-Session-Id header is required'); + }); + + it('should reject invalid session ID', async () => { + sessionId = await initializeSession(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, 'invalid-session-id'); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + const body = await response.json(); + expect(body.error.message).toBe('Session not found'); + }); + + it('should establish standalone SSE stream and receive server-initiated messages', async () => { + sessionId = await initializeSession(); + + const request = createGetRequest(sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + }); + + it('should not close GET SSE stream after sending multiple server notifications', async () => { + sessionId = await initializeSession(); + + const request = createGetRequest(sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + + // Send multiple notifications + await mcpServer.sendLoggingMessage({ level: 'info', data: 'test1' }); + await mcpServer.sendLoggingMessage({ level: 'info', data: 'test2' }); + + // Stream should still be open (readable) + expect(response.body).not.toBeNull(); + }); + + it('should reject second SSE stream for the same session', async () => { + sessionId = await initializeSession(); + + // First SSE stream + const request1 = createGetRequest(sessionId); + const response1 = await transport.handleRequest(request1); + expect(response1.status).toBe(200); + + // Second SSE stream should fail + const request2 = createGetRequest(sessionId); + const response2 = await transport.handleRequest(request2); + expect(response2.status).toBe(409); + }); + + it('should reject GET requests without Accept: text/event-stream header', async () => { + sessionId = await initializeSession(); + + const request = new Request('http://localhost:3000/mcp', { + method: 'GET', + headers: { + Host: 'localhost:3000', + 'mcp-session-id': sessionId, + Accept: 'application/json' + } + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(406); + }); + + it('should reject POST requests without proper Accept header', async () => { + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Host: 'localhost:3000', + Accept: 'application/json' // Missing text/event-stream + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(406); + }); + + it('should reject unsupported Content-Type', async () => { + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'text/plain', + Host: 'localhost:3000', + Accept: 'application/json, text/event-stream' + }, + body: 'not json' + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(415); + }); + + it('should handle JSON-RPC batch notification messages with 202 response', async () => { + sessionId = await initializeSession(); + + const notifications: JSONRPCMessage[] = [ + { jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: '1', reason: 'test' } }, + { jsonrpc: '2.0', method: 'notifications/cancelled', params: { requestId: '2', reason: 'test' } } + ]; + + const request = createPostRequest(notifications, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(202); + }); + + it('should handle batch request messages with SSE stream for responses', async () => { + sessionId = await initializeSession(); + + const batch: JSONRPCMessage[] = [ + { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'batch-1' }, + { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'batch-2' } + ]; + + const request = createPostRequest(batch, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('text/event-stream'); + }); + + it('should properly handle invalid JSON data', async () => { + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Host: 'localhost:3000', + Accept: 'application/json, text/event-stream' + }, + body: 'not valid json' + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(400); + + const body = await response.json(); + expect(body.error.message).toContain('Parse error'); + }); + + it('should return 400 error for invalid JSON-RPC messages', async () => { + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Host: 'localhost:3000', + Accept: 'application/json, text/event-stream' + }, + body: JSON.stringify({ invalid: 'message' }) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(400); + }); + + it('should reject requests to uninitialized server', async () => { + // Create fresh transport without initializing + const { transport: freshTransport } = await createTestTransport(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, 'some-session-id'); + const response = await freshTransport.handleRequest(request); + + // Server returns 400 when not initialized (in-memory mode without sessionStore) + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error.message).toContain('Server not initialized'); + await freshTransport.close(); + }); + + it('should send response messages to the connection that sent the request', async () => { + sessionId = await initializeSession(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + expect(text).toContain('"tools"'); + }); + + it('should keep stream open after sending server notifications', async () => { + sessionId = await initializeSession(); + + const request = createGetRequest(sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + // Send a notification + await mcpServer.sendLoggingMessage({ level: 'info', data: 'test notification' }); + + // Stream should still be readable + expect(response.body?.locked).toBe(false); + }); + + it('should properly handle DELETE requests and close session', async () => { + sessionId = await initializeSession(); + + const request = createDeleteRequest(sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + }); + + it('should reject DELETE requests with invalid session ID', async () => { + sessionId = await initializeSession(); + + const request = createDeleteRequest('invalid-session-id'); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + }); + + describe('protocol version header validation', () => { + it('should accept requests with matching protocol version', async () => { + sessionId = await initializeSession(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId, { + 'mcp-protocol-version': '2025-03-26' + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + }); + + it('should accept requests without protocol version header', async () => { + sessionId = await initializeSession(); + + // Create request without mcp-protocol-version header + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000', + 'mcp-session-id': sessionId + }, + body: JSON.stringify(TEST_MESSAGES.toolsList) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + }); + + it('should reject requests with unsupported protocol version', async () => { + sessionId = await initializeSession(); + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId, { + 'mcp-protocol-version': '9999-99-99' + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(400); + const body = await response.json(); + expectErrorResponse(body, -32000, /Unsupported protocol version/); + }); + + it('should accept when protocol version differs from negotiated version', async () => { + sessionId = await initializeSession(); + + // Use a different but supported version + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId, { + 'mcp-protocol-version': '2024-11-05' + }); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + }); + + it('should handle protocol version validation for GET requests', async () => { + sessionId = await initializeSession(); + + const request = new Request('http://localhost:3000/mcp', { + method: 'GET', + headers: { + Accept: 'text/event-stream', + Host: 'localhost:3000', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '9999-99-99' + } + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(400); + }); + + it('should handle protocol version validation for DELETE requests', async () => { + sessionId = await initializeSession(); + + const request = new Request('http://localhost:3000/mcp', { + method: 'DELETE', + headers: { + Host: 'localhost:3000', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '9999-99-99' + } + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(400); + }); + }); + }); + + // Test JSON Response Mode + describe('FetchStreamableHTTPServerTransport with JSON Response Mode', () => { + let transport: FetchStreamableHTTPServerTransport; + let sessionId: string; + + beforeEach(async () => { + const result = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + enableJsonResponse: true + }); + transport = result.transport; + + // Initialize and get session ID + const request = createPostRequest(TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + sessionId = response.headers.get('mcp-session-id')!; + }); + + afterEach(async () => { + await transport.close(); + }); + + it('should return JSON response for a single request', async () => { + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('application/json'); + + const body = await response.json(); + expect(body).toMatchObject({ + jsonrpc: '2.0', + result: { tools: expect.any(Array) }, + id: 'tools-1' + }); + }); + + it('should return JSON response for batch requests', async () => { + const batch: JSONRPCMessage[] = [ + { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'batch-1' }, + { jsonrpc: '2.0', method: 'tools/list', params: {}, id: 'batch-2' } + ]; + + const request = createPostRequest(batch, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toBe('application/json'); + + const body = await response.json(); + expect(Array.isArray(body)).toBe(true); + expect(body).toHaveLength(2); + }); + }); + + // Test stateless mode + describe('FetchStreamableHTTPServerTransport in stateless mode', () => { + let transport: FetchStreamableHTTPServerTransport; + + beforeEach(async () => { + const result = await createTestTransport({ sessionIdGenerator: undefined }); + transport = result.transport; + }); + + afterEach(async () => { + await transport.close(); + }); + + it('should operate without session ID validation', async () => { + // Initialize without session ID generator + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + + expect(initResponse.status).toBe(200); + expect(initResponse.headers.get('mcp-session-id')).toBeNull(); + + // Subsequent requests should work without session ID + const request = createPostRequest(TEST_MESSAGES.toolsList); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + }); + + it('should handle POST requests with various session IDs in stateless mode', async () => { + // Initialize + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + await transport.handleRequest(initRequest); + + // Request with random session ID should work + const request = createPostRequest(TEST_MESSAGES.toolsList, 'any-random-id'); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + }); + + it('should reject second SSE stream even in stateless mode', async () => { + // Initialize + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + await transport.handleRequest(initRequest); + + // First SSE stream + const request1 = createGetRequest('any-session'); + const response1 = await transport.handleRequest(request1); + expect(response1.status).toBe(200); + + // Second SSE stream should still fail (transport limitation) + const request2 = createGetRequest('any-session'); + const response2 = await transport.handleRequest(request2); + expect(response2.status).toBe(409); + }); + }); + + // Test resumability with EventStore + describe('FetchStreamableHTTPServerTransport with resumability', () => { + let transport: FetchStreamableHTTPServerTransport; + let mcpServer: McpServer; + let eventStore: EventStore; + let sessionId: string; + + beforeEach(async () => { + // Create a simple in-memory event store + const storedEvents = new Map>(); + + eventStore = { + storeEvent: async (streamId: StreamId, message: JSONRPCMessage): Promise => { + const events = storedEvents.get(streamId) || []; + const eventId = `event-${events.length + 1}` as EventId; + events.push({ id: eventId, message }); + storedEvents.set(streamId, events); + return eventId; + }, + replayEventsAfter: async (lastEventId: EventId, { send }): Promise => { + // Find the stream that has this eventId + for (const [streamId, events] of storedEvents) { + let replay = false; + for (const event of events) { + if (replay) { + await send(event.id, event.message); + } + if (event.id === lastEventId) { + replay = true; + } + } + if (replay) { + return streamId; + } + } + return 'unknown-stream' as StreamId; + } + }; + + const result = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + eventStore + }); + transport = result.transport; + mcpServer = result.mcpServer; + + // Initialize + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + sessionId = initResponse.headers.get('mcp-session-id')!; + }); + + afterEach(async () => { + await transport.close(); + }); + + it('should store and include event IDs in server SSE messages', async () => { + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + // Should have an id: line for the event + expect(text).toMatch(/id:/); + }); + + it('should store and replay MCP server tool notifications', async () => { + // Open SSE stream + const sseRequest = createGetRequest(sessionId); + const sseResponse = await transport.handleRequest(sseRequest); + expect(sseResponse.status).toBe(200); + + // Send a notification + await mcpServer.sendLoggingMessage({ level: 'info', data: 'test notification' }); + + // The event should be stored - we can verify by checking the stream has data + const reader = sseResponse.body?.getReader(); + if (reader) { + const { value } = await reader.read(); + const text = new TextDecoder().decode(value); + expect(text).toContain('id:'); + } + }); + }); + + // Test POST SSE priming events + describe('FetchStreamableHTTPServerTransport POST SSE priming events', () => { + it('should send priming event with retry field on POST SSE stream', async () => { + // Priming events require an eventStore to be configured + const eventStore: EventStore = { + storeEvent: async () => 'event-1' as EventId, + replayEventsAfter: async () => 'stream-1' as StreamId + }; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + retryInterval: 5000, + eventStore + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + expect(text).toContain('retry: 5000'); + + await transport.close(); + }); + + it('should send priming event without retry field when retryInterval is not configured', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID() + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const request = createPostRequest(TEST_MESSAGES.toolsList, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + const text = await readSSEEvent(response); + expect(text).not.toContain('retry:'); + + await transport.close(); + }); + + it('should close POST SSE stream when extra.closeSSEStream is called', async () => { + // Create a simple event store to enable closeSSEStream + const eventStore: EventStore = { + storeEvent: async () => 'event-1' as EventId, + replayEventsAfter: async () => 'stream-1' as StreamId + }; + + const mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.tool('close-stream', 'Closes the SSE stream', {}, async (_args, extra): Promise => { + // Call closeSSEStream after a short delay + setTimeout(() => { + extra.closeSSEStream?.(); + }, 50); + return { content: [{ type: 'text', text: 'closing' }] }; + }); + + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + eventStore + }); + await mcpServer.connect(transport); + + // Initialize + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + // Call the tool + const toolCallMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'close-stream', arguments: {} }, + id: 'call-1' + }; + + const request = createPostRequest(toolCallMessage, sessionId); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + // Wait for close to be called and stream to end + await new Promise(resolve => setTimeout(resolve, 100)); + + await transport.close(); + }); + + it('should provide closeSSEStream callback in extra when eventStore is configured', async () => { + const eventStore: EventStore = { + storeEvent: async () => 'event-1' as EventId, + replayEventsAfter: async () => 'stream-1' as StreamId + }; + + const mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + let hasCloseSSEStream = false; + + mcpServer.tool('check-callback', 'Checks for closeSSEStream callback', {}, async (_args, extra): Promise => { + hasCloseSSEStream = typeof extra.closeSSEStream === 'function'; + return { content: [{ type: 'text', text: 'checked' }] }; + }); + + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + eventStore + }); + await mcpServer.connect(transport); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const toolCallMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'check-callback', arguments: {} }, + id: 'call-1' + }; + + const request = createPostRequest(toolCallMessage, sessionId); + const response = await transport.handleRequest(request); + + // Wait for the tool to execute by reading all SSE events + // (with eventStore, a priming event is sent first, then the tool response) + await readAllSSEEvents(response); + + expect(hasCloseSSEStream).toBe(true); + + await transport.close(); + }); + + it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => { + const mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + let hasCloseSSEStream = false; + + mcpServer.tool('check-callback', 'Checks for closeSSEStream callback', {}, async (_args, extra): Promise => { + hasCloseSSEStream = typeof extra.closeSSEStream === 'function'; + return { content: [{ type: 'text', text: 'checked' }] }; + }); + + const transport = new FetchStreamableHTTPServerTransport({ + sessionIdGenerator: () => crypto.randomUUID() + // No eventStore + }); + await mcpServer.connect(transport); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const toolCallMessage: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'check-callback', arguments: {} }, + id: 'call-1' + }; + + const request = createPostRequest(toolCallMessage, sessionId); + await transport.handleRequest(request); + + expect(hasCloseSSEStream).toBe(false); + + await transport.close(); + }); + }); + + // Test onsessionclosed callback + describe('FetchStreamableHTTPServerTransport onsessionclosed callback', () => { + it('should call onsessionclosed callback when session is closed via DELETE', async () => { + let closedSessionId: string | undefined; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessionclosed: id => { + closedSessionId = id; + } + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const deleteRequest = createDeleteRequest(sessionId); + await transport.handleRequest(deleteRequest); + + expect(closedSessionId).toBe(sessionId); + + await transport.close(); + }); + + it('should not call onsessionclosed callback when not provided', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID() + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const deleteRequest = createDeleteRequest(sessionId); + const response = await transport.handleRequest(deleteRequest); + + expect(response.status).toBe(200); + + await transport.close(); + }); + + it('should not call onsessionclosed callback for invalid session DELETE', async () => { + let callbackCalled = false; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessionclosed: () => { + callbackCalled = true; + } + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + await transport.handleRequest(initRequest); + + const deleteRequest = createDeleteRequest('invalid-session'); + await transport.handleRequest(deleteRequest); + + expect(callbackCalled).toBe(false); + + await transport.close(); + }); + }); + + // Test async callbacks + describe('FetchStreamableHTTPServerTransport async callbacks', () => { + it('should support async onsessioninitialized callback', async () => { + let initializedSessionId: string | undefined; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessioninitialized: async id => { + await new Promise(resolve => setTimeout(resolve, 50)); + initializedSessionId = id; + } + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + expect(initializedSessionId).toBe(sessionId); + + await transport.close(); + }); + + it('should support sync onsessioninitialized callback (backwards compatibility)', async () => { + let initializedSessionId: string | undefined; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessioninitialized: id => { + initializedSessionId = id; + } + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + expect(initializedSessionId).toBe(sessionId); + + await transport.close(); + }); + + it('should support async onsessionclosed callback', async () => { + let closedSessionId: string | undefined; + + const { transport } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), + onsessionclosed: async id => { + await new Promise(resolve => setTimeout(resolve, 50)); + closedSessionId = id; + } + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + const sessionId = initResponse.headers.get('mcp-session-id')!; + + const deleteRequest = createDeleteRequest(sessionId); + await transport.handleRequest(deleteRequest); + + expect(closedSessionId).toBe(sessionId); + + await transport.close(); + }); + }); + + // Test DNS rebinding protection + describe('FetchStreamableHTTPServerTransport DNS rebinding protection', () => { + describe('Host header validation', () => { + it('should accept requests with allowed host headers', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedHosts: ['localhost:3000'], + enableDnsRebindingProtection: true + }); + + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + + await transport.close(); + }); + + it('should reject requests with disallowed host headers', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedHosts: ['localhost:3000'], + enableDnsRebindingProtection: true + }); + + const request = new Request('http://evil.com/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'evil.com' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(403); + + const body = await response.json(); + expect(body.error.message).toContain('Invalid Host header'); + + await transport.close(); + }); + + it('should reject GET requests with disallowed host headers', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedHosts: ['localhost:3000'], + enableDnsRebindingProtection: true + }); + + // First initialize + const initRequest = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + await transport.handleRequest(initRequest); + + // Then try GET with evil host + const request = new Request('http://evil.com/mcp', { + method: 'GET', + headers: { + Accept: 'text/event-stream', + Host: 'evil.com' + } + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(403); + + await transport.close(); + }); + }); + + describe('Origin header validation', () => { + it('should accept requests with allowed origin headers', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedOrigins: ['http://localhost:3000'], + enableDnsRebindingProtection: true + }); + + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000', + Origin: 'http://localhost:3000' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + + await transport.close(); + }); + + it('should reject requests with disallowed origin headers', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedOrigins: ['http://localhost:3000'], + enableDnsRebindingProtection: true + }); + + const request = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000', + Origin: 'http://evil.com' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(403); + + const body = await response.json(); + expect(body.error.message).toContain('Invalid Origin header'); + + await transport.close(); + }); + }); + + describe('enableDnsRebindingProtection option', () => { + it('should skip all validations when enableDnsRebindingProtection is false', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedHosts: ['localhost:3000'], + allowedOrigins: ['http://localhost:3000'], + enableDnsRebindingProtection: false + }); + + const request = new Request('http://evil.com/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'evil.com', + Origin: 'http://evil.com' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response = await transport.handleRequest(request); + expect(response.status).toBe(200); + + await transport.close(); + }); + }); + + describe('Combined validations', () => { + it('should validate both host and origin when both are configured', async () => { + const { transport } = await createTestTransport({ + sessionIdGenerator: undefined, + allowedHosts: ['localhost:3000'], + allowedOrigins: ['http://localhost:3000'], + enableDnsRebindingProtection: true + }); + + // Test with invalid origin + const request1 = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000', + Origin: 'http://evil.com' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response1 = await transport.handleRequest(request1); + expect(response1.status).toBe(403); + const body1 = await response1.json(); + expect(body1.error.message).toBe('Invalid Origin header: http://evil.com'); + + // Test with valid origin + const request2 = new Request('http://localhost:3000/mcp', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + Host: 'localhost:3000', + Origin: 'http://localhost:3000' + }, + body: JSON.stringify(TEST_MESSAGES.initialize) + }); + + const response2 = await transport.handleRequest(request2); + expect(response2.status).toBe(200); + + await transport.close(); + }); + }); + }); + + /** + * Tests for SessionStore functionality (distributed/serverless mode) + */ + describe('FetchStreamableHTTPServerTransport with SessionStore', () => { + /** + * Creates an in-memory session store for testing + */ + function createInMemorySessionStore(): SessionStore & { sessions: Map } { + const sessions = new Map(); + return { + sessions, + get: async (sessionId: string) => sessions.get(sessionId), + save: async (sessionId: string, state: SessionState) => { + sessions.set(sessionId, state); + }, + delete: async (sessionId: string) => { + sessions.delete(sessionId); + } + }; + } + + it('should save session state to store on initialization', async () => { + const sessionStore = createInMemorySessionStore(); + const { transport } = await createTestTransport({ + sessionIdGenerator: () => 'test-session-123', + sessionStore + }); + + const request = createPostRequest(TEST_MESSAGES.initialize); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(200); + + // Verify session was saved to store + const savedSession = await sessionStore.get('test-session-123'); + expect(savedSession).toBeDefined(); + expect(savedSession?.initialized).toBe(true); + expect(savedSession?.protocolVersion).toBeDefined(); + expect(savedSession?.createdAt).toBeGreaterThan(0); + + await transport.close(); + }); + + it('should validate session from store for subsequent requests', async () => { + const sessionStore = createInMemorySessionStore(); + const { transport } = await createTestTransport({ + sessionIdGenerator: () => 'test-session-456', + sessionStore + }); + + // Initialize the session + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + expect(initResponse.status).toBe(200); + const sessionId = initResponse.headers.get('mcp-session-id'); + + // Make a subsequent request with valid session ID + const listRequest = createPostRequest(TEST_MESSAGES.toolsList, sessionId!); + const listResponse = await transport.handleRequest(listRequest); + expect(listResponse.status).toBe(200); + + await transport.close(); + }); + + it('should reject requests with invalid session ID when using session store', async () => { + const sessionStore = createInMemorySessionStore(); + const { transport } = await createTestTransport({ + sessionIdGenerator: () => 'test-session-789', + sessionStore + }); + + // Initialize the session first + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + expect(initResponse.status).toBe(200); + + // Try to make a request with invalid session ID + const request = createPostRequest(TEST_MESSAGES.toolsList, 'invalid-session-id'); + const response = await transport.handleRequest(request); + + expect(response.status).toBe(404); + const body = await response.json(); + expect(body.error.message).toBe('Session not found'); + + await transport.close(); + }); + + it('should delete session from store on DELETE request', async () => { + const sessionStore = createInMemorySessionStore(); + const { transport } = await createTestTransport({ + sessionIdGenerator: () => 'test-session-delete', + sessionStore + }); + + // Initialize the session + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + expect(initResponse.status).toBe(200); + const sessionId = initResponse.headers.get('mcp-session-id'); + + // Verify session exists in store + expect(await sessionStore.get(sessionId!)).toBeDefined(); + + // Delete the session + const deleteRequest = createDeleteRequest(sessionId!); + const deleteResponse = await transport.handleRequest(deleteRequest); + expect(deleteResponse.status).toBe(200); + + // Verify session was deleted from store + expect(await sessionStore.get(sessionId!)).toBeUndefined(); + + await transport.close(); + }); + + it('should allow new transport instances to validate existing sessions (serverless mode)', async () => { + // This test simulates serverless behavior where each request + // is handled by a fresh transport instance + const sessionStore = createInMemorySessionStore(); + + // First, initialize using one transport instance + const { transport: transport1 } = await createTestTransport({ + sessionIdGenerator: () => 'serverless-session-123', + sessionStore + }); + + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport1.handleRequest(initRequest); + expect(initResponse.status).toBe(200); + const sessionId = initResponse.headers.get('mcp-session-id'); + + // Close the first transport + await transport1.close(); + + // Create a NEW transport instance with same sessionStore (simulates new serverless invocation) + const { transport: transport2 } = await createTestTransport({ + sessionIdGenerator: () => crypto.randomUUID(), // Different generator, doesn't matter + sessionStore // Same session store + }); + + // The new transport should be able to validate the existing session from the store + const listRequest = createPostRequest(TEST_MESSAGES.toolsList, sessionId!); + const listResponse = await transport2.handleRequest(listRequest); + + expect(listResponse.status).toBe(200); + + await transport2.close(); + }); + + it('should work with GET SSE stream when session is hydrated from store', async () => { + const sessionStore = createInMemorySessionStore(); + const { transport } = await createTestTransport({ + sessionIdGenerator: () => 'sse-session-123', + sessionStore + }); + + // Initialize session + const initRequest = createPostRequest(TEST_MESSAGES.initialize); + const initResponse = await transport.handleRequest(initRequest); + expect(initResponse.status).toBe(200); + const sessionId = initResponse.headers.get('mcp-session-id'); + + // Open SSE stream with session ID + const sseRequest = createGetRequest(sessionId!); + const sseResponse = await transport.handleRequest(sseRequest); + + expect(sseResponse.status).toBe(200); + expect(sseResponse.headers.get('content-type')).toBe('text/event-stream'); + + await transport.close(); + }); + }); +}); diff --git a/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.ts b/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.ts new file mode 100644 index 000000000..da9cd7078 --- /dev/null +++ b/src/experimental/fetch-streamable-http/fetchStreamableHttpServerTransport.ts @@ -0,0 +1,1037 @@ +/** + * Web Standards Streamable HTTP Server Transport + * + * This is an experimental transport that implements the MCP Streamable HTTP specification + * using Web Standard APIs (Request, Response, TransformStream) instead of Node.js HTTP. + * + * @see https://github.com/modelcontextprotocol/typescript-sdk/issues/260 + * @experimental + */ + +import { Transport } from '../../shared/transport.js'; +import { + MessageExtraInfo, + RequestInfo, + isInitializeRequest, + isJSONRPCError, + isJSONRPCRequest, + isJSONRPCResponse, + JSONRPCMessage, + JSONRPCMessageSchema, + RequestId, + SUPPORTED_PROTOCOL_VERSIONS, + DEFAULT_NEGOTIATED_PROTOCOL_VERSION +} from '../../types.js'; + +export type StreamId = string; +export type EventId = string; + +/** + * Interface for resumability support via event storage + */ +export interface EventStore { + /** + * Stores an event for later retrieval + * @param streamId ID of the stream the event belongs to + * @param message The JSON-RPC message to store + * @returns The generated event ID for the stored event + */ + storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise; + + /** + * Get the stream ID associated with a given event ID. + * @param eventId The event ID to look up + * @returns The stream ID, or undefined if not found + * + * Optional: If not provided, the SDK will use the streamId returned by + * replayEventsAfter for stream mapping. + */ + getStreamIdForEventId?(eventId: EventId): Promise; + + replayEventsAfter( + lastEventId: EventId, + { + send + }: { + send: (eventId: EventId, message: JSONRPCMessage) => Promise; + } + ): Promise; +} + +/** + * Session state that can be persisted externally for serverless deployments. + */ +export interface SessionState { + /** Whether the session has completed initialization */ + initialized: boolean; + /** The negotiated protocol version */ + protocolVersion: string; + /** Timestamp when the session was created */ + createdAt: number; +} + +/** + * Interface for session storage in distributed/serverless deployments. + * + * In serverless environments (Lambda, Vercel, Cloudflare Workers), each request + * may be handled by a different instance with no shared memory. The SessionStore + * allows session state to be persisted externally (e.g., Redis, DynamoDB, KV). + * + * @example + * ```typescript + * // Cloudflare KV implementation + * class KVSessionStore implements SessionStore { + * constructor(private kv: KVNamespace) {} + * + * async get(sessionId: string) { + * return this.kv.get(`session:${sessionId}`, 'json'); + * } + * async save(sessionId: string, state: SessionState) { + * await this.kv.put(`session:${sessionId}`, JSON.stringify(state), { expirationTtl: 3600 }); + * } + * async delete(sessionId: string) { + * await this.kv.delete(`session:${sessionId}`); + * } + * } + * ``` + */ +export interface SessionStore { + /** + * Retrieve session state by ID. + * @param sessionId The session ID to look up + * @returns The session state, or undefined if not found + */ + get(sessionId: string): Promise; + + /** + * Save session state. + * Called when a session is initialized or updated. + * @param sessionId The session ID + * @param state The session state to persist + */ + save(sessionId: string, state: SessionState): Promise; + + /** + * Delete session state. + * Called when a session is explicitly closed via DELETE request. + * @param sessionId The session ID to delete + */ + delete(sessionId: string): Promise; +} + +/** + * Internal stream mapping for managing SSE connections + */ +interface StreamMapping { + /** Stream controller for pushing SSE data - only used with ReadableStream approach */ + controller?: ReadableStreamDefaultController; + /** Text encoder for SSE formatting */ + encoder?: TextEncoder; + /** Promise resolver for JSON response mode */ + resolveJson?: (response: Response) => void; + /** Cleanup function to close stream and remove mapping */ + cleanup: () => void; +} + +/** + * Configuration options for FetchStreamableHTTPServerTransport + */ +export interface FetchStreamableHTTPServerTransportOptions { + /** + * Function that generates a session ID for the transport. + * The session ID SHOULD be globally unique and cryptographically secure (e.g., a securely generated UUID, a JWT, or a cryptographic hash) + * + * Return undefined to disable session management. + */ + sessionIdGenerator: (() => string) | undefined; + + /** + * A callback for session initialization events + * This is called when the server initializes a new session. + * Useful in cases when you need to register multiple mcp sessions + * and need to keep track of them. + * @param sessionId The generated session ID + */ + onsessioninitialized?: (sessionId: string) => void | Promise; + + /** + * A callback for session close events + * This is called when the server closes a session due to a DELETE request. + * Useful in cases when you need to clean up resources associated with the session. + * Note that this is different from the transport closing, if you are handling + * HTTP requests from multiple nodes you might want to close each + * WSStreamableHTTPServerTransport after a request is completed while still keeping the + * session open/running. + * @param sessionId The session ID that was closed + */ + onsessionclosed?: (sessionId: string) => void | Promise; + + /** + * If true, the server will return JSON responses instead of starting an SSE stream. + * This can be useful for simple request/response scenarios without streaming. + * Default is false (SSE streams are preferred). + */ + enableJsonResponse?: boolean; + + /** + * Event store for resumability support + * If provided, resumability will be enabled, allowing clients to reconnect and resume messages + */ + eventStore?: EventStore; + + /** + * List of allowed host header values for DNS rebinding protection. + * If not specified, host validation is disabled. + */ + allowedHosts?: string[]; + + /** + * List of allowed origin header values for DNS rebinding protection. + * If not specified, origin validation is disabled. + */ + allowedOrigins?: string[]; + + /** + * Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured). + * Default is false for backwards compatibility. + */ + enableDnsRebindingProtection?: boolean; + + /** + * Retry interval in milliseconds to suggest to clients in SSE retry field. + * When set, the server will send a retry field in SSE priming events to control + * client reconnection timing for polling behavior. + */ + retryInterval?: number; + + /** + * Session store for distributed/serverless deployments. + * + * When provided, session state will be persisted externally, allowing the transport + * to work across multiple serverless function invocations or instances. + * + * If not provided, session state is kept in-memory (single-instance mode). + * + * @example + * ```typescript + * // Redis session store + * const transport = new FetchStreamableHTTPServerTransport({ + * sessionIdGenerator: () => crypto.randomUUID(), + * sessionStore: { + * get: async (id) => redis.get(`session:${id}`), + * save: async (id, state) => redis.set(`session:${id}`, state, 'EX', 3600), + * delete: async (id) => redis.del(`session:${id}`) + * } + * }); + * ``` + */ + sessionStore?: SessionStore; +} + +/** + * Server transport for Web Standards Streamable HTTP: this implements the MCP Streamable HTTP transport specification + * using Web Standard APIs (Request, Response, TransformStream). + * + * Usage example: + * + * ```typescript + * // Stateful mode - server sets the session ID + * const statefulTransport = new FetchStreamableHTTPServerTransport({ + * sessionIdGenerator: () => crypto.randomUUID(), + * }); + * + * // Stateless mode - explicitly set session ID to undefined + * const statelessTransport = new FetchStreamableHTTPServerTransport({ + * sessionIdGenerator: undefined, + * }); + * + * // Hono.js usage + * app.all('/mcp', async (c) => { + * return transport.handleRequest(c.req.raw); + * }); + * ``` + * + * In stateful mode: + * - Session ID is generated and included in response headers + * - Session ID is always included in initialization responses + * - Requests with invalid session IDs are rejected with 404 Not Found + * - Non-initialization requests without a session ID are rejected with 400 Bad Request + * - State is maintained in-memory (connections, message history) + * + * In stateless mode: + * - No Session ID is included in any responses + * - No session validation is performed + * + * @experimental + */ +export class FetchStreamableHTTPServerTransport implements Transport { + // when sessionId is not set (undefined), it means the transport is in stateless mode + private sessionIdGenerator: (() => string) | undefined; + private _started: boolean = false; + private _streamMapping: Map = new Map(); + private _requestToStreamMapping: Map = new Map(); + private _requestResponseMap: Map = new Map(); + private _initialized: boolean = false; + private _enableJsonResponse: boolean = false; + private _standaloneSseStreamId: string = '_GET_stream'; + private _eventStore?: EventStore; + private _onsessioninitialized?: (sessionId: string) => void | Promise; + private _onsessionclosed?: (sessionId: string) => void | Promise; + private _allowedHosts?: string[]; + private _allowedOrigins?: string[]; + private _enableDnsRebindingProtection: boolean; + private _retryInterval?: number; + private _sessionStore?: SessionStore; + + sessionId?: string; + onclose?: () => void; + onerror?: (error: Error) => void; + onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void; + + constructor(options: FetchStreamableHTTPServerTransportOptions) { + this.sessionIdGenerator = options.sessionIdGenerator; + this._enableJsonResponse = options.enableJsonResponse ?? false; + this._eventStore = options.eventStore; + this._onsessioninitialized = options.onsessioninitialized; + this._onsessionclosed = options.onsessionclosed; + this._allowedHosts = options.allowedHosts; + this._allowedOrigins = options.allowedOrigins; + this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; + this._retryInterval = options.retryInterval; + this._sessionStore = options.sessionStore; + } + + /** + * Starts the transport. This is required by the Transport interface but is a no-op + * for the Streamable HTTP transport as connections are managed per-request. + */ + async start(): Promise { + if (this._started) { + throw new Error('Transport already started'); + } + this._started = true; + } + + /** + * Helper to create a JSON error response + */ + private createJsonErrorResponse(status: number, code: number, message: string, headers?: Record): Response { + return new Response( + JSON.stringify({ + jsonrpc: '2.0', + error: { code, message }, + id: null + }), + { + status, + headers: { + 'Content-Type': 'application/json', + ...headers + } + } + ); + } + + /** + * Validates request headers for DNS rebinding protection. + * @returns Error response if validation fails, undefined if validation passes. + */ + private validateRequestHeaders(req: Request): Response | undefined { + // Skip validation if protection is not enabled + if (!this._enableDnsRebindingProtection) { + return undefined; + } + + // Validate Host header if allowedHosts is configured + if (this._allowedHosts && this._allowedHosts.length > 0) { + const hostHeader = req.headers.get('host'); + if (!hostHeader || !this._allowedHosts.includes(hostHeader)) { + const error = `Invalid Host header: ${hostHeader}`; + this.onerror?.(new Error(error)); + return this.createJsonErrorResponse(403, -32000, error); + } + } + + // Validate Origin header if allowedOrigins is configured + if (this._allowedOrigins && this._allowedOrigins.length > 0) { + const originHeader = req.headers.get('origin'); + if (!originHeader || !this._allowedOrigins.includes(originHeader)) { + const error = `Invalid Origin header: ${originHeader}`; + this.onerror?.(new Error(error)); + return this.createJsonErrorResponse(403, -32000, error); + } + } + + return undefined; + } + + /** + * Handles an incoming HTTP request, whether GET, POST, or DELETE + * Returns a Response object (Web Standard) + */ + async handleRequest(req: Request): Promise { + // Validate request headers for DNS rebinding protection + const validationError = this.validateRequestHeaders(req); + if (validationError) { + return validationError; + } + + switch (req.method) { + case 'POST': + return this.handlePostRequest(req); + case 'GET': + return this.handleGetRequest(req); + case 'DELETE': + return this.handleDeleteRequest(req); + default: + return this.handleUnsupportedRequest(); + } + } + + /** + * Writes a priming event to establish resumption capability. + * Only sends if eventStore is configured (opt-in for resumability). + */ + private async writePrimingEvent( + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + streamId: string + ): Promise { + if (!this._eventStore) { + return; + } + + const primingEventId = await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage); + + let primingEvent = `id: ${primingEventId}\ndata: \n\n`; + if (this._retryInterval !== undefined) { + primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`; + } + controller.enqueue(encoder.encode(primingEvent)); + } + + /** + * Handles GET requests for SSE stream + */ + private async handleGetRequest(req: Request): Promise { + // The client MUST include an Accept header, listing text/event-stream as a supported content type. + const acceptHeader = req.headers.get('accept'); + if (!acceptHeader?.includes('text/event-stream')) { + return this.createJsonErrorResponse(406, -32000, 'Not Acceptable: Client must accept text/event-stream'); + } + + // If an Mcp-Session-Id is returned by the server during initialization, + // clients using the Streamable HTTP transport MUST include it + // in the Mcp-Session-Id header on all of their subsequent HTTP requests. + const sessionError = await this.validateSession(req); + if (sessionError) { + return sessionError; + } + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + + // Handle resumability: check for Last-Event-ID header + if (this._eventStore) { + const lastEventId = req.headers.get('last-event-id'); + if (lastEventId) { + return this.replayEvents(lastEventId); + } + } + + // Check if there's already an active standalone SSE stream for this session + if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) { + // Only one GET SSE stream is allowed per session + return this.createJsonErrorResponse(409, -32000, 'Conflict: Only one SSE stream is allowed per session'); + } + + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + // Create a ReadableStream with a controller we can use to push SSE events + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + this._streamMapping.delete(this._standaloneSseStreamId); + } + }); + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive' + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Store the stream mapping with the controller for pushing data + this._streamMapping.set(this._standaloneSseStreamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(this._standaloneSseStreamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + + return new Response(readable, { headers }); + } + + /** + * Replays events that would have been sent after the specified event ID + * Only used when resumability is enabled + */ + private async replayEvents(lastEventId: string): Promise { + if (!this._eventStore) { + return this.createJsonErrorResponse(400, -32000, 'Event store not configured'); + } + + try { + // If getStreamIdForEventId is available, use it for conflict checking + let streamId: string | undefined; + if (this._eventStore.getStreamIdForEventId) { + streamId = await this._eventStore.getStreamIdForEventId(lastEventId); + + if (!streamId) { + return this.createJsonErrorResponse(400, -32000, 'Invalid event ID format'); + } + + // Check conflict with the SAME streamId we'll use for mapping + if (this._streamMapping.get(streamId) !== undefined) { + return this.createJsonErrorResponse(409, -32000, 'Conflict: Stream already has an active connection'); + } + } + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive' + }; + + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Create a ReadableStream with controller for SSE + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + // Cleanup will be handled by the mapping + } + }); + + // Replay events - returns the streamId for backwards compatibility + const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, { + send: async (eventId: string, message: JSONRPCMessage) => { + const success = this.writeSSEEvent(streamController!, encoder, message, eventId); + if (!success) { + this.onerror?.(new Error('Failed replay events')); + } + } + }); + + this._streamMapping.set(replayedStreamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(replayedStreamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + + return new Response(readable, { headers }); + } catch (error) { + this.onerror?.(error as Error); + return this.createJsonErrorResponse(500, -32000, 'Error replaying events'); + } + } + + /** + * Writes an event to an SSE stream via controller with proper formatting + */ + private writeSSEEvent( + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + message: JSONRPCMessage, + eventId?: string + ): boolean { + try { + let eventData = `event: message\n`; + // Include event ID if provided - this is important for resumability + if (eventId) { + eventData += `id: ${eventId}\n`; + } + eventData += `data: ${JSON.stringify(message)}\n\n`; + controller.enqueue(encoder.encode(eventData)); + return true; + } catch { + return false; + } + } + + /** + * Handles unsupported requests (PUT, PATCH, etc.) + */ + private handleUnsupportedRequest(): Response { + return new Response( + JSON.stringify({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Method not allowed.' + }, + id: null + }), + { + status: 405, + headers: { + Allow: 'GET, POST, DELETE', + 'Content-Type': 'application/json' + } + } + ); + } + + /** + * Handles POST requests containing JSON-RPC messages + */ + private async handlePostRequest(req: Request): Promise { + try { + // Validate the Accept header + const acceptHeader = req.headers.get('accept'); + // The client MUST include an Accept header, listing both application/json and text/event-stream as supported content types. + if (!acceptHeader?.includes('application/json') || !acceptHeader.includes('text/event-stream')) { + return this.createJsonErrorResponse( + 406, + -32000, + 'Not Acceptable: Client must accept both application/json and text/event-stream' + ); + } + + const ct = req.headers.get('content-type'); + if (!ct || !ct.includes('application/json')) { + return this.createJsonErrorResponse(415, -32000, 'Unsupported Media Type: Content-Type must be application/json'); + } + + const requestInfo: RequestInfo = { + headers: Object.fromEntries(req.headers.entries()) + }; + + let rawMessage; + try { + rawMessage = await req.json(); + } catch { + return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON'); + } + + let messages: JSONRPCMessage[]; + + // handle batch and single messages + try { + if (Array.isArray(rawMessage)) { + messages = rawMessage.map(msg => JSONRPCMessageSchema.parse(msg)); + } else { + messages = [JSONRPCMessageSchema.parse(rawMessage)]; + } + } catch { + return this.createJsonErrorResponse(400, -32700, 'Parse error: Invalid JSON-RPC message'); + } + + // Check if this is an initialization request + // https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/ + const isInitializationRequest = messages.some(isInitializeRequest); + if (isInitializationRequest) { + // If it's a server with session management and the session ID is already set we should reject the request + // to avoid re-initialization. + if (this._initialized && this.sessionId !== undefined) { + return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Server already initialized'); + } + if (messages.length > 1) { + return this.createJsonErrorResponse(400, -32600, 'Invalid Request: Only one initialization request is allowed'); + } + this.sessionId = this.sessionIdGenerator?.(); + this._initialized = true; + + // Persist session state to external store if configured + if (this.sessionId && this._sessionStore) { + const protocolVersion = req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION; + await this._sessionStore.save(this.sessionId, { + initialized: true, + protocolVersion, + createdAt: Date.now() + }); + } + + // If we have a session ID and an onsessioninitialized handler, call it immediately + // This is needed in cases where the server needs to keep track of multiple sessions + if (this.sessionId && this._onsessioninitialized) { + await Promise.resolve(this._onsessioninitialized(this.sessionId)); + } + } + if (!isInitializationRequest) { + // If an Mcp-Session-Id is returned by the server during initialization, + // clients using the Streamable HTTP transport MUST include it + // in the Mcp-Session-Id header on all of their subsequent HTTP requests. + const sessionError = await this.validateSession(req); + if (sessionError) { + return sessionError; + } + // Mcp-Protocol-Version header is required for all requests after initialization. + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + } + + // check if it contains requests + const hasRequests = messages.some(isJSONRPCRequest); + + if (!hasRequests) { + // if it only contains notifications or responses, return 202 + for (const message of messages) { + this.onmessage?.(message, { requestInfo }); + } + return new Response(null, { status: 202 }); + } + + // The default behavior is to use SSE streaming + // but in some cases server will return JSON responses + const streamId = crypto.randomUUID(); + + if (this._enableJsonResponse) { + // For JSON response mode, return a Promise that resolves when all responses are ready + return new Promise(resolve => { + this._streamMapping.set(streamId, { + resolveJson: resolve, + cleanup: () => { + this._streamMapping.delete(streamId); + } + }); + + for (const message of messages) { + if (isJSONRPCRequest(message)) { + this._requestToStreamMapping.set(message.id, streamId); + } + } + + for (const message of messages) { + this.onmessage?.(message, { requestInfo }); + } + }); + } + + // SSE streaming mode - use ReadableStream with controller for more reliable data pushing + const encoder = new TextEncoder(); + let streamController: ReadableStreamDefaultController; + + const readable = new ReadableStream({ + start: controller => { + streamController = controller; + }, + cancel: () => { + // Stream was cancelled by client + this._streamMapping.delete(streamId); + } + }); + + const headers: Record = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive' + }; + + // After initialization, always include the session ID if we have one + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + // Store the response for this request to send messages back through this connection + // We need to track by request ID to maintain the connection + for (const message of messages) { + if (isJSONRPCRequest(message)) { + this._streamMapping.set(streamId, { + controller: streamController!, + encoder, + cleanup: () => { + this._streamMapping.delete(streamId); + try { + streamController!.close(); + } catch { + // Controller might already be closed + } + } + }); + this._requestToStreamMapping.set(message.id, streamId); + } + } + + // Write priming event if event store is configured (after mapping is set up) + await this.writePrimingEvent(streamController!, encoder, streamId); + + // handle each message + for (const message of messages) { + // Build closeSSEStream callback for requests when eventStore is configured + let closeSSEStream: (() => void) | undefined; + let closeStandaloneSSEStream: (() => void) | undefined; + if (isJSONRPCRequest(message) && this._eventStore) { + closeSSEStream = () => { + this.closeSSEStream(message.id); + }; + closeStandaloneSSEStream = () => { + this.closeStandaloneSSEStream(); + }; + } + + this.onmessage?.(message, { requestInfo, closeSSEStream, closeStandaloneSSEStream }); + } + // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses + // This will be handled by the send() method when responses are ready + + return new Response(readable, { status: 200, headers }); + } catch (error) { + // return JSON-RPC formatted error + this.onerror?.(error as Error); + return this.createJsonErrorResponse(400, -32700, 'Parse error'); + } + } + + /** + * Handles DELETE requests to terminate sessions + */ + private async handleDeleteRequest(req: Request): Promise { + const sessionError = await this.validateSession(req); + if (sessionError) { + return sessionError; + } + const protocolError = this.validateProtocolVersion(req); + if (protocolError) { + return protocolError; + } + + // Delete session from external store if configured + if (this.sessionId && this._sessionStore) { + await this._sessionStore.delete(this.sessionId); + } + + await Promise.resolve(this._onsessionclosed?.(this.sessionId!)); + await this.close(); + return new Response(null, { status: 200 }); + } + + /** + * Validates session ID for non-initialization requests. + * In serverless mode with sessionStore, this will hydrate session state from the store. + * Returns Response error if invalid, undefined otherwise + */ + private async validateSession(req: Request): Promise { + if (this.sessionIdGenerator === undefined) { + // If the sessionIdGenerator ID is not set, the session management is disabled + // and we don't need to validate the session ID + return undefined; + } + + const sessionId = req.headers.get('mcp-session-id'); + + if (!sessionId) { + // Non-initialization requests without a session ID should return 400 Bad Request + return this.createJsonErrorResponse(400, -32000, 'Bad Request: Mcp-Session-Id header is required'); + } + + // If sessionStore is configured, try to hydrate session from external store + // This enables serverless mode where each request may be on a fresh instance + if (this._sessionStore) { + const sessionState = await this._sessionStore.get(sessionId); + if (sessionState && sessionState.initialized) { + // Hydrate this transport instance with the session state + this.sessionId = sessionId; + this._initialized = true; + return undefined; + } + // Session not found in store + return this.createJsonErrorResponse(404, -32001, 'Session not found'); + } + + // In-memory mode: check local state + if (!this._initialized) { + // If the server has not been initialized yet, reject all requests + return this.createJsonErrorResponse(400, -32000, 'Bad Request: Server not initialized'); + } + + if (sessionId !== this.sessionId) { + // Reject requests with invalid session ID with 404 Not Found + return this.createJsonErrorResponse(404, -32001, 'Session not found'); + } + + return undefined; + } + + private validateProtocolVersion(req: Request): Response | undefined { + const protocolVersion = req.headers.get('mcp-protocol-version') ?? DEFAULT_NEGOTIATED_PROTOCOL_VERSION; + + if (!SUPPORTED_PROTOCOL_VERSIONS.includes(protocolVersion)) { + return this.createJsonErrorResponse( + 400, + -32000, + `Bad Request: Unsupported protocol version (supported versions: ${SUPPORTED_PROTOCOL_VERSIONS.join(', ')})` + ); + } + return undefined; + } + + async close(): Promise { + // Close all SSE connections + this._streamMapping.forEach(({ cleanup }) => { + cleanup(); + }); + this._streamMapping.clear(); + + // Clear any pending responses + this._requestResponseMap.clear(); + this.onclose?.(); + } + + /** + * Close an SSE stream for a specific request, triggering client reconnection. + * Use this to implement polling behavior during long-running operations - + * client will reconnect after the retry interval specified in the priming event. + */ + closeSSEStream(requestId: RequestId): void { + const streamId = this._requestToStreamMapping.get(requestId); + if (!streamId) return; + + const stream = this._streamMapping.get(streamId); + if (stream) { + stream.cleanup(); + } + } + + /** + * Close the standalone GET SSE stream, triggering client reconnection. + * Use this to implement polling behavior for server-initiated notifications. + */ + closeStandaloneSSEStream(): void { + const stream = this._streamMapping.get(this._standaloneSseStreamId); + if (stream) { + stream.cleanup(); + } + } + + async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise { + let requestId = options?.relatedRequestId; + if (isJSONRPCResponse(message) || isJSONRPCError(message)) { + // If the message is a response, use the request ID from the message + requestId = message.id; + } + + // Check if this message should be sent on the standalone SSE stream (no request ID) + // Ignore notifications from tools (which have relatedRequestId set) + // Those will be sent via dedicated response SSE streams + if (requestId === undefined) { + // For standalone SSE streams, we can only send requests and notifications + if (isJSONRPCResponse(message) || isJSONRPCError(message)) { + throw new Error('Cannot send a response on a standalone SSE stream unless resuming a previous client request'); + } + + // Generate and store event ID if event store is provided + // Store even if stream is disconnected so events can be replayed on reconnect + let eventId: string | undefined; + if (this._eventStore) { + // Stores the event and gets the generated event ID + eventId = await this._eventStore.storeEvent(this._standaloneSseStreamId, message); + } + + const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId); + if (standaloneSse === undefined) { + // Stream is disconnected - event is stored for replay, nothing more to do + return; + } + + // Send the message to the standalone SSE stream + if (standaloneSse.controller && standaloneSse.encoder) { + this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId); + } + return; + } + + // Get the response for this request + const streamId = this._requestToStreamMapping.get(requestId); + if (!streamId) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } + + const stream = this._streamMapping.get(streamId); + + if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { + // For SSE responses, generate event ID if event store is provided + let eventId: string | undefined; + + if (this._eventStore) { + eventId = await this._eventStore.storeEvent(streamId, message); + } + // Write the event to the response stream + this.writeSSEEvent(stream.controller, stream.encoder, message, eventId); + } + + if (isJSONRPCResponse(message) || isJSONRPCError(message)) { + this._requestResponseMap.set(requestId, message); + const relatedIds = Array.from(this._requestToStreamMapping.entries()) + .filter(([_, sid]) => sid === streamId) + .map(([id]) => id); + + // Check if we have responses for all requests using this connection + const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id)); + + if (allResponsesReady) { + if (!stream) { + throw new Error(`No connection established for request ID: ${String(requestId)}`); + } + if (this._enableJsonResponse && stream.resolveJson) { + // All responses ready, send as JSON + const headers: Record = { + 'Content-Type': 'application/json' + }; + if (this.sessionId !== undefined) { + headers['mcp-session-id'] = this.sessionId; + } + + const responses = relatedIds.map(id => this._requestResponseMap.get(id)!); + + if (responses.length === 1) { + stream.resolveJson(new Response(JSON.stringify(responses[0]), { status: 200, headers })); + } else { + stream.resolveJson(new Response(JSON.stringify(responses), { status: 200, headers })); + } + } else { + // End the SSE stream + stream.cleanup(); + } + // Clean up + for (const id of relatedIds) { + this._requestResponseMap.delete(id); + this._requestToStreamMapping.delete(id); + } + } + } + } +} diff --git a/src/experimental/fetch-streamable-http/index.ts b/src/experimental/fetch-streamable-http/index.ts new file mode 100644 index 000000000..e96d561a6 --- /dev/null +++ b/src/experimental/fetch-streamable-http/index.ts @@ -0,0 +1,18 @@ +/** + * Fetch Streamable HTTP Transport Module + * + * This module provides a Streamable HTTP server transport implementation + * using Web Standard APIs (Request, Response, ReadableStream) instead of Node.js HTTP. + * + * @experimental + */ + +export { + FetchStreamableHTTPServerTransport, + type FetchStreamableHTTPServerTransportOptions, + type EventStore, + type EventId, + type StreamId, + type SessionStore, + type SessionState +} from './fetchStreamableHttpServerTransport.js'; diff --git a/src/experimental/index.ts b/src/experimental/index.ts index 55dd44ed0..3164e3338 100644 --- a/src/experimental/index.ts +++ b/src/experimental/index.ts @@ -5,9 +5,11 @@ * Import experimental features from this module: * ```typescript * import { TaskStore, InMemoryTaskStore } from '@modelcontextprotocol/sdk/experimental'; + * import { FetchStreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/experimental'; * ``` * * @experimental */ export * from './tasks/index.js'; +export * from './fetch-streamable-http/index.js';