Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## Unreleased

- added: Added flexible service key matching with URL templating.
- added: Added support for WebSocket transports for evmRpc plugin.
- changed: Migrated to JSON-based logs using Pino library.
- changed: Use serviceKeys for nownodes API key.
- removed: Removed plugins to reduce load: abstract, amoy, arbitrum, avalanche, base, bobevm, celo, ethereumclassic, ethereumpow, fantom, filecoinfevm, filecoinfevmcalibration, holesky, hyperevm, pulsechain, rsk, sepolia, sonic.

## 0.2.6 (2025-12-10)

## 0.2.5 (2025-12-04)
Expand Down
75 changes: 75 additions & 0 deletions docs/logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Change Server Logging

## Implementation

Uses Pino with a base logger and child loggers for scoping. Located in `src/util/logger.ts`.

### API

```typescript
import { logger, makeLogger } from './util/logger'

// Direct use of base logger for simple cases:
logger.info({ port: 3000 }, 'server started')

// For code plugins (blockbook, evmRpc):
const pluginLogger = makeLogger('blockbook', 'ethereum') // scope, chainPluginId

// For non-plugin code:
const socketLogger = makeLogger('socket') // scope only

// Returns a pino.Logger - use standard Pino API:
pluginLogger.info('message')
pluginLogger.warn('message')
pluginLogger.error('message')

// Pass an object with extra fields:
pluginLogger.info({ ip: '1.2.3.4' }, 'connected')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You pass an object with a msg field in a ton of places, but your own docs suggest that this is the wrong approach. Should you be passing the msg field as a second string parameter? If so, that applies to a bunch of places.

pluginLogger.info({ blockNum: '12345' }, 'block')
```

### Output Format

JSON with these fields:

- `level` - numeric level (30=info, 40=warn, 50=error)
- `time` - numeric epoch milliseconds
- `scope` - scope identifier (code plugin name: "blockbook", "evmRpc", "socket", "server")
- `chainPluginId` - chain plugin ID (optional, e.g., "ethereum", "arbitrum")
- `pid` - process ID (for socket events)
- `sid` - socket ID (for socket events, identifies the connection)
- `msg` - text message (Pino default key)
- Additional fields from passed objects

Example:

```json
{"level":30,"time":1735654281000,"scope":"blockbook","chainPluginId":"bitcoin","blockNum":"876543","msg":"block"}
{"level":30,"time":1735654281000,"scope":"socket","pid":20812,"sid":396,"ip":"192.168.1.1","msg":"connected"}
{"level":50,"time":1735654281000,"scope":"evmRpc","chainPluginId":"ethereum","msg":"watchBlocks error: connection timeout"}
```

### pino-pretty Support

Logs are compatible with pino-pretty for human-readable output during development:

```bash
node src/index.js | pino-pretty --translateTime
# Or use -t shorthand:
node src/index.js | pino-pretty -t
```

## Logged Events

1. **WebSocket connection established** - scope: `socket`, includes pid, sid, IP
2. **Addresses subscribed** - scope: `socket`, includes pid, sid, IP, first 6 chars of addresses, pluginId, checkpoint
3. **Block found** - scope: code plugin, includes `chainPluginId`, block number
4. **Transaction detected** - scope: code plugin, includes `chainPluginId`, first 6 chars of address and txid
5. **Update sent** - scope: `socket`, includes pid, sid, IP, pluginId, address (6 chars), checkpoint

## Notes

- All output goes to stdout for logrotate compatibility
- Uses Pino's `.child()` pattern for efficient scoped logging
- Logging is disabled when `NODE_ENV=test`
- Reserved fields (`time`, `scope`) passed in log objects are automatically renamed with a `_` suffix to avoid conflicts (e.g., `time` becomes `time_`)
4 changes: 3 additions & 1 deletion jest.setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ jest.mock('./src/serverConfig', () => ({
'api.etherscan.io': ['JYMB141VYKJ2KPVMYJUZC8PXGWKUFVFX8N'],
'eth.blockscout.com': []
}
}
},
// Pass through URL unchanged in tests (no replacements)
replaceUrlParams: (url: string) => url
}))
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"clipanion": "^3.2.0-rc.3",
"edge-server-tools": "^0.2.19",
"node-fetch": "^2.6.0",
"pino": "^10.2.0",
"prom-client": "^15.1.0",
"serverlet": "^0.1.1",
"viem": "^2.27.0",
Expand Down
122 changes: 83 additions & 39 deletions src/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ import { Counter, Gauge } from 'prom-client'
import WebSocket from 'ws'

import { messageToString } from './messageToString'
import { Logger } from './types'
import { AddressPlugin } from './types/addressPlugin'
import {
changeProtocol,
SubscribeParams,
SubscribeResult
} from './types/changeProtocol'
import { getAddressPrefix } from './util/addressUtils'
import { makeLogger } from './util/logger'
import { stackify } from './util/stackify'

