Skip to content

Commit d73e594

Browse files
committed
refactor: split coder from blobber
1 parent d9b56b7 commit d73e594

File tree

13 files changed

+175
-122
lines changed

13 files changed

+175
-122
lines changed
File renamed without changes.

crates/blobber/src/cache.rs renamed to crates/blobber/src/blobs/cache.rs

Lines changed: 27 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1-
use crate::{BlobberError, BlobberResult, Blobs, FetchResult};
1+
use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult};
22
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
33
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
44
use alloy::eips::merge::EPOCH_SLOTS;
55
use alloy::primitives::{B256, Bytes, keccak256};
6+
use core::fmt;
67
use reth::transaction_pool::TransactionPool;
78
use reth::{network::cache::LruMap, primitives::Receipt};
89
use signet_extract::ExtractedEvent;
910
use signet_zenith::Zenith::BlockSubmitted;
1011
use signet_zenith::ZenithBlock;
12+
use std::marker::PhantomData;
1113
use std::{
1214
sync::{Arc, Mutex},
1315
time::Duration,
@@ -37,11 +39,13 @@ enum CacheInst {
3739

3840
/// Handle for the cache.
3941
#[derive(Debug, Clone)]
40-
pub struct CacheHandle {
42+
pub struct CacheHandle<Coder = SimpleCoder> {
4143
sender: mpsc::Sender<CacheInst>,
44+
45+
_coder: PhantomData<Coder>,
4246
}
4347

44-
impl CacheHandle {
48+
impl<Coder> CacheHandle<Coder> {
4549
/// Sends a cache instruction.
4650
async fn send(&self, inst: CacheInst) {
4751
let _ = self.sender.send(inst).await;
@@ -71,12 +75,14 @@ impl CacheHandle {
7175

7276
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
7377
/// Zenith block data using the provided coder.
74-
pub async fn fetch_and_decode_with_coder<C: SidecarCoder>(
78+
pub async fn fetch_and_decode(
7579
&self,
7680
slot: usize,
7781
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
78-
mut coder: C,
79-
) -> BlobberResult<Bytes> {
82+
) -> BlobberResult<Bytes>
83+
where
84+
Coder: SidecarCoder + Default,
85+
{
8086
let tx_hash = extract.tx_hash();
8187
let versioned_hashes = extract
8288
.tx
@@ -87,23 +93,13 @@ impl CacheHandle {
8793

8894
let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;
8995

90-
coder
96+
Coder::default()
9197
.decode_all(blobs.as_ref())
9298
.ok_or_else(BlobberError::blob_decode_error)?
9399
.into_iter()
94100
.find(|data| keccak256(data) == extract.block_data_hash())
95101
.map(Into::into)
96-
.ok_or_else(|| BlobberError::block_data_not_found(tx_hash))
97-
}
98-
99-
/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
100-
/// [`SimpleCoder`] to get the Zenith block data.
101-
pub async fn fech_and_decode(
102-
&self,
103-
slot: usize,
104-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
105-
) -> BlobberResult<Bytes> {
106-
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
102+
.ok_or_else(|| BlobberError::block_data_not_found(extract.block_data_hash()))
107103
}
108104

109105
/// Fetch the blobs, decode them using the provided coder, and construct a
@@ -117,15 +113,17 @@ impl CacheHandle {
117113
/// decoded (e.g., due to a malformatted blob).
118114
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
119115
/// blobs.
120-
pub async fn signet_block_with_coder<C: SidecarCoder>(
116+
pub async fn signet_block(
121117
&self,
122118
host_block_number: u64,
123119
slot: usize,
124120
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
125-
coder: C,
126-
) -> FetchResult<ZenithBlock> {
121+
) -> FetchResult<ZenithBlock>
122+
where
123+
Coder: SidecarCoder + Default,
124+
{
127125
let header = extract.ru_header(host_block_number);
128-
let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await {
126+
let block_data = match self.fetch_and_decode(slot, extract).await {
129127
Ok(buf) => buf,
130128
Err(BlobberError::Decode(_)) => {
131129
trace!("Failed to decode block data");
@@ -135,44 +133,24 @@ impl CacheHandle {
135133
};
136134
Ok(ZenithBlock::from_header_and_data(header, block_data))
137135
}
138-
139-
/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
140-
/// Zenith block from the header and data.
141-
///
142-
/// # Returns
143-
///
144-
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
145-
/// decoded.
146-
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
147-
/// decoded (e.g., due to a malformatted blob).
148-
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
149-
/// blobs.
150-
pub async fn signet_block(
151-
&self,
152-
host_block_number: u64,
153-
slot: usize,
154-
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
155-
) -> FetchResult<ZenithBlock> {
156-
self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await
157-
}
158136
}
159137

160138
/// Retrieves blobs and stores them in a cache for later use.
161139
pub struct BlobCacher<Pool> {
162-
fetcher: crate::BlobFetcher<Pool>,
140+
fetcher: BlobFetcher<Pool>,
163141

164142
cache: Mutex<LruMap<(usize, B256), Blobs>>,
165143
}
166144

167-
impl<Pool: core::fmt::Debug> core::fmt::Debug for BlobCacher<Pool> {
168-
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
145+
impl<Pool: fmt::Debug> fmt::Debug for BlobCacher<Pool> {
146+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169147
f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive()
170148
}
171149
}
172150

173151
impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
174152
/// Creates a new `BlobCacher` with the provided extractor and cache size.
175-
pub fn new(fetcher: crate::BlobFetcher<Pool>) -> Self {
153+
pub fn new(fetcher: BlobFetcher<Pool>) -> Self {
176154
Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() }
177155
}
178156

@@ -237,10 +215,10 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
237215
///
238216
/// # Panics
239217
/// This function will panic if the cache task fails to spawn.
240-
pub fn spawn(self) -> CacheHandle {
218+
pub fn spawn<C: SidecarCoder + Default>(self) -> CacheHandle<C> {
241219
let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE);
242220
tokio::spawn(Arc::new(self).task_future(inst));
243-
CacheHandle { sender }
221+
CacheHandle { sender, _coder: PhantomData }
244222
}
245223
}
246224

@@ -310,7 +288,7 @@ mod tests {
310288
.unwrap()
311289
.with_slot_calculator(calc)
312290
.build_cache()?;
313-
let handle = cache.spawn();
291+
let handle = cache.spawn::<SimpleCoder>();
314292

315293
let got = handle
316294
.fetch_blobs(
File renamed without changes.

crates/blobber/src/blobs/error.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use alloy::primitives::B256;
2+
use reth::transaction_pool::BlobStoreError;
3+
4+
/// Result using [`FetchError`] as the default error type.
5+
pub type FetchResult<T> = Result<T, FetchError>;
6+
7+
/// Unrecoverable blob fetching errors. These result in the node shutting
8+
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
9+
#[derive(Debug, thiserror::Error)]
10+
pub enum FetchError {
11+
/// Reqwest error
12+
#[error(transparent)]
13+
Reqwest(#[from] reqwest::Error),
14+
/// Missing sidecar error
15+
#[error("Cannot retrieve sidecar for {0} from any source")]
16+
MissingSidecar(B256),
17+
/// Reth blobstore error.
18+
#[error(transparent)]
19+
BlobStore(BlobStoreError),
20+
/// Url parse error.
21+
#[error(transparent)]
22+
UrlParse(#[from] url::ParseError),
23+
/// Consensus client URL not set error.
24+
#[error("Consensus client URL not set")]
25+
ConsensusClientUrlNotSet,
26+
/// Pylon client URL not set error.
27+
#[error("Pylon client URL not set")]
28+
PylonClientUrlNotSet,
29+
}
30+
31+
impl From<BlobStoreError> for FetchError {
32+
fn from(err: BlobStoreError) -> Self {
33+
match err {
34+
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
35+
_ => FetchError::BlobStore(err),
36+
}
37+
}
38+
}

crates/blobber/src/fetch.rs renamed to crates/blobber/src/blobs/fetch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
BlobFetcherBuilder, BlobberError, BlobberResult, FetchResult, error::FetchError,
2+
BlobFetcherBuilder, BlobberError, BlobberResult, FetchError, FetchResult,
33
shim::ExtractableChainShim, utils::extract_blobs_from_bundle,
44
};
55
use alloy::{

crates/blobber/src/blobs/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
mod builder;
2+
pub use builder::{BlobFetcherBuilder, BuilderError as BlobFetcherBuilderError};
3+
4+
mod cache;
5+
pub use cache::{BlobCacher, CacheHandle};
6+
7+
mod config;
8+
pub use config::BlobFetcherConfig;
9+
10+
mod error;
11+
pub use error::{FetchError, FetchResult};
12+
13+
mod fetch;
14+
pub use fetch::{BlobFetcher, Blobs};

crates/blobber/src/coder/error.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use alloy::{eips::eip2718::Eip2718Error, primitives::B256};
2+
3+
/// Result using [`DecodeError`] as the default error type.
4+
pub type DecodeResult<T> = Result<T, DecodeError>;
5+
6+
/// Ignorable blob fetching errors. These result in the block being skipped.
7+
#[derive(Debug, thiserror::Error, Copy, Clone)]
8+
pub enum DecodeError {
9+
/// Incorrect transaction type error
10+
#[error("Non-4844 transaction")]
11+
Non4844Transaction,
12+
/// Decoding error from the internal [`SimpleCoder`]. This indicates the
13+
/// blobs are not formatted in the simple coder format.
14+
///
15+
/// [`SimpleCoder`]: alloy::consensus::SimpleCoder
16+
#[error("Decoding failed")]
17+
BlobDecodeError,
18+
/// Block data not found in decoded blob
19+
#[error("Block data not found in decoded blob. Expected block hash: {0}")]
20+
BlockDataNotFound(B256),
21+
/// Error while decoding block from blob
22+
#[error("Block decode error: {0}")]
23+
BlockDecodeError(#[from] Eip2718Error),
24+
}

crates/blobber/src/coder/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
mod error;
2+
pub use error::{DecodeError, DecodeResult};
3+
4+
mod r#trait;
5+
pub use r#trait::SignetBlockDecoder;

crates/blobber/src/coder/trait.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use crate::{Blobs, DecodeError, DecodeResult};
2+
use alloy::{
3+
consensus::SidecarCoder,
4+
primitives::{B256, Bytes, keccak256},
5+
};
6+
use signet_zenith::{Zenith, ZenithBlock};
7+
8+
/// A trait for decoding blocks from blob data.
9+
pub trait SignetBlockDecoder {
10+
/// Decodes a block from the given blob bytes.
11+
fn decode_block(
12+
&mut self,
13+
blobs: Blobs,
14+
header: Zenith::BlockHeader,
15+
data_hash: B256,
16+
) -> DecodeResult<ZenithBlock>;
17+
18+
/// Decodes a block from the given blob bytes, or returns an empty block.
19+
fn decode_block_or_default(
20+
&mut self,
21+
blobs: Blobs,
22+
header: Zenith::BlockHeader,
23+
data_hash: B256,
24+
) -> ZenithBlock {
25+
self.decode_block(blobs, header, data_hash)
26+
.unwrap_or_else(|_| ZenithBlock::from_header_and_data(header, Bytes::new()))
27+
}
28+
}
29+
30+
impl<T> SignetBlockDecoder for T
31+
where
32+
T: SidecarCoder,
33+
{
34+
fn decode_block(
35+
&mut self,
36+
blobs: Blobs,
37+
header: Zenith::BlockHeader,
38+
data_hash: B256,
39+
) -> DecodeResult<ZenithBlock> {
40+
let block_data = self
41+
.decode_all(blobs.as_ref())
42+
.ok_or(DecodeError::BlobDecodeError)?
43+
.into_iter()
44+
.find(|data| keccak256(data) == data_hash)
45+
.map(Into::<Bytes>::into)
46+
.ok_or(DecodeError::BlockDataNotFound(data_hash))?;
47+
Ok(ZenithBlock::from_header_and_data(header, block_data))
48+
}
49+
}

crates/blobber/src/error.rs

Lines changed: 3 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,10 @@
1+
use crate::{DecodeError, FetchError};
12
use alloy::{eips::eip2718::Eip2718Error, primitives::B256};
23
use reth::transaction_pool::BlobStoreError;
34

45
/// Result using [`BlobFetcherError`] as the default error type.
56
pub type BlobberResult<T, E = BlobberError> = std::result::Result<T, E>;
67

7-
/// Result using [`FetchError`] as the default error type.
8-
pub type FetchResult<T> = BlobberResult<T, FetchError>;
9-
10-
/// Result using [`DecodeError`] as the default error type.
11-
pub type DecodeResult<T> = BlobberResult<T, DecodeError>;
12-
13-
/// Unrecoverable blob fetching errors. These result in the node shutting
14-
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
15-
#[derive(Debug, thiserror::Error)]
16-
pub enum FetchError {
17-
/// Reqwest error
18-
#[error(transparent)]
19-
Reqwest(#[from] reqwest::Error),
20-
/// Missing sidecar error
21-
#[error("Cannot retrieve sidecar for {0} from any source")]
22-
MissingSidecar(B256),
23-
/// Reth blobstore error.
24-
#[error(transparent)]
25-
BlobStore(BlobStoreError),
26-
/// Url parse error.
27-
#[error(transparent)]
28-
UrlParse(#[from] url::ParseError),
29-
/// Consensus client URL not set error.
30-
#[error("Consensus client URL not set")]
31-
ConsensusClientUrlNotSet,
32-
/// Pylon client URL not set error.
33-
#[error("Pylon client URL not set")]
34-
PylonClientUrlNotSet,
35-
}
36-
37-
/// Ignorable blob fetching errors. These result in the block being skipped.
38-
#[derive(Debug, thiserror::Error, Copy, Clone)]
39-
pub enum DecodeError {
40-
/// Incorrect transaction type error
41-
#[error("Non-4844 transaction")]
42-
Non4844Transaction,
43-
/// Decoding error from the internal [`SimpleCoder`]. This indicates the
44-
/// blobs are not formatted in the simple coder format.
45-
///
46-
/// [`SimpleCoder`]: alloy::consensus::SimpleCoder
47-
#[error("Decoding failed")]
48-
BlobDecodeError,
49-
/// Block data not found in decoded blob
50-
#[error("Block data not found in decoded blob. Expected block hash: {0}")]
51-
BlockDataNotFound(B256),
52-
/// Error while decoding block from blob
53-
#[error("Block decode error: {0}")]
54-
BlockDecodeError(#[from] Eip2718Error),
55-
}
56-
578
/// Blob fetching errors
589
#[derive(Debug, thiserror::Error)]
5910
pub enum BlobberError {
@@ -92,8 +43,8 @@ impl BlobberError {
9243
}
9344

9445
/// Blob decoded, but expected hash not found
95-
pub fn block_data_not_found(tx: B256) -> Self {
96-
DecodeError::BlockDataNotFound(tx).into()
46+
pub fn block_data_not_found(data_hash: B256) -> Self {
47+
DecodeError::BlockDataNotFound(data_hash).into()
9748
}
9849

9950
/// Missing sidecar error
@@ -107,15 +58,6 @@ impl BlobberError {
10758
}
10859
}
10960

110-
impl From<BlobStoreError> for FetchError {
111-
fn from(err: BlobStoreError) -> Self {
112-
match err {
113-
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
114-
_ => FetchError::BlobStore(err),
115-
}
116-
}
117-
}
118-
11961
impl From<BlobStoreError> for BlobberError {
12062
fn from(err: BlobStoreError) -> Self {
12163
Self::Fetch(err.into())

0 commit comments

Comments
 (0)