From f18bd7333b578493497c27a45b0d38870d5da259 Mon Sep 17 00:00:00 2001 From: William Swanson Date: Mon, 3 Mar 2025 15:40:36 -0800 Subject: [PATCH 1/2] wip: Implement blockbook plugin --- src/jsonRpc.ts | 35 ++++--- src/plugins/allPlugins.ts | 55 +++++++++++ src/plugins/blockbook.ts | 87 +++++++++++++++++ src/plugins/blocktest.ts | 16 ++++ src/plugins/ethereum.ts | 15 +++ src/serverConfig.ts | 5 +- src/types/blockbookProtocol.ts | 164 +++++++++++++++++++++++++++++++++ 7 files changed, 365 insertions(+), 12 deletions(-) create mode 100644 src/plugins/blockbook.ts create mode 100644 src/plugins/blocktest.ts create mode 100644 src/plugins/ethereum.ts create mode 100644 src/types/blockbookProtocol.ts diff --git a/src/jsonRpc.ts b/src/jsonRpc.ts index d9603ce..956ae35 100644 --- a/src/jsonRpc.ts +++ b/src/jsonRpc.ts @@ -91,15 +91,26 @@ export function makeRpcProtocol< * Methods supported on the client side. */ clientMethods: ClientCleaners + + /** + * Optionally used if the protocol differs from strict JSON-RPC 2.0. + */ + asCall?: Cleaner + asReturn?: Cleaner }): RpcProtocol, Methods> { - const { serverMethods, clientMethods } = opts + const { + serverMethods, + clientMethods, + asCall = asJsonRpcCall, + asReturn = asJsonRpcReturn + } = opts return { makeServerCodec(opts) { - return makeCodec(serverMethods, clientMethods, opts) + return makeCodec(serverMethods, clientMethods, asCall, asReturn, opts) }, makeClientCodec(opts) { - return makeCodec(clientMethods, serverMethods, opts) + return makeCodec(clientMethods, serverMethods, asCall, asReturn, opts) } } } @@ -123,9 +134,13 @@ type Methods = { function makeCodec( localCleaners: MethodCleaners, remoteCleaners: MethodCleaners, + asCall: Cleaner, + asReturn: Cleaner, opts: RpcCodecOpts ): RpcCodec { const { handleError, handleSend, localMethods } = opts + const wasCall = uncleaner(asCall) + const wasReturn = uncleaner(asReturn) const sendError = async ( code: number, @@ -134,7 +149,7 @@ function makeCodec( ): Promise => await handleSend( JSON.stringify( - wasJsonRpcReturn({ + wasReturn({ jsonrpc: '2.0', result: undefined, error: { code, message, data: undefined }, @@ -160,7 +175,7 @@ function makeCodec( remoteMethods[name] = (params: unknown): void => { handleSend( JSON.stringify( - wasJsonRpcCall({ + wasCall({ jsonrpc: '2.0', method: name, params: wasParams(params), @@ -183,7 +198,7 @@ function makeCodec( handleSend( JSON.stringify( - wasJsonRpcCall({ + wasCall({ jsonrpc: '2.0', method: name, params: wasParams(params), @@ -208,8 +223,8 @@ function makeCodec( } // TODO: We need to add support for batch calls. - const call = asMaybe(asJsonRpcCall)(json) - const response = asMaybe(asJsonRpcReturn)(json) + const call = asMaybe(asCall)(json) + const response = asMaybe(asReturn)(json) if (call != null) { const { method, id, params } = call @@ -251,7 +266,7 @@ function makeCodec( (result: unknown) => { handleSend( JSON.stringify( - wasJsonRpcReturn({ + wasReturn({ jsonrpc: '2.0', result: wasResult(result), error: undefined, @@ -350,7 +365,6 @@ const asJsonRpcCall = asObject({ params: asUnknown, id: asOptional(asRpcId) }) -const wasJsonRpcCall = uncleaner(asJsonRpcCall) const asJsonRpcReturn = asObject({ jsonrpc: asValue('2.0'), @@ -364,4 +378,3 @@ const asJsonRpcReturn = asObject({ ), id: asRpcId }) -const wasJsonRpcReturn = uncleaner(asJsonRpcReturn) diff --git a/src/plugins/allPlugins.ts b/src/plugins/allPlugins.ts index c256435..c7f8411 100644 --- a/src/plugins/allPlugins.ts +++ b/src/plugins/allPlugins.ts @@ -1,6 +1,61 @@ +import { serverConfig } from '../server-config' +import { AddressPlugin } from '../types/addressPlugin' +import { BlockbookOptions, makeBlockbook } from './blockbook' import { makeFakePlugin } from './fakePlugin' +function makeNowNode(opts: BlockbookOptions): AddressPlugin { + return makeBlockbook({ + ...opts, + safeUrl: opts.url, + url: opts.url + '/' + serverConfig.nowNodesApiKey + }) +} + export const allPlugins = [ + // Bitcoin family: + makeNowNode({ + pluginId: 'bitcoin', + url: 'wss://btcbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'bitcoincash', + url: 'wss://bchbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'dogecoin', + url: 'wss://dogebook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'litecoin', + url: 'wss://ltcbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'qtum', + url: 'wss://qtum-blockbook.nownodes.io/wss' + }), + + // Ethereum family: + makeNowNode({ + pluginId: 'arbitrum', + url: 'wss://arb-blockbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'avalanche', + url: 'wss://avax-blockbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'base', + url: 'wss://base-blockbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'ethereum', + url: 'wss://eth-blockbook.nownodes.io/wss' + }), + makeNowNode({ + pluginId: 'polygon', + url: 'wss://maticbook.nownodes.io/wss' + }), + // Testing: makeFakePlugin() ] diff --git a/src/plugins/blockbook.ts b/src/plugins/blockbook.ts new file mode 100644 index 0000000..d126d61 --- /dev/null +++ b/src/plugins/blockbook.ts @@ -0,0 +1,87 @@ +import WebSocket from 'ws' +import { makeEvents } from 'yavent' + +import { messageToString } from '../messageToString' +import { AddressPlugin, PluginEvents } from '../types/addressPlugin' +import { blockbookProtocol } from '../types/blockbookProtocol' + +export interface BlockbookOptions { + pluginId: string + + /** A clean URL for logging */ + safeUrl?: string + + /** The actual connection URL */ + url: string +} + +export function makeBlockbook(opts: BlockbookOptions): AddressPlugin { + const { pluginId, url } = opts + + const ws = new WebSocket(url) + const [on, emit] = makeEvents() + const codec = blockbookProtocol.makeClientCodec({ + handleError(error) { + console.log(error) + }, + async handleSend(text) { + console.log('send', text) + ws.send(text) + }, + localMethods: { + subscribeAddresses({ address }) { + emit('update', { address }) + } + } + }) + + ws.on('message', message => { + const text = messageToString(message) + console.log(text) + codec.handleMessage(text) + }) + ws.on('open', () => emit('connect', undefined)) + ws.on('close', () => { + codec.handleClose() + emit('disconnect', undefined) + }) + + return { + pluginId, + on, + + subscribe(address) { + codec.remoteMethods + .subscribeAddresses({ addresses: [address] }) + .catch(error => console.log(error)) + }, + + unsubscribe(address) { + codec.remoteMethods + .unsubscribeAddresses({ addresses: [address] }) + .catch(error => console.log(error)) + }, + + async scanAddress(address, checkpoint): Promise { + const out = await codec.remoteMethods.getAccountInfo({ + descriptor: address, // Address or xpub + details: 'txids', + tokens: undefined, // 'derived', + from: 860728, // checkpoint == null ? checkpoint : parseInt(checkpoint), + to: undefined, // checkpoint == null ? checkpoint : parseInt(checkpoint), + page: undefined, + pageSize: undefined, + contractFilter: undefined, + secondaryCurrency: undefined, + gap: undefined + }) + + console.log(out) + // return out.unconfirmedTxs > 0 || out.txids?.length > 0 + + if (out.unconfirmedTxs > 0) return true + if (out.txids != null && out.txids.length > 0) return true + return false + } + } +} diff --git a/src/plugins/blocktest.ts b/src/plugins/blocktest.ts new file mode 100644 index 0000000..07e3d38 --- /dev/null +++ b/src/plugins/blocktest.ts @@ -0,0 +1,16 @@ +import { allPlugins } from './allPlugins' + +const bitcoin = allPlugins.find(plugin => plugin.pluginId === 'bitcoin') +if (bitcoin != null) { + bitcoin.on('connect', () => { + bitcoin.subscribe('bc1qmgwnfjlda4ns3g6g3yz74w6scnn9yu2ts82yyc') + bitcoin + .scanAddress?.( + 'bc1qmgwnfjlda4ns3g6g3yz74w6scnn9yu2ts82yyc' /* '860728' */ + ) + .then( + x => console.log(x), + e => console.log(e) + ) + }) +} diff --git a/src/plugins/ethereum.ts b/src/plugins/ethereum.ts new file mode 100644 index 0000000..f53a13b --- /dev/null +++ b/src/plugins/ethereum.ts @@ -0,0 +1,15 @@ +import { AddressPlugin } from '../types/addressPlugin' + +export function makeEthereum(opts: {}): AddressPlugin { + return { + pluginId: 'ethereum', + + on(event, callback) { + return () => {} + }, + + subscribe() {}, + + unsubscribe(address) {} + } +} diff --git a/src/serverConfig.ts b/src/serverConfig.ts index f490c18..444f118 100644 --- a/src/serverConfig.ts +++ b/src/serverConfig.ts @@ -15,7 +15,10 @@ const asServerConfig = asObject({ listenPort: asOptional(asNumber, 8008), metricsHost: asOptional(asString, '127.0.0.1'), metricsPort: asOptional(asNumber, 8009), - publicUri: asOptional(asString, 'https://address1.edge.app') + publicUri: asOptional(asString, 'https://address1.edge.app'), + + // Resources: + nowNodesApiKey: asOptional(asString, '') }) export const serverConfig = makeConfig( diff --git a/src/types/blockbookProtocol.ts b/src/types/blockbookProtocol.ts new file mode 100644 index 0000000..abba258 --- /dev/null +++ b/src/types/blockbookProtocol.ts @@ -0,0 +1,164 @@ +import { + asArray, + asBoolean, + asCodec, + asMaybe, + asNumber, + asObject, + asOptional, + asString, + asUnknown, + asValue, + Cleaner +} from 'cleaners' + +import { JsonRpcCall, JsonRpcReturn, makeRpcProtocol } from '../jsonRpc' + +// function asNullable(cleaner: Cleaner): Cleaner { +// return function asNullable(raw) { +// if (raw == null) return raw +// return cleaner(raw) +// } +// } + +export const blockbookProtocol = makeRpcProtocol({ + serverMethods: { + getAccountInfo: { + asParams: asObject({ + descriptor: asString, // Address or XPub + details: asValue( + 'basic', + 'tokenBalances', + 'tokens', + 'txids', + 'txs', + 'txslight' + ), + tokens: asOptional(asValue('derived', 'nonzero', 'used')), + page: asOptional(asNumber), + pageSize: asOptional(asNumber), + from: asOptional(asNumber), // Lowest block height, inclusive + to: asOptional(asNumber), // Highest block height + contractFilter: asOptional(asString), // For token queries + secondaryCurrency: asOptional(asString), // For fiat balances + gap: asOptional(asNumber) // Gap limit for xpub's + }), + asResult: asObject({ + address: asString, + balance: asString, + totalReceived: asString, + totalSent: asString, + unconfirmedBalance: asString, + unconfirmedTxs: asNumber, + txs: asNumber, + // For paged queries: + page: asOptional(asNumber), + totalPages: asOptional(asNumber), + itemsOnPage: asOptional(asNumber), + transactions: asOptional(asArray(asUnknown)), + txids: asOptional(asArray(asString)) + }) + }, + + subscribeAddresses: { + asParams: asObject({ addresses: asArray(asString) }), + asResult: asObject({ subscribed: asBoolean }) + }, + + unsubscribeAddresses: { + asParams: asObject({ addresses: asArray(asString) }), + asResult: asObject({ subscribed: asBoolean }) + } + + // estimateFee + // getBalanceHistory + // getBlockHash + // getCurrentFiatRates + // getFiatRatesForTimestamps + // getFiatRatesTickersList + // getInfo + // getTransaction + // getTransactionSpecific + // sendTransaction + // subscribeFiatRates + // subscribeNewBlock + // subscribeNewTransaction + // unsubscribeFiatRates + // unsubscribeNewBlock + // unsubscribeNewTransaction + }, + + clientMethods: { + subscribeAddresses: { + asParams: asObject({ address: asString }) + } + + // subscribeFiatRates + // subscribeNewBlock + // subscribeNewTransaction + }, + + // Differences from JSON-RPC: + // - There is no `jsonrpc` version field. + // - The `id` cannot be a number, only a string. + // - Responses come back with `data` instead of `result`. + // - Subscription updates updates look like return values, + // but with the `id` as the method name. + + asCall: asCodec( + raw => { + // Blockbook notifications look like return values: + const notification = asMaybe(asBlockbookReturn)(raw) + if (notification != null && typeof notification.id === 'string') { + return { + jsonrpc: '2.0', + method: notification.id, + params: notification.data + } + } + const { id, method, params } = asBlockbookCall(raw) + return { + id, + jsonrpc: '2.0', + method, + params + } + }, + ({ id, method, params }) => ({ + id: String(id), + method, + params + }) + ), + + asReturn: asCodec( + raw => { + const { id, data } = asBlockbookReturn(raw) + return { + id, + jsonrpc: '2.0', + result: data + } + }, + ({ id, result }) => ({ + id: String(id), + data: result + }) + ) +}) + +const asBlockbookId: Cleaner = raw => { + const clean = asString(raw) + return /^[0-9]+$/.test(clean) ? parseInt(clean) : clean +} + +const asBlockbookCall = asObject({ + id: asBlockbookId, + method: asString, + params: asUnknown +}) + +const asBlockbookReturn = asObject({ + id: asBlockbookId, + data: asUnknown +}) From 74a159911c2d4d1023b98397e3eb9dee7ae15f08 Mon Sep 17 00:00:00 2001 From: William Swanson Date: Mon, 3 Mar 2025 15:09:50 -0800 Subject: [PATCH 2/2] Meeting notes --- src/hub.ts | 16 ++++++++++++++ src/types/changeProtocol.ts | 43 +++++++++++++++++++++++++++---------- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/hub.ts b/src/hub.ts index 51433d0..10b386b 100644 --- a/src/hub.ts +++ b/src/hub.ts @@ -10,6 +10,12 @@ import { SubscribeResult } from './types/changeProtocol' +const pluginGauge = new Gauge({ + name: 'change_plugin_count', + help: 'Active change-server plugins', + labelNames: ['pluginId'] as const +}) + const connectionGauge = new Gauge({ name: 'change_connection_count', help: 'Active websocket connections' @@ -69,6 +75,16 @@ export function makeAddressHub(plugins: AddressPlugin[]): AddressHub { codec?.remoteMethods.update([pluginId, address, checkpoint]) } }) + + plugin.on('connect', () => { + pluginGauge.inc({ pluginId }) + // TODO: Tell clients + }) + plugin.on('disconnect', () => { + pluginGauge.dec({ pluginId }) + // TODO: Tell clients + // Clear active addresses list + }) } function handleConnection(ws: WebSocket): void { diff --git a/src/types/changeProtocol.ts b/src/types/changeProtocol.ts index 867fcaf..7429faf 100644 --- a/src/types/changeProtocol.ts +++ b/src/types/changeProtocol.ts @@ -1,10 +1,11 @@ import { asArray, + asBoolean, + asObject, asOptional, asString, asTuple, - asValue, - Cleaner + asValue } from 'cleaners' import { makeRpcProtocol } from '../jsonRpc' @@ -29,21 +30,21 @@ const asAddress = asTuple( asOptional(asString) ) -export type SubscribeResult = - /** Subscribe failed (unsupported chain) */ - | 0 - /** Subscribe succeeded, no changes */ - | 1 - /** Subscribed succeeded, changes present */ - | 2 +// export type SubscribeResult = +// /** Subscribe failed */ +// | 0 +// /** Subscribe succeeded, no changes */ +// | 1 +// /** Subscribed succeeded, changes present */ +// | 2 -const asSubscribeResult: Cleaner = asValue(0, 1, 2) +// const asSubscribeResult: Cleaner = asValue(0, 1, 2) export const changeProtocol = makeRpcProtocol({ serverMethods: { subscribe: { asParams: asArray(asAddress), - asResult: asArray(asSubscribeResult) + asResult: asArray(asBoolean) }, unsubscribe: { @@ -55,6 +56,26 @@ export const changeProtocol = makeRpcProtocol({ clientMethods: { update: { asParams: asAddress + }, + pluginConnect: { + asParams: asObject({ pluginIds: asArray(asString) }) + }, + pluginDisconnect: { + asParams: asObject({ pluginIds: asArray(asString) }) } } }) + +// core: +// ws +// codec +// activePluginIds +// subcriptions: Map +// +// 1. Core connects +// 2. Server sends pluginConnect +// 3. Core foreach pluginId with wallets, subscribe +// ... +// 1. Server sends pluginDisconnect +// 2. Core keeps updating subscriptions as if nothing happened +// 3. Server sends pluginConnect