const pluginGauge = new Gauge({
Expand All @@ -35,13 +36,12 @@ const eventCounter = new Counter({
})

export interface AddressHub {
handleConnection: (ws: WebSocket) => void
handleConnection: (ws: WebSocket, ip: string) => void
destroy: () => void
}

export interface AddressHubOpts {
plugins: AddressPlugin[]
logger?: Logger
}

interface PluginRow {
Expand All @@ -58,13 +58,15 @@ interface PluginRow {
*/
export function makeAddressHub(opts: AddressHubOpts): AddressHub {
const { plugins } = opts
const logger = makeLogger('socket')
let nextSocketId = 0

// Maps socketId to changeProtocol server codec:
const codecMap = new Map<
number,
ReturnType<typeof changeProtocol.makeServerCodec>
>()
// Maps socketId to codec and IP info:
interface SocketInfo {
codec: ReturnType<typeof changeProtocol.makeServerCodec>
ip: string
}
const codecMap = new Map<number, SocketInfo>()

// Maps pluginId to PluginRow:
const pluginMap = new Map<string, PluginRow>()
Expand All @@ -82,9 +84,18 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
const socketIds = pluginRow.addressSubscriptions.get(address)
if (socketIds == null) return
for (const socketId of socketIds) {
const codec = codecMap.get(socketId)
if (codec == null) continue
console.log(`WebSocket ${process.pid}.${socketId} update`)
const socketInfo = codecMap.get(socketId)
if (socketInfo == null) continue
const { codec, ip } = socketInfo
logger.info({
pid: process.pid,
sid: socketId,
ip,
pluginId,
addr: getAddressPrefix(address),
checkpoint,
msg: 'update'
})
codec.remoteMethods.update([pluginId, address, checkpoint])
}
})
Expand All @@ -95,8 +106,17 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
const socketIds = pluginRow.addressSubscriptions.get(address)
if (socketIds == null) continue
for (const socketId of socketIds) {
const codec = codecMap.get(socketId)
if (codec == null) continue
const socketInfo = codecMap.get(socketId)
if (socketInfo == null) continue
const { codec, ip } = socketInfo
logger.info({
pid: process.pid,
sid: socketId,
ip,
pluginId,
addr: getAddressPrefix(address),
msg: 'subLost'
})
codec.remoteMethods.subLost([pluginId, address])
}
pluginRow.addressSubscriptions.delete(address)
Expand Down Expand Up @@ -148,28 +168,17 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
}

return {
handleConnection(ws: WebSocket): void {
handleConnection(ws: WebSocket, ip: string): void {
const socketId = nextSocketId++

const logPrefix = `WebSocket ${process.pid}.${socketId} `
const logger: Logger = {
log: (...args: unknown[]): void => {
opts.logger?.log(logPrefix, ...args)
},
error: (...args: unknown[]): void => {
opts.logger?.error(logPrefix, ...args)
},
warn: (...args: unknown[]): void => {
opts.logger?.warn(logPrefix, ...args)
}
}

connectionGauge.inc()
logger.log('connected')
const pid = process.pid
const sid = socketId
logger.info({ pid, sid, ip, msg: 'connected' })

const codec = changeProtocol.makeServerCodec({
handleError(error) {
logger.error(`send error: ${String(error)}`)
logger.error({ pid, sid, ip, msg: `send error: ${String(error)}` })
ws.close(1011, 'Internal error')
},

Expand All @@ -181,7 +190,13 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
async subscribe(
params: SubscribeParams[]
): Promise<SubscribeResult[]> {
logger.log(`subscribing ${params.length}`)
// Log all subscriptions in one line
const subs = params.map(([pluginId, address, checkpoint]) => ({
pluginId,
addr: getAddressPrefix(address),
checkpoint
}))
logger.info({ pid, sid, ip, subs, msg: 'subscribe' })

// Do the initial scan:
const result = await Promise.all(
Expand All @@ -205,21 +220,30 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
const changed = await pluginRow.plugin
.scanAddress(address, checkpoint)
.catch(error => {
logger.warn('Scan address failed: ' + stackify(error))
logger.warn({
pid,
sid,
ip,
msg: 'Scan address failed: ' + stackify(error)
})
return true
})
return changed ? 2 : 1
}
)
)

logger.log(`subscribed ${params.length}`)

return result
},

async unsubscribe(params: SubscribeParams[]): Promise<undefined> {
logger.log(`unsubscribed ${params.length}`)
logger.info({
pid,
sid,
ip,
count: params.length,
msg: 'unsubscribe'
})

for (const param of params) {
const [pluginId, address] = param
Expand All @@ -241,11 +265,21 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
}
})

// Save the codec for notifications:
codecMap.set(socketId, codec)
// Save the codec and IP for notifications:
codecMap.set(socketId, { codec, ip })

ws.on('close', () => {
logger.log(`closed`)
// Collect subscriptions for logging before cleanup:
const subs: Array<{ pluginId: string; addr: string }> = []
for (const [pluginId, pluginRow] of pluginMap.entries()) {
for (const [address, socketIds] of pluginRow.addressSubscriptions) {
if (socketIds.has(socketId)) {
subs.push({ pluginId, addr: getAddressPrefix(address) })
}
}
}

logger.info({ pid, sid, ip, subs, msg: 'closed' })
connectionGauge.dec()

// Cleanup the server codec:
Expand All @@ -264,7 +298,12 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
if (socketIds.size < 1) {
unsubscribeClientsToPluginAddress(pluginRow, address).catch(
error => {
console.error('unsubscribe error:', error)
logger.error({
pid,
sid,
ip,
msg: `unsubscribe error: ${String(error)}`
})
}
)
}
Expand All @@ -273,7 +312,12 @@ export function makeAddressHub(opts: AddressHubOpts): AddressHub {
})

ws.on('error', error => {
logger.error(`connection error: ${String(error)}`)
logger.error({
pid,
sid,
ip,
msg: `connection error: ${String(error)}`
})
})

ws.on('message', message => {
Expand Down
Loading
Loading