Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@snapshot-labs/checkpoint",
"version": "0.1.0-beta.65",
"version": "0.1.0-beta.66",
"license": "MIT",
"bin": {
"checkpoint": "dist/src/bin/index.js"
Expand All @@ -19,11 +19,6 @@
},
"prettier": "@snapshot-labs/prettier-config",
"dependencies": {
"@ethersproject/abi": "^5.7.0",
"@ethersproject/address": "^5.7.0",
"@ethersproject/keccak256": "^5.7.0",
"@ethersproject/providers": "^5.7.2",
"@ethersproject/strings": "^5.7.0",
"@graphql-tools/schema": "^8.5.1",
"@starknet-io/types-js": "^0.7.10",
"connection-string": "^4.3.5",
Expand All @@ -40,6 +35,7 @@
"pino-pretty": "^13.1.1",
"pluralize": "^8.0.0",
"starknet": "~5.19.3",
"viem": "^2.36.0",
"yargs": "^17.7.2",
"zod": "^3.21.4"
},
Expand Down
136 changes: 80 additions & 56 deletions src/providers/evm/provider.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { Interface, LogDescription } from '@ethersproject/abi';
import { getAddress } from '@ethersproject/address';
import { keccak256 } from '@ethersproject/keccak256';
import {
Formatter,
createPublicClient,
formatLog,
getAddress,
http,
keccak256,
Log,
Provider,
StaticJsonRpcProvider
} from '@ethersproject/providers';
import { toUtf8Bytes } from '@ethersproject/strings';
parseEventLogs,
ParseEventLogsReturnType,
PublicClient,
RpcLog,
stringToBytes
} from 'viem';
import { getRangeHint } from './helpers';
import { Block, CustomJsonRpcError, EventsData, Writer } from './types';
import { CheckpointRecord } from '../../stores/checkpoints';
Expand All @@ -24,17 +27,20 @@ type GetLogsBlockRangeFilter = {
toBlock: number;
};

/**
* Timeout for client requests in milliseconds.
* This timeout is also used when fetching latest blocks in getLogs.
*/
const CLIENT_TIMEOUT = 5 * 1000;

const MAX_BLOCKS_PER_REQUEST = 10000;

