diff --git a/src/cli/cli.ts b/src/cli/cli.ts index e35dbaf..678dec1 100644 --- a/src/cli/cli.ts +++ b/src/cli/cli.ts @@ -113,6 +113,7 @@ async function main(): Promise { ws.on('close', () => { console.log('Disconnected') + codec.handleClose() close() }) diff --git a/src/hub.ts b/src/hub.ts index 79f4e7f..1dfb130 100644 --- a/src/hub.ts +++ b/src/hub.ts @@ -36,6 +36,7 @@ const eventCounter = new Counter({ export interface AddressHub { handleConnection: (ws: WebSocket) => void + destroy: () => void } export interface AddressHubOpts { @@ -278,6 +279,12 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub { ws.on('message', message => { codec.handleMessage(messageToString(message)) }) + }, + + destroy() { + for (const plugin of plugins) { + plugin.destroy?.() + } } } } diff --git a/src/index.ts b/src/index.ts index 7319ec3..36ef5fc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,14 +51,38 @@ async function server(): Promise { const { allPlugins } = await import('./plugins/allPlugins') const { listenPort, listenHost } = serverConfig - const server = new WebSocket.Server({ + const wss = new WebSocket.Server({ port: listenPort, host: listenHost }) console.log(`WebSocket server listening on port ${listenPort}`) const hub = makeAddressHub({ plugins: allPlugins, logger: console }) - server.on('connection', ws => hub.handleConnection(ws)) + wss.on('connection', ws => hub.handleConnection(ws)) + + // Graceful shutdown handler + const shutdown = (): void => { + console.log(`Worker ${process.pid} shutting down...`) + + // Stop accepting new connections + wss.close(() => { + console.log(`Worker ${process.pid} WebSocket server closed`) + }) + + // Close all existing client connections + for (const client of wss.clients) { + client.close(1001, 'Server shutting down') + } + + // Clean up plugin resources (timers, WebSocket connections, etc.) + hub.destroy() + + console.log(`Worker ${process.pid} cleanup complete`) + process.exit(0) + } + + process.on('SIGTERM', shutdown) + process.on('SIGINT', shutdown) } main().catch(error => { diff --git a/src/jsonRpc.ts b/src/jsonRpc.ts index 20557ce..5d19dd9 100644 --- a/src/jsonRpc.ts +++ b/src/jsonRpc.ts @@ -344,13 +344,12 @@ function makeCodec( pendingCall.reject(new Error(error.message)) } else { const { asResult } = pendingCall - let cleanResult: unknown try { - cleanResult = asResult(result) + const cleanResult = asResult(result) + pendingCall.resolve(cleanResult) } catch (error) { pendingCall.reject(error) } - pendingCall.resolve(cleanResult) } } else { sendError(-32600, `Invalid JSON-RPC request / response`).catch( @@ -363,6 +362,8 @@ function makeCodec( for (const call of remoteCalls.values()) { call.reject(new Error('JSON-RPC connection closed')) } + remoteCalls.clear() + remoteSubscriptions.clear() } return { diff --git a/src/plugins/blockbook.ts b/src/plugins/blockbook.ts index 700c036..a4b18f7 100644 --- a/src/plugins/blockbook.ts +++ b/src/plugins/blockbook.ts @@ -65,6 +65,8 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { const unconfirmedTxWatchlist = new Map>() // Global connection for block notifications let blockConnection: Connection | null = null + // Flag to prevent reconnection after destroy + let destroyed = false const getBlockConnectionReconnectDelay = (() => { const ROUGH_RECONNECTION_TIME = 3000 @@ -127,21 +129,24 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { pluginDisconnectionCounter.inc({ pluginId, url: logUrl }) if (connection === blockConnection) { - // If this was the block connection, re-init it. + // If this was the block connection, re-init it (unless destroyed). blockConnection = null - snooze(getBlockConnectionReconnectDelay()) - .then(() => initBlockConnection()) - .catch(err => { - console.error('Failed to re-initialize block connection:', err) - }) + if (!destroyed) { + snooze(getBlockConnectionReconnectDelay()) + .then(() => initBlockConnection()) + .catch(err => { + console.error('Failed to re-initialize block connection:', err) + }) + } } else { // If this is a connection for a plugin, remove it and emit a subLost event. codec.handleClose() // Remove connection from connections array connections.splice(connections.indexOf(connection), 1) - // Remove connection from addressToConnection map + // Remove connection from addressToConnection map and clean up watchlist for (const address of connection.addresses) { addressToConnection.delete(address) + unconfirmedTxWatchlist.delete(address) } emit('subLost', { addresses: connection.addresses }) } @@ -158,7 +163,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { // Initialize a dedicated connection for block notifications function initBlockConnection(): void { - if (blockConnection !== null) return + if (destroyed || blockConnection !== null) return blockConnection = makeConnection() @@ -206,6 +211,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { const addressIndex = connection.addresses.indexOf(address) connection.addresses.splice(addressIndex, 1) addressToConnection.delete(address) + unconfirmedTxWatchlist.delete(address) if (connection.addresses.length === 0) { connection.ws.close() connections.splice(connections.indexOf(connection), 1) @@ -369,6 +375,27 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { if (out.unconfirmedTxs > 0) return true if (out.transactions != null && out.transactions.length > 0) return true return false + }, + + destroy() { + destroyed = true + pingTask.stop() + + // Close all address connections + for (const connection of connections) { + connection.codec.handleClose() + connection.ws.close() + } + connections.length = 0 + addressToConnection.clear() + unconfirmedTxWatchlist.clear() + + // Close block connection + if (blockConnection !== null) { + blockConnection.codec.handleClose() + blockConnection.ws.close() + blockConnection = null + } } } diff --git a/src/plugins/evmRpc.ts b/src/plugins/evmRpc.ts index 335b022..c372a65 100644 --- a/src/plugins/evmRpc.ts +++ b/src/plugins/evmRpc.ts @@ -55,8 +55,9 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { // Track subscribed addresses (normalized lowercase address -> original address) const subscribedAddresses = new Map() - // Create a map to track which URL corresponds to which transport instance - const transportUrlMap = new Map() + // Create a map to track which URL corresponds to which transport instance. + // Using WeakMap so transport instances can be garbage collected when no longer used. + const transportUrlMap = new WeakMap() // Create fallback transport with all URLs const transports = urls.map(url => { @@ -99,7 +100,7 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { ) } - client.watchBlocks({ + const unwatchBlocks = client.watchBlocks({ includeTransactions: true, emitMissed: true, onError: error => { @@ -253,6 +254,10 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { const scanAdapter = pickRandom(scanAdapters) const adapter = getScanAdapter(scanAdapter, logger) return await adapter(address, checkpoint) + }, + destroy() { + unwatchBlocks() + subscribedAddresses.clear() } } diff --git a/src/plugins/fakePlugin.ts b/src/plugins/fakePlugin.ts index 6c9663a..3906add 100644 --- a/src/plugins/fakePlugin.ts +++ b/src/plugins/fakePlugin.ts @@ -5,23 +5,46 @@ import { AddressPlugin, PluginEvents } from '../types/addressPlugin' export function makeFakePlugin(): AddressPlugin { const [on, emit] = makeEvents() + // Track pending timeouts so they can be cancelled on unsubscribe + const pendingTimeouts = new Map() + return { pluginId: 'fake', on, async subscribe(address) { - setTimeout(() => { + // Clear any existing timeout for this address to prevent leaks on re-subscribe + const existingTimeout = pendingTimeouts.get(address) + if (existingTimeout != null) { + clearTimeout(existingTimeout) + } + const timeout = setTimeout(() => { + pendingTimeouts.delete(address) emit('update', { address }) }, 1000) + pendingTimeouts.set(address, timeout) return true }, async unsubscribe(address) { + const timeout = pendingTimeouts.get(address) + if (timeout != null) { + clearTimeout(timeout) + pendingTimeouts.delete(address) + } return true }, async scanAddress(address, checkpoint): Promise { return false + }, + + destroy() { + // Clear all pending timeouts + for (const timeout of pendingTimeouts.values()) { + clearTimeout(timeout) + } + pendingTimeouts.clear() } } } diff --git a/src/types/addressPlugin.ts b/src/types/addressPlugin.ts index bd6d164..fe4676e 100644 --- a/src/types/addressPlugin.ts +++ b/src/types/addressPlugin.ts @@ -24,4 +24,12 @@ export interface AddressPlugin { * @returns `true` if the address has updates, `false` otherwise. */ scanAddress?: (address: string, checkpoint?: string) => Promise + + /** + * Clean up plugin resources (timers, connections, etc). + * + * This method is optional, because not all plugins require cleanup. + * Call this when shutting down the server or replacing plugins. + */ + destroy?: () => void }