diff --git a/package.json b/package.json index 7f19b77..b6e69ac 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@snapshot-labs/checkpoint", - "version": "0.1.0-beta.68", + "version": "0.1.0-beta.69", "license": "MIT", "bin": { "checkpoint": "dist/src/bin/index.js" diff --git a/src/container.ts b/src/container.ts index a663c30..57b50ab 100644 --- a/src/container.ts +++ b/src/container.ts @@ -239,13 +239,12 @@ export class Container implements Instance { if (this.preloadedBlocks.length > 0) return this.preloadedBlocks.shift() as number; + const providerRange = this.indexer.getProvider().getPreloadRange(); let currentBlock = blockNum; while (currentBlock <= this.preloadEndBlock) { - const endBlock = Math.min( - currentBlock + this.preloadStep, - this.preloadEndBlock - ); + const step = providerRange ?? this.preloadStep; + const endBlock = Math.min(currentBlock + step, this.preloadEndBlock); let checkpoints: CheckpointRecord[]; try { this.log.info( @@ -264,14 +263,16 @@ export class Container implements Instance { continue; } - const increase = - checkpoints.length > BLOCK_PRELOAD_TARGET - ? -BLOCK_PRELOAD_STEP - : +BLOCK_PRELOAD_STEP; - this.preloadStep = Math.max( - BLOCK_RELOAD_MIN_RANGE, - this.preloadStep + increase - ); + if (!providerRange) { + const increase = + checkpoints.length > BLOCK_PRELOAD_TARGET + ? -BLOCK_PRELOAD_STEP + : +BLOCK_PRELOAD_STEP; + this.preloadStep = Math.max( + BLOCK_RELOAD_MIN_RANGE, + this.preloadStep + increase + ); + } if (checkpoints.length > 0) { this.preloadedBlocks = [ diff --git a/src/providers/base.ts b/src/providers/base.ts index 0f47128..863a253 100644 --- a/src/providers/base.ts +++ b/src/providers/base.ts @@ -90,6 +90,10 @@ export class BaseProvider { ); } + getPreloadRange(): number | null { + return null; + } + async getCheckpointsRange( fromBlock: number, toBlock: number diff --git a/src/providers/evm/hypersync-indexer.ts b/src/providers/evm/hypersync-indexer.ts new file mode 100644 index 0000000..e95a8f6 --- /dev/null +++ b/src/providers/evm/hypersync-indexer.ts @@ -0,0 +1,44 @@ +import { Logger } from '../../utils/logger'; +import { BaseIndexer, Instance } from '../base'; +import { HyperSyncEvmProvider } from './hypersync-provider'; +import { Writer } from './types'; + +export class HyperSyncEvmIndexer extends BaseIndexer { + private writers: Record; + private options: { apiToken: string }; + + constructor(writers: Record, options: { apiToken: string }) { + super(); + + if (!options.apiToken) { + throw new Error('HyperSync API token is required'); + } + + this.writers = writers; + this.options = options; + } + + init({ + instance, + log, + abis + }: { + instance: Instance; + log: Logger; + abis?: Record; + }) { + log.info('using HyperSync provider'); + + this.provider = new HyperSyncEvmProvider({ + instance, + log, + abis, + writers: this.writers, + apiToken: this.options.apiToken + }); + } + + public getHandlers(): string[] { + return Object.keys(this.writers); + } +} diff --git a/src/providers/evm/hypersync-provider.ts b/src/providers/evm/hypersync-provider.ts new file mode 100644 index 0000000..c47f01b --- /dev/null +++ b/src/providers/evm/hypersync-provider.ts @@ -0,0 +1,220 @@ +import { Log } from 'viem'; +import { EvmProvider } from './provider'; +import { Block } from './types'; +import { CheckpointRecord } from '../../stores/checkpoints'; +import { ContractSourceConfig } from '../../types'; + +type FetchedBlock = { + number: number; + hash: string; + parentHash: string; + timestamp: number; +}; + +type HyperSyncBlock = { + number: number; + timestamp: number; + hash: string; + parent_hash: string; +}; + +type HyperSyncLog = { + block_number: number; + log_index: number; + transaction_index: number; + transaction_hash: string; + block_hash: string; + address: string; + data: string; + topic0: string | null; + topic1: string | null; + topic2: string | null; + topic3: string | null; + removed: boolean; +}; + +type HyperSyncResponse = { + next_block: number; + data: { + blocks?: HyperSyncBlock[]; + logs?: HyperSyncLog[]; + }[]; +}; + +const FIELD_SELECTION = { + block: ['number', 'timestamp', 'hash', 'parent_hash'], + log: [ + 'block_number', + 'log_index', + 'transaction_index', + 'transaction_hash', + 'block_hash', + 'address', + 'data', + 'topic0', + 'topic1', + 'topic2', + 'topic3', + 'removed' + ] +}; + +export class HyperSyncEvmProvider extends EvmProvider { + private readonly apiToken: string; + private hyperSyncUrl?: string; + private blockCache = new Map(); + + constructor( + params: ConstructorParameters[0] & { + apiToken: string; + } + ) { + super(params); + this.apiToken = params.apiToken; + } + + getPreloadRange(): number { + return Infinity; + } + + async getCheckpointsRange( + fromBlock: number, + toBlock: number + ): Promise { + const sources = this.instance.getCurrentSources(toBlock); + this.blockCache.clear(); + + const { logs, blocks } = await this.queryCheckpointsRange( + fromBlock, + toBlock, + sources + ); + + for (const block of blocks) { + this.blockCache.set(block.number, block); + } + + for (const log of logs) { + if (log.blockNumber === null) continue; + + if (!this.logsCache.has(log.blockNumber)) { + this.logsCache.set(log.blockNumber, []); + } + + this.logsCache.get(log.blockNumber)?.push(log); + } + + return logs.map(log => ({ + blockNumber: Number(log.blockNumber), + contractAddress: log.address + })); + } + + protected async fetchBlock(blockNumber: number): Promise { + const cached = this.blockCache.get(blockNumber); + if (cached) { + this.blockCache.delete(blockNumber); + return { + number: BigInt(cached.number), + hash: cached.hash, + parentHash: cached.parentHash, + timestamp: BigInt(cached.timestamp) + } as Block; + } + + return super.fetchBlock(blockNumber); + } + + private async queryCheckpointsRange( + fromBlock: number, + toBlock: number, + sources: ContractSourceConfig[] + ): Promise<{ logs: Log[]; blocks: FetchedBlock[] }> { + const allLogs: Log[] = []; + const allBlocks: FetchedBlock[] = []; + + const addresses = sources.map(source => source.contract); + const topic0 = sources.flatMap(source => + source.events.map(event => this.getEventHash(event.name)) + ); + + let currentFrom = fromBlock; + const exclusiveTo = toBlock + 1; + + while (currentFrom < exclusiveTo) { + const response = await this.query({ + from_block: currentFrom, + to_block: exclusiveTo, + logs: [{ address: addresses, topics: [topic0] }], + field_selection: FIELD_SELECTION + }); + + for (const chunk of response.data) { + // NOTE: do not replace for/push with spread — spread causes stack overflow on large arrays + for (const block of chunk.blocks ?? []) { + allBlocks.push({ + number: block.number, + hash: block.hash, + parentHash: block.parent_hash, + timestamp: block.timestamp + }); + } + + for (const log of chunk.logs ?? []) { + const topics = [ + log.topic0, + log.topic1, + log.topic2, + log.topic3 + ].filter((t): t is string => !!t) as `0x${string}`[]; + + allLogs.push({ + address: log.address as `0x${string}`, + blockHash: log.block_hash as `0x${string}`, + blockNumber: BigInt(log.block_number), + data: log.data as `0x${string}`, + logIndex: log.log_index, + transactionHash: log.transaction_hash as `0x${string}`, + transactionIndex: log.transaction_index, + removed: log.removed, + topics + } as Log); + } + } + + if (response.next_block >= exclusiveTo) break; + currentFrom = response.next_block; + } + + return { logs: allLogs, blocks: allBlocks }; + } + + private async getHyperSyncUrl(): Promise { + if (!this.hyperSyncUrl) { + const chainId = await this.getChainId(); + this.hyperSyncUrl = `https://${chainId}.hypersync.xyz`; + } + return this.hyperSyncUrl; + } + + private async query( + body: Record + ): Promise { + const url = await this.getHyperSyncUrl(); + + const res = await fetch(`${url}/query`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.apiToken}` + }, + body: JSON.stringify(body) + }); + + if (!res.ok) { + throw new Error(`HyperSync query failed: ${res.statusText}`); + } + + return res.json(); + } +} diff --git a/src/providers/evm/index.ts b/src/providers/evm/index.ts index e5c759a..8f301c7 100644 --- a/src/providers/evm/index.ts +++ b/src/providers/evm/index.ts @@ -1,3 +1,5 @@ -export { EvmProvider } from './provider'; export { EvmIndexer } from './indexer'; +export { EvmProvider } from './provider'; +export { HyperSyncEvmIndexer } from './hypersync-indexer'; +export { HyperSyncEvmProvider } from './hypersync-provider'; export * from './types'; diff --git a/src/providers/evm/provider.ts b/src/providers/evm/provider.ts index f8207ab..625edd1 100644 --- a/src/providers/evm/provider.ts +++ b/src/providers/evm/provider.ts @@ -40,7 +40,7 @@ export class EvmProvider extends BaseProvider { private readonly writers: Record; private sourceHashes = new Map(); - private logsCache = new Map(); + protected logsCache = new Map(); constructor({ instance, @@ -95,9 +95,7 @@ export class EvmProvider extends BaseProvider { try { if (!hasPreloadedBlockEvents) { - block = await this.client.getBlock({ - blockNumber: BigInt(blockNumber) - }); + block = await this.fetchBlock(blockNumber); } } catch (err) { this.log.error({ blockNumber, err }, 'getting block failed... retrying'); @@ -477,36 +475,6 @@ export class EvmProvider extends BaseProvider { return result; } - async getLogsForSources({ - fromBlock, - toBlock, - sources - }: { - fromBlock: number; - toBlock: number; - sources: ContractSourceConfig[]; - }): Promise { - const chunks: ContractSourceConfig[][] = []; - for (let i = 0; i < sources.length; i += 20) { - chunks.push(sources.slice(i, i + 20)); - } - - let events: Log[] = []; - for (const chunk of chunks) { - const address = chunk.map(source => source.contract); - const topics = chunk.flatMap(source => - source.events.map(event => this.getEventHash(event.name)) - ); - - const chunkEvents = await this.getLogs(fromBlock, toBlock, address, [ - topics - ]); - events = events.concat(chunkEvents); - } - - return events; - } - async getCheckpointsRange( fromBlock: number, toBlock: number @@ -549,4 +517,44 @@ export class EvmProvider extends BaseProvider { this.log.info('new source added, clearing logs cache'); this.logsCache.clear(); } + + protected async getChainId(): Promise { + return this.client.getChainId(); + } + + protected async fetchBlock(blockNumber: number): Promise { + return this.client.getBlock({ + blockNumber: BigInt(blockNumber) + }); + } + + protected async getLogsForSources({ + fromBlock, + toBlock, + sources + }: { + fromBlock: number; + toBlock: number; + sources: ContractSourceConfig[]; + }): Promise { + const chunks: ContractSourceConfig[][] = []; + for (let i = 0; i < sources.length; i += 20) { + chunks.push(sources.slice(i, i + 20)); + } + + let events: Log[] = []; + for (const chunk of chunks) { + const address = chunk.map(source => source.contract); + const topics = chunk.flatMap(source => + source.events.map(event => this.getEventHash(event.name)) + ); + + const chunkEvents = await this.getLogs(fromBlock, toBlock, address, [ + topics + ]); + events = events.concat(chunkEvents); + } + + return events; + } }