diff --git a/Cargo.lock b/Cargo.lock index 986b7f94d..c20bf39a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2419,6 +2419,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "bnum" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" + [[package]] name = "borsh" version = "1.6.0" @@ -2482,6 +2488,15 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bstr" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" +dependencies = [ + "memchr", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -2744,6 +2759,12 @@ dependencies = [ "inout", ] +[[package]] +name = "cityhash-rs" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" + [[package]] name = "clang-sys" version = "1.8.1" @@ -2795,6 +2816,53 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +[[package]] +name = "clickhouse" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d975a05171c6f8a453f60ec6287c0018c90911d5a8a46d9b6abe386ea359fab3" +dependencies = [ + "bnum", + "bstr", + "bytes", + "cityhash-rs", + "clickhouse-macros", + "clickhouse-types", + "futures-channel", + "futures-util", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "lz4_flex", + "polonius-the-crab", + "serde", + "thiserror 2.0.17", + "tokio", + "url", +] + +[[package]] +name = "clickhouse-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.111", +] + +[[package]] +name = "clickhouse-types" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "358fbfd439fb0bed02a3e2ecc5131f6a9d039ba5639aed650cf0e845f6ebfc16" +dependencies = [ + "bytes", + "thiserror 2.0.17", +] + [[package]] name = "coins-bip32" version = "0.8.7" @@ -5663,6 +5731,7 @@ dependencies = [ "bincode", "bytes", "chrono", + "clickhouse", "crossbeam-channel", "dashmap 5.5.3", "deadpool-postgres", @@ -5884,6 +5953,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "higher-kinded-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776" +dependencies = [ + "macro_rules_attribute", + "never-say-never", + "paste", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -7192,6 +7272,22 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "macro_rules_attribute" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" +dependencies = [ + "macro_rules_attribute-proc_macro", + "paste", +] + +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" + [[package]] name = "maplit" version = "1.0.2" @@ -7595,6 +7691,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "never-say-never" +version = "6.6.666" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -8594,6 +8696,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "polonius-the-crab" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "polyval" version = "0.6.2" @@ -12961,6 +13073,17 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.111", +] + [[package]] name = "serde_json" version = "1.0.145" diff --git a/Cargo.toml b/Cargo.toml index ea2aaf3e9..68eb3a677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ ethereum_ssz_derive = "0.9" ethers = "2.0.14" eyre = "0.6.12" aws-sdk-s3 = "=1.35.0" +clickhouse = "0.14.2" flate2 = "1.0" flux = { git = "https://github.com/gattaca-com/flux", rev = "732c996c13f8fb5e2988dbf949e1d8a74261fca4" } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index 8b8a710cc..eab565b33 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -43,10 +43,8 @@ pub struct RelayConfig { /// Configuration for block merging parameters. #[serde(default)] pub block_merging_config: BlockMergingConfig, - #[serde(default)] pub primev_config: Option, pub discord_webhook_url: Option, - #[serde(default)] pub alerts_config: Option, pub inclusion_list: Option, pub is_submission_instance: bool, @@ -63,10 +61,17 @@ pub struct RelayConfig { pub tcp_port: u16, #[serde(default = "default_usize::<512>")] pub tcp_max_connections: usize, - #[serde(default)] pub s3_config: Option, /// Directory for local cache snapshots (bincode). Enables fast startup. pub snapshot_dir: Option, + pub clickhouse: Option, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ClickhouseConfig { + pub url: String, + pub database: String, + pub user: String, } impl RelayConfig { @@ -100,6 +105,7 @@ impl RelayConfig { decoder: vec![4], simulator: 5, top_bid: 1, + data_gatherer: 3, }, gossip_payload_on_header: false, api_port: 4040, @@ -107,6 +113,7 @@ impl RelayConfig { tcp_max_connections: 512, s3_config: None, snapshot_dir: None, + clickhouse: None, } } } @@ -153,6 +160,8 @@ pub struct CoresConfig { pub simulator: usize, #[serde(default)] pub top_bid: usize, + #[serde(default)] + pub data_gatherer: usize, } impl Default for WebsiteConfig { @@ -261,7 +270,6 @@ pub struct SimulatorConfig { #[serde(default = "default_usize::<32>")] pub max_concurrent_tasks: usize, /// If set, use the SSZ binary endpoint at this URL instead of JSON-RPC - #[serde(default)] pub ssz_url: Option, } diff --git a/crates/common/src/validator_preferences/mod.rs b/crates/common/src/validator_preferences/mod.rs index 3d9cf805e..ee42f9cd8 100644 --- a/crates/common/src/validator_preferences/mod.rs +++ b/crates/common/src/validator_preferences/mod.rs @@ -8,7 +8,6 @@ pub struct ValidatorPreferences { /// An optional list of BuilderIDs. If this is set, the relay will only accept /// submissions from builders whose public keys are linked to the IDs in this list. /// This allows for limiting submissions to a trusted set of builders. - #[serde(default)] pub trusted_builders: Option>, /// Allows validators to express a preference for whether a delay should be applied to get @@ -16,7 +15,6 @@ pub struct ValidatorPreferences { #[serde(default = "default_bool::")] pub header_delay: bool, - #[serde(default)] pub delay_ms: Option, #[serde(default)] diff --git a/crates/relay/Cargo.toml b/crates/relay/Cargo.toml index 6bd834923..950706b09 100644 --- a/crates/relay/Cargo.toml +++ b/crates/relay/Cargo.toml @@ -53,6 +53,7 @@ reqwest.workspace = true reqwest-eventsource.workspace = true rustc-hash.workspace = true bincode.workspace = true +clickhouse.workspace = true serde.workspace = true serde_json.workspace = true serial_test.workspace = true diff --git a/crates/relay/src/auctioneer/get_header.rs b/crates/relay/src/auctioneer/get_header.rs index 0fcc1bd68..4068c9abe 100644 --- a/crates/relay/src/auctioneer/get_header.rs +++ b/crates/relay/src/auctioneer/get_header.rs @@ -57,7 +57,7 @@ impl Context { .with_label_values(&[strategy]) .observe(start.elapsed().as_micros() as f64); - self.store_data(adjusted_bid.clone(), sim_request.is_optimistic); + self.store_data(adjusted_bid.clone(), sim_request.is_optimistic, producers); self.send_to_sim(sim_request, true, producers); if is_adjustable_slot { diff --git a/crates/relay/src/auctioneer/submit_block.rs b/crates/relay/src/auctioneer/submit_block.rs index 12a29a4da..5cdbe726c 100644 --- a/crates/relay/src/auctioneer/submit_block.rs +++ b/crates/relay/src/auctioneer/submit_block.rs @@ -22,7 +22,7 @@ use crate::{ simulator::{SimRequest, ValidationRequest, tile::ValidationResult}, spine::{ HelixSpineProducers, - messages::{ToSimKind, ToSimMsg}, + messages::{BidEvent, BidUpdate, ToSimKind, ToSimMsg}, }, }; @@ -122,7 +122,7 @@ impl Context { ); self.try_adjustments_dry_run(&entry, slot_data, producers); - self.store_data(entry, is_optimistic); + self.store_data(entry, is_optimistic, producers); self.try_merge_block(merging_data, producers); } @@ -164,7 +164,7 @@ impl Context { .with_label_values(&[strategy]) .observe(start.elapsed().as_micros()); - self.store_data(adjusted_block, sim_request.is_optimistic); + self.store_data(adjusted_block, sim_request.is_optimistic, producers); self.send_to_sim(sim_request, true, producers); } } @@ -217,6 +217,7 @@ impl Context { self.request_merged_block(producers); if need_send_result { + producers.produce(BidUpdate { block_hash, event: BidEvent::Live }); self.db.update_block_submission_live_ts(block_hash, Nanos::now().0); send_submission_result( producers, @@ -231,7 +232,12 @@ impl Context { need_send_result } - pub fn store_data(&mut self, entry: PayloadEntry, is_optimistic: bool) { + pub fn store_data( + &mut self, + entry: PayloadEntry, + is_optimistic: bool, + producers: &mut HelixSpineProducers, + ) { let block_hash = *entry.block_hash(); let is_adjusted = entry.is_adjusted(); @@ -243,7 +249,12 @@ impl Context { }; // For optimistic submissions the bid is live as soon as it is stored. // For non-optimistic, live_ts is updated when the simulation result arrives. - let live_ts = if is_optimistic { Some(Nanos::now().0) } else { None }; + let live_ts = if is_optimistic { + producers.produce(BidUpdate { block_hash, event: BidEvent::Live }); + Some(Nanos::now().0) + } else { + None + }; self.db.store_block_submission( s.signed_bid_submission.clone(), s.submission_trace, diff --git a/crates/relay/src/auctioneer/validation.rs b/crates/relay/src/auctioneer/validation.rs index 09de7c7ed..1045da71b 100644 --- a/crates/relay/src/auctioneer/validation.rs +++ b/crates/relay/src/auctioneer/validation.rs @@ -36,11 +36,11 @@ impl Context { }); }; - if let helix_types::Submission::Dehydrated(ref dehydrated) = *submission - && !self.hydration_cache.can_hydrate(dehydrated, self.chain_info.max_blobs_per_block()) - { - return Err(BlockValidationError::CannotHydrate); - } + if let helix_types::Submission::Dehydrated(ref dehydrated) = *submission && + !self.hydration_cache.can_hydrate(dehydrated, self.chain_info.max_blobs_per_block()) + { + return Err(BlockValidationError::CannotHydrate); + } self.staleness_check(submission.builder_pubkey(), submission_data.version)?; self.validate_submission_data( diff --git a/crates/relay/src/data_gatherer/clickhouse.rs b/crates/relay/src/data_gatherer/clickhouse.rs new file mode 100644 index 000000000..c8609deed --- /dev/null +++ b/crates/relay/src/data_gatherer/clickhouse.rs @@ -0,0 +1,139 @@ +#![allow(clippy::future_not_send)] + +use std::{collections::hash_map::Entry, future::Future, time::Duration}; + +use alloy_primitives::B256; +use flux_utils::ArrayStr; +use helix_common::{config::ClickhouseConfig, expect_env_var}; +use helix_types::BlsPublicKeyBytes; +use rustc_hash::FxHashMap; +use tracing::{error, info}; + +const TABLE: &str = "relay_bid_submission_data"; +const ENV_CLICKHOUSE_PASSWORD: &str = "CLICKHOUSE_PASSWORD"; + +fn serialize_str, S: serde::Serializer>(v: &T, s: S) -> Result { + s.serialize_str(v.as_ref()) +} + +#[derive(Default)] +pub struct BlockInfo { + pub builder_pubkey: BlsPublicKeyBytes, + pub slot: u64, + pub is_dehydrated: bool, + pub received_ns: i64, + pub read_body_ns: i64, + pub decoded_ns: Option, + pub live_ns: Option, + pub top_bid_ns: Option, +} + +#[derive(clickhouse::Row, serde::Serialize)] +pub struct BlockInfoRow { + #[serde(serialize_with = "serialize_str")] + pub instance_id: ArrayStr<64>, + pub slot: u64, + pub is_dehydrated: bool, + pub block_hash: String, + pub received_ns: i64, + pub read_body_ns: i64, + pub decoded_ns: Option, + pub live_ns: Option, + pub top_bid_ns: Option, + pub builder_pubkey: String, +} + +impl BlockInfoRow { + pub fn from(instance_id: ArrayStr<64>, block_hash: B256, info: BlockInfo) -> Self { + BlockInfoRow { + instance_id, + slot: info.slot, + block_hash: block_hash.to_string(), + is_dehydrated: info.is_dehydrated, + received_ns: info.received_ns, + read_body_ns: info.read_body_ns, + decoded_ns: info.decoded_ns, + live_ns: info.live_ns, + top_bid_ns: info.top_bid_ns, + builder_pubkey: info.builder_pubkey.to_string(), + } + } +} + +pub struct ClickhouseData { + client: clickhouse::Client, + instance_id: ArrayStr<64>, + map: FxHashMap, +} + +impl ClickhouseData { + pub fn new(config: &ClickhouseConfig, instance_id: String) -> Self { + let password = expect_env_var(ENV_CLICKHOUSE_PASSWORD); + let client = clickhouse::Client::default() + .with_url(&config.url) + .with_database(&config.database) + .with_user(&config.user) + .with_password(password); + Self { + client, + instance_id: ArrayStr::from_str_truncate(&instance_id), + map: FxHashMap::with_capacity_and_hasher(5000, Default::default()), + } + } + + pub fn insert(&mut self, hash: B256, info: BlockInfo) { + self.map.insert(hash, info); + } + + pub fn get_mut(&mut self, hash: &B256) -> Option<&mut BlockInfo> { + self.map.get_mut(hash) + } + + pub fn entry(&mut self, hash: B256) -> Entry<'_, B256, BlockInfo> { + self.map.entry(hash) + } + + pub fn publish_snapshot( + &mut self, + new_slot: u64, + ) -> Option + Send + 'static> { + if self.map.is_empty() { + return None; + } + + let rows = self + .map + .extract_if(|_, v| v.slot < new_slot) + .map(|(hash, info)| BlockInfoRow::from(self.instance_id, hash, info)) + .collect::>(); + + if rows.is_empty() { + return None; + } + + let client = self.client.clone(); + Some(async move { + match Self::insert_rows(&client, rows.into_iter()).await { + Ok(len) => info!("inserted {len} rows to {TABLE}"), + Err(err) => error!(?err, "failed to insert rows to {TABLE}"), + } + }) + } + + async fn insert_rows( + client: &clickhouse::Client, + rows: impl Iterator, + ) -> Result { + let mut insert = client + .insert::(TABLE) + .await? + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))); + let mut len = 0; + for row in rows { + insert.write(&row).await?; + len += 1; + } + insert.end().await?; + Ok(len) + } +} diff --git a/crates/relay/src/data_gatherer/mod.rs b/crates/relay/src/data_gatherer/mod.rs new file mode 100644 index 000000000..5e3a5feec --- /dev/null +++ b/crates/relay/src/data_gatherer/mod.rs @@ -0,0 +1,5 @@ +pub use tile::DataGatherer; + +mod clickhouse; +mod s3; +mod tile; diff --git a/crates/relay/src/data_gatherer/s3.rs b/crates/relay/src/data_gatherer/s3.rs new file mode 100644 index 000000000..83894686d --- /dev/null +++ b/crates/relay/src/data_gatherer/s3.rs @@ -0,0 +1,76 @@ +use std::future::Future; + +use aws_sdk_s3::{ + Client, + config::{BehaviorVersion, Credentials, Region}, + primitives::ByteStream, +}; +use chrono::Utc; +use helix_common::{S3Config, expect_env_var}; +use uuid::Uuid; + +use crate::auctioneer::InternalBidSubmissionHeader; + +const ENV_ACCESS_KEY_ID: &str = "S3_ACCESS_KEY_ID"; +const ENV_SECRET_ACCESS_KEY: &str = "S3_SECRET_ACCESS_KEY"; + +pub struct S3Data { + client: Client, + bucket: String, +} + +impl S3Data { + pub fn new(config: S3Config) -> Self { + let access_key_id = expect_env_var(ENV_ACCESS_KEY_ID); + let secret_access_key = expect_env_var(ENV_SECRET_ACCESS_KEY); + + let creds = Credentials::new(&access_key_id, &secret_access_key, None, None, "env"); + let sdk_config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(creds) + .region(Region::new(config.region.clone())) + .build(); + let client = Client::from_conf(sdk_config); + + Self { client, bucket: config.bucket } + } + + pub fn upload_task( + &self, + header: InternalBidSubmissionHeader, + payload: &[u8], + ) -> impl Future + Send + 'static { + let id = header.id; + let header = header.to_bytes(); + let header_slice = header.as_slice(); + let header_len = header_slice.len() as u16; + + // format: [u16 LE header_len][header bytes][payload bytes] + let mut buf = bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len()); + buf.extend_from_slice(&header_len.to_le_bytes()); + buf.extend_from_slice(header_slice); + buf.extend_from_slice(payload); + let bytes = buf.freeze(); + + let client = self.client.clone(); + let bucket = self.bucket.clone(); + async move { + let key = Self::make_key(id); + if let Err(e) = client + .put_object() + .bucket(bucket) + .key(&key) + .body(ByteStream::from(bytes)) + .send() + .await + { + tracing::error!(%e, %key, "s3 upload failed"); + } + } + } + + fn make_key(id: Uuid) -> String { + let now = Utc::now().to_rfc3339(); + format!("{now}_{id}.bin") + } +} diff --git a/crates/relay/src/data_gatherer/tile.rs b/crates/relay/src/data_gatherer/tile.rs new file mode 100644 index 000000000..c73841ff6 --- /dev/null +++ b/crates/relay/src/data_gatherer/tile.rs @@ -0,0 +1,210 @@ +use std::{sync::Arc, time::Duration, u64}; + +use alloy_primitives::B256; +use flux::{spine::SpineAdapter, tile::Tile, timing::InternalMessage}; +use flux_utils::SharedVector; +use helix_common::{ + S3Config, api::builder_api::TopBidUpdate, config::ClickhouseConfig, decoder::Encoding, +}; +use helix_types::{BlsPublicKeyBytes, Compression, MergeType}; + +use crate::{ + HelixSpine, SubmissionDataWithSpan, + data_gatherer::{ + clickhouse::{BlockInfo, ClickhouseData}, + s3::S3Data, + }, + spine::messages::{BidEvent, BidUpdate, DecodedSubmission, NewBidSubmission}, +}; + +pub struct DataGatherer { + decoded: Arc>, + ch: Option, + s3: Option, + current_slot: u64, + rt: tokio::runtime::Runtime, +} + +impl DataGatherer { + pub fn new( + decoded: Arc>, + instance_id: String, + ch_config: Option<&ClickhouseConfig>, + s3_config: Option, + ) -> Self { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build data gatherer runtime"); + Self { + decoded, + ch: ch_config.map(|cfg| ClickhouseData::new(cfg, instance_id)), + s3: s3_config.map(S3Data::new), + current_slot: 0, + rt, + } + } + + pub fn on_new_slot(&mut self, new_slot: u64) { + self.current_slot = new_slot; + if let Some(ch) = self.ch.as_mut() && + let Some(future) = ch.publish_snapshot(new_slot) + { + self.rt.spawn(future); + } + } + + fn extract_block_hash_and_pubkey( + encoding: Encoding, + buf: &[u8], + has_mergeable_data: bool, + ) -> Option<(u64, B256, BlsPublicKeyBytes)> { + match encoding { + Encoding::Json => { + #[derive(serde::Deserialize)] + struct Outer { + submission: Bid, + } + #[derive(serde::Deserialize)] + struct Bid { + message: Message, + } + #[derive(serde::Deserialize)] + struct Message { + #[serde(with = "serde_utils::quoted_u64")] + slot: u64, + block_hash: B256, + builder_pubkey: BlsPublicKeyBytes, + } + + let bid: Bid = if has_mergeable_data { + serde_json::from_slice::(buf).ok()?.submission + } else { + serde_json::from_slice(buf).ok()? + }; + + Some((bid.message.slot, bid.message.block_hash, bid.message.builder_pubkey)) + } + Encoding::Ssz => { + const BLOCK_HASH_OFFSET: usize = 8 + /* slot */ + 32; /* parent_hash */ + const BUILDER_PUBKEY_OFFSET: usize = BLOCK_HASH_OFFSET + 32; /* block_hash */ + + if buf.len() < BUILDER_PUBKEY_OFFSET + BlsPublicKeyBytes::len_bytes() { + return None; + } + + let (slot, block_hash, builder_pubkey) = unsafe { + ( + u64::from_le_bytes(buf[0..8].try_into().unwrap()), + core::ptr::read_unaligned( + buf.as_ptr().add(BLOCK_HASH_OFFSET) as *const B256 + ), + core::ptr::read_unaligned( + buf.as_ptr().add(BUILDER_PUBKEY_OFFSET) as *const BlsPublicKeyBytes + ), + ) + }; + + Some((slot, block_hash, builder_pubkey)) + } + } + } +} + +impl Tile for DataGatherer { + fn loop_body(&mut self, adapter: &mut SpineAdapter) { + let mut max_slot = self.current_slot; + + adapter.consume_with_dcache_internal_message( + |bid: &InternalMessage, payload| { + let payload = &payload[bid.payload_offset..]; + if let Some(s3) = self.s3.as_ref() { + self.rt.spawn(s3.upload_task(bid.header, payload)); + } + + let is_mergeable = matches!(bid.header.merge_type, MergeType::Mergeable); + if let Compression::None = bid.header.compression { + if let Some((slot, block_hash, builder_pubkey)) = + Self::extract_block_hash_and_pubkey(bid.header.encoding, payload, is_mergeable) + { + max_slot = max_slot.max(slot); + if let Some(ch) = self.ch.as_mut() { + ch.insert(block_hash, BlockInfo { + builder_pubkey, + slot, + is_dehydrated: bid.header.flags.is_dehydrated(), + received_ns: bid.trace.receive_ns.0 as i64, + read_body_ns: bid.trace.read_body_ns.0 as i64, + ..Default::default() + }); + } + } else { + tracing::error!( + "failed to extract builder_pubkey & block hash from submission with id {}", + bid.header.id + ); + } + } + }, + |_, _| {}, + ); + + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { + if let Some(bid) = self.decoded.get(msg.ix) { + max_slot = max_slot.max(bid.submission_data.bid_slot()); + if let Some(ch) = self.ch.as_mut() { + let info = + ch.entry(*bid.submission_data.block_hash()).or_insert_with(|| BlockInfo { + builder_pubkey: *bid.submission_data.builder_pubkey(), + slot: bid.submission_data.bid_slot(), + is_dehydrated: bid.submission_data.decoder_params.is_dehydrated, + received_ns: bid.submission_data.trace.receive_ns.0 as i64, + read_body_ns: bid.submission_data.trace.read_body_ns.0 as i64, + decoded_ns: None, + live_ns: None, + top_bid_ns: None, + }); + info.decoded_ns = Some(msg.ingestion_time().real().0 as i64); + } + } + }); + + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { + if let Some(ch) = self.ch.as_mut() && + let Some(info) = ch.get_mut(&msg.block_hash) + { + // todo @nina - will we ever need other events? + #[allow(irrefutable_let_patterns)] + if let BidEvent::Live = msg.event { + info.live_ns = Some(msg.ingestion_time().real().0 as i64); + } + } + }); + + adapter.consume_internal_message(|msg: &mut InternalMessage, _| { + max_slot = max_slot.max(msg.slot); + if let Some(ch) = self.ch.as_mut() && + let Some(info) = ch.get_mut(&msg.block_hash) + { + info.top_bid_ns = Some(msg.ingestion_time().real().0 as i64); + } + }); + + if max_slot > self.current_slot { + self.on_new_slot(max_slot); + } + + // drive spawned S3 and clickhouse tasks + self.rt.block_on(tokio::time::sleep(Duration::from_micros(500))); + } + + fn teardown(mut self, adapter: &mut SpineAdapter) { + self.loop_body(adapter); + if let Some(mut ch) = self.ch && + let Some(fut) = ch.publish_snapshot(u64::MAX) + { + self.rt.block_on(fut); + } + } +} diff --git a/crates/relay/src/lib.rs b/crates/relay/src/lib.rs index 0df535580..fadfe07ee 100644 --- a/crates/relay/src/lib.rs +++ b/crates/relay/src/lib.rs @@ -2,6 +2,7 @@ mod api; mod auctioneer; mod beacon; mod bid_decoder; +mod data_gatherer; mod gossip; mod housekeeper; mod network; @@ -33,13 +34,14 @@ pub use crate::{ }, beacon::start_beacon_client, bid_decoder::{DecoderTile, SubmissionDataWithSpan}, + data_gatherer::DataGatherer, housekeeper::start_housekeeper, network::RelayNetworkManager, simulator::{SimRequest, SimResult}, spine::{HelixSpine, HelixSpineConfig, messages::NewBidSubmission}, tcp_bid_recv::{ BidSubmissionFlags, BidSubmissionHeader, BidSubmissionResponse, BidSubmissionTcpListener, - RegistrationMsg, S3PayloadSaver, + RegistrationMsg, }, website::WebsiteService, }; diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index e45a81bf0..9802fd556 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -26,9 +26,9 @@ use helix_common::{ utils::{init_panic_hook, init_tracing_log}, }; use helix_relay::{ - Api, Auctioneer, AuctioneerHandle, BidSorter, BidSubmissionTcpListener, DbHandle, DecoderTile, - DefaultBidAdjustor, FutureBidSubmissionResult, HelixSpine, HelixSpineConfig, NewBidSubmission, - RegWorker, RegWorkerHandle, RelayNetworkManager, S3PayloadSaver, SimRequest, SimResult, + Api, Auctioneer, AuctioneerHandle, BidSorter, BidSubmissionTcpListener, DataGatherer, DbHandle, + DecoderTile, DefaultBidAdjustor, FutureBidSubmissionResult, HelixSpine, HelixSpineConfig, + NewBidSubmission, RegWorker, RegWorkerHandle, RelayNetworkManager, SimRequest, SimResult, SimulatorTile, SubmissionDataWithSpan, TopBidTile, WebsiteService, spawn_tokio_monitoring, start_admin_service, start_api_service, start_beacon_client, start_db_service, start_housekeeper, @@ -85,12 +85,11 @@ fn main() { instance_id.clone(), )); - let app_id = config - .instance_id - .clone() - .unwrap_or_else(|| format!("RELAY-{}", config.postgres.region_name)); - - init_panic_hook(app_id, config.discord_webhook_url.clone(), config.logging.dir_path()); + init_panic_hook( + instance_id.clone(), + config.discord_webhook_url.clone(), + config.logging.dir_path(), + ); block_on(start_metrics_server(&config)); match block_on(run(instance_id, config, spine_config, keypair)) { @@ -223,6 +222,20 @@ async fn run( } if config.is_submission_instance { + if config.clickhouse.is_some() || config.s3_config.is_some() { + let data_gatherer = DataGatherer::new( + decoded.clone(), + instance_id, + config.clickhouse.as_ref(), + config.s3_config.clone(), + ); + attach_tile( + data_gatherer, + spine, + TileConfig::new(config.cores.data_gatherer, ThreadPriority::OSDefault), + ); + } + for core in &config.cores.decoder { let decoder_tile = DecoderTile::new( local_cache.as_ref().clone(), @@ -236,11 +249,6 @@ async fn run( attach_tile(decoder_tile, spine, TileConfig::new(*core, ThreadPriority::OSDefault)); } - if let Some(cfg) = config.s3_config.clone() { - let s3_saver = S3PayloadSaver::new(cfg); - attach_tile(s3_saver, spine, TileConfig::background(None, None)); - } - let sock_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)); let block_submission_tcp_listener = BidSubmissionTcpListener::new( diff --git a/crates/relay/src/spine/messages.rs b/crates/relay/src/spine/messages.rs index c58c23f07..7f759d4d0 100644 --- a/crates/relay/src/spine/messages.rs +++ b/crates/relay/src/spine/messages.rs @@ -1,3 +1,4 @@ +use alloy_primitives::B256; use flux_utils::ArrayStr; use helix_common::SubmissionTrace; // Re-export as also used as spine message. @@ -87,3 +88,14 @@ pub enum ToSimKind { pub struct FromSimMsg { pub ix: usize, } + +#[derive(Debug, Clone, Copy)] +pub enum BidEvent { + Live, +} + +#[derive(Debug, Clone, Copy)] +pub struct BidUpdate { + pub block_hash: B256, + pub event: BidEvent, +} diff --git a/crates/relay/src/spine/mod.rs b/crates/relay/src/spine/mod.rs index f8ce9ec89..5c0b98f7d 100644 --- a/crates/relay/src/spine/mod.rs +++ b/crates/relay/src/spine/mod.rs @@ -27,4 +27,7 @@ pub struct HelixSpine { /// Auctioneer → TopBidTile. #[queue(size(2usize.pow(16)))] pub top_bid: SpineQueue, + + #[queue(size(2usize.pow(16)))] + pub bid_update: SpineQueue, } diff --git a/crates/relay/src/tcp_bid_recv/mod.rs b/crates/relay/src/tcp_bid_recv/mod.rs index 074857beb..5591cb4e9 100644 --- a/crates/relay/src/tcp_bid_recv/mod.rs +++ b/crates/relay/src/tcp_bid_recv/mod.rs @@ -19,13 +19,11 @@ use crate::{ spine::messages::{NewBidSubmission, SubmissionResultWithRef}, }; -mod s3; pub mod types; pub use helix_tcp_types::{ BidSubmissionFlags, BidSubmissionHeader, BidSubmissionResponse, RegistrationMsg, }; -pub use s3::S3PayloadSaver; pub use crate::tcp_bid_recv::types::{ BidSubmissionError, response_from_bid_submission_error, response_from_submission_result, diff --git a/crates/relay/src/tcp_bid_recv/s3.rs b/crates/relay/src/tcp_bid_recv/s3.rs deleted file mode 100644 index 3af83f178..000000000 --- a/crates/relay/src/tcp_bid_recv/s3.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::sync::Arc; - -use aws_sdk_s3::{ - Client, - config::{BehaviorVersion, Credentials, Region}, - primitives::ByteStream, -}; -use bytes::Bytes; -use chrono::Utc; -use flux::tile::Tile; -use helix_common::S3Config; -use tokio::sync::mpsc; -use uuid::Uuid; - -use crate::{HelixSpine, spine::messages::NewBidSubmission}; - -const ENV_ACCESS_KEY_ID: &str = "S3_ACCESS_KEY_ID"; -const ENV_SECRET_ACCESS_KEY: &str = "S3_SECRET_ACCESS_KEY"; - -pub struct S3PayloadSaver { - tx: mpsc::Sender<(NewBidSubmission, Bytes)>, -} - -impl S3PayloadSaver { - pub fn new(config: S3Config) -> Self { - let access_key_id = std::env::var(ENV_ACCESS_KEY_ID) - .unwrap_or_else(|_| panic!("{ENV_ACCESS_KEY_ID} must be set")); - let secret_access_key = std::env::var(ENV_SECRET_ACCESS_KEY) - .unwrap_or_else(|_| panic!("{ENV_SECRET_ACCESS_KEY} must be set")); - - let creds = Credentials::new(&access_key_id, &secret_access_key, None, None, "env"); - let sdk_config = aws_sdk_s3::Config::builder() - .behavior_version(BehaviorVersion::latest()) - .credentials_provider(creds) - .region(Region::new(config.region.clone())) - .build(); - let client = Client::from_conf(sdk_config); - let bucket = Arc::::from(config.bucket.as_str()); - - let (tx, mut rx) = mpsc::channel::<(NewBidSubmission, Bytes)>(10_000); - tokio::spawn(async move { - while let Some((r, payload)) = rx.recv().await { - let key = make_key(r.header.id); - let client = client.clone(); - let bucket = Arc::clone(&bucket); - tokio::spawn(async move { - if let Err(e) = client - .put_object() - .bucket(bucket.as_ref()) - .key(&key) - .body(ByteStream::from(payload)) - .send() - .await - { - tracing::error!(%e, %key, "s3 upload failed"); - } - }); - } - }); - - Self { tx } - } -} - -impl Tile for S3PayloadSaver { - fn loop_body(&mut self, adapter: &mut flux::spine::SpineAdapter) { - adapter.consume_with_dcache( - |r: NewBidSubmission, full_payload| { - let header = r.header.to_bytes(); - let header_slice = header.as_slice(); - let header_len = header_slice.len() as u16; - let payload_offset = r.payload_offset; - - let payload = &full_payload[payload_offset..]; - - // format: [u16 LE header_len][header bytes][payload bytes] - let mut buf = - bytes::BytesMut::with_capacity(2 + header_slice.len() + payload.len()); - buf.extend_from_slice(&header_len.to_le_bytes()); - buf.extend_from_slice(header_slice); - buf.extend_from_slice(payload); - let bytes = buf.freeze(); - - if self.tx.try_send((r, bytes)).is_err() { - tracing::error!("s3 channel full, dropping payload"); - } - }, - |_, _| {}, - ); - } -} - -fn make_key(id: Uuid) -> String { - let now = Utc::now().to_rfc3339(); - format!("{now}_{id}.bin") -} diff --git a/crates/simulator/src/block_merging/types.rs b/crates/simulator/src/block_merging/types.rs index 1b276575f..baab83114 100644 --- a/crates/simulator/src/block_merging/types.rs +++ b/crates/simulator/src/block_merging/types.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use alloy_eips::{Decodable2718, eip2718::Eip2718Error}; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_rpc_types::{beacon::requests::ExecutionRequestsV4, engine::ExecutionPayloadV3}; +use helix_common::expect_env_var; use helix_types::{BuilderInclusionResult, MergedBlockTrace}; use reth_ethereum::{evm::EthEvmConfig, primitives::SignedTransaction, provider::ProviderError}; use reth_node_builder::ConfigureEvm; @@ -36,7 +37,7 @@ pub(crate) struct PrivateKeySigner( ); pub fn load_signer() -> PrivateKeySigner { - let signing_key_str = std::env::var("RELAY_KEY").expect("could not find RELAY_KEY in env"); + let signing_key_str = expect_env_var("RELAY_KEY"); let signing_key = signing_key_str .parse::() .expect("failed to parse RELAY_KEY"); diff --git a/crates/types/src/hydration.rs b/crates/types/src/hydration.rs index 48b6d5830..db8113546 100644 --- a/crates/types/src/hydration.rs +++ b/crates/types/src/hydration.rs @@ -515,5 +515,4 @@ pub enum HydrationError { #[error("too many blobs: blobs {blobs}, max {max}")] TooManyBlobs { blobs: usize, max: usize }, - }