Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/relay/src/api/builder/api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use axum::{Extension, http::StatusCode, response::IntoResponse};
use bytes::Bytes;
use crossbeam_channel::Sender;
use flux::spine::StandaloneDCacheProducer;
use flux_utils::SharedVector;
Expand All @@ -23,6 +24,7 @@ pub struct BuilderApi<A: Api> {
pub api_provider: Arc<A::ApiProvider>,
pub producer: StandaloneDCacheProducer<NewBidSubmission>,
pub future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
pub submission_payloads: Arc<SharedVector<Bytes>>,
pub web_socket_connections: Sender<RawWebSocket>,
}

Expand All @@ -36,6 +38,7 @@ impl<A: Api> BuilderApi<A> {
api_provider: Arc<A::ApiProvider>,
producer: StandaloneDCacheProducer<NewBidSubmission>,
future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
submission_payloads: Arc<SharedVector<Bytes>>,
web_socket_connections: Sender<RawWebSocket>,
) -> Self {
Self {
Expand All @@ -47,6 +50,7 @@ impl<A: Api> BuilderApi<A> {
api_provider,
producer,
future_results,
submission_payloads,
web_socket_connections,
}
}
Expand Down
6 changes: 4 additions & 2 deletions crates/relay/src/api/builder/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,19 @@ impl<A: Api> BuilderApi<A> {

let future_ix = api.future_results.push(FutureBidSubmissionResult::new());

let ix = api.submission_payloads.push(body);
let new_bid = NewBidSubmission {
payload_offset: 0,
header,
submission_ref: SubmissionRef::Http(future_ix),
trace,
expected_pubkey: None,
http_submission_ix: Some(ix),
};

if let Err(e) = api.producer.produce_with_ingestion(
if let Err(e) = api.producer.produce_with_ingestion::<fn(&mut [u8])>(
new_bid,
Some((body.len(), |buf: &mut [u8]| buf.copy_from_slice(&body))),
None,
IngestionTime::now(),
) {
tracing::error!("failed to write the request payload: {e}");
Expand Down
5 changes: 5 additions & 0 deletions crates/relay/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use crossbeam_channel::Sender;
use flux::spine::StandaloneDCacheProducer;
use flux_utils::SharedVector;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub fn start_api_service<A: Api>(
registrations_handle: RegWorkerHandle,
bid_producer: StandaloneDCacheProducer<NewBidSubmission>,
future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
http_submissions: Arc<SharedVector<Bytes>>,
web_socket_connections: Sender<RawWebSocket>,
) {
tokio::spawn(run_api_service::<A>(
Expand All @@ -65,6 +67,7 @@ pub fn start_api_service<A: Api>(
registrations_handle,
bid_producer,
future_results,
http_submissions,
web_socket_connections,
));
}
Expand All @@ -86,6 +89,7 @@ pub async fn run_api_service<A: Api>(
registrations_handle: RegWorkerHandle,
bid_producer: StandaloneDCacheProducer<NewBidSubmission>,
future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
http_submissions: Arc<SharedVector<Bytes>>,
web_socket_connections: Sender<RawWebSocket>,
) {
let gossiper = Arc::new(
Expand All @@ -107,6 +111,7 @@ pub async fn run_api_service<A: Api>(
api_provider.clone(),
bid_producer,
future_results,
http_submissions,
web_socket_connections,
);
let builder_api = Arc::new(builder_api);
Expand Down
117 changes: 91 additions & 26 deletions crates/relay/src/bid_decoder/tile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::{cell::RefCell, sync::Arc};

use alloy_primitives::B256;
use bytes::Bytes;
use flux::{
spine::{DCacheRead, SpineProducers},
tile::Tile,
Expand All @@ -20,6 +21,7 @@ use helix_types::{
SignedBidSubmission, Submission, SubmissionVersion,
};
use tracing::trace;
use zstd::zstd_safe::WriteBuf;

use crate::{
HelixSpine,
Expand All @@ -29,7 +31,10 @@ use crate::{
send_submission_result,
},
bid_decoder::SubmissionDataWithSpan,
spine::messages::{DecodedSubmission, NewBidSubmission},
spine::{
HelixSpineProducers,
messages::{DecodedSubmission, NewBidSubmission},
},
};

pub struct DecoderTile {
Expand All @@ -38,50 +43,87 @@ pub struct DecoderTile {
config: RelayConfig,
decoded: Arc<SharedVector<SubmissionDataWithSpan>>,
future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
buffer: Vec<u8>,
http_submissions: Arc<SharedVector<Bytes>>,
buffer: RefCell<Vec<u8>>,
core: usize,
}

impl Tile<HelixSpine> for DecoderTile {
fn loop_body(&mut self, adapter: &mut flux::spine::SpineAdapter<HelixSpine>) {
adapter.consume_with_dcache_collaborative_internal_message(
|new_bid: &InternalMessage<NewBidSubmission>, full_payload| {
let payload = &full_payload[new_bid.payload_offset..];
|new_bid: &InternalMessage<NewBidSubmission>, dcache_payload| {
let payload = &dcache_payload[new_bid.payload_offset..];
let sent_at = new_bid.tracking_timestamp().publish_t();
Self::handle_block_submission(
DecoderTile::handle_block_submission(
&self.cache,
&self.chain_info,
&self.config,
&new_bid.submission_ref,
&new_bid.header,
payload,
&mut self.buffer,
&mut self.buffer.borrow_mut(),
new_bid.trace,
sent_at,
new_bid.expected_pubkey.as_ref(),
)
},
|res, producers| match res {
DCacheRead::Ok((new_bid, result)) => match result {
Ok((submission, span)) => {
let ix = self.decoded.push(SubmissionDataWithSpan {
submission_data: submission,
span,
sent_at: new_bid.tracking_timestamp().publish_t(),
});
producers.produce(DecodedSubmission { ix });
}
Err(e) => {
send_submission_result(
DCacheRead::Ok((new_bid, result)) => {
let sent_at = new_bid.tracking_timestamp().publish_t();
Self::handle_result(
&self.decoded,
&self.future_results,
result,
sent_at,
new_bid.submission_ref,
producers,
);
}
DCacheRead::NoRef(new_bid) => {
let Some(payload) = new_bid
.http_submission_ix
.map(|ix| self.http_submissions.get(ix))
.flatten()
else {
tracing::error!(
"failed to find the payload for bid submission with id = {}",
new_bid.header.id
);
return send_submission_result(
producers,
&self.future_results,
new_bid.submission_ref,
Err(e),
Err(BuilderApiError::InternalError),
);
}
},
DCacheRead::Lost(new_bid) | DCacheRead::NoRef(new_bid) => {
tracing::error!("dcache read failed");
};

let sent_at = new_bid.tracking_timestamp().publish_t();
let result = DecoderTile::handle_block_submission(
&self.cache,
&self.chain_info,
&self.config,
&new_bid.submission_ref,
&new_bid.header,
payload.as_slice(),
&mut self.buffer.borrow_mut(),
new_bid.trace,
sent_at,
new_bid.expected_pubkey.as_ref(),
);
Self::handle_result(
&self.decoded,
&self.future_results,
result,
sent_at,
new_bid.submission_ref,
producers,
);
}
DCacheRead::Lost(new_bid) => {
tracing::error!(
"dcache read failed for bid submission with id {}",
new_bid.header.id
);
send_submission_result(
producers,
&self.future_results,
Expand All @@ -102,8 +144,6 @@ impl Tile<HelixSpine> for DecoderTile {
true
}

fn teardown(self, _adapter: &mut flux::spine::SpineAdapter<HelixSpine>) {}

fn name(&self) -> flux::tile::TileName {
let mut name = flux_utils::short_typename::<Self>();
name.push_str_truncate(self.core.to_string().as_str());
Expand All @@ -118,6 +158,7 @@ impl DecoderTile {
config: RelayConfig,
future_results: Arc<SharedVector<FutureBidSubmissionResult>>,
decoded: Arc<SharedVector<SubmissionDataWithSpan>>,
http_submissions: Arc<SharedVector<Bytes>>,
core: usize,
) -> Self {
Self {
Expand All @@ -126,7 +167,8 @@ impl DecoderTile {
config,
decoded,
future_results,
buffer: Vec::with_capacity(MAX_PAYLOAD_LENGTH),
http_submissions,
buffer: RefCell::new(Vec::with_capacity(MAX_PAYLOAD_LENGTH)),
core,
}
}
Expand Down Expand Up @@ -303,6 +345,29 @@ impl DecoderTile {
decoder_params,
))
}

fn handle_result(
decoded: &SharedVector<SubmissionDataWithSpan>,
future_results: &Arc<SharedVector<FutureBidSubmissionResult>>,
result: Result<(SubmissionData, tracing::Span), BuilderApiError>,
sent_at: Nanos,
submission_ref: SubmissionRef,
producers: &mut HelixSpineProducers,
) {
match result {
Ok((submission, span)) => {
let ix = decoded.push(SubmissionDataWithSpan {
submission_data: submission,
span,
sent_at,
});
producers.produce(DecodedSubmission { ix });
}
Err(e) => {
send_submission_result(producers, future_results, submission_ref, Err(e));
}
}
}
}

fn verify_and_validate(
Expand Down
6 changes: 6 additions & 0 deletions crates/relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use eyre::eyre;
use flux::{
spine::FluxSpine,
Expand Down Expand Up @@ -175,6 +176,9 @@ async fn run(

let (web_socket_send, web_socket_recv) = crossbeam_channel::bounded(1024);

let http_submissions =
Arc::new(SharedVector::<Bytes>::with_capacity(MAX_SUBMISSIONS_PER_SLOT));

let future_results = Arc::new(SharedVector::<FutureBidSubmissionResult>::with_capacity(
MAX_SUBMISSIONS_PER_SLOT,
));
Expand All @@ -201,6 +205,7 @@ async fn run(
registrations_handle,
bid_producer,
future_results.clone(),
http_submissions.clone(),
web_socket_send,
);

Expand All @@ -225,6 +230,7 @@ async fn run(
config.clone(),
future_results.clone(),
decoded.clone(),
http_submissions.clone(),
*core,
);
attach_tile(decoder_tile, spine, TileConfig::new(*core, ThreadPriority::OSDefault));
Expand Down
1 change: 1 addition & 0 deletions crates/relay/src/spine/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct NewBidSubmission {
pub header: InternalBidSubmissionHeader,
pub trace: SubmissionTrace,
pub expected_pubkey: Option<BlsPublicKeyBytes>,
pub http_submission_ix: Option<usize>,
}

#[derive(Debug, Clone, Copy)]
Expand Down
1 change: 1 addition & 0 deletions crates/relay/src/tcp_bid_recv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl Tile<HelixSpine> for BidSubmissionTcpListener {
submission_ref,
trace,
expected_pubkey: Some(*expected_pubkey),
http_submission_ix: None,
})
} else {
match RegistrationMsg::from_ssz_bytes(payload) {
Expand Down
Loading