Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ export interface WebSocketTransportConfig<T extends WebsocketTransportGenerics>
desiredSubs: TypeFromDefinition<T['Parameters']>[],
) => Promise<ClientOptions> | ClientOptions

/**
* Optional heartbeat configuration to keep the connection alive.
* When enabled, sends periodic ping frames and updates lastMessageReceivedAt on pong responses.
* This prevents the connection from being closed due to inactivity when no data messages are being sent.
*/
heartbeatIntervalMs?: number

/** Map of handlers for different WS lifecycle events */
handlers: {
/**
Expand Down Expand Up @@ -149,6 +156,7 @@ export class WebSocketTransport<
currentUrl = ''
lastMessageReceivedAt = 0
connectionOpenedAt = 0
private heartbeatInterval?: ReturnType<typeof setInterval>

constructor(private config: WebSocketTransportConfig<T>) {
super()
Expand All @@ -169,6 +177,42 @@ export class WebSocketTransport<
return JSON.parse(data.toString()) as T['Provider']['WsMessage']
}

private startHeartbeat(connection: WebSocket): void {
// Only start heartbeat if configured
if (!this.config.heartbeatIntervalMs) {
return
}

// Clear any existing interval
this.stopHeartbeat()

logger.debug(
`Starting heartbeat with interval of ${this.config.heartbeatIntervalMs}ms`,
)

// Send periodic pings to keep the connection alive
this.heartbeatInterval = setInterval(() => {
if (connection.readyState === WebSocket.OPEN) {
connection.ping()
logger.trace('Sent heartbeat ping')
}
}, this.config.heartbeatIntervalMs)

// Update lastMessageReceivedAt when we receive a pong
connection.on('pong', () => {
this.lastMessageReceivedAt = Date.now()
logger.trace('Received heartbeat pong')
})
}

private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval)
this.heartbeatInterval = undefined
logger.debug('Stopped heartbeat')
}
}

buildConnectionHandlers(
context: EndpointContext<T>,
connection: WebSocket,
Expand Down Expand Up @@ -296,6 +340,9 @@ export class WebSocketTransport<
connection.addEventListener('error', handlers.error)
connection.addEventListener('close', handlers.close)

// Start heartbeat mechanism if configured
this.startHeartbeat(connection)

return connection
}

Expand Down Expand Up @@ -378,6 +425,10 @@ export class WebSocketTransport<
)
await sleep(1000 - timeSinceConnectionOpened)
}

// Stop heartbeat before closing connection
this.stopHeartbeat()

this.wsConnection?.close(1000)
connectionClosed = true

Expand Down
206 changes: 206 additions & 0 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1100,3 +1100,209 @@
},
})
})

test.serial('heartbeat keeps connection alive when no data messages are sent', async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10000
const HEARTBEAT_INTERVAL = 2000

// Mock WS
mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
let connectionCounter = 0
let pongCounter = 0

mockWsServer.on('connection', (socket) => {
connectionCounter++

// Send initial message with data
socket.send(
JSON.stringify({
pair: `${base}/${quote}`,
value: price,
}),
)

// Listen for ping frames and respond with pong
socket.on('ping', () => {

Check failure on line 1128 in test/transports/websocket.test.ts

View workflow job for this annotation

GitHub Actions / test

Argument of type '"ping"' is not assignable to parameter of type 'keyof WebSocketCallbackMap'.
pongCounter++
socket.pong()

Check failure on line 1130 in test/transports/websocket.test.ts

View workflow job for this annotation

GitHub Actions / test

Property 'pong' does not exist on type 'Client'.
})
})

const transport = new WebSocketTransport<WebSocketTypes>({
url: () => ENDPOINT_URL,
heartbeatIntervalMs: HEARTBEAT_INTERVAL,
handlers: {
message(message) {
if (!message.pair) {
return []
}
const [curBase, curQuote] = message.pair.split('/')
return [
{
params: { base: curBase, quote: curQuote },
response: {
data: {
result: message.value,
},
result: message.value,
},
},
]
},
},
builders: {
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
},
})

const webSocketEndpoint = new AdapterEndpoint({
name: 'TEST',
transport: transport,
inputParameters,
})

const config = new AdapterConfig(
{},
{
envDefaultOverrides: {
BACKGROUND_EXECUTE_MS_WS,
WS_SUBSCRIPTION_TTL: 30000,
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
},
},
)

const adapter = new Adapter({
name: 'TEST',
defaultEndpoint: 'test',
config,
endpoints: [webSocketEndpoint],
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base, quote },
expectedResponse: {
data: {
result: price,
},
result: price,
statusCode: 200,
},
})

// Wait for multiple heartbeat intervals without the provider sending any data messages
// This should be longer than WS_SUBSCRIPTION_UNRESPONSIVE_TTL to verify the connection stays alive
await runAllUntilTime(t.context.clock, WS_SUBSCRIPTION_UNRESPONSIVE_TTL + HEARTBEAT_INTERVAL * 3)

// The connection should still be open (only opened once)
t.is(connectionCounter, 1, 'Connection should not have reconnected')

// We should have received multiple pongs (at least 5 based on the time elapsed)
t.true(pongCounter >= 5, `Should have received at least 5 pongs, got ${pongCounter}`)

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
})

test.serial('connection without heartbeat still reconnects when unresponsive', async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 1000

// Mock WS
mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL, { mock: false })
let connectionCounter = 0

mockWsServer.on('connection', (socket) => {
connectionCounter++

// Send initial message then go silent
socket.send(
JSON.stringify({
pair: `${base}/${quote}`,
value: price,
}),
)
})

const transport = new WebSocketTransport<WebSocketTypes>({
url: () => ENDPOINT_URL,
// No heartbeatIntervalMs configured - connection should timeout
handlers: {
message(message) {
if (!message.pair) {
return []
}
const [curBase, curQuote] = message.pair.split('/')
return [
{
params: { base: curBase, quote: curQuote },
response: {
data: {
result: message.value,
},
result: message.value,
},
},
]
},
},
builders: {
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
},
})

const webSocketEndpoint = new AdapterEndpoint({
name: 'TEST',
transport: transport,
inputParameters,
})

const config = new AdapterConfig(
{},
{
envDefaultOverrides: {
BACKGROUND_EXECUTE_MS_WS,
WS_SUBSCRIPTION_TTL: 30000,
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
},
},
)

const adapter = new Adapter({
name: 'TEST',
defaultEndpoint: 'test',
config,
endpoints: [webSocketEndpoint],
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base, quote },
expectedResponse: {
data: {
result: price,
},
result: price,
statusCode: 200,
},
})

// Wait past the unresponsive TTL - connection should be closed and reopened
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 2 + 100)

// The connection should have reconnected
t.is(connectionCounter, 2, 'Connection should have reconnected due to inactivity')

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
})
Loading