diff --git a/Gemfile.lock b/Gemfile.lock index 3b0883b..1093461 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -115,7 +115,7 @@ GEM tzinfo coderay (1.1.3) concurrent-ruby (1.3.3) - connection_pool (2.4.1) + connection_pool (3.0.2) crass (1.0.6) csv (3.3.0) date (3.4.1) @@ -171,8 +171,8 @@ GEM multi_xml (0.7.1) bigdecimal (~> 3.1) mutex_m (0.3.0) - net-http-persistent (4.0.2) - connection_pool (~> 2.2) + net-http-persistent (4.0.8) + connection_pool (>= 2.2.4, < 4) net-imap (0.5.8) date net-protocol diff --git a/app/services/eth_block_importer.rb b/app/services/eth_block_importer.rb index 57b2403..644cde6 100644 --- a/app/services/eth_block_importer.rb +++ b/app/services/eth_block_importer.rb @@ -1,5 +1,3 @@ -require 'l1_rpc_prefetcher' - class EthBlockImporter include SysConfig include Memery @@ -15,7 +13,7 @@ def initialize @facet_block_cache = {} @eth_block_cache = {} - @ethereum_client ||= EthRpcClient.new(ENV.fetch('L1_RPC_URL')) + @ethereum_client ||= EthRpcClient.l1 @geth_driver = GethDriver @@ -26,7 +24,14 @@ def initialize set_eth_block_starting_points populate_facet_block_cache - @prefetcher = L1RpcPrefetcher.new(ethereum_client: @ethereum_client) + @prefetcher = L1RpcPrefetcher.new(ethereum_client: EthRpcClient.l1_prefetch) + + unless Rails.env.test? + max_block = current_max_eth_block_number + if max_block && max_block > 0 + @prefetcher.ensure_prefetched(max_block + 1) + end + end end def current_max_facet_block_number @@ -271,17 +276,17 @@ def import_single_block(block_number) start = Time.current # Fetch block data from prefetcher - response = prefetcher.fetch(block_number) + begin + response = prefetcher.fetch(block_number) + rescue L1RpcPrefetcher::BlockFetchError => e + raise BlockNotReadyToImportError.new(e.message) + end - # Handle cancellation, fetch failure, or block not ready + # Handle cancellation or fetch failure if response.nil? raise BlockNotReadyToImportError.new("Block #{block_number} fetch was cancelled or failed") end - if response[:error] == :not_ready - raise BlockNotReadyToImportError.new("Block #{block_number} not yet available on L1") - end - eth_block = response[:eth_block] facet_block = response[:facet_block] facet_txs = response[:facet_txs] @@ -329,9 +334,6 @@ def import_blocks(block_numbers) def import_next_block block_number = next_block_to_import - - prefetcher.ensure_prefetched(block_number) - import_single_block(block_number) end diff --git a/config/derive_facet_blocks.rb b/config/derive_facet_blocks.rb index 0efdb3e..b778c28 100644 --- a/config/derive_facet_blocks.rb +++ b/config/derive_facet_blocks.rb @@ -83,19 +83,24 @@ module Clockwork end end - every(6.seconds, 'import_blocks_until_done') do + every(2.seconds, 'import_blocks_until_done') do importer = EthBlockImporter.new - loop do - begin - importer.import_blocks_until_done - rescue EthBlockImporter::ReorgDetectedError - Rails.logger.warn 'Reorg detected – reinitialising EthBlockImporter' - importer = EthBlockImporter.new - retry - end + begin + loop do + begin + importer.import_blocks_until_done + rescue EthBlockImporter::ReorgDetectedError + Rails.logger.warn 'Reorg detected – reinitialising EthBlockImporter' + importer.shutdown + importer = EthBlockImporter.new + retry + end - sleep 6 + sleep 6 + end + ensure + importer&.shutdown end end end diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 88fb1ed..4582d59 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -21,7 +21,7 @@ services: start_period: 10s node: - image: ghcr.io/0xfacet/facet-node:v2.0.1 + image: ghcr.io/0xfacet/facet-node:v2.0.2 environment: JWT_SECRET: ${JWT_SECRET} L1_NETWORK: ${L1_NETWORK} diff --git a/lib/eth_rpc_client.rb b/lib/eth_rpc_client.rb index 3f550fd..f5ace26 100644 --- a/lib/eth_rpc_client.rb +++ b/lib/eth_rpc_client.rb @@ -1,7 +1,9 @@ class EthRpcClient + include Memery + class HttpError < StandardError attr_reader :code, :http_message - + def initialize(code, http_message) @code = code @http_message = http_message @@ -9,21 +11,67 @@ def initialize(code, http_message) end end class ApiError < StandardError; end + class ExecutionRevertedError < StandardError; end class MethodRequiredError < StandardError; end - attr_accessor :base_url + attr_accessor :base_url, :http - def initialize(base_url = ENV['L1_RPC_URL']) + def initialize(base_url = ENV['L1_RPC_URL'], jwt_secret: nil, retry_config: {}) self.base_url = base_url + @request_id = 0 + @mutex = Mutex.new + + # JWT support (optional) + @jwt_secret = jwt_secret + @jwt_enabled = !jwt_secret.nil? + + if @jwt_enabled + @jwt_secret_decoded = ByteString.from_hex(jwt_secret).to_bin + end + + # Customizable retry configuration + @retry_config = { + tries: 7, + base_interval: 1, + max_interval: 32, + multiplier: 2, + rand_factor: 0.4 + }.merge(retry_config) + + # HTTP persistent connection pool + @uri = URI(base_url) + @http = Net::HTTP::Persistent.new( + name: "eth_rpc_#{@uri.host}:#{@uri.port}", + pool_size: 100 + ) + @http.open_timeout = 5 # 5 seconds to establish connection + @http.read_timeout = 10 # 10 seconds - allows multiple retries within prefetch timeout + @http.idle_timeout = 30 # Keep connections alive for 30 seconds end - + def self.l1 @_l1_client ||= new(ENV.fetch('L1_RPC_URL')) end + def self.l1_prefetch + # Fewer retries with shorter intervals for prefetching + @_l1_prefetch_client ||= new( + ENV.fetch('L1_RPC_URL'), + retry_config: { tries: 3, base_interval: 1, max_interval: 4 } + ) + end + def self.l2 @_l2_client ||= new(ENV.fetch('NON_AUTH_GETH_RPC_URL')) end + def self.l2_engine + @_l2_engine_client ||= new( + ENV.fetch('GETH_RPC_URL'), + jwt_secret: ENV.fetch('JWT_SECRET'), + retry_config: { tries: 5, base_interval: 0.5, max_interval: 4 } + ) + end + def get_block(block_number, include_txs = false) if block_number.is_a?(String) return query_api( @@ -31,24 +79,24 @@ def get_block(block_number, include_txs = false) params: [block_number, include_txs] ) end - + query_api( method: 'eth_getBlockByNumber', params: ['0x' + block_number.to_s(16), include_txs] ) end - + def get_nonce(address, block_number = "latest") query_api( method: 'eth_getTransactionCount', params: [address, block_number] ).to_i(16) end - + def get_chain_id query_api(method: 'eth_chainId').to_i(16) end - + def trace_block(block_number) query_api( method: 'debug_traceBlockByNumber', @@ -66,14 +114,14 @@ def trace_transaction(transaction_hash) def trace(tx_hash) trace_transaction(tx_hash) end - + def get_transaction(transaction_hash) query_api( method: 'eth_getTransactionByHash', params: [transaction_hash] ) end - + def get_transaction_receipts(block_number) if block_number.is_a?(String) return query_api( @@ -81,24 +129,24 @@ def get_transaction_receipts(block_number) params: [block_number] ) end - + query_api( method: 'eth_getBlockReceipts', params: ["0x" + block_number.to_s(16)] ) end - + def get_block_receipts(block_number) get_transaction_receipts(block_number) end - + def get_transaction_receipt(transaction_hash) query_api( method: 'eth_getTransactionReceipt', params: [transaction_hash] ) end - + def get_block_number query_api(method: 'eth_blockNumber').to_i(16) end @@ -108,65 +156,59 @@ def query_api(method = nil, params = [], **kwargs) method = kwargs[:method] params = kwargs[:params] end - + unless method raise MethodRequiredError, "Method is required" end - + data = { - id: 1, + id: next_request_id, jsonrpc: "2.0", method: method, params: params } - url = base_url - Retriable.retriable( - tries: 7, - base_interval: 1, - max_interval: 32, - multiplier: 2, - rand_factor: 0.4, - on: [Net::ReadTimeout, Net::OpenTimeout, HttpError, ApiError], + tries: @retry_config[:tries], + base_interval: @retry_config[:base_interval], + max_interval: @retry_config[:max_interval], + multiplier: @retry_config[:multiplier], + rand_factor: @retry_config[:rand_factor], + on: [Net::ReadTimeout, Net::OpenTimeout, HttpError, ApiError, Errno::EPIPE, EOFError, Errno::ECONNREFUSED], on_retry: ->(exception, try, elapsed_time, next_interval) { - Rails.logger.info "Retrying #{method} (attempt #{try}, next delay: #{next_interval.round(2)}s) - #{exception.message}" + Rails.logger.info "Retrying #{method} (attempt #{try}, next delay: #{next_interval&.round(2)}s) - #{exception.message}" } ) do - response = HTTParty.post(url, body: data.to_json, headers: headers) - - if response.code != 200 - raise HttpError.new(response.code, response.message) - end - - parsed_response = JSON.parse(response.body, max_nesting: false) - - if parsed_response['error'] - raise ApiError, "API error: #{parsed_response.dig('error', 'message') || 'Unknown API error'}" - end - - parsed_response['result'] + send_http_request_simple(data) end end def call(method, params = []) query_api(method: method, params: params) end - + def eth_call(to:, data:, block_number: "latest") query_api( method: 'eth_call', params: [{ to: to, data: data }, block_number] ) end - + def headers - { + h = { 'Accept' => 'application/json', 'Content-Type' => 'application/json' } + h['Authorization'] = "Bearer #{jwt}" if @jwt_enabled + h + end + + def jwt + return nil unless @jwt_enabled + JWT.encode({ iat: Time.now.to_i }, @jwt_secret_decoded, 'HS256') end - + memoize :jwt, ttl: 55 + def get_code(address, block_number = "latest") query_api( method: 'eth_getCode', @@ -180,4 +222,41 @@ def get_storage_at(address, slot, block_number = "latest") params: [address, slot, block_number] ) end + + private + + def send_http_request_simple(data) + request = Net::HTTP::Post.new(@uri) + request.body = data.to_json + headers.each { |key, value| request[key] = value } + + response = @http.request(@uri, request) + + if response.code.to_i != 200 + raise HttpError.new(response.code.to_i, response.message) + end + + parse_response_and_handle_errors(response.body) + end + + def parse_response_and_handle_errors(response_text) + parsed_response = JSON.parse(response_text, max_nesting: false) + + if parsed_response['error'] + error_message = parsed_response.dig('error', 'message') || 'Unknown API error' + + # Don't retry execution reverted errors as they're deterministic failures + if error_message.include?('execution reverted') + raise ExecutionRevertedError, "API error: #{error_message}" + end + + raise ApiError, "API error: #{error_message}" + end + + parsed_response['result'] + end + + def next_request_id + @mutex.synchronize { @request_id += 1 } + end end diff --git a/lib/fct_mint_simulator.rb b/lib/fct_mint_simulator.rb index 65ca355..1fb58f0 100644 --- a/lib/fct_mint_simulator.rb +++ b/lib/fct_mint_simulator.rb @@ -42,7 +42,7 @@ def initialize(initial_state: nil) @current_l2_block_number = nil # We'll fetch L1 blocks directly - @l1_client = EthRpcClient.new(ENV.fetch('L1_RPC_URL')) + @l1_client = EthRpcClient.l1 if initial_state # Use provided initial state diff --git a/lib/l1_rpc_prefetcher.rb b/lib/l1_rpc_prefetcher.rb index 004c79d..896c396 100644 --- a/lib/l1_rpc_prefetcher.rb +++ b/lib/l1_rpc_prefetcher.rb @@ -1,35 +1,43 @@ -require 'concurrent' -require 'retriable' - class L1RpcPrefetcher + include Memery + class BlockFetchError < StandardError; end + def initialize(ethereum_client:, ahead: ENV.fetch('L1_PREFETCH_FORWARD', Rails.env.test? ? 5 : 20).to_i, threads: ENV.fetch('L1_PREFETCH_THREADS', Rails.env.test? ? 2 : 2).to_i) @eth = ethereum_client @ahead = ahead @threads = threads + # 30s default - long enough for slow RPC calls, short enough to bail on stuck requests + @fetch_timeout = ENV.fetch('L1_PREFETCH_TIMEOUT', Rails.env.test? ? 5 : 30).to_i # Thread-safe collections and pool @pool = Concurrent::FixedThreadPool.new(threads) @promises = Concurrent::Map.new + @last_chain_tip = current_l1_block_number Rails.logger.info "L1RpcPrefetcher initialized with #{threads} threads" end def ensure_prefetched(from_block) - to_block = from_block + @ahead + distance_from_last_tip = @last_chain_tip - from_block + latest = if distance_from_last_tip > 10 + cached_l1_block_number + else + current_l1_block_number + end + + # Don't prefetch beyond chain tip + to_block = [from_block + @ahead, latest].min + # Only create promises for blocks we don't have yet blocks_to_fetch = (from_block..to_block).reject { |n| @promises.key?(n) } return if blocks_to_fetch.empty? - # Only enqueue a reasonable number at once to avoid overwhelming the promise system - max_to_enqueue = [@threads * 10, 50].min - - to_enqueue = blocks_to_fetch.first(max_to_enqueue) - Rails.logger.debug "Enqueueing #{to_enqueue.size} of #{blocks_to_fetch.size} blocks: #{to_enqueue.first}..#{to_enqueue.last}" + Rails.logger.debug "Enqueueing #{blocks_to_fetch.size} blocks: #{blocks_to_fetch.first}..#{blocks_to_fetch.last}" - to_enqueue.each { |block_number| enqueue_single(block_number) } + blocks_to_fetch.each { |block_number| enqueue_single(block_number) } end def fetch(block_number) @@ -38,26 +46,22 @@ def fetch(block_number) # Get or create promise promise = @promises[block_number] || enqueue_single(block_number) - # Wait for result - if it's already done, this returns immediately - timeout = Rails.env.test? ? 5 : 30 - - Rails.logger.debug "Fetching block #{block_number}, promise state: #{promise.state}" - + # Wait for result - value! returns nil on timeout, raises on rejection begin - result = promise.value!(timeout) - Rails.logger.debug "Got result for block #{block_number}" - - # Clean up :not_ready promises so they can be retried - if result[:error] == :not_ready - @promises.delete(block_number) - end + result = promise.value!(@fetch_timeout) + rescue => e + raise BlockFetchError.new("Block #{block_number} fetch failed: #{e.message}") + end - result - rescue Concurrent::TimeoutError => e - Rails.logger.error "Timeout fetching block #{block_number} after #{timeout}s" + if result.nil? || result == :not_ready_sentinel @promises.delete(block_number) - raise + message = result.nil? ? + "Block #{block_number} fetch timed out after #{@fetch_timeout}s" : + "Block #{block_number} not yet available on L1" + raise BlockFetchError.new(message) end + + result end def clear_older_than(min_keep) @@ -99,26 +103,37 @@ def stats def shutdown @pool.shutdown - if @pool.wait_for_termination(30) - Rails.logger.info "L1 RPC Prefetcher thread pool shut down successfully" - else - Rails.logger.warn "L1 RPC Prefetcher shutdown timed out, forcing kill" - @pool.kill + terminated = @pool.wait_for_termination(3) + @pool.kill unless terminated + @promises.each_pair do |_, promise| + begin + if promise.pending? && promise.respond_to?(:cancel) + promise.cancel + end + rescue StandardError => e + Rails.logger.warn "Failed cancelling promise during shutdown: #{e.message}" + end end + @promises.clear + Rails.logger.info( + terminated ? + 'L1 RPC Prefetcher thread pool shut down successfully' : + 'L1 RPC Prefetcher shutdown timed out after 3s, pool killed' + ) + terminated + rescue StandardError => e + Rails.logger.error("Error during L1RpcPrefetcher shutdown: #{e.message}\n#{e.backtrace.join("\n")}") + false end private def enqueue_single(block_number) @promises.compute_if_absent(block_number) do - Rails.logger.debug "Creating promise for block #{block_number}" - Concurrent::Promise.execute(executor: @pool) do - Rails.logger.debug "Executing fetch for block #{block_number}" fetch_job(block_number) end.rescue do |e| - Rails.logger.error "Prefetch failed for block #{block_number}: #{e.message}" - # Clean up failed promise so it can be retried + Rails.logger.error "[PREFETCH] Block #{block_number}: #{e.class} - #{e.message}" @promises.delete(block_number) raise e end @@ -126,29 +141,33 @@ def enqueue_single(block_number) end def fetch_job(block_number) - # Use shared persistent client (thread-safe with HTTParty) - client = @eth + block = @eth.get_block(block_number, true) - Retriable.retriable(tries: 3, base_interval: 1, max_interval: 4) do - block = client.get_block(block_number, true) + # Handle case where block doesn't exist yet (normal when caught up) + if block.nil? + Rails.logger.info "[PREFETCH] Block #{block_number} not yet available on L1" + return :not_ready_sentinel + end - # Handle case where block doesn't exist yet (normal when caught up) - if block.nil? - Rails.logger.debug "Block #{block_number} not yet available on L1" - return { error: :not_ready, block_number: block_number } - end + receipts = @eth.get_transaction_receipts(block_number) - receipts = client.get_transaction_receipts(block_number) + eth_block = EthBlock.from_rpc_result(block) + facet_block = FacetBlock.from_eth_block(eth_block) + facet_txs = EthTransaction.facet_txs_from_rpc_results(block, receipts) - eth_block = EthBlock.from_rpc_result(block) - facet_block = FacetBlock.from_eth_block(eth_block) - facet_txs = EthTransaction.facet_txs_from_rpc_results(block, receipts) + { + eth_block: eth_block, + facet_block: facet_block, + facet_txs: facet_txs + } + end - { - eth_block: eth_block, - facet_block: facet_block, - facet_txs: facet_txs - } - end + def current_l1_block_number + @last_chain_tip = @eth.get_block_number + end + + def cached_l1_block_number + current_l1_block_number end -end \ No newline at end of file + memoize :cached_l1_block_number, ttl: 12.seconds +end diff --git a/spec/l1_rpc_prefetcher_spec.rb b/spec/l1_rpc_prefetcher_spec.rb index eae0b91..a7705a1 100644 --- a/spec/l1_rpc_prefetcher_spec.rb +++ b/spec/l1_rpc_prefetcher_spec.rb @@ -7,8 +7,10 @@ before do allow(ethereum_client).to receive(:base_url).and_return('http://test.com') + allow(ethereum_client).to receive(:get_block_number).and_return(100) allow(Rails.logger).to receive(:debug) allow(Rails.logger).to receive(:info) + allow(Rails.logger).to receive(:warn) allow(Rails.logger).to receive(:error) end @@ -36,6 +38,23 @@ expect(ethereum_client).to receive(:get_transaction_receipts).with(1).and_return(receipts_data) prefetcher.fetch(1) end + + it 'raises BlockFetchError when block is not ready' do + allow(ethereum_client).to receive(:get_block).and_return(nil) + expect { prefetcher.fetch(1) }.to raise_error(L1RpcPrefetcher::BlockFetchError, /not yet available/) + end + + it 'does not prefetch beyond chain tip' do + allow(ethereum_client).to receive(:get_block_number).and_return(5) + allow(ethereum_client).to receive(:get_block).and_return(block_data) + allow(ethereum_client).to receive(:get_transaction_receipts).and_return(receipts_data) + + prefetcher.ensure_prefetched(1) + sleep(0.1) # Let promises start + + promises = prefetcher.instance_variable_get(:@promises) + expect(promises.keys.sort).to eq([1, 2, 3, 4, 5]) + end end describe '#stats' do