From b3b6bb87983823efaf41aca3b62ef1b5e09ecf45 Mon Sep 17 00:00:00 2001 From: Nina Date: Wed, 25 Mar 2026 10:20:39 +0000 Subject: [PATCH 1/4] init --- Cargo.lock | 123 ++++++++++ Cargo.toml | 1 + crates/common/src/config.rs | 16 +- .../common/src/validator_preferences/mod.rs | 2 - crates/relay/Cargo.toml | 1 + crates/relay/src/auctioneer/submit_block.rs | 6 +- crates/relay/src/auctioneer/validation.rs | 10 +- crates/relay/src/data_gatherer/clickhouse.rs | 64 +++++ crates/relay/src/data_gatherer/mod.rs | 5 + crates/relay/src/data_gatherer/s3.rs | 81 +++++++ crates/relay/src/data_gatherer/tile.rs | 228 ++++++++++++++++++ crates/relay/src/lib.rs | 4 +- crates/relay/src/main.rs | 25 +- crates/relay/src/spine/messages.rs | 13 + crates/relay/src/spine/mod.rs | 3 + crates/relay/src/tcp_bid_recv/mod.rs | 2 - crates/relay/src/tcp_bid_recv/s3.rs | 96 -------- crates/simulator/src/block_merging/types.rs | 3 +- crates/types/src/hydration.rs | 1 - 19 files changed, 562 insertions(+), 122 deletions(-) create mode 100644 crates/relay/src/data_gatherer/clickhouse.rs create mode 100644 crates/relay/src/data_gatherer/mod.rs create mode 100644 crates/relay/src/data_gatherer/s3.rs create mode 100644 crates/relay/src/data_gatherer/tile.rs delete mode 100644 crates/relay/src/tcp_bid_recv/s3.rs diff --git a/Cargo.lock b/Cargo.lock index 986b7f94d..c20bf39a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2419,6 +2419,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "bnum" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" + [[package]] name = "borsh" version = "1.6.0" @@ -2482,6 +2488,15 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bstr" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -2744,6 +2759,12 @@ dependencies = [ "inout", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + [[package]] name = "clang-sys" version = "1.8.1" @@ -2795,6 +2816,53 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "clickhouse" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d975a05171c6f8a453f60ec6287c0018c90911d5a8a46d9b6abe386ea359fab3" +dependencies = [ + "bnum", + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-macros", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "lz4_flex", + "polonius-the-crab", + "serde", + "thiserror 2.0.17", + "tokio", + "url", +] + +[[package]] +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.111", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "358fbfd439fb0bed02a3e2ecc5131f6a9d039ba5639aed650cf0e845f6ebfc16" +dependencies = [ + "bytes", + "thiserror 2.0.17", +] + [[package]] name = "coins-bip32" version = "0.8.7" @@ -5663,6 +5731,7 @@ dependencies = [ "bincode", "bytes", "chrono", + "clickhouse", "crossbeam-channel", "dashmap 5.5.3", "deadpool-postgres", @@ -5884,6 +5953,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "higher-kinded-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776" +dependencies = [ + "macro_rules_attribute", + "never-say-never", + "paste", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -7192,6 +7272,22 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "macro_rules_attribute" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" +dependencies = [ + "macro_rules_attribute-proc_macro", + "paste", +] + +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + [[package]] name = "maplit" version = "1.0.2" @@ -7595,6 +7691,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never-say-never" +version = "6.6.666" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -8594,6 +8696,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "polonius-the-crab" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "polyval" version = "0.6.2" @@ -12961,6 +13073,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "serde_json" version = "1.0.145" diff --git a/Cargo.toml b/Cargo.toml index ea2aaf3e9..68eb3a677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ ethereum_ssz_derive = "0.9" ethers = "2.0.14" eyre = "0.6.12" aws-sdk-s3 = "=1.35.0" +clickhouse = "0.14.2" flate2 = "1.0" flux = { git = "https://github.com/gattaca-com/flux", rev = "732c996c13f8fb5e2988dbf949e1d8a74261fca4" } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 8b8a710cc..eab565b33 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -43,10 +43,8 @@ pub struct RelayConfig { /// Configuration for block merging parameters. #[serde(default)] pub block_merging_config: BlockMergingConfig, - #[serde(default)] pub primev_config: Option, pub discord_webhook_url: Option, - #[serde(default)] pub alerts_config: Option, pub inclusion_list: Option, pub is_submission_instance: bool, @@ -63,10 +61,17 @@ pub struct RelayConfig { pub tcp_port: u16, #[serde(default = "default_usize::<512>")] pub tcp_max_connections: usize, - #[serde(default)] pub s3_config: Option, /// Directory for local cache snapshots (bincode). Enables fast startup. pub snapshot_dir: Option, + pub clickhouse: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ClickhouseConfig { + pub url: String, + pub database: String, + pub user: String, } impl RelayConfig { @@ -100,6 +105,7 @@ impl RelayConfig { decoder: vec![4], simulator: 5, top_bid: 1, + data_gatherer: 3, }, gossip_payload_on_header: false, api_port: 4040, @@ -107,6 +113,7 @@ impl RelayConfig { tcp_max_connections: 512, s3_config: None, snapshot_dir: None, + clickhouse: None, } } } @@ -153,6 +160,8 @@ pub struct CoresConfig { pub simulator: usize, #[serde(default)] pub top_bid: usize, + #[serde(default)] + pub data_gatherer: usize, } impl Default for WebsiteConfig { @@ -261,7 +270,6 @@ pub struct SimulatorConfig { #[serde(default = "default_usize::<32>")] pub max_concurrent_tasks: usize, /// If set, use the SSZ binary endpoint at this URL instead of JSON-RPC - #[serde(default)] pub ssz_url: Option, } diff --git a/crates/common/src/validator_preferences/mod.rs b/crates/common/src/validator_preferences/mod.rs index 3d9cf805e..ee42f9cd8 100644 --- a/crates/common/src/validator_preferences/mod.rs +++ b/crates/common/src/validator_preferences/mod.rs @@ -8,7 +8,6 @@ pub struct ValidatorPreferences { /// An optional list of BuilderIDs. If this is set, the relay will only accept /// submissions from builders whose public keys are linked to the IDs in this list. /// This allows for limiting submissions to a trusted set of builders. - #[serde(default)] pub trusted_builders: Option>, /// Allows validators to express a preference for whether a delay should be applied to get @@ -16,7 +15,6 @@ pub struct ValidatorPreferences { #[serde(default = "default_bool::")] pub header_delay: bool, - #[serde(default)] pub delay_ms: Option, #[serde(default)] diff --git a/crates/relay/Cargo.toml b/crates/relay/Cargo.toml index 6bd834923..950706b09 100644 --- a/crates/relay/Cargo.toml +++ b/crates/relay/Cargo.toml @@ -53,6 +53,7 @@ reqwest.workspace = true reqwest-eventsource.workspace = true rustc-hash.workspace = true bincode.workspace = true +clickhouse.workspace = true serde.workspace = true serde_json.workspace = true serial_test.workspace = true diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index 12a29a4da..e041965e9 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -22,7 +22,7 @@ use crate::{ simulator::{SimRequest, ValidationRequest, tile::ValidationResult}, spine::{ HelixSpineProducers, - messages::{ToSimKind, ToSimMsg}, + messages::{BidEvent, BidUpdate, ToSimKind, ToSimMsg}, }, }; @@ -217,7 +217,9 @@ impl Context { self.request_merged_block(producers); if need_send_result { - self.db.update_block_submission_live_ts(block_hash, Nanos::now().0); + let now = Nanos::now(); + producers.produce(BidUpdate { block_hash, event: BidEvent::Live(now) }); + self.db.update_block_submission_live_ts(block_hash, now.0); send_submission_result( producers, &self.future_results, diff --git a/crates/relay/src/auctioneer/validation.rs b/crates/relay/src/auctioneer/validation.rs index 09de7c7ed..1045da71b 100644 --- a/crates/relay/src/auctioneer/validation.rs +++ b/crates/relay/src/auctioneer/validation.rs @@ -36,11 +36,11 @@ impl Context { }); }; - if let helix_types::Submission::Dehydrated(ref dehydrated) = *submission - && !self.hydration_cache.can_hydrate(dehydrated, self.chain_info.max_blobs_per_block()) - { - return Err(BlockValidationError::CannotHydrate); - } + if let helix_types::Submission::Dehydrated(ref dehydrated) = *submission && + !self.hydration_cache.can_hydrate(dehydrated, self.chain_info.max_blobs_per_block()) + { + return Err(BlockValidationError::CannotHydrate); + } self.staleness_check(submission.builder_pubkey(), submission_data.version)?; self.validate_submission_data( diff --git a/crates/relay/src/data_gatherer/clickhouse.rs b/crates/relay/src/data_gatherer/clickhouse.rs new file mode 100644 index 000000000..4e378b0cb --- /dev/null +++ b/crates/relay/src/data_gatherer/clickhouse.rs @@ -0,0 +1,64 @@ +#![allow(clippy::future_not_send)] + +use std::time::Duration; + +use helix_common::{config::ClickhouseConfig, expect_env_var}; +use tracing::{error, info}; + +const TABLE: &str = "relay_bid_submission_data"; +const ENV_CLICKHOUSE_PASSWORD: &str = "CLICKHOUSE_PASSWORD"; + +#[derive(clickhouse::Row, serde::Serialize)] +pub struct BlockInfoRow { + pub instance_id: String, + pub slot: u64, + pub is_dehydrated: bool, + pub block_hash: String, + pub received_ns: i64, + pub read_body_ns: i64, + pub decoded_ns: Option, + pub live_ns: Option, + pub top_bid_ns: Option, + pub builder_pubkey: String, +} + +pub struct ClickhouseData { + client: clickhouse::Client, +} + +impl ClickhouseData { + pub fn new(config: &ClickhouseConfig) -> Self { + let password = expect_env_var(ENV_CLICKHOUSE_PASSWORD); + let client = clickhouse::Client::default() + .with_url(&config.url) + .with_database(&config.database) + .with_user(&config.user) + .with_password(password); + Self { client } + } + + pub async fn publish(&mut self, rows: impl Iterator) { + match self.insert_rows(rows).await { + Ok(len) => info!("inserted {len} rows to {TABLE}"), + Err(err) => error!(?err, "failed to insert rows to {TABLE}"), + } + } + + async fn insert_rows( + &self, + rows: impl Iterator, + ) -> Result { + let mut insert = self + .client + .insert::(TABLE) + .await? + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))); + let mut len = 0; + for row in rows { + insert.write(&row).await?; + len += 1; + } + insert.end().await?; + Ok(len) + } +} diff --git a/crates/relay/src/data_gatherer/mod.rs b/crates/relay/src/data_gatherer/mod.rs new file mode 100644 index 000000000..5e3a5feec --- /dev/null +++ b/crates/relay/src/data_gatherer/mod.rs @@ -0,0 +1,5 @@ +pub use tile::DataGatherer; + +mod clickhouse; +mod s3; +mod tile; diff --git a/crates/relay/src/data_gatherer/s3.rs b/crates/relay/src/data_gatherer/s3.rs new file mode 100644 index 000000000..3403eb51d --- /dev/null +++ b/crates/relay/src/data_gatherer/s3.rs @@ -0,0 +1,81 @@ +use aws_sdk_s3::{ + Client, + config::{BehaviorVersion, Credentials, Region}, + primitives::ByteStream, +}; +use bytes::Bytes; +use chrono::Utc; +use helix_common::{S3Config, expect_env_var}; +use uuid::Uuid; + +use crate::auctioneer::InternalBidSubmissionHeader; + +const ENV_ACCESS_KEY_ID: &str = "S3_ACCESS_KEY_ID"; +const ENV_SECRET_ACCESS_KEY: &str = "S3_SECRET_ACCESS_KEY"; + +pub struct S3Data { + client: Client, + bucket: String, + pending: Vec<(Uuid, Bytes)>, +} + +impl S3Data { + pub fn new(config: S3Config) -> Self { + let access_key_id = expect_env_var(ENV_ACCESS_KEY_ID); + let secret_access_key = expect_env_var(ENV_SECRET_ACCESS_KEY); + + let creds = Credentials::new(&access_key_id, &secret_access_key, None, None, "env"); + let sdk_config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(creds) + .region(Region::new(config.region.clone())) + .build(); + let client = Client::from_conf(sdk_config); + + Self { client, bucket: config.bucket, pending: Vec::with_capacity(10_000) } + } + + pub fn push( + &mut self, + header: InternalBidSubmissionHeader, + payload: &[u8], + payload_offset: usize, + ) { + let id = header.id; + let header = header.to_bytes(); + let header_slice = header.as_slice(); + let header_len = header_slice.len() as u16; + + let payload = &payload[payload_offset..]; + // format: [u16 LE header_len][header bytes][payload bytes] + let mut buf = bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len()); + buf.extend_from_slice(&header_len.to_le_bytes()); + buf.extend_from_slice(header_slice); + buf.extend_from_slice(payload); + let bytes = buf.freeze(); + + self.pending.push((id, bytes)); + } + + pub async fn flush(&mut self) { + for (id, payload) in self.pending.drain(..) { + let key = Self::make_key(id); + if let Err(e) = self + .client + .put_object() + .bucket(self.bucket.clone()) + .key(&key) + .body(ByteStream::from(payload)) + .send() + .await + { + tracing::error!(%e, %key, "s3 upload failed"); + } + } + } + + fn make_key(id: Uuid) -> String { + let now = Utc::now().to_rfc3339(); + format!("{now}_{id}.bin") + } +} diff --git a/crates/relay/src/data_gatherer/tile.rs b/crates/relay/src/data_gatherer/tile.rs new file mode 100644 index 000000000..e140f6c45 --- /dev/null +++ b/crates/relay/src/data_gatherer/tile.rs @@ -0,0 +1,228 @@ +use std::sync::Arc; + +use alloy_primitives::B256; +use flux::{spine::SpineAdapter, tile::Tile, timing::InternalMessage}; +use flux_utils::SharedVector; +use helix_common::{ + S3Config, api::builder_api::TopBidUpdate, config::ClickhouseConfig, decoder::Encoding, + task::block_on, +}; +use helix_types::{BlsPublicKeyBytes, MergeType}; +use rustc_hash::FxHashMap; + +use crate::{ + HelixSpine, SubmissionDataWithSpan, + data_gatherer::{ + clickhouse::{BlockInfoRow, ClickhouseData}, + s3::S3Data, + }, + spine::messages::{BidEvent, BidUpdate, DecodedSubmission, NewBidSubmission}, +}; + +#[derive(Default)] +struct BlockInfo { + builder_pubkey: BlsPublicKeyBytes, + slot: u64, + is_dehydrated: bool, + received_ns: i64, + read_body_ns: i64, + decoded_ns: Option, + live_ns: Option, + top_bid_ns: Option, +} + +pub struct DataGatherer { + decoded: Arc>, + map: FxHashMap, + ch: Option, + s3: Option, + current_slot: u64, + instance_id: String, +} + +impl DataGatherer { + pub fn new( + decoded: Arc>, + instance_id: String, + ch_config: Option<&ClickhouseConfig>, + s3_config: Option, + ) -> Self { + Self { + decoded, + map: FxHashMap::with_capacity_and_hasher(5000, Default::default()), + ch: ch_config.map(ClickhouseData::new), + s3: s3_config.map(S3Data::new), + current_slot: 0, + instance_id, + } + } + + pub fn on_new_slot(&mut self, new_slot: u64) { + self.current_slot = new_slot; + + if let Some(ch) = self.ch.as_mut() { + let rows = self + .map + .extract_if(|_, v| v.slot < new_slot) + .map(|(hash, info)| Self::make_row(self.instance_id.clone(), hash, info)); + block_on(ch.publish(rows)); + } else { + self.map.retain(|_, v| v.slot >= new_slot); + } + + if let Some(s3) = self.s3.as_mut() { + block_on(s3.flush()); + } + } + + fn make_row(instance_id: String, hash: B256, info: BlockInfo) -> BlockInfoRow { + BlockInfoRow { + instance_id, + slot: info.slot, + block_hash: hash.to_string(), + is_dehydrated: info.is_dehydrated, + received_ns: info.received_ns, + read_body_ns: info.read_body_ns, + decoded_ns: info.decoded_ns, + live_ns: info.live_ns, + top_bid_ns: info.top_bid_ns, + builder_pubkey: info.builder_pubkey.to_string(), + } + } + + fn extract_block_hash_and_pubkey( + encoding: Encoding, + buf: &[u8], + has_mergeable_data: bool, + ) -> Option<(u64, B256, BlsPublicKeyBytes)> { + match encoding { + Encoding::Json => { + #[derive(serde::Deserialize)] + struct Outer { + submission: Bid, + } + #[derive(serde::Deserialize)] + struct Bid { + message: Message, + } + #[derive(serde::Deserialize)] + struct Message { + #[serde(with = "serde_utils::quoted_u64")] + slot: u64, + block_hash: B256, + builder_pubkey: BlsPublicKeyBytes, + } + + let bid: Bid = if has_mergeable_data { + serde_json::from_slice::(buf).ok()?.submission + } else { + serde_json::from_slice(buf).ok()? + }; + + Some((bid.message.slot, bid.message.block_hash, bid.message.builder_pubkey)) + } + Encoding::Ssz => { + const BLOCK_HASH_OFFSET: usize = 8 + /* slot */ + 32; /* parent_hash */ + const BUILDER_PUBKEY_OFFSET: usize = BLOCK_HASH_OFFSET + 32; /* block_hash */ + + if buf.len() < BUILDER_PUBKEY_OFFSET + BlsPublicKeyBytes::len_bytes() { + return None; + } + + let (slot, block_hash, builder_pubkey) = unsafe { + ( + core::ptr::read_unaligned(buf.as_ptr() as *const u64), + core::ptr::read_unaligned( + buf.as_ptr().add(BLOCK_HASH_OFFSET) as *const B256 + ), + core::ptr::read_unaligned( + buf.as_ptr().add(BUILDER_PUBKEY_OFFSET) as *const BlsPublicKeyBytes + ), + ) + }; + + Some((slot, block_hash, builder_pubkey)) + } + } + } +} + +impl Tile for DataGatherer { + fn loop_body(&mut self, adapter: &mut SpineAdapter) { + let mut max_slot = self.current_slot; + + adapter.consume_with_dcache_internal_message( + |bid: &InternalMessage, payload| { + if let Some(s3) = self.s3.as_mut() { + s3.push(bid.header, payload, bid.payload_offset); + } + + let is_mergeable = matches!(bid.header.merge_type, MergeType::Mergeable); + if let Some((slot, block_hash, builder_pubkey)) = + Self::extract_block_hash_and_pubkey(bid.header.encoding, payload, is_mergeable) + { + max_slot = max_slot.max(slot); + + let info = BlockInfo { + builder_pubkey, + slot, + is_dehydrated: bid.header.flags.is_dehydrated(), + received_ns: bid.trace.receive_ns.0 as i64, + read_body_ns: bid.trace.read_body_ns.0 as i64, + ..Default::default() + }; + + self.map.insert(block_hash, info); + } else { + tracing::error!( + "failed to extract builder_pubkey & block hash from submission with id {}", + bid.header.id + ); + } + }, + |_, _| {}, + ); + + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { + if let Some(bid) = self.decoded.get(msg.ix) { + max_slot = max_slot.max(bid.submission_data.bid_slot()); + if let Some(info) = self.map.get_mut(bid.submission_data.block_hash()) { + info.decoded_ns = Some(msg.ingestion_time().real().0 as i64); + } + } + }); + + adapter.consume(|msg: BidUpdate, _| { + if let Some(info) = self.map.get_mut(&msg.block_hash) { + let BidEvent::Live(nanos) = msg.event; + info.live_ns = Some(nanos.0 as i64); + } + }); + + adapter.consume(|msg: TopBidUpdate, _| { + max_slot = max_slot.max(msg.slot); + if let Some(info) = self.map.get_mut(&msg.block_hash) { + info.top_bid_ns = Some(msg.timestamp as i64); + } + }); + + if max_slot > self.current_slot { + self.on_new_slot(max_slot); + } + } + + fn teardown(mut self, adapter: &mut SpineAdapter) { + self.loop_body(adapter); + if let Some(ch) = self.ch.as_mut() { + let rows = self + .map + .into_iter() + .map(|(hash, info)| Self::make_row(self.instance_id.clone(), hash, info)); + block_on(ch.publish(rows)); + } + if let Some(s3) = self.s3.as_mut() { + block_on(s3.flush()); + } + } +} diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index 0df535580..fadfe07ee 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -2,6 +2,7 @@ mod api; mod auctioneer; mod beacon; mod bid_decoder; +mod data_gatherer; mod gossip; mod housekeeper; mod network; @@ -33,13 +34,14 @@ pub use crate::{ }, beacon::start_beacon_client, bid_decoder::{DecoderTile, SubmissionDataWithSpan}, + data_gatherer::DataGatherer, housekeeper::start_housekeeper, network::RelayNetworkManager, simulator::{SimRequest, SimResult}, spine::{HelixSpine, HelixSpineConfig, messages::NewBidSubmission}, tcp_bid_recv::{ BidSubmissionFlags, BidSubmissionHeader, BidSubmissionResponse, BidSubmissionTcpListener, - RegistrationMsg, S3PayloadSaver, + RegistrationMsg, }, website::WebsiteService, }; diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index e45a81bf0..2cce9d385 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -26,9 +26,9 @@ use helix_common::{ utils::{init_panic_hook, init_tracing_log}, }; use helix_relay::{ - Api, Auctioneer, AuctioneerHandle, BidSorter, BidSubmissionTcpListener, DbHandle, DecoderTile, - DefaultBidAdjustor, FutureBidSubmissionResult, HelixSpine, HelixSpineConfig, NewBidSubmission, - RegWorker, RegWorkerHandle, RelayNetworkManager, S3PayloadSaver, SimRequest, SimResult, + Api, Auctioneer, AuctioneerHandle, BidSorter, BidSubmissionTcpListener, DataGatherer, DbHandle, + DecoderTile, DefaultBidAdjustor, FutureBidSubmissionResult, HelixSpine, HelixSpineConfig, + NewBidSubmission, RegWorker, RegWorkerHandle, RelayNetworkManager, SimRequest, SimResult, SimulatorTile, SubmissionDataWithSpan, TopBidTile, WebsiteService, spawn_tokio_monitoring, start_admin_service, start_api_service, start_beacon_client, start_db_service, start_housekeeper, @@ -223,6 +223,20 @@ async fn run( } if config.is_submission_instance { + if config.clickhouse.is_some() || config.s3_config.is_some() { + let data_gatherer = DataGatherer::new( + decoded.clone(), + instance_id.clone(), + config.clickhouse.as_ref(), + config.s3_config.clone(), + ); + attach_tile( + data_gatherer, + spine, + TileConfig::new(config.cores.data_gatherer, ThreadPriority::OSDefault), + ); + } + for core in &config.cores.decoder { let decoder_tile = DecoderTile::new( local_cache.as_ref().clone(), @@ -236,11 +250,6 @@ async fn run( attach_tile(decoder_tile, spine, TileConfig::new(*core, ThreadPriority::OSDefault)); } - if let Some(cfg) = config.s3_config.clone() { - let s3_saver = S3PayloadSaver::new(cfg); - attach_tile(s3_saver, spine, TileConfig::background(None, None)); - } - let sock_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)); let block_submission_tcp_listener = BidSubmissionTcpListener::new( diff --git a/crates/relay/src/spine/messages.rs b/crates/relay/src/spine/messages.rs index c58c23f07..462fe97c3 100644 --- a/crates/relay/src/spine/messages.rs +++ b/crates/relay/src/spine/messages.rs @@ -1,3 +1,5 @@ +use alloy_primitives::B256; +use flux::timing::Nanos; use flux_utils::ArrayStr; use helix_common::SubmissionTrace; // Re-export as also used as spine message. @@ -87,3 +89,14 @@ pub enum ToSimKind { pub struct FromSimMsg { pub ix: usize, } + +#[derive(Debug, Clone, Copy)] +pub enum BidEvent { + Live(Nanos), +} + +#[derive(Debug, Clone, Copy)] +pub struct BidUpdate { + pub block_hash: B256, + pub event: BidEvent, +} diff --git a/crates/relay/src/spine/mod.rs b/crates/relay/src/spine/mod.rs index f8ce9ec89..5c0b98f7d 100644 --- a/crates/relay/src/spine/mod.rs +++ b/crates/relay/src/spine/mod.rs @@ -27,4 +27,7 @@ pub struct HelixSpine { /// Auctioneer → TopBidTile. #[queue(size(2usize.pow(16)))] pub top_bid: SpineQueue, + + #[queue(size(2usize.pow(16)))] + pub bid_update: SpineQueue, } diff --git a/crates/relay/src/tcp_bid_recv/mod.rs b/crates/relay/src/tcp_bid_recv/mod.rs index 074857beb..5591cb4e9 100644 --- a/crates/relay/src/tcp_bid_recv/mod.rs +++ b/crates/relay/src/tcp_bid_recv/mod.rs @@ -19,13 +19,11 @@ use crate::{ spine::messages::{NewBidSubmission, SubmissionResultWithRef}, }; -mod s3; pub mod types; pub use helix_tcp_types::{ BidSubmissionFlags, BidSubmissionHeader, BidSubmissionResponse, RegistrationMsg, }; -pub use s3::S3PayloadSaver; pub use crate::tcp_bid_recv::types::{ BidSubmissionError, response_from_bid_submission_error, response_from_submission_result, diff --git a/crates/relay/src/tcp_bid_recv/s3.rs b/crates/relay/src/tcp_bid_recv/s3.rs deleted file mode 100644 index 3af83f178..000000000 --- a/crates/relay/src/tcp_bid_recv/s3.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::sync::Arc; - -use aws_sdk_s3::{ - Client, - config::{BehaviorVersion, Credentials, Region}, - primitives::ByteStream, -}; -use bytes::Bytes; -use chrono::Utc; -use flux::tile::Tile; -use helix_common::S3Config; -use tokio::sync::mpsc; -use uuid::Uuid; - -use crate::{HelixSpine, spine::messages::NewBidSubmission}; - -const ENV_ACCESS_KEY_ID: &str = "S3_ACCESS_KEY_ID"; -const ENV_SECRET_ACCESS_KEY: &str = "S3_SECRET_ACCESS_KEY"; - -pub struct S3PayloadSaver { - tx: mpsc::Sender<(NewBidSubmission, Bytes)>, -} - -impl S3PayloadSaver { - pub fn new(config: S3Config) -> Self { - let access_key_id = std::env::var(ENV_ACCESS_KEY_ID) - .unwrap_or_else(|_| panic!("{ENV_ACCESS_KEY_ID} must be set")); - let secret_access_key = std::env::var(ENV_SECRET_ACCESS_KEY) - .unwrap_or_else(|_| panic!("{ENV_SECRET_ACCESS_KEY} must be set")); - - let creds = Credentials::new(&access_key_id, &secret_access_key, None, None, "env"); - let sdk_config = aws_sdk_s3::Config::builder() - .behavior_version(BehaviorVersion::latest()) - .credentials_provider(creds) - .region(Region::new(config.region.clone())) - .build(); - let client = Client::from_conf(sdk_config); - let bucket = Arc::::from(config.bucket.as_str()); - - let (tx, mut rx) = mpsc::channel::<(NewBidSubmission, Bytes)>(10_000); - tokio::spawn(async move { - while let Some((r, payload)) = rx.recv().await { - let key = make_key(r.header.id); - let client = client.clone(); - let bucket = Arc::clone(&bucket); - tokio::spawn(async move { - if let Err(e) = client - .put_object() - .bucket(bucket.as_ref()) - .key(&key) - .body(ByteStream::from(payload)) - .send() - .await - { - tracing::error!(%e, %key, "s3 upload failed"); - } - }); - } - }); - - Self { tx } - } -} - -impl Tile for S3PayloadSaver { - fn loop_body(&mut self, adapter: &mut flux::spine::SpineAdapter) { - adapter.consume_with_dcache( - |r: NewBidSubmission, full_payload| { - let header = r.header.to_bytes(); - let header_slice = header.as_slice(); - let header_len = header_slice.len() as u16; - let payload_offset = r.payload_offset; - - let payload = &full_payload[payload_offset..]; - - // format: [u16 LE header_len][header bytes][payload bytes] - let mut buf = - bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len()); - buf.extend_from_slice(&header_len.to_le_bytes()); - buf.extend_from_slice(header_slice); - buf.extend_from_slice(payload); - let bytes = buf.freeze(); - - if self.tx.try_send((r, bytes)).is_err() { - tracing::error!("s3 channel full, dropping payload"); - } - }, - |_, _| {}, - ); - } -} - -fn make_key(id: Uuid) -> String { - let now = Utc::now().to_rfc3339(); - format!("{now}_{id}.bin") -} diff --git a/crates/simulator/src/block_merging/types.rs b/crates/simulator/src/block_merging/types.rs index 1b276575f..baab83114 100644 --- a/crates/simulator/src/block_merging/types.rs +++ b/crates/simulator/src/block_merging/types.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use alloy_eips::{Decodable2718, eip2718::Eip2718Error}; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_rpc_types::{beacon::requests::ExecutionRequestsV4, engine::ExecutionPayloadV3}; +use helix_common::expect_env_var; use helix_types::{BuilderInclusionResult, MergedBlockTrace}; use reth_ethereum::{evm::EthEvmConfig, primitives::SignedTransaction, provider::ProviderError}; use reth_node_builder::ConfigureEvm; @@ -36,7 +37,7 @@ pub(crate) struct PrivateKeySigner( ); pub fn load_signer() -> PrivateKeySigner { - let signing_key_str = std::env::var("RELAY_KEY").expect("could not find RELAY_KEY in env"); + let signing_key_str = expect_env_var("RELAY_KEY"); let signing_key = signing_key_str .parse::() .expect("failed to parse RELAY_KEY"); diff --git a/crates/types/src/hydration.rs b/crates/types/src/hydration.rs index 48b6d5830..db8113546 100644 --- a/crates/types/src/hydration.rs +++ b/crates/types/src/hydration.rs @@ -515,5 +515,4 @@ pub enum HydrationError { #[error("too many blobs: blobs {blobs}, max {max}")] TooManyBlobs { blobs: usize, max: usize }, - } From 2606dbb2aa14d861b52cb5ce6a7ff2dee537cf04 Mon Sep 17 00:00:00 2001 From: Nina Date: Fri, 27 Mar 2026 12:04:51 +0000 Subject: [PATCH 2/4] local runtime --- crates/relay/src/data_gatherer/s3.rs | 27 ++++++++++------------ crates/relay/src/data_gatherer/tile.rs | 32 ++++++++++++++------------ 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/crates/relay/src/data_gatherer/s3.rs b/crates/relay/src/data_gatherer/s3.rs index 3403eb51d..c9b211e4d 100644 --- a/crates/relay/src/data_gatherer/s3.rs +++ b/crates/relay/src/data_gatherer/s3.rs @@ -1,9 +1,10 @@ +use std::future::Future; + use aws_sdk_s3::{ Client, config::{BehaviorVersion, Credentials, Region}, primitives::ByteStream, }; -use bytes::Bytes; use chrono::Utc; use helix_common::{S3Config, expect_env_var}; use uuid::Uuid; @@ -16,7 +17,6 @@ const ENV_SECRET_ACCESS_KEY: &str = "S3_SECRET_ACCESS_KEY"; pub struct S3Data { client: Client, bucket: String, - pending: Vec<(Uuid, Bytes)>, } impl S3Data { @@ -32,15 +32,15 @@ impl S3Data { .build(); let client = Client::from_conf(sdk_config); - Self { client, bucket: config.bucket, pending: Vec::with_capacity(10_000) } + Self { client, bucket: config.bucket } } - pub fn push( - &mut self, + pub fn upload_task( + &self, header: InternalBidSubmissionHeader, payload: &[u8], payload_offset: usize, - ) { + ) -> impl Future + Send + 'static { let id = header.id; let header = header.to_bytes(); let header_slice = header.as_slice(); @@ -54,18 +54,15 @@ impl S3Data { buf.extend_from_slice(payload); let bytes = buf.freeze(); - self.pending.push((id, bytes)); - } - - pub async fn flush(&mut self) { - for (id, payload) in self.pending.drain(..) { + let client = self.client.clone(); + let bucket = self.bucket.clone(); + async move { let key = Self::make_key(id); - if let Err(e) = self - .client + if let Err(e) = client .put_object() - .bucket(self.bucket.clone()) + .bucket(bucket) .key(&key) - .body(ByteStream::from(payload)) + .body(ByteStream::from(bytes)) .send() .await { diff --git a/crates/relay/src/data_gatherer/tile.rs b/crates/relay/src/data_gatherer/tile.rs index e140f6c45..2f7be17cd 100644 --- a/crates/relay/src/data_gatherer/tile.rs +++ b/crates/relay/src/data_gatherer/tile.rs @@ -1,11 +1,10 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use alloy_primitives::B256; use flux::{spine::SpineAdapter, tile::Tile, timing::InternalMessage}; use flux_utils::SharedVector; use helix_common::{ S3Config, api::builder_api::TopBidUpdate, config::ClickhouseConfig, decoder::Encoding, - task::block_on, }; use helix_types::{BlsPublicKeyBytes, MergeType}; use rustc_hash::FxHashMap; @@ -38,6 +37,7 @@ pub struct DataGatherer { s3: Option, current_slot: u64, instance_id: String, + rt: tokio::runtime::Runtime, } impl DataGatherer { @@ -47,6 +47,10 @@ impl DataGatherer { ch_config: Option<&ClickhouseConfig>, s3_config: Option, ) -> Self { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build data gatherer runtime"); Self { decoded, map: FxHashMap::with_capacity_and_hasher(5000, Default::default()), @@ -54,6 +58,7 @@ impl DataGatherer { s3: s3_config.map(S3Data::new), current_slot: 0, instance_id, + rt, } } @@ -65,14 +70,10 @@ impl DataGatherer { .map .extract_if(|_, v| v.slot < new_slot) .map(|(hash, info)| Self::make_row(self.instance_id.clone(), hash, info)); - block_on(ch.publish(rows)); + self.rt.block_on(ch.publish(rows)); } else { self.map.retain(|_, v| v.slot >= new_slot); } - - if let Some(s3) = self.s3.as_mut() { - block_on(s3.flush()); - } } fn make_row(instance_id: String, hash: B256, info: BlockInfo) -> BlockInfoRow { @@ -132,7 +133,7 @@ impl DataGatherer { let (slot, block_hash, builder_pubkey) = unsafe { ( - core::ptr::read_unaligned(buf.as_ptr() as *const u64), + u64::from_le_bytes(buf[0..8].try_into().unwrap()), core::ptr::read_unaligned( buf.as_ptr().add(BLOCK_HASH_OFFSET) as *const B256 ), @@ -154,8 +155,8 @@ impl Tile for DataGatherer { adapter.consume_with_dcache_internal_message( |bid: &InternalMessage, payload| { - if let Some(s3) = self.s3.as_mut() { - s3.push(bid.header, payload, bid.payload_offset); + if let Some(s3) = self.s3.as_ref() { + self.rt.spawn(s3.upload_task(bid.header, payload, bid.payload_offset)); } let is_mergeable = matches!(bid.header.merge_type, MergeType::Mergeable); @@ -210,19 +211,20 @@ impl Tile for DataGatherer { if max_slot > self.current_slot { self.on_new_slot(max_slot); } + + // drive spawned S3 upload tasks + self.rt.block_on(tokio::time::sleep(Duration::from_micros(500))); } fn teardown(mut self, adapter: &mut SpineAdapter) { self.loop_body(adapter); if let Some(ch) = self.ch.as_mut() { + let instance_id = self.instance_id.clone(); let rows = self .map .into_iter() - .map(|(hash, info)| Self::make_row(self.instance_id.clone(), hash, info)); - block_on(ch.publish(rows)); - } - if let Some(s3) = self.s3.as_mut() { - block_on(s3.flush()); + .map(move |(hash, info)| Self::make_row(instance_id.clone(), hash, info)); + self.rt.block_on(ch.publish(rows)); } } } From 15a752c98ce42814317b7b8e5bcbe7ec366571bf Mon Sep 17 00:00:00 2001 From: Nina Date: Fri, 27 Mar 2026 16:47:24 +0000 Subject: [PATCH 3/4] fixes --- crates/relay/src/auctioneer/get_header.rs | 2 +- crates/relay/src/auctioneer/submit_block.rs | 23 ++-- crates/relay/src/data_gatherer/clickhouse.rs | 89 +++++++++++++-- crates/relay/src/data_gatherer/s3.rs | 2 - crates/relay/src/data_gatherer/tile.rs | 113 +++++++------------ crates/relay/src/main.rs | 13 +-- crates/relay/src/spine/messages.rs | 3 +- 7 files changed, 143 insertions(+), 102 deletions(-) diff --git a/crates/relay/src/auctioneer/get_header.rs b/crates/relay/src/auctioneer/get_header.rs index 0fcc1bd68..4068c9abe 100644 --- a/crates/relay/src/auctioneer/get_header.rs +++ b/crates/relay/src/auctioneer/get_header.rs @@ -57,7 +57,7 @@ impl Context { .with_label_values(&[strategy]) .observe(start.elapsed().as_micros() as f64); - self.store_data(adjusted_bid.clone(), sim_request.is_optimistic); + self.store_data(adjusted_bid.clone(), sim_request.is_optimistic, producers); self.send_to_sim(sim_request, true, producers); if is_adjustable_slot { diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index e041965e9..5cdbe726c 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -122,7 +122,7 @@ impl Context { ); self.try_adjustments_dry_run(&entry, slot_data, producers); - self.store_data(entry, is_optimistic); + self.store_data(entry, is_optimistic, producers); self.try_merge_block(merging_data, producers); } @@ -164,7 +164,7 @@ impl Context { .with_label_values(&[strategy]) .observe(start.elapsed().as_micros()); - self.store_data(adjusted_block, sim_request.is_optimistic); + self.store_data(adjusted_block, sim_request.is_optimistic, producers); self.send_to_sim(sim_request, true, producers); } } @@ -217,9 +217,8 @@ impl Context { self.request_merged_block(producers); if need_send_result { - let now = Nanos::now(); - producers.produce(BidUpdate { block_hash, event: BidEvent::Live(now) }); - self.db.update_block_submission_live_ts(block_hash, now.0); + producers.produce(BidUpdate { block_hash, event: BidEvent::Live }); + self.db.update_block_submission_live_ts(block_hash, Nanos::now().0); send_submission_result( producers, &self.future_results, @@ -233,7 +232,12 @@ impl Context { need_send_result } - pub fn store_data(&mut self, entry: PayloadEntry, is_optimistic: bool) { + pub fn store_data( + &mut self, + entry: PayloadEntry, + is_optimistic: bool, + producers: &mut HelixSpineProducers, + ) { let block_hash = *entry.block_hash(); let is_adjusted = entry.is_adjusted(); @@ -245,7 +249,12 @@ impl Context { }; // For optimistic submissions the bid is live as soon as it is stored. // For non-optimistic, live_ts is updated when the simulation result arrives. - let live_ts = if is_optimistic { Some(Nanos::now().0) } else { None }; + let live_ts = if is_optimistic { + producers.produce(BidUpdate { block_hash, event: BidEvent::Live }); + Some(Nanos::now().0) + } else { + None + }; self.db.store_block_submission( s.signed_bid_submission.clone(), s.submission_trace, diff --git a/crates/relay/src/data_gatherer/clickhouse.rs b/crates/relay/src/data_gatherer/clickhouse.rs index 4e378b0cb..98a9cb1f0 100644 --- a/crates/relay/src/data_gatherer/clickhouse.rs +++ b/crates/relay/src/data_gatherer/clickhouse.rs @@ -1,16 +1,37 @@ #![allow(clippy::future_not_send)] -use std::time::Duration; +use std::{future::Future, time::Duration}; +use alloy_primitives::B256; +use flux_utils::ArrayStr; use helix_common::{config::ClickhouseConfig, expect_env_var}; +use helix_types::BlsPublicKeyBytes; +use rustc_hash::FxHashMap; use tracing::{error, info}; const TABLE: &str = "relay_bid_submission_data"; const ENV_CLICKHOUSE_PASSWORD: &str = "CLICKHOUSE_PASSWORD"; +fn serialize_str, S: serde::Serializer>(v: &T, s: S) -> Result { + s.serialize_str(v.as_ref()) +} + +#[derive(Default)] +pub struct BlockInfo { + pub builder_pubkey: BlsPublicKeyBytes, + pub slot: u64, + pub is_dehydrated: bool, + pub received_ns: i64, + pub read_body_ns: i64, + pub decoded_ns: Option, + pub live_ns: Option, + pub top_bid_ns: Option, +} + #[derive(clickhouse::Row, serde::Serialize)] pub struct BlockInfoRow { - pub instance_id: String, + #[serde(serialize_with = "serialize_str")] + pub instance_id: ArrayStr<64>, pub slot: u64, pub is_dehydrated: bool, pub block_hash: String, @@ -22,34 +43,80 @@ pub struct BlockInfoRow { pub builder_pubkey: String, } +impl BlockInfoRow { + pub fn from(instance_id: ArrayStr<64>, block_hash: B256, info: BlockInfo) -> Self { + BlockInfoRow { + instance_id, + slot: info.slot, + block_hash: block_hash.to_string(), + is_dehydrated: info.is_dehydrated, + received_ns: info.received_ns, + read_body_ns: info.read_body_ns, + decoded_ns: info.decoded_ns, + live_ns: info.live_ns, + top_bid_ns: info.top_bid_ns, + builder_pubkey: info.builder_pubkey.to_string(), + } + } +} + pub struct ClickhouseData { client: clickhouse::Client, + instance_id: ArrayStr<64>, + map: FxHashMap, } impl ClickhouseData { - pub fn new(config: &ClickhouseConfig) -> Self { + pub fn new(config: &ClickhouseConfig, instance_id: String) -> Self { let password = expect_env_var(ENV_CLICKHOUSE_PASSWORD); let client = clickhouse::Client::default() .with_url(&config.url) .with_database(&config.database) .with_user(&config.user) .with_password(password); - Self { client } + Self { + client, + instance_id: ArrayStr::from_str_truncate(&instance_id), + map: FxHashMap::with_capacity_and_hasher(5000, Default::default()), + } } - pub async fn publish(&mut self, rows: impl Iterator) { - match self.insert_rows(rows).await { - Ok(len) => info!("inserted {len} rows to {TABLE}"), - Err(err) => error!(?err, "failed to insert rows to {TABLE}"), + pub fn insert(&mut self, hash: B256, info: BlockInfo) { + self.map.insert(hash, info); + } + + pub fn get_mut(&mut self, hash: &B256) -> Option<&mut BlockInfo> { + self.map.get_mut(hash) + } + + pub fn publish_snapshot( + &mut self, + new_slot: u64, + ) -> Option + Send + 'static> { + if self.map.is_empty() { + return None; } + + let rows = self + .map + .extract_if(|_, v| v.slot < new_slot) + .map(|(hash, info)| BlockInfoRow::from(self.instance_id, hash, info)) + .collect::>(); + + let client = self.client.clone(); + Some(async move { + match Self::insert_rows(&client, rows.into_iter()).await { + Ok(len) => info!("inserted {len} rows to {TABLE}"), + Err(err) => error!(?err, "failed to insert rows to {TABLE}"), + } + }) } async fn insert_rows( - &self, + client: &clickhouse::Client, rows: impl Iterator, ) -> Result { - let mut insert = self - .client + let mut insert = client .insert::(TABLE) .await? .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))); diff --git a/crates/relay/src/data_gatherer/s3.rs b/crates/relay/src/data_gatherer/s3.rs index c9b211e4d..83894686d 100644 --- a/crates/relay/src/data_gatherer/s3.rs +++ b/crates/relay/src/data_gatherer/s3.rs @@ -39,14 +39,12 @@ impl S3Data { &self, header: InternalBidSubmissionHeader, payload: &[u8], - payload_offset: usize, ) -> impl Future + Send + 'static { let id = header.id; let header = header.to_bytes(); let header_slice = header.as_slice(); let header_len = header_slice.len() as u16; - let payload = &payload[payload_offset..]; // format: [u16 LE header_len][header bytes][payload bytes] let mut buf = bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len()); buf.extend_from_slice(&header_len.to_le_bytes()); diff --git a/crates/relay/src/data_gatherer/tile.rs b/crates/relay/src/data_gatherer/tile.rs index 2f7be17cd..52bcd433c 100644 --- a/crates/relay/src/data_gatherer/tile.rs +++ b/crates/relay/src/data_gatherer/tile.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration, u64}; use alloy_primitives::B256; use flux::{spine::SpineAdapter, tile::Tile, timing::InternalMessage}; @@ -7,36 +7,21 @@ use helix_common::{ S3Config, api::builder_api::TopBidUpdate, config::ClickhouseConfig, decoder::Encoding, }; use helix_types::{BlsPublicKeyBytes, MergeType}; -use rustc_hash::FxHashMap; use crate::{ HelixSpine, SubmissionDataWithSpan, data_gatherer::{ - clickhouse::{BlockInfoRow, ClickhouseData}, + clickhouse::{BlockInfo, ClickhouseData}, s3::S3Data, }, spine::messages::{BidEvent, BidUpdate, DecodedSubmission, NewBidSubmission}, }; -#[derive(Default)] -struct BlockInfo { - builder_pubkey: BlsPublicKeyBytes, - slot: u64, - is_dehydrated: bool, - received_ns: i64, - read_body_ns: i64, - decoded_ns: Option, - live_ns: Option, - top_bid_ns: Option, -} - pub struct DataGatherer { decoded: Arc>, - map: FxHashMap, ch: Option, s3: Option, current_slot: u64, - instance_id: String, rt: tokio::runtime::Runtime, } @@ -53,41 +38,19 @@ impl DataGatherer { .expect("failed to build data gatherer runtime"); Self { decoded, - map: FxHashMap::with_capacity_and_hasher(5000, Default::default()), - ch: ch_config.map(ClickhouseData::new), + ch: ch_config.map(|cfg| ClickhouseData::new(cfg, instance_id)), s3: s3_config.map(S3Data::new), current_slot: 0, - instance_id, rt, } } pub fn on_new_slot(&mut self, new_slot: u64) { self.current_slot = new_slot; - - if let Some(ch) = self.ch.as_mut() { - let rows = self - .map - .extract_if(|_, v| v.slot < new_slot) - .map(|(hash, info)| Self::make_row(self.instance_id.clone(), hash, info)); - self.rt.block_on(ch.publish(rows)); - } else { - self.map.retain(|_, v| v.slot >= new_slot); - } - } - - fn make_row(instance_id: String, hash: B256, info: BlockInfo) -> BlockInfoRow { - BlockInfoRow { - instance_id, - slot: info.slot, - block_hash: hash.to_string(), - is_dehydrated: info.is_dehydrated, - received_ns: info.received_ns, - read_body_ns: info.read_body_ns, - decoded_ns: info.decoded_ns, - live_ns: info.live_ns, - top_bid_ns: info.top_bid_ns, - builder_pubkey: info.builder_pubkey.to_string(), + if let Some(ch) = self.ch.as_mut() && + let Some(future) = ch.publish_snapshot(new_slot) + { + self.rt.spawn(future); } } @@ -155,8 +118,9 @@ impl Tile for DataGatherer { adapter.consume_with_dcache_internal_message( |bid: &InternalMessage, payload| { + let payload = &payload[bid.payload_offset..]; if let Some(s3) = self.s3.as_ref() { - self.rt.spawn(s3.upload_task(bid.header, payload, bid.payload_offset)); + self.rt.spawn(s3.upload_task(bid.header, payload)); } let is_mergeable = matches!(bid.header.merge_type, MergeType::Mergeable); @@ -164,17 +128,16 @@ impl Tile for DataGatherer { Self::extract_block_hash_and_pubkey(bid.header.encoding, payload, is_mergeable) { max_slot = max_slot.max(slot); - - let info = BlockInfo { - builder_pubkey, - slot, - is_dehydrated: bid.header.flags.is_dehydrated(), - received_ns: bid.trace.receive_ns.0 as i64, - read_body_ns: bid.trace.read_body_ns.0 as i64, - ..Default::default() - }; - - self.map.insert(block_hash, info); + if let Some(ch) = self.ch.as_mut() { + ch.insert(block_hash, BlockInfo { + builder_pubkey, + slot, + is_dehydrated: bid.header.flags.is_dehydrated(), + received_ns: bid.trace.receive_ns.0 as i64, + read_body_ns: bid.trace.read_body_ns.0 as i64, + ..Default::default() + }); + } } else { tracing::error!( "failed to extract builder_pubkey & block hash from submission with id {}", @@ -188,23 +151,32 @@ impl Tile for DataGatherer { adapter.consume_internal_message(|msg: &mut InternalMessage, _| { if let Some(bid) = self.decoded.get(msg.ix) { max_slot = max_slot.max(bid.submission_data.bid_slot()); - if let Some(info) = self.map.get_mut(bid.submission_data.block_hash()) { + if let Some(ch) = self.ch.as_mut() && + let Some(info) = ch.get_mut(bid.submission_data.block_hash()) + { info.decoded_ns = Some(msg.ingestion_time().real().0 as i64); } } }); - adapter.consume(|msg: BidUpdate, _| { - if let Some(info) = self.map.get_mut(&msg.block_hash) { - let BidEvent::Live(nanos) = msg.event; - info.live_ns = Some(nanos.0 as i64); + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { + if let Some(ch) = self.ch.as_mut() && + let Some(info) = ch.get_mut(&msg.block_hash) + { + // todo @nina - will we ever need other events? + #[allow(irrefutable_let_patterns)] + if let BidEvent::Live = msg.event { + info.live_ns = Some(msg.ingestion_time().real().0 as i64); + } } }); - adapter.consume(|msg: TopBidUpdate, _| { + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { max_slot = max_slot.max(msg.slot); - if let Some(info) = self.map.get_mut(&msg.block_hash) { - info.top_bid_ns = Some(msg.timestamp as i64); + if let Some(ch) = self.ch.as_mut() && + let Some(info) = ch.get_mut(&msg.block_hash) + { + info.top_bid_ns = Some(msg.ingestion_time().real().0 as i64); } }); @@ -212,19 +184,16 @@ impl Tile for DataGatherer { self.on_new_slot(max_slot); } - // drive spawned S3 upload tasks + // drive spawned S3 and clickhouse tasks self.rt.block_on(tokio::time::sleep(Duration::from_micros(500))); } fn teardown(mut self, adapter: &mut SpineAdapter) { self.loop_body(adapter); - if let Some(ch) = self.ch.as_mut() { - let instance_id = self.instance_id.clone(); - let rows = self - .map - .into_iter() - .map(move |(hash, info)| Self::make_row(instance_id.clone(), hash, info)); - self.rt.block_on(ch.publish(rows)); + if let Some(mut ch) = self.ch && + let Some(fut) = ch.publish_snapshot(u64::MAX) + { + self.rt.block_on(fut); } } } diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index 2cce9d385..9802fd556 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -85,12 +85,11 @@ fn main() { instance_id.clone(), )); - let app_id = config - .instance_id - .clone() - .unwrap_or_else(|| format!("RELAY-{}", config.postgres.region_name)); - - init_panic_hook(app_id, config.discord_webhook_url.clone(), config.logging.dir_path()); + init_panic_hook( + instance_id.clone(), + config.discord_webhook_url.clone(), + config.logging.dir_path(), + ); block_on(start_metrics_server(&config)); match block_on(run(instance_id, config, spine_config, keypair)) { @@ -226,7 +225,7 @@ async fn run( if config.clickhouse.is_some() || config.s3_config.is_some() { let data_gatherer = DataGatherer::new( decoded.clone(), - instance_id.clone(), + instance_id, config.clickhouse.as_ref(), config.s3_config.clone(), ); diff --git a/crates/relay/src/spine/messages.rs b/crates/relay/src/spine/messages.rs index 462fe97c3..7f759d4d0 100644 --- a/crates/relay/src/spine/messages.rs +++ b/crates/relay/src/spine/messages.rs @@ -1,5 +1,4 @@ use alloy_primitives::B256; -use flux::timing::Nanos; use flux_utils::ArrayStr; use helix_common::SubmissionTrace; // Re-export as also used as spine message. @@ -92,7 +91,7 @@ pub struct FromSimMsg { #[derive(Debug, Clone, Copy)] pub enum BidEvent { - Live(Nanos), + Live, } #[derive(Debug, Clone, Copy)] From 6bc326146dab40e15f5b2115d383524d124c4a07 Mon Sep 17 00:00:00 2001 From: Nina Date: Fri, 27 Mar 2026 17:44:23 +0000 Subject: [PATCH 4/4] fixes --- crates/relay/src/data_gatherer/clickhouse.rs | 10 +++- crates/relay/src/data_gatherer/tile.rs | 55 ++++++++++++-------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/crates/relay/src/data_gatherer/clickhouse.rs b/crates/relay/src/data_gatherer/clickhouse.rs index 98a9cb1f0..c8609deed 100644 --- a/crates/relay/src/data_gatherer/clickhouse.rs +++ b/crates/relay/src/data_gatherer/clickhouse.rs @@ -1,6 +1,6 @@ #![allow(clippy::future_not_send)] -use std::{future::Future, time::Duration}; +use std::{collections::hash_map::Entry, future::Future, time::Duration}; use alloy_primitives::B256; use flux_utils::ArrayStr; @@ -89,6 +89,10 @@ impl ClickhouseData { self.map.get_mut(hash) } + pub fn entry(&mut self, hash: B256) -> Entry<'_, B256, BlockInfo> { + self.map.entry(hash) + } + pub fn publish_snapshot( &mut self, new_slot: u64, @@ -103,6 +107,10 @@ impl ClickhouseData { .map(|(hash, info)| BlockInfoRow::from(self.instance_id, hash, info)) .collect::>(); + if rows.is_empty() { + return None; + } + let client = self.client.clone(); Some(async move { match Self::insert_rows(&client, rows.into_iter()).await { diff --git a/crates/relay/src/data_gatherer/tile.rs b/crates/relay/src/data_gatherer/tile.rs index 52bcd433c..c73841ff6 100644 --- a/crates/relay/src/data_gatherer/tile.rs +++ b/crates/relay/src/data_gatherer/tile.rs @@ -6,7 +6,7 @@ use flux_utils::SharedVector; use helix_common::{ S3Config, api::builder_api::TopBidUpdate, config::ClickhouseConfig, decoder::Encoding, }; -use helix_types::{BlsPublicKeyBytes, MergeType}; +use helix_types::{BlsPublicKeyBytes, Compression, MergeType}; use crate::{ HelixSpine, SubmissionDataWithSpan, @@ -124,25 +124,27 @@ impl Tile for DataGatherer { } let is_mergeable = matches!(bid.header.merge_type, MergeType::Mergeable); - if let Some((slot, block_hash, builder_pubkey)) = - Self::extract_block_hash_and_pubkey(bid.header.encoding, payload, is_mergeable) - { - max_slot = max_slot.max(slot); - if let Some(ch) = self.ch.as_mut() { - ch.insert(block_hash, BlockInfo { - builder_pubkey, - slot, - is_dehydrated: bid.header.flags.is_dehydrated(), - received_ns: bid.trace.receive_ns.0 as i64, - read_body_ns: bid.trace.read_body_ns.0 as i64, - ..Default::default() - }); + if let Compression::None = bid.header.compression { + if let Some((slot, block_hash, builder_pubkey)) = + Self::extract_block_hash_and_pubkey(bid.header.encoding, payload, is_mergeable) + { + max_slot = max_slot.max(slot); + if let Some(ch) = self.ch.as_mut() { + ch.insert(block_hash, BlockInfo { + builder_pubkey, + slot, + is_dehydrated: bid.header.flags.is_dehydrated(), + received_ns: bid.trace.receive_ns.0 as i64, + read_body_ns: bid.trace.read_body_ns.0 as i64, + ..Default::default() + }); + } + } else { + tracing::error!( + "failed to extract builder_pubkey & block hash from submission with id {}", + bid.header.id + ); } - } else { - tracing::error!( - "failed to extract builder_pubkey & block hash from submission with id {}", - bid.header.id - ); } }, |_, _| {}, @@ -151,9 +153,18 @@ impl Tile for DataGatherer { adapter.consume_internal_message(|msg: &mut InternalMessage, _| { if let Some(bid) = self.decoded.get(msg.ix) { max_slot = max_slot.max(bid.submission_data.bid_slot()); - if let Some(ch) = self.ch.as_mut() && - let Some(info) = ch.get_mut(bid.submission_data.block_hash()) - { + if let Some(ch) = self.ch.as_mut() { + let info = + ch.entry(*bid.submission_data.block_hash()).or_insert_with(|| BlockInfo { + builder_pubkey: *bid.submission_data.builder_pubkey(), + slot: bid.submission_data.bid_slot(), + is_dehydrated: bid.submission_data.decoder_params.is_dehydrated, + received_ns: bid.submission_data.trace.receive_ns.0 as i64, + read_body_ns: bid.submission_data.trace.read_body_ns.0 as i64, + decoded_ns: None, + live_ns: None, + top_bid_ns: None, + }); info.decoded_ns = Some(msg.ingestion_time().real().0 as i64); } }