diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index 7842351d..c5cd39d8 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -47,6 +47,13 @@ export interface WebSocketTransportConfig desiredSubs: TypeFromDefinition[], ) => Promise | ClientOptions + /** + * Optional heartbeat configuration to keep the connection alive. + * When enabled, sends periodic ping frames and updates lastMessageReceivedAt on pong responses. + * This prevents the connection from being closed due to inactivity when no data messages are being sent. + */ + heartbeatIntervalMs?: number + /** Map of handlers for different WS lifecycle events */ handlers: { /** @@ -149,6 +156,7 @@ export class WebSocketTransport< currentUrl = '' lastMessageReceivedAt = 0 connectionOpenedAt = 0 + private heartbeatInterval?: ReturnType constructor(private config: WebSocketTransportConfig) { super() @@ -169,6 +177,42 @@ export class WebSocketTransport< return JSON.parse(data.toString()) as T['Provider']['WsMessage'] } + private startHeartbeat(connection: WebSocket): void { + // Only start heartbeat if configured + if (!this.config.heartbeatIntervalMs) { + return + } + + // Clear any existing interval + this.stopHeartbeat() + + logger.debug( + `Starting heartbeat with interval of ${this.config.heartbeatIntervalMs}ms`, + ) + + // Send periodic pings to keep the connection alive + this.heartbeatInterval = setInterval(() => { + if (connection.readyState === WebSocket.OPEN) { + connection.ping() + logger.trace('Sent heartbeat ping') + } + }, this.config.heartbeatIntervalMs) + + // Update lastMessageReceivedAt when we receive a pong + connection.on('pong', () => { + this.lastMessageReceivedAt = Date.now() + logger.trace('Received heartbeat pong') + }) + } + + private stopHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval) + this.heartbeatInterval = undefined + logger.debug('Stopped heartbeat') + } + } + buildConnectionHandlers( context: EndpointContext, connection: WebSocket, @@ -296,6 +340,9 @@ export class WebSocketTransport< connection.addEventListener('error', handlers.error) connection.addEventListener('close', handlers.close) + // Start heartbeat mechanism if configured + this.startHeartbeat(connection) + return connection } @@ -378,6 +425,10 @@ export class WebSocketTransport< ) await sleep(1000 - timeSinceConnectionOpened) } + + // Stop heartbeat before closing connection + this.stopHeartbeat() + this.wsConnection?.close(1000) connectionClosed = true diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index 646023a0..e240b88b 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -1100,3 +1100,209 @@ test.serial('can set reverse mapping and read from it', async (t) => { }, }) }) + +test.serial('heartbeat keeps connection alive when no data messages are sent', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10000 + const HEARTBEAT_INTERVAL = 2000 + + // Mock WS + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let connectionCounter = 0 + let pongCounter = 0 + + mockWsServer.on('connection', (socket) => { + connectionCounter++ + + // Send initial message with data + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) + + // Listen for ping frames and respond with pong + socket.on('ping', () => { + pongCounter++ + socket.pong() + }) + }) + + const transport = new WebSocketTransport({ + url: () => ENDPOINT_URL, + heartbeatIntervalMs: HEARTBEAT_INTERVAL, + handlers: { + message(message) { + if (!message.pair) { + return [] + } + const [curBase, curQuote] = message.pair.split('/') + return [ + { + params: { base: curBase, quote: curQuote }, + response: { + data: { + result: message.value, + }, + result: message.value, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => `S:${params.base}/${params.quote}`, + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport: transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_TTL: 30000, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, + }) + + // Wait for multiple heartbeat intervals without the provider sending any data messages + // This should be longer than WS_SUBSCRIPTION_UNRESPONSIVE_TTL to verify the connection stays alive + await runAllUntilTime(t.context.clock, WS_SUBSCRIPTION_UNRESPONSIVE_TTL + HEARTBEAT_INTERVAL * 3) + + // The connection should still be open (only opened once) + t.is(connectionCounter, 1, 'Connection should not have reconnected') + + // We should have received multiple pongs (at least 5 based on the time elapsed) + t.true(pongCounter >= 5, `Should have received at least 5 pongs, got ${pongCounter}`) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +}) + +test.serial('connection without heartbeat still reconnects when unresponsive', async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000 + + // Mock WS + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let connectionCounter = 0 + + mockWsServer.on('connection', (socket) => { + connectionCounter++ + + // Send initial message then go silent + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: price, + }), + ) + }) + + const transport = new WebSocketTransport({ + url: () => ENDPOINT_URL, + // No heartbeatIntervalMs configured - connection should timeout + handlers: { + message(message) { + if (!message.pair) { + return [] + } + const [curBase, curQuote] = message.pair.split('/') + return [ + { + params: { base: curBase, quote: curQuote }, + response: { + data: { + result: message.value, + }, + result: message.value, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => `S:${params.base}/${params.quote}`, + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport: transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_TTL: 30000, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { + result: price, + }, + result: price, + statusCode: 200, + }, + }) + + // Wait past the unresponsive TTL - connection should be closed and reopened + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100) + + // The connection should have reconnected + t.is(connectionCounter, 2, 'Connection should have reconnected due to inactivity') + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() +})