export class EvmProvider extends BaseProvider {
private readonly provider: Provider;
/**
* Formatter instance from ethers.js used to format raw responses.
*/
private readonly formatter = new Formatter();
private readonly client: PublicClient;

private readonly writers: Record<string, Writer>;
private sourceHashes = new Map<string, string>();
private logsCache = new Map<number, Log[]>();
private logsCache = new Map<bigint, Log[]>();

constructor({
instance,
Expand All @@ -46,9 +52,12 @@ export class EvmProvider extends BaseProvider {
}) {
super({ instance, log, abis });

this.provider = new StaticJsonRpcProvider(
this.instance.config.network_node_url
);
this.client = createPublicClient({
transport: http(instance.config.network_node_url, {
timeout: CLIENT_TIMEOUT
})
});

this.writers = writers;
}

Expand All @@ -57,76 +66,78 @@ export class EvmProvider extends BaseProvider {
}

async getNetworkIdentifier(): Promise<string> {
const result = await this.provider.getNetwork();
return `evm_${result.chainId}`;
const chainId = await this.client.getChainId();

return `evm_${chainId}`;
}

async getLatestBlockNumber(): Promise<number> {
return this.provider.getBlockNumber();
const blockNumber = await this.client.getBlockNumber();

return Number(blockNumber);
}

async getBlockHash(blockNumber: number) {
const block = await this.provider.getBlock(blockNumber);
const block = await this.client.getBlock({
blockNumber: BigInt(blockNumber)
});

return block.hash;
}

async processBlock(blockNum: number, parentHash: string | null) {
async processBlock(blockNumber: number, parentHash: string | null) {
let block: Block | null = null;
let eventsData: EventsData;

const skipBlockFetching = this.instance.opts?.skipBlockFetching ?? false;
const hasPreloadedBlockEvents =
skipBlockFetching && this.logsCache.has(blockNum);
skipBlockFetching && this.logsCache.has(BigInt(blockNumber));

try {
if (!hasPreloadedBlockEvents) {
block = await this.provider.getBlock(blockNum);
block = await this.client.getBlock({
blockNumber: BigInt(blockNumber)
});
}
} catch (e) {
this.log.error(
{ blockNumber: blockNum, err: e },
'getting block failed... retrying'
);
throw e;
} catch (err) {
this.log.error({ blockNumber, err }, 'getting block failed... retrying');
throw err;
}

if (!hasPreloadedBlockEvents && block === null) {
this.log.info({ blockNumber: blockNum }, 'block not found');
this.log.info({ blockNumber }, 'block not found');
throw new BlockNotFoundError();
}

try {
eventsData = await this.getEvents({
blockNumber: blockNum,
blockNumber: BigInt(blockNumber),
blockHash: block?.hash ?? null
});
} catch (e: unknown) {
if (e instanceof CustomJsonRpcError && e.code === -32000) {
this.log.info({ blockNumber: blockNum }, 'block events not found');
} catch (err: unknown) {
if (err instanceof CustomJsonRpcError && err.code === -32000) {
this.log.info({ blockNumber }, 'block events not found');
throw new BlockNotFoundError();
}

this.log.error(
{ blockNumber: blockNum, err: e },
'getting events failed... retrying'
);
throw e;
this.log.error({ blockNumber, err }, 'getting events failed... retrying');
throw err;
}

if (block && parentHash && block.parentHash !== parentHash) {
this.log.error({ blockNumber: blockNum }, 'reorg detected');
this.log.error({ blockNumber }, 'reorg detected');
throw new ReorgDetectedError();
}

await this.handleBlock(blockNum, block, eventsData);
await this.handleBlock(blockNumber, block, eventsData);

if (block) {
await this.instance.setBlockHash(blockNum, block.hash);
await this.instance.setBlockHash(blockNumber, block.hash);
}

await this.instance.setLastIndexedBlock(blockNum);
await this.instance.setLastIndexedBlock(blockNumber);

return blockNum + 1;
return blockNumber + 1;
}

private async handleBlock(
Expand Down Expand Up @@ -160,7 +171,7 @@ export class EvmProvider extends BaseProvider {
) {
this.log.debug({ txId }, 'handling transaction');

const helpers = await this.instance.getWriterHelpers();
const helpers = this.instance.getWriterHelpers();

if (this.instance.config.tx_fn) {
await this.writers[this.instance.config.tx_fn]({
Expand All @@ -184,7 +195,10 @@ export class EvmProvider extends BaseProvider {
);

for (const event of logs) {
const handler = globalEventHandlers[event.topics[0]];
const eventHash = event.topics[0];
if (!eventHash) continue;

const handler = globalEventHandlers[eventHash];
if (!handler) continue;

this.log.info(
Expand Down Expand Up @@ -228,11 +242,15 @@ export class EvmProvider extends BaseProvider {
'found contract event'
);

let parsedEvent: LogDescription | undefined;
let parsedEvent: ParseEventLogsReturnType[number] | undefined;
if (source.abi && this.abis?.[source.abi]) {
const iface = new Interface(this.abis[source.abi]);
try {
parsedEvent = iface.parseLog(log);
const parsedLogs = parseEventLogs({
abi: this.abis[source.abi],
logs: [log]
});
parsedEvent =
parsedLogs[0] as ParseEventLogsReturnType[number];
} catch (err) {
this.log.warn(
{
Expand Down Expand Up @@ -298,7 +316,7 @@ export class EvmProvider extends BaseProvider {
blockNumber
}: {
blockHash: string | null;
blockNumber: number;
blockNumber: bigint;
}): Promise<EventsData> {
let isPreloaded = false;
let events: Log[] = [];
Expand All @@ -320,6 +338,8 @@ export class EvmProvider extends BaseProvider {
return {
isPreloaded,
events: events.reduce((acc, event) => {
if (event.transactionHash === null) return acc;

if (!acc[event.transactionHash]) acc[event.transactionHash] = [];

acc[event.transactionHash] = acc[event.transactionHash].concat(event);
Expand Down Expand Up @@ -350,7 +370,10 @@ export class EvmProvider extends BaseProvider {
topics?: (string | string[])[];
} = {};

let signal: AbortSignal | undefined;

if ('blockHash' in filter) {
signal = AbortSignal.timeout(CLIENT_TIMEOUT);
params.blockHash = filter.blockHash;
}

Expand All @@ -372,6 +395,7 @@ export class EvmProvider extends BaseProvider {

const res = await fetch(this.instance.config.network_node_url, {
method: 'POST',
signal,
headers: {
'Content-Type': 'application/json'
},
Expand All @@ -397,9 +421,7 @@ export class EvmProvider extends BaseProvider {
);
}

return Formatter.arrayOf(this.formatter.filterLog.bind(this.formatter))(
json.result
);
return json.result.map((log: RpcLog) => formatLog(log));
}

async getLogs(
Expand Down Expand Up @@ -491,6 +513,8 @@ export class EvmProvider extends BaseProvider {
});

for (const log of events) {
if (log.blockNumber === null) continue;

if (!this.logsCache.has(log.blockNumber)) {
this.logsCache.set(log.blockNumber, []);
}
Expand All @@ -499,14 +523,14 @@ export class EvmProvider extends BaseProvider {
}

return events.map(log => ({
blockNumber: log.blockNumber,
blockNumber: Number(log.blockNumber),
contractAddress: log.address
}));
}

getEventHash(eventName: string) {
if (!this.sourceHashes.has(eventName)) {
this.sourceHashes.set(eventName, keccak256(toUtf8Bytes(eventName)));
this.sourceHashes.set(eventName, keccak256(stringToBytes(eventName)));
}

return this.sourceHashes.get(eventName) as string;
Expand Down
18 changes: 13 additions & 5 deletions src/providers/evm/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { LogDescription } from '@ethersproject/abi';
import { Log, Provider } from '@ethersproject/providers';
import {
Abi,
ContractEventName,
GetBlockReturnType,
Log,
ParseEventLogsReturnType
} from 'viem';
import { BaseWriterParams } from '../../types';

export class CustomJsonRpcError extends Error {
Expand All @@ -22,13 +27,16 @@ export type EventsData = {
events: Record<string, Log[]>;
};

export type Block = Awaited<ReturnType<Provider['getBlock']>>;
export type Block = GetBlockReturnType;

export type Writer = (
export type Writer<
WriterAbi extends Abi = any,
EventName extends ContractEventName<WriterAbi> = any
> = (
args: {
txId: string;
block: Block | null;
rawEvent?: Log;
event?: LogDescription;
event?: ParseEventLogsReturnType<WriterAbi, EventName, true>[number];
} & BaseWriterParams
) => Promise<void>;
Loading