From fe4d099a4e400e46bf48b7178454bd8ddf6c6bab Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:48:17 -0800 Subject: [PATCH 1/8] fix(jsonRpc): clear remoteCalls and remoteSubscriptions maps on connection close The handleClose() function rejected pending calls but never cleared the remoteCalls map, and the remoteSubscriptions map was never cleared at all. This caused memory to accumulate over the lifetime of each codec instance: - remoteCalls retained rejected promise handlers - remoteSubscriptions grew unbounded with each subscription request Now both maps are explicitly cleared after rejecting pending calls, allowing the garbage collector to reclaim the memory. --- src/jsonRpc.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/jsonRpc.ts b/src/jsonRpc.ts index 20557ce..d2d6282 100644 --- a/src/jsonRpc.ts +++ b/src/jsonRpc.ts @@ -363,6 +363,8 @@ function makeCodec( for (const call of remoteCalls.values()) { call.reject(new Error('JSON-RPC connection closed')) } + remoteCalls.clear() + remoteSubscriptions.clear() } return { From e9a9d3d9a23dbcb33ba23246b3c7675dafddb7c6 Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:48:58 -0800 Subject: [PATCH 2/8] fix(blockbook): clean up unconfirmedTxWatchlist on address removal The unconfirmedTxWatchlist Map tracks unconfirmed transactions per address, but entries were never removed when: - An address was explicitly unsubscribed via removeAddressConnection() - A WebSocket connection closed unexpectedly This caused orphaned transaction IDs to accumulate indefinitely, as transactions that are dropped from the mempool (never confirmed) would remain in the watchlist forever. Now the watchlist is cleaned up in both scenarios: - When removeAddressConnection() is called during normal unsubscribe - When a connection closes and addresses are cleaned up in the close handler --- src/plugins/blockbook.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/plugins/blockbook.ts b/src/plugins/blockbook.ts index 700c036..73627f2 100644 --- a/src/plugins/blockbook.ts +++ b/src/plugins/blockbook.ts @@ -139,9 +139,10 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { codec.handleClose() // Remove connection from connections array connections.splice(connections.indexOf(connection), 1) - // Remove connection from addressToConnection map + // Remove connection from addressToConnection map and clean up watchlist for (const address of connection.addresses) { addressToConnection.delete(address) + unconfirmedTxWatchlist.delete(address) } emit('subLost', { addresses: connection.addresses }) } @@ -206,6 +207,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { const addressIndex = connection.addresses.indexOf(address) connection.addresses.splice(addressIndex, 1) addressToConnection.delete(address) + unconfirmedTxWatchlist.delete(address) if (connection.addresses.length === 0) { connection.ws.close() connections.splice(connections.indexOf(connection), 1) From 6647102c799ab6d55593199fb6e555b795e042bc Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:50:07 -0800 Subject: [PATCH 3/8] fix: add destroy lifecycle and graceful shutdown for plugin cleanup Plugins had resources that were never cleaned up: periodic timers, block watchers, and WebSocket connections. This caused resource leaks during server shutdown or plugin recreation. Changes: - Added optional 'destroy' method to AddressPlugin interface - Implemented destroy() in blockbook plugin: - Stops the pingTask periodic timer - Closes all WebSocket connections - Clears internal maps - Added 'destroyed' flag to prevent reconnection after destroy - Added destroy() to AddressHub that calls destroy() on all plugins - Added graceful shutdown in server workers (SIGTERM/SIGINT handlers) that closes connections and calls hub.destroy() --- src/hub.ts | 7 +++++++ src/index.ts | 28 +++++++++++++++++++++++++-- src/plugins/blockbook.ts | 39 +++++++++++++++++++++++++++++++------- src/types/addressPlugin.ts | 8 ++++++++ 4 files changed, 73 insertions(+), 9 deletions(-) diff --git a/src/hub.ts b/src/hub.ts index 79f4e7f..1dfb130 100644 --- a/src/hub.ts +++ b/src/hub.ts @@ -36,6 +36,7 @@ const eventCounter = new Counter({ export interface AddressHub { handleConnection: (ws: WebSocket) => void + destroy: () => void } export interface AddressHubOpts { @@ -278,6 +279,12 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub { ws.on('message', message => { codec.handleMessage(messageToString(message)) }) + }, + + destroy() { + for (const plugin of plugins) { + plugin.destroy?.() + } } } } diff --git a/src/index.ts b/src/index.ts index 7319ec3..36ef5fc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -51,14 +51,38 @@ async function server(): Promise { const { allPlugins } = await import('./plugins/allPlugins') const { listenPort, listenHost } = serverConfig - const server = new WebSocket.Server({ + const wss = new WebSocket.Server({ port: listenPort, host: listenHost }) console.log(`WebSocket server listening on port ${listenPort}`) const hub = makeAddressHub({ plugins: allPlugins, logger: console }) - server.on('connection', ws => hub.handleConnection(ws)) + wss.on('connection', ws => hub.handleConnection(ws)) + + // Graceful shutdown handler + const shutdown = (): void => { + console.log(`Worker ${process.pid} shutting down...`) + + // Stop accepting new connections + wss.close(() => { + console.log(`Worker ${process.pid} WebSocket server closed`) + }) + + // Close all existing client connections + for (const client of wss.clients) { + client.close(1001, 'Server shutting down') + } + + // Clean up plugin resources (timers, WebSocket connections, etc.) + hub.destroy() + + console.log(`Worker ${process.pid} cleanup complete`) + process.exit(0) + } + + process.on('SIGTERM', shutdown) + process.on('SIGINT', shutdown) } main().catch(error => { diff --git a/src/plugins/blockbook.ts b/src/plugins/blockbook.ts index 73627f2..a4b18f7 100644 --- a/src/plugins/blockbook.ts +++ b/src/plugins/blockbook.ts @@ -65,6 +65,8 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { const unconfirmedTxWatchlist = new Map>() // Global connection for block notifications let blockConnection: Connection | null = null + // Flag to prevent reconnection after destroy + let destroyed = false const getBlockConnectionReconnectDelay = (() => { const ROUGH_RECONNECTION_TIME = 3000 @@ -127,13 +129,15 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { pluginDisconnectionCounter.inc({ pluginId, url: logUrl }) if (connection === blockConnection) { - // If this was the block connection, re-init it. + // If this was the block connection, re-init it (unless destroyed). blockConnection = null - snooze(getBlockConnectionReconnectDelay()) - .then(() => initBlockConnection()) - .catch(err => { - console.error('Failed to re-initialize block connection:', err) - }) + if (!destroyed) { + snooze(getBlockConnectionReconnectDelay()) + .then(() => initBlockConnection()) + .catch(err => { + console.error('Failed to re-initialize block connection:', err) + }) + } } else { // If this is a connection for a plugin, remove it and emit a subLost event. codec.handleClose() @@ -159,7 +163,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { // Initialize a dedicated connection for block notifications function initBlockConnection(): void { - if (blockConnection !== null) return + if (destroyed || blockConnection !== null) return blockConnection = makeConnection() @@ -371,6 +375,27 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { if (out.unconfirmedTxs > 0) return true if (out.transactions != null && out.transactions.length > 0) return true return false + }, + + destroy() { + destroyed = true + pingTask.stop() + + // Close all address connections + for (const connection of connections) { + connection.codec.handleClose() + connection.ws.close() + } + connections.length = 0 + addressToConnection.clear() + unconfirmedTxWatchlist.clear() + + // Close block connection + if (blockConnection !== null) { + blockConnection.codec.handleClose() + blockConnection.ws.close() + blockConnection = null + } } } diff --git a/src/types/addressPlugin.ts b/src/types/addressPlugin.ts index bd6d164..fe4676e 100644 --- a/src/types/addressPlugin.ts +++ b/src/types/addressPlugin.ts @@ -24,4 +24,12 @@ export interface AddressPlugin { * @returns `true` if the address has updates, `false` otherwise. */ scanAddress?: (address: string, checkpoint?: string) => Promise + + /** + * Clean up plugin resources (timers, connections, etc). + * + * This method is optional, because not all plugins require cleanup. + * Call this when shutting down the server or replacing plugins. + */ + destroy?: () => void } From bf7f8854cb81d0cd7808cd6fde7577dcf9a40731 Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:50:53 -0800 Subject: [PATCH 4/8] fix(evmRpc): add destroy method to unsubscribe from block watcher The client.watchBlocks() call returns an unwatch function that was being ignored. This meant the block watcher would run indefinitely with no way to stop it, even if the plugin was recreated or the server shut down. If multiple plugin instances were created (e.g., during hot reload or testing), multiple watchers would be active simultaneously, each consuming resources and potentially causing duplicate event emissions. Now the unwatch function is stored and called in the new destroy() method, which also clears the subscribedAddresses map. --- src/plugins/evmRpc.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/plugins/evmRpc.ts b/src/plugins/evmRpc.ts index 335b022..87f37f0 100644 --- a/src/plugins/evmRpc.ts +++ b/src/plugins/evmRpc.ts @@ -99,7 +99,7 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { ) } - client.watchBlocks({ + const unwatchBlocks = client.watchBlocks({ includeTransactions: true, emitMissed: true, onError: error => { @@ -253,6 +253,10 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { const scanAdapter = pickRandom(scanAdapters) const adapter = getScanAdapter(scanAdapter, logger) return await adapter(address, checkpoint) + }, + destroy() { + unwatchBlocks() + subscribedAddresses.clear() } } From 89d2d662c3342d033ac0502651d2d848921fa0de Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:51:26 -0800 Subject: [PATCH 5/8] fix(evmRpc): use WeakMap for transportUrlMap to allow garbage collection The transportUrlMap was using a regular Map to store transport instance to URL mappings. Since Map holds strong references to its keys, transport instances would never be garbage collected even after they were no longer in use by viem's fallback transport mechanism. By switching to WeakMap, transport instances that are no longer referenced elsewhere can be garbage collected, preventing gradual memory growth over the lifetime of the plugin. --- src/plugins/evmRpc.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/plugins/evmRpc.ts b/src/plugins/evmRpc.ts index 87f37f0..c372a65 100644 --- a/src/plugins/evmRpc.ts +++ b/src/plugins/evmRpc.ts @@ -55,8 +55,9 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin { // Track subscribed addresses (normalized lowercase address -> original address) const subscribedAddresses = new Map() - // Create a map to track which URL corresponds to which transport instance - const transportUrlMap = new Map() + // Create a map to track which URL corresponds to which transport instance. + // Using WeakMap so transport instances can be garbage collected when no longer used. + const transportUrlMap = new WeakMap() // Create fallback transport with all URLs const transports = urls.map(url => { From c58713dcc2caa611adf74eafa9863b4e5805c54f Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:52:02 -0800 Subject: [PATCH 6/8] fix(fakePlugin): track and clear timeouts on unsubscribe and destroy The fake plugin's subscribe() method created a setTimeout that could not be cancelled. If unsubscribe() was called before the timeout fired: - The timeout would still execute and emit an update for an address that was no longer subscribed - This could cause unexpected behavior in tests Now the plugin: - Tracks pending timeouts in a Map keyed by address - Clears the timeout when unsubscribe() is called - Implements destroy() to clear all pending timeouts This makes the fake plugin behave more predictably in test scenarios and prevents orphaned timer callbacks. --- src/plugins/fakePlugin.ts | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/plugins/fakePlugin.ts b/src/plugins/fakePlugin.ts index 6c9663a..3906add 100644 --- a/src/plugins/fakePlugin.ts +++ b/src/plugins/fakePlugin.ts @@ -5,23 +5,46 @@ import { AddressPlugin, PluginEvents } from '../types/addressPlugin' export function makeFakePlugin(): AddressPlugin { const [on, emit] = makeEvents() + // Track pending timeouts so they can be cancelled on unsubscribe + const pendingTimeouts = new Map() + return { pluginId: 'fake', on, async subscribe(address) { - setTimeout(() => { + // Clear any existing timeout for this address to prevent leaks on re-subscribe + const existingTimeout = pendingTimeouts.get(address) + if (existingTimeout != null) { + clearTimeout(existingTimeout) + } + const timeout = setTimeout(() => { + pendingTimeouts.delete(address) emit('update', { address }) }, 1000) + pendingTimeouts.set(address, timeout) return true }, async unsubscribe(address) { + const timeout = pendingTimeouts.get(address) + if (timeout != null) { + clearTimeout(timeout) + pendingTimeouts.delete(address) + } return true }, async scanAddress(address, checkpoint): Promise { return false + }, + + destroy() { + // Clear all pending timeouts + for (const timeout of pendingTimeouts.values()) { + clearTimeout(timeout) + } + pendingTimeouts.clear() } } } From 4d73de563f15b32d6b843703a3e99f556bbf519c Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 16:52:31 -0800 Subject: [PATCH 7/8] fix(cli): call codec.handleClose() when WebSocket disconnects The CLI tool was not calling codec.handleClose() when the WebSocket connection closed. This left pending RPC promises unresolved, which could cause the process to hang or produce confusing errors. Now handleClose() is called to properly reject any pending method calls before closing the readline interface and WebSocket. --- src/cli/cli.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cli/cli.ts b/src/cli/cli.ts index e35dbaf..678dec1 100644 --- a/src/cli/cli.ts +++ b/src/cli/cli.ts @@ -113,6 +113,7 @@ async function main(): Promise { ws.on('close', () => { console.log('Disconnected') + codec.handleClose() close() }) From 856046eafe767c5f6c8a718336b11aad0657cca9 Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Sat, 6 Dec 2025 17:09:03 -0800 Subject: [PATCH 8/8] fix(jsonRpc): prevent resolve after reject when result cleaning fails When asResult() threw an exception, pendingCall.reject() was called but then pendingCall.resolve(cleanResult) was also called with undefined. This could cause unpredictable behavior since the promise would be both rejected and resolved. Now resolve() only executes if asResult() succeeds. --- src/jsonRpc.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/jsonRpc.ts b/src/jsonRpc.ts index d2d6282..5d19dd9 100644 --- a/src/jsonRpc.ts +++ b/src/jsonRpc.ts @@ -344,13 +344,12 @@ function makeCodec( pendingCall.reject(new Error(error.message)) } else { const { asResult } = pendingCall - let cleanResult: unknown try { - cleanResult = asResult(result) + const cleanResult = asResult(result) + pendingCall.resolve(cleanResult) } catch (error) { pendingCall.reject(error) } - pendingCall.resolve(cleanResult) } } else { sendError(-32600, `Invalid JSON-RPC request / response`).catch(