Skip to content

Commit 652de96

Browse files
committed
express example
1 parent 6b27ef2 commit 652de96

File tree

1 file changed

+333
-0
lines changed

1 file changed

+333
-0
lines changed
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
/**
2+
* Example MCP Server using Express with FetchStreamableHTTPServerTransport
3+
*
4+
* This example demonstrates how to use the experimental FetchStreamableHTTPServerTransport
5+
* with Express by converting between Node.js HTTP and Web Standard Request/Response.
6+
*
7+
* The FetchStreamableHTTPServerTransport uses Web Standard APIs, so we need adapter
8+
* functions to convert Express's req/res to Web Standard Request/Response.
9+
*
10+
* To run this example:
11+
* npx tsx src/examples/server/expressFetchStreamableHttp.ts
12+
*
13+
* Then test with curl:
14+
* # Initialize
15+
* curl -X POST http://localhost:3000/mcp \
16+
* -H "Content-Type: application/json" \
17+
* -H "Accept: application/json, text/event-stream" \
18+
* -d '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2025-03-26","clientInfo":{"name":"test","version":"1.0"},"capabilities":{}},"id":1}'
19+
*/
20+
21+
import express from 'express';
22+
import cors from 'cors';
23+
import { IncomingMessage, ServerResponse } from 'node:http';
24+
import { McpServer } from '../../server/mcp.js';
25+
import { FetchStreamableHTTPServerTransport } from '../../experimental/fetch-streamable-http/index.js';
26+
import { CallToolResult, GetPromptResult, ReadResourceResult } from '../../types.js';
27+
import { z } from 'zod';
28+
29+
// Create the Express app
30+
const app = express();
31+
32+
// Store active transports by session ID for session management
33+
const transports = new Map<string, FetchStreamableHTTPServerTransport>();
34+
35+
/**
36+
* Converts a Node.js IncomingMessage to a Web Standard Request
37+
*/
38+
async function nodeRequestToWebRequest(req: IncomingMessage, baseUrl: string): Promise<Request> {
39+
const url = new URL(req.url ?? '/', baseUrl);
40+
const headers = new Headers();
41+
42+
for (const [key, value] of Object.entries(req.headers)) {
43+
if (value) {
44+
if (Array.isArray(value)) {
45+
value.forEach(v => headers.append(key, v));
46+
} else {
47+
headers.set(key, value);
48+
}
49+
}
50+
}
51+
52+
// For requests with body (POST), we need to read the body
53+
let body: string | null = null;
54+
if (req.method === 'POST') {
55+
body = await new Promise<string>((resolve, reject) => {
56+
let data = '';
57+
req.on('data', chunk => {
58+
data += chunk;
59+
});
60+
req.on('end', () => resolve(data));
61+
req.on('error', reject);
62+
});
63+
}
64+
65+
return new Request(url.toString(), {
66+
method: req.method,
67+
headers,
68+
body: body,
69+
// @ts-expect-error duplex is required for streams but not in types
70+
duplex: 'half'
71+
});
72+
}
73+
74+
/**
75+
* Converts a Web Standard Response to a Node.js ServerResponse
76+
*/
77+
async function webResponseToNodeResponse(webResponse: Response, res: ServerResponse): Promise<void> {
78+
// Set status code
79+
res.statusCode = webResponse.status;
80+
81+
// Copy headers
82+
webResponse.headers.forEach((value, key) => {
83+
res.setHeader(key, value);
84+
});
85+
86+
// Handle streaming response (SSE)
87+
if (webResponse.body) {
88+
const reader = webResponse.body.getReader();
89+
const decoder = new TextDecoder();
90+
91+
// For SSE, we need to flush headers immediately
92+
if (webResponse.headers.get('content-type') === 'text/event-stream') {
93+
res.flushHeaders();
94+
}
95+
96+
try {
97+
while (true) {
98+
const { done, value } = await reader.read();
99+
if (done) break;
100+
101+
const chunk = decoder.decode(value, { stream: true });
102+
res.write(chunk);
103+
104+
// Flush for SSE to ensure real-time delivery
105+
if (typeof (res as NodeJS.WritableStream & { flush?: () => void }).flush === 'function') {
106+
(res as NodeJS.WritableStream & { flush?: () => void }).flush!();
107+
}
108+
}
109+
} catch {
110+
// Client disconnected or stream error
111+
} finally {
112+
res.end();
113+
}
114+
} else {
115+
res.end();
116+
}
117+
}
118+
119+
/**
120+
* Creates and configures an MCP server with example tools, resources, and prompts
121+
*/
122+
function createMcpServer(): McpServer {
123+
const server = new McpServer(
124+
{
125+
name: 'express-fetch-streamable-http-server',
126+
version: '1.0.0'
127+
},
128+
{ capabilities: { logging: {} } }
129+
);
130+
131+
// Register a simple tool
132+
server.registerTool(
133+
'greet',
134+
{
135+
description: 'Greets someone by name',
136+
inputSchema: {
137+
name: z.string().describe('The name to greet')
138+
}
139+
},
140+
async ({ name }): Promise<CallToolResult> => {
141+
return {
142+
content: [
143+
{
144+
type: 'text',
145+
text: `Hello, ${name}! Welcome to the Express MCP server.`
146+
}
147+
]
148+
};
149+
}
150+
);
151+
152+
// Register a calculator tool
153+
server.registerTool(
154+
'calculate',
155+
{
156+
description: 'Performs a simple calculation',
157+
inputSchema: {
158+
operation: z.enum(['add', 'subtract', 'multiply', 'divide']).describe('The operation to perform'),
159+
a: z.number().describe('First operand'),
160+
b: z.number().describe('Second operand')
161+
}
162+
},
163+
async ({ operation, a, b }): Promise<CallToolResult> => {
164+
let result: number;
165+
switch (operation) {
166+
case 'add':
167+
result = a + b;
168+
break;
169+
case 'subtract':
170+
result = a - b;
171+
break;
172+
case 'multiply':
173+
result = a * b;
174+
break;
175+
case 'divide':
176+
if (b === 0) {
177+
return {
178+
content: [{ type: 'text', text: 'Error: Division by zero' }],
179+
isError: true
180+
};
181+
}
182+
result = a / b;
183+
break;
184+
}
185+
return {
186+
content: [
187+
{
188+
type: 'text',
189+
text: `${a} ${operation} ${b} = ${result}`
190+
}
191+
]
192+
};
193+
}
194+
);
195+
196+
// Register a prompt
197+
server.registerPrompt(
198+
'code-review',
199+
{
200+
description: 'A prompt template for code review',
201+
argsSchema: {
202+
language: z.string().describe('Programming language'),
203+
code: z.string().describe('Code to review')
204+
}
205+
},
206+
async ({ language, code }): Promise<GetPromptResult> => {
207+
return {
208+
messages: [
209+
{
210+
role: 'user',
211+
content: {
212+
type: 'text',
213+
text: `Please review the following ${language} code:\n\n\`\`\`${language}\n${code}\n\`\`\``
214+
}
215+
}
216+
]
217+
};
218+
}
219+
);
220+
221+
// Register a resource
222+
server.registerResource(
223+
'server-info',
224+
'mcp://server/info',
225+
{
226+
description: 'Information about this MCP server',
227+
mimeType: 'application/json'
228+
},
229+
async (): Promise<ReadResourceResult> => {
230+
return {
231+
contents: [
232+
{
233+
uri: 'mcp://server/info',
234+
mimeType: 'application/json',
235+
text: JSON.stringify(
236+
{
237+
name: 'express-fetch-streamable-http-server',
238+
version: '1.0.0',
239+
runtime: 'Node.js',
240+
framework: 'Express',
241+
transport: 'FetchStreamableHTTPServerTransport',
242+
timestamp: new Date().toISOString()
243+
},
244+
null,
245+
2
246+
)
247+
}
248+
]
249+
};
250+
}
251+
);
252+
253+
return server;
254+
}
255+
256+
// Configure CORS middleware
257+
app.use(
258+
cors({
259+
origin: '*',
260+
methods: ['GET', 'POST', 'DELETE', 'OPTIONS'],
261+
allowedHeaders: ['Content-Type', 'Accept', 'mcp-session-id', 'last-event-id', 'mcp-protocol-version'],
262+
exposedHeaders: ['mcp-session-id']
263+
})
264+
);
265+
266+
// MCP endpoint - handles all methods
267+
app.all('/mcp', async (req, res) => {
268+
const baseUrl = `http://${req.headers.host}`;
269+
270+
// Check for existing session
271+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
272+
273+
if (sessionId && transports.has(sessionId)) {
274+
// Reuse existing transport for this session
275+
const transport = transports.get(sessionId)!;
276+
const webRequest = await nodeRequestToWebRequest(req, baseUrl);
277+
const webResponse = await transport.handleRequest(webRequest);
278+
await webResponseToNodeResponse(webResponse, res);
279+
return;
280+
}
281+
282+
// For new sessions or initialization, create new transport and server
283+
const server = createMcpServer();
284+
const transport = new FetchStreamableHTTPServerTransport({
285+
sessionIdGenerator: () => crypto.randomUUID(),
286+
onsessioninitialized: sessionId => {
287+
// Store the transport for session reuse
288+
transports.set(sessionId, transport);
289+
console.log(`Session initialized: ${sessionId}`);
290+
},
291+
onsessionclosed: sessionId => {
292+
// Clean up when session closes
293+
transports.delete(sessionId);
294+
console.log(`Session closed: ${sessionId}`);
295+
}
296+
});
297+
298+
await server.connect(transport);
299+
300+
const webRequest = await nodeRequestToWebRequest(req, baseUrl);
301+
const webResponse = await transport.handleRequest(webRequest);
302+
await webResponseToNodeResponse(webResponse, res);
303+
});
304+
305+
// Health check endpoint
306+
app.get('/health', (_req, res) => {
307+
res.json({
308+
status: 'healthy',
309+
activeSessions: transports.size,
310+
timestamp: new Date().toISOString()
311+
});
312+
});
313+
314+
// Start the server
315+
const PORT = 3000;
316+
app.listen(PORT, () => {
317+
console.log(`MCP server running at http://localhost:${PORT}/mcp`);
318+
});
319+
320+
// Handle graceful shutdown
321+
process.on('SIGINT', async () => {
322+
console.log('\nShutting down server...');
323+
324+
// Close all active transports
325+
for (const [sessionId, transport] of transports) {
326+
console.log(`Closing session: ${sessionId}`);
327+
await transport.close();
328+
}
329+
transports.clear();
330+
331+
console.log('Server stopped.');
332+
process.exit(0);
333+
});

0 commit comments

Comments
 (0)