Skip to content
1 change: 1 addition & 0 deletions src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async function main(): Promise<void> {

ws.on('close', () => {
console.log('Disconnected')
codec.handleClose()
close()
})

Expand Down
7 changes: 7 additions & 0 deletions src/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const eventCounter = new Counter({

export interface AddressHub {
handleConnection: (ws: WebSocket) => void
destroy: () => void
}

export interface AddressHubOpts {
Expand Down Expand Up @@ -278,6 +279,12 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
ws.on('message', message => {
codec.handleMessage(messageToString(message))
})
},

destroy() {
for (const plugin of plugins) {
plugin.destroy?.()
}
}
}
}
28 changes: 26 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,38 @@ async function server(): Promise<void> {
const { allPlugins } = await import('./plugins/allPlugins')
const { listenPort, listenHost } = serverConfig

const server = new WebSocket.Server({
const wss = new WebSocket.Server({
port: listenPort,
host: listenHost
})
console.log(`WebSocket server listening on port ${listenPort}`)

const hub = makeAddressHub({ plugins: allPlugins, logger: console })
server.on('connection', ws => hub.handleConnection(ws))
wss.on('connection', ws => hub.handleConnection(ws))

// Graceful shutdown handler
const shutdown = (): void => {
console.log(`Worker ${process.pid} shutting down...`)

// Stop accepting new connections
wss.close(() => {
console.log(`Worker ${process.pid} WebSocket server closed`)
})

// Close all existing client connections
for (const client of wss.clients) {
client.close(1001, 'Server shutting down')
}

// Clean up plugin resources (timers, WebSocket connections, etc.)
hub.destroy()

console.log(`Worker ${process.pid} cleanup complete`)
process.exit(0)
}

process.on('SIGTERM', shutdown)
process.on('SIGINT', shutdown)
}

main().catch(error => {
Expand Down
7 changes: 4 additions & 3 deletions src/jsonRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,12 @@ function makeCodec(
pendingCall.reject(new Error(error.message))
} else {
const { asResult } = pendingCall
let cleanResult: unknown
try {
cleanResult = asResult(result)
const cleanResult = asResult(result)
pendingCall.resolve(cleanResult)
} catch (error) {
pendingCall.reject(error)
}
pendingCall.resolve(cleanResult)
}
} else {
sendError(-32600, `Invalid JSON-RPC request / response`).catch(
Expand All @@ -363,6 +362,8 @@ function makeCodec(
for (const call of remoteCalls.values()) {
call.reject(new Error('JSON-RPC connection closed'))
}
remoteCalls.clear()
remoteSubscriptions.clear()
}

return {
Expand Down
43 changes: 35 additions & 8 deletions src/plugins/blockbook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin {
const unconfirmedTxWatchlist = new Map<string, Set<string>>()
// Global connection for block notifications
let blockConnection: Connection | null = null
// Flag to prevent reconnection after destroy
let destroyed = false

const getBlockConnectionReconnectDelay = (() => {
const ROUGH_RECONNECTION_TIME = 3000
Expand Down Expand Up @@ -127,21 +129,24 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin {
pluginDisconnectionCounter.inc({ pluginId, url: logUrl })

if (connection === blockConnection) {
// If this was the block connection, re-init it.
// If this was the block connection, re-init it (unless destroyed).
blockConnection = null
snooze(getBlockConnectionReconnectDelay())
.then(() => initBlockConnection())
.catch(err => {
console.error('Failed to re-initialize block connection:', err)
})
if (!destroyed) {
snooze(getBlockConnectionReconnectDelay())
.then(() => initBlockConnection())
.catch(err => {
console.error('Failed to re-initialize block connection:', err)
})
}
} else {
// If this is a connection for a plugin, remove it and emit a subLost event.
codec.handleClose()
// Remove connection from connections array
connections.splice(connections.indexOf(connection), 1)
// Remove connection from addressToConnection map
// Remove connection from addressToConnection map and clean up watchlist
for (const address of connection.addresses) {
addressToConnection.delete(address)
unconfirmedTxWatchlist.delete(address)
}
emit('subLost', { addresses: connection.addresses })
}
Expand All @@ -158,7 +163,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin {

// Initialize a dedicated connection for block notifications
function initBlockConnection(): void {
if (blockConnection !== null) return
if (destroyed || blockConnection !== null) return

blockConnection = makeConnection()

Expand Down Expand Up @@ -206,6 +211,7 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin {
const addressIndex = connection.addresses.indexOf(address)
connection.addresses.splice(addressIndex, 1)
addressToConnection.delete(address)
unconfirmedTxWatchlist.delete(address)
if (connection.addresses.length === 0) {
connection.ws.close()
connections.splice(connections.indexOf(connection), 1)
Expand Down Expand Up @@ -369,6 +375,27 @@ export function makeBlockbook(opts: BlockbookOptions): AddressPlugin {
if (out.unconfirmedTxs > 0) return true
if (out.transactions != null && out.transactions.length > 0) return true
return false
},

destroy() {
destroyed = true
pingTask.stop()

// Close all address connections
for (const connection of connections) {
connection.codec.handleClose()
connection.ws.close()
}
connections.length = 0
addressToConnection.clear()
unconfirmedTxWatchlist.clear()

// Close block connection
if (blockConnection !== null) {
blockConnection.codec.handleClose()
blockConnection.ws.close()
blockConnection = null
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/plugins/evmRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin {
// Track subscribed addresses (normalized lowercase address -> original address)
const subscribedAddresses = new Map<string, string>()

// Create a map to track which URL corresponds to which transport instance
const transportUrlMap = new Map<any, string>()
// Create a map to track which URL corresponds to which transport instance.
// Using WeakMap so transport instances can be garbage collected when no longer used.
const transportUrlMap = new WeakMap<object, string>()

// Create fallback transport with all URLs
const transports = urls.map(url => {
Expand Down Expand Up @@ -99,7 +100,7 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin {
)
}

client.watchBlocks({
const unwatchBlocks = client.watchBlocks({
includeTransactions: true,
emitMissed: true,
onError: error => {
Expand Down Expand Up @@ -253,6 +254,10 @@ export function makeEvmRpc(opts: EvmRpcOptions): AddressPlugin {
const scanAdapter = pickRandom(scanAdapters)
const adapter = getScanAdapter(scanAdapter, logger)
return await adapter(address, checkpoint)
},
destroy() {
unwatchBlocks()
subscribedAddresses.clear()
}
}

Expand Down
25 changes: 24 additions & 1 deletion src/plugins/fakePlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,46 @@ import { AddressPlugin, PluginEvents } from '../types/addressPlugin'
export function makeFakePlugin(): AddressPlugin {
const [on, emit] = makeEvents<PluginEvents>()

// Track pending timeouts so they can be cancelled on unsubscribe
const pendingTimeouts = new Map<string, NodeJS.Timeout>()

return {
pluginId: 'fake',
on,

async subscribe(address) {
setTimeout(() => {
// Clear any existing timeout for this address to prevent leaks on re-subscribe
const existingTimeout = pendingTimeouts.get(address)
if (existingTimeout != null) {
clearTimeout(existingTimeout)
}
const timeout = setTimeout(() => {
pendingTimeouts.delete(address)
emit('update', { address })
}, 1000)
pendingTimeouts.set(address, timeout)
return true
},

async unsubscribe(address) {
const timeout = pendingTimeouts.get(address)
if (timeout != null) {
clearTimeout(timeout)
pendingTimeouts.delete(address)
}
return true
},

async scanAddress(address, checkpoint): Promise<boolean> {
return false
},

destroy() {
// Clear all pending timeouts
for (const timeout of pendingTimeouts.values()) {
clearTimeout(timeout)
}
pendingTimeouts.clear()
}
}
}
8 changes: 8 additions & 0 deletions src/types/addressPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ export interface AddressPlugin {
* @returns `true` if the address has updates, `false` otherwise.
*/
scanAddress?: (address: string, checkpoint?: string) => Promise<boolean>

/**
* Clean up plugin resources (timers, connections, etc).
*
* This method is optional, because not all plugins require cleanup.
* Call this when shutting down the server or replacing plugins.
*/
destroy?: () => void
}
Loading