diff --git a/README.md b/README.md index bcdf0d841..e3d17e34d 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ - [Eliciting User Input](#eliciting-user-input) - [Writing MCP Clients](#writing-mcp-clients) - [Proxy Authorization Requests Upstream](#proxy-authorization-requests-upstream) + - [Custom Reconnection Scheduling for Non-Persistent Environments](#custom-reconnection-scheduling-for-non-persistent-environments) - [Backwards Compatibility](#backwards-compatibility) - [Documentation](#documentation) - [Contributing](#contributing) @@ -1480,6 +1481,164 @@ This setup allows you to: - Provide custom documentation URLs - Maintain control over the OAuth flow while delegating to an external provider +### Custom Reconnection Scheduling for Non-Persistent Environments + +By default, the Streamable HTTP client transport uses `setTimeout` to schedule automatic reconnections after connection failures. However, this approach doesn't work well in non-persistent environments like serverless functions, mobile apps, or desktop applications that may be +suspended. + +The SDK allows you to provide a custom reconnection scheduler to handle these scenarios: + +#### Use Cases + +- **Serverless Functions**: Reconnect immediately on the next function invocation instead of waiting for a timer +- **Mobile Apps**: Use platform-specific background scheduling (iOS Background Fetch, Android WorkManager) +- **Desktop Apps**: Handle sleep/wake cycles with OS-aware scheduling +- **Edge Functions**: Optimize for short-lived execution contexts + +#### API + +```typescript +type ReconnectionScheduler = ( + reconnect: () => void, // Function to call to initiate reconnection + delay: number, // Suggested delay in milliseconds + attemptCount: number // Current reconnection attempt count (0-indexed) +) => void; +``` + +#### Example: Serverless Environment (Immediate Reconnection) + +```typescript +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; + +// Serverless scheduler: reconnect immediately without setTimeout +const serverlessScheduler = (reconnect, delay, attemptCount) => { + // In serverless, timers don't persist across invocations + // Just reconnect immediately - the function will handle retries on next invocation + reconnect(); +}; + +const transport = new StreamableHTTPClientTransport(new URL('https://api.example.com/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 1000, + maxReconnectionDelay: 30000, + reconnectionDelayGrowFactor: 2, + maxRetries: 5 + }, + reconnectionScheduler: serverlessScheduler +}); + +const client = new Client({ + name: 'serverless-client', + version: '1.0.0' +}); + +await client.connect(transport); +``` + +#### Example: Serverless with Deferred Reconnection + +For true serverless environments where the function may terminate before reconnection, you can store the reconnection callback for the next invocation: + +```typescript +// Global or persistent storage for reconnection callback +let storedReconnect: (() => void) | undefined; + +const deferredScheduler = (reconnect, delay, attemptCount) => { + // Store the reconnect callback instead of calling it + // In a real app, persist this to a database or queue + storedReconnect = reconnect; + console.log(`Reconnection scheduled for next invocation (attempt ${attemptCount})`); +}; + +const transport = new StreamableHTTPClientTransport(new URL('https://api.example.com/mcp'), { + reconnectionScheduler: deferredScheduler +}); + +// Later, on next function invocation: +if (storedReconnect) { + console.log('Triggering stored reconnection'); + storedReconnect(); + storedReconnect = undefined; +} +``` + +#### Example: Mobile App with Platform Scheduling + +```typescript +// iOS/Android mobile app using platform-specific background tasks +const mobileScheduler = (reconnect, delay, attemptCount) => { + if (attemptCount > 3) { + console.log('Too many reconnection attempts, giving up'); + return; + } + + // Use native background task API (pseudocode) + BackgroundTaskManager.schedule({ + taskId: 'mcp-reconnect', + delay: delay, + callback: () => { + console.log(`Background reconnection attempt ${attemptCount}`); + reconnect(); + } + }); +}; + +const transport = new StreamableHTTPClientTransport(new URL('https://api.example.com/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 5000, + maxReconnectionDelay: 60000, + reconnectionDelayGrowFactor: 1.5, + maxRetries: 3 + }, + reconnectionScheduler: mobileScheduler +}); +``` + +#### Example: Desktop App with Power Management + +```typescript +// Desktop app that respects system sleep/wake cycles +const desktopScheduler = (reconnect, delay, attemptCount) => { + const timeoutId = setTimeout(() => { + // Check if system was sleeping + const actualElapsed = Date.now() - scheduleTime; + if (actualElapsed > delay * 1.5) { + console.log('System was likely sleeping, reconnecting immediately'); + reconnect(); + } else { + reconnect(); + } + }, delay); + + const scheduleTime = Date.now(); + + // Handle system wake events (pseudocode) + powerMonitor.on('resume', () => { + clearTimeout(timeoutId); + console.log('System resumed, reconnecting immediately'); + reconnect(); + }); +}; + +const transport = new StreamableHTTPClientTransport(new URL('https://api.example.com/mcp'), { + reconnectionScheduler: desktopScheduler +}); +``` + +#### Default Behavior + +If no custom scheduler is provided, the transport uses the default `setTimeout`-based scheduler: + +```typescript +// Default scheduler (built-in) +const defaultScheduler = (reconnect, delay, attemptCount) => { + setTimeout(reconnect, delay); +}; +``` + +This default scheduler works well for traditional long-running applications but may not be suitable for environments with lifecycle constraints. + ### Backwards Compatibility Clients and servers with StreamableHttp transport can maintain [backwards compatibility](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#backwards-compatibility) with the deprecated HTTP+SSE transport (from protocol version 2024-11-05) as follows diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 2799aa67e..38dee1b51 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -1372,4 +1372,230 @@ describe('StreamableHTTPClientTransport', () => { }); }); }); + + describe('Custom Reconnection Scheduler', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('should use custom reconnection scheduler when provided', async () => { + const customSchedulerSpy = vi.fn((reconnect, _, __) => { + // Immediately call reconnect without setTimeout + reconnect(); + }); + + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 1000, + maxReconnectionDelay: 5000, + reconnectionDelayGrowFactor: 2, + maxRetries: 1 + }, + reconnectionScheduler: customSchedulerSpy + }); + + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + // Create a failing stream + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Network failure')); + } + }); + + const fetchMock = global.fetch as Mock; + // First call: GET request that fails + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: failingStream + }); + // Second call: reconnection attempt + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream() + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Wait for async operations + await vi.advanceTimersByTimeAsync(50); + + // Verify custom scheduler was called + expect(customSchedulerSpy).toHaveBeenCalled(); + expect(customSchedulerSpy).toHaveBeenCalledWith(expect.any(Function), 1000, 0); + + // Verify reconnection occurred (fetch called twice) + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('should pass correct parameters to custom scheduler', async () => { + let capturedDelay: number | undefined; + let capturedAttemptCount: number | undefined; + + const customScheduler = vi.fn((reconnect, delay, attemptCount) => { + capturedDelay = delay; + capturedAttemptCount = attemptCount; + // Don't actually reconnect to avoid infinite loop + }); + + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 2000, + maxReconnectionDelay: 10000, + reconnectionDelayGrowFactor: 1.5, + maxRetries: 1 + }, + reconnectionScheduler: customScheduler + }); + + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Network failure')); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: failingStream + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + await vi.advanceTimersByTimeAsync(50); + + // Verify scheduler received correct delay (initial delay of 2000ms) + expect(capturedDelay).toBe(2000); + // Verify scheduler received correct attempt count (first attempt = 0) + expect(capturedAttemptCount).toBe(0); + }); + + it('should fall back to setTimeout when no custom scheduler provided', async () => { + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 100, + maxReconnectionDelay: 5000, + reconnectionDelayGrowFactor: 2, + maxRetries: 1 + } + // No reconnectionScheduler provided + }); + + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Network failure')); + } + }); + + const fetchMock = global.fetch as Mock; + // First call: fails + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: failingStream + }); + // Second call: reconnection + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream() + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + + // Should not reconnect immediately (setTimeout used) + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Advance time by delay amount + await vi.advanceTimersByTimeAsync(100); + + // Now should have attempted reconnection + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('should support serverless scenario - store reconnect callback for later invocation', async () => { + let storedReconnect: (() => void) | undefined; + + // Serverless scheduler: store callback instead of calling immediately + // In real serverless, this would be persisted and called on next function invocation + const serverlessScheduler = vi.fn((reconnect, _, __) => { + storedReconnect = reconnect; + // Do NOT call reconnect() - that's the key difference from regular scheduling + }); + + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 5000, + maxReconnectionDelay: 30000, + reconnectionDelayGrowFactor: 2, + maxRetries: 1 + }, + reconnectionScheduler: serverlessScheduler + }); + + const errorSpy = vi.fn(); + transport.onerror = errorSpy; + + const failingStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Network failure')); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: failingStream + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + await vi.advanceTimersByTimeAsync(10); + + // Verify scheduler was called and callback was stored + expect(serverlessScheduler).toHaveBeenCalled(); + expect(storedReconnect).toBeDefined(); + + // Only 1 fetch call so far (no automatic reconnection) + expect(fetchMock).toHaveBeenCalledTimes(1); + + // Simulate next serverless function invocation - manually trigger stored reconnection + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: new ReadableStream() + }); + + storedReconnect!(); // Manual trigger simulates next invocation + + await vi.advanceTimersByTimeAsync(10); + + // Now should have 2 calls total (initial + manual reconnection) + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + }); }); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 9d34c7b7d..f1d3a0d9c 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -20,6 +20,41 @@ export class StreamableHTTPError extends Error { } } +/** + * Custom reconnection scheduler function type. + * Allows environments to override the default setTimeout-based reconnection. + * + * @example Serverless environment (immediate reconnection) + * ```typescript + * const serverlessScheduler: ReconnectionScheduler = (reconnect) => { + * // Reconnect immediately without timer + * reconnect(); + * }; + * ``` + * + * @example Mobile environment (platform-specific scheduling) + * ```typescript + * const mobileScheduler: ReconnectionScheduler = (reconnect, delay) => { + * // Use native background task API + * BackgroundFetch.schedule({ delay, callback: reconnect }); + * }; + * ``` + */ +export type ReconnectionScheduler = ( + /** + * Function to call to initiate reconnection + */ + reconnect: () => void, + /** + * Suggested delay in milliseconds before reconnecting + */ + delay: number, + /** + * Current reconnection attempt count + */ + attemptCount: number +) => void; + /** * Options for starting or authenticating an SSE connection */ @@ -109,6 +144,18 @@ export type StreamableHTTPClientTransportOptions = { */ reconnectionOptions?: StreamableHTTPReconnectionOptions; + /** + * Custom reconnection scheduler for non-persistent client environments. + * + * Useful for: + * - Serverless functions (immediate reconnection on next invocation) + * - Mobile apps (platform-specific background scheduling) + * - Desktop apps (sleep/wake cycle handling) + * + * If not provided, defaults to setTimeout-based scheduling. + */ + reconnectionScheduler?: ReconnectionScheduler; + /** * Session ID for the connection. This is used to identify the session on the server. * When not provided and connecting to a server that supports session IDs, the server will generate a new session ID. @@ -136,6 +183,7 @@ export class StreamableHTTPClientTransport implements Transport { private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401 private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field + private _reconnectionScheduler?: ReconnectionScheduler; // Custom reconnection scheduler onclose?: () => void; onerror?: (error: Error) => void; @@ -151,6 +199,7 @@ export class StreamableHTTPClientTransport implements Transport { this._fetchWithInit = createFetchWithInit(opts?.fetch, opts?.requestInit); this._sessionId = opts?.sessionId; this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; + this._reconnectionScheduler = opts?.reconnectionScheduler; } private async _authThenStart(): Promise { @@ -284,15 +333,20 @@ export class StreamableHTTPClientTransport implements Transport { // Calculate next delay based on current attempt count const delay = this._getNextReconnectionDelay(attemptCount); - // Schedule the reconnection - setTimeout(() => { - // Use the last event ID to resume where we left off + // Determine reconnection strategy by custom scheduler or default setTimeout + const reconnect = () => { this._startOrAuthSse(options).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); - // Schedule another attempt if this one failed, incrementing the attempt counter this._scheduleReconnection(options, attemptCount + 1); }); - }, delay); + }; + + // Use custom scheduler if provided, otherwise default to setTimeout + if (this._reconnectionScheduler) { + this._reconnectionScheduler(reconnect, delay, attemptCount); + } else { + setTimeout(reconnect, delay); + } } private _handleSseStream(stream: ReadableStream | null, options: StartSSEOptions, isReconnectable: boolean): void {