diff --git a/crates/relay/src/api/builder/api.rs b/crates/relay/src/api/builder/api.rs index 9a29e5e9..9eaa93a3 100644 --- a/crates/relay/src/api/builder/api.rs +++ b/crates/relay/src/api/builder/api.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use axum::{Extension, http::StatusCode, response::IntoResponse}; +use bytes::Bytes; use crossbeam_channel::Sender; use flux::spine::StandaloneDCacheProducer; use flux_utils::SharedVector; @@ -23,6 +24,7 @@ pub struct BuilderApi { pub api_provider: Arc, pub producer: StandaloneDCacheProducer, pub future_results: Arc>, + pub submission_payloads: Arc>, pub web_socket_connections: Sender, } @@ -36,6 +38,7 @@ impl BuilderApi { api_provider: Arc, producer: StandaloneDCacheProducer, future_results: Arc>, + submission_payloads: Arc>, web_socket_connections: Sender, ) -> Self { Self { @@ -47,6 +50,7 @@ impl BuilderApi { api_provider, producer, future_results, + submission_payloads, web_socket_connections, } } diff --git a/crates/relay/src/api/builder/submit_block.rs b/crates/relay/src/api/builder/submit_block.rs index 527dd534..c5684ea8 100644 --- a/crates/relay/src/api/builder/submit_block.rs +++ b/crates/relay/src/api/builder/submit_block.rs @@ -55,17 +55,19 @@ impl BuilderApi { let future_ix = api.future_results.push(FutureBidSubmissionResult::new()); + let ix = api.submission_payloads.push(body); let new_bid = NewBidSubmission { payload_offset: 0, header, submission_ref: SubmissionRef::Http(future_ix), trace, expected_pubkey: None, + http_submission_ix: Some(ix), }; - if let Err(e) = api.producer.produce_with_ingestion( + if let Err(e) = api.producer.produce_with_ingestion::( new_bid, - Some((body.len(), |buf: &mut [u8]| buf.copy_from_slice(&body))), + None, IngestionTime::now(), ) { tracing::error!("failed to write the request payload: {e}"); diff --git a/crates/relay/src/api/service.rs b/crates/relay/src/api/service.rs index e2b2be33..1a7f8a80 100644 --- a/crates/relay/src/api/service.rs +++ b/crates/relay/src/api/service.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use bytes::Bytes; use crossbeam_channel::Sender; use flux::spine::StandaloneDCacheProducer; use flux_utils::SharedVector; @@ -46,6 +47,7 @@ pub fn start_api_service( registrations_handle: RegWorkerHandle, bid_producer: StandaloneDCacheProducer, future_results: Arc>, + http_submissions: Arc>, web_socket_connections: Sender, ) { tokio::spawn(run_api_service::( @@ -65,6 +67,7 @@ pub fn start_api_service( registrations_handle, bid_producer, future_results, + http_submissions, web_socket_connections, )); } @@ -86,6 +89,7 @@ pub async fn run_api_service( registrations_handle: RegWorkerHandle, bid_producer: StandaloneDCacheProducer, future_results: Arc>, + http_submissions: Arc>, web_socket_connections: Sender, ) { let gossiper = Arc::new( @@ -107,6 +111,7 @@ pub async fn run_api_service( api_provider.clone(), bid_producer, future_results, + http_submissions, web_socket_connections, ); let builder_api = Arc::new(builder_api); diff --git a/crates/relay/src/bid_decoder/tile.rs b/crates/relay/src/bid_decoder/tile.rs index aeada1f2..04cb6d64 100644 --- a/crates/relay/src/bid_decoder/tile.rs +++ b/crates/relay/src/bid_decoder/tile.rs @@ -1,6 +1,7 @@ -use std::sync::Arc; +use std::{cell::RefCell, sync::Arc}; use alloy_primitives::B256; +use bytes::Bytes; use flux::{ spine::{DCacheRead, SpineProducers}, tile::Tile, @@ -20,6 +21,7 @@ use helix_types::{ SignedBidSubmission, Submission, SubmissionVersion, }; use tracing::trace; +use zstd::zstd_safe::WriteBuf; use crate::{ HelixSpine, @@ -29,7 +31,10 @@ use crate::{ send_submission_result, }, bid_decoder::SubmissionDataWithSpan, - spine::messages::{DecodedSubmission, NewBidSubmission}, + spine::{ + HelixSpineProducers, + messages::{DecodedSubmission, NewBidSubmission}, + }, }; pub struct DecoderTile { @@ -38,50 +43,87 @@ pub struct DecoderTile { config: RelayConfig, decoded: Arc>, future_results: Arc>, - buffer: Vec, + http_submissions: Arc>, + buffer: RefCell>, core: usize, } impl Tile for DecoderTile { fn loop_body(&mut self, adapter: &mut flux::spine::SpineAdapter) { adapter.consume_with_dcache_collaborative_internal_message( - |new_bid: &InternalMessage, full_payload| { - let payload = &full_payload[new_bid.payload_offset..]; + |new_bid: &InternalMessage, dcache_payload| { + let payload = &dcache_payload[new_bid.payload_offset..]; let sent_at = new_bid.tracking_timestamp().publish_t(); - Self::handle_block_submission( + DecoderTile::handle_block_submission( &self.cache, &self.chain_info, &self.config, &new_bid.submission_ref, &new_bid.header, payload, - &mut self.buffer, + &mut self.buffer.borrow_mut(), new_bid.trace, sent_at, new_bid.expected_pubkey.as_ref(), ) }, |res, producers| match res { - DCacheRead::Ok((new_bid, result)) => match result { - Ok((submission, span)) => { - let ix = self.decoded.push(SubmissionDataWithSpan { - submission_data: submission, - span, - sent_at: new_bid.tracking_timestamp().publish_t(), - }); - producers.produce(DecodedSubmission { ix }); - } - Err(e) => { - send_submission_result( + DCacheRead::Ok((new_bid, result)) => { + let sent_at = new_bid.tracking_timestamp().publish_t(); + Self::handle_result( + &self.decoded, + &self.future_results, + result, + sent_at, + new_bid.submission_ref, + producers, + ); + } + DCacheRead::NoRef(new_bid) => { + let Some(payload) = new_bid + .http_submission_ix + .map(|ix| self.http_submissions.get(ix)) + .flatten() + else { + tracing::error!( + "failed to find the payload for bid submission with id = {}", + new_bid.header.id + ); + return send_submission_result( producers, &self.future_results, new_bid.submission_ref, - Err(e), + Err(BuilderApiError::InternalError), ); - } - }, - DCacheRead::Lost(new_bid) | DCacheRead::NoRef(new_bid) => { - tracing::error!("dcache read failed"); + }; + + let sent_at = new_bid.tracking_timestamp().publish_t(); + let result = DecoderTile::handle_block_submission( + &self.cache, + &self.chain_info, + &self.config, + &new_bid.submission_ref, + &new_bid.header, + payload.as_slice(), + &mut self.buffer.borrow_mut(), + new_bid.trace, + sent_at, + new_bid.expected_pubkey.as_ref(), + ); + Self::handle_result( + &self.decoded, + &self.future_results, + result, + sent_at, + new_bid.submission_ref, + producers, + ); + } + DCacheRead::Lost(new_bid) => { + tracing::error!( + "dcache read failed for bid submission with id {}", + new_bid.header.id + ); send_submission_result( producers, &self.future_results, @@ -102,8 +144,6 @@ impl Tile for DecoderTile { true } - fn teardown(self, _adapter: &mut flux::spine::SpineAdapter) {} - fn name(&self) -> flux::tile::TileName { let mut name = flux_utils::short_typename::(); name.push_str_truncate(self.core.to_string().as_str()); @@ -118,6 +158,7 @@ impl DecoderTile { config: RelayConfig, future_results: Arc>, decoded: Arc>, + http_submissions: Arc>, core: usize, ) -> Self { Self { @@ -126,7 +167,8 @@ impl DecoderTile { config, decoded, future_results, - buffer: Vec::with_capacity(MAX_PAYLOAD_LENGTH), + http_submissions, + buffer: RefCell::new(Vec::with_capacity(MAX_PAYLOAD_LENGTH)), core, } } @@ -303,6 +345,29 @@ impl DecoderTile { decoder_params, )) } + + fn handle_result( + decoded: &SharedVector, + future_results: &Arc>, + result: Result<(SubmissionData, tracing::Span), BuilderApiError>, + sent_at: Nanos, + submission_ref: SubmissionRef, + producers: &mut HelixSpineProducers, + ) { + match result { + Ok((submission, span)) => { + let ix = decoded.push(SubmissionDataWithSpan { + submission_data: submission, + span, + sent_at, + }); + producers.produce(DecodedSubmission { ix }); + } + Err(e) => { + send_submission_result(producers, future_results, submission_ref, Err(e)); + } + } + } } fn verify_and_validate( diff --git a/crates/relay/src/main.rs b/crates/relay/src/main.rs index 6b6549a0..95bbeec8 100644 --- a/crates/relay/src/main.rs +++ b/crates/relay/src/main.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use bytes::Bytes; use eyre::eyre; use flux::{ spine::FluxSpine, @@ -175,6 +176,9 @@ async fn run( let (web_socket_send, web_socket_recv) = crossbeam_channel::bounded(1024); + let http_submissions = + Arc::new(SharedVector::::with_capacity(MAX_SUBMISSIONS_PER_SLOT)); + let future_results = Arc::new(SharedVector::::with_capacity( MAX_SUBMISSIONS_PER_SLOT, )); @@ -201,6 +205,7 @@ async fn run( registrations_handle, bid_producer, future_results.clone(), + http_submissions.clone(), web_socket_send, ); @@ -225,6 +230,7 @@ async fn run( config.clone(), future_results.clone(), decoded.clone(), + http_submissions.clone(), *core, ); attach_tile(decoder_tile, spine, TileConfig::new(*core, ThreadPriority::OSDefault)); diff --git a/crates/relay/src/spine/messages.rs b/crates/relay/src/spine/messages.rs index 8fd69bb1..c58c23f0 100644 --- a/crates/relay/src/spine/messages.rs +++ b/crates/relay/src/spine/messages.rs @@ -18,6 +18,7 @@ pub struct NewBidSubmission { pub header: InternalBidSubmissionHeader, pub trace: SubmissionTrace, pub expected_pubkey: Option, + pub http_submission_ix: Option, } #[derive(Debug, Clone, Copy)] diff --git a/crates/relay/src/tcp_bid_recv/mod.rs b/crates/relay/src/tcp_bid_recv/mod.rs index 2917946e..074857be 100644 --- a/crates/relay/src/tcp_bid_recv/mod.rs +++ b/crates/relay/src/tcp_bid_recv/mod.rs @@ -129,6 +129,7 @@ impl Tile for BidSubmissionTcpListener { submission_ref, trace, expected_pubkey: Some(*expected_pubkey), + http_submission_ix: None, }) } else { match RegistrationMsg::from_ssz_bytes(payload) {