Skip to content

Commit 0f4562a

Browse files
authored
fix: fee history filterer behavior and arc management (#27)
* fix: fee history filterer behavior and arc management * lint: clippy
1 parent 4857fbf commit 0f4562a

File tree

9 files changed

+294
-108
lines changed

9 files changed

+294
-108
lines changed

Cargo.toml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.10.0"
6+
version = "0.10.1"
77
edition = "2024"
88
rust-version = "1.88"
99
authors = ["init4"]
@@ -41,13 +41,14 @@ signet-rpc = { version = "0.10.0", path = "crates/rpc" }
4141

4242
init4-bin-base = { version = "0.11.0", features = ["alloy"] }
4343

44-
signet-bundle = "0.10.0"
45-
signet-constants = "0.10.0"
46-
signet-evm = "0.10.0"
47-
signet-extract = "0.10.0"
48-
signet-tx-cache = "0.10.0"
49-
signet-types = "0.10.0"
50-
signet-zenith = "0.10.0"
44+
signet-bundle = "0.10.1"
45+
signet-constants = "0.10.1"
46+
signet-evm = "0.10.1"
47+
signet-extract = "0.10.1"
48+
signet-journal = "0.10.1"
49+
signet-tx-cache = "0.10.1"
50+
signet-types = "0.10.1"
51+
signet-zenith = "0.10.1"
5152

5253
# ajj
5354
ajj = { version = "0.3.4" }

crates/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,10 @@ repository.workspace = true
1212
signet-node-types.workspace = true
1313

1414
signet-evm.workspace = true
15+
signet-journal.workspace = true
1516
signet-types.workspace = true
1617
signet-zenith.workspace = true
1718

18-
trevm.workspace = true
19-
2019
alloy.workspace = true
2120

2221
reth.workspace = true

crates/db/src/convert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use alloy::consensus::TxReceipt;
1111

1212
/// Trait for types that can be converted into other types as they're already compatible.
13-
/// Uswed for converting between alloy/reth/signet types.
13+
/// Used for converting between alloy/reth/signet types.
1414
pub trait DataCompat<Other: DataCompat<Self>>: Sized {
1515
/// Convert `self` into the target type.
1616
fn convert(self) -> Other;

crates/db/src/journal/ingestor.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use crate::{SignetDbRw, journal::JournalDb};
2+
use futures_util::StreamExt;
3+
use reth::providers::ProviderResult;
4+
use signet_journal::{Journal, JournalStream};
5+
use signet_node_types::NodeTypesDbTrait;
6+
use std::sync::Arc;
7+
use tokio::task::JoinHandle;
8+
9+
/// A task that ingests journals into a reth database.
10+
#[derive(Debug)]
11+
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
12+
db: Arc<SignetDbRw<Db>>,
13+
}
14+
15+
impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
16+
fn from(value: SignetDbRw<Db>) -> Self {
17+
Self::new(value.into())
18+
}
19+
}
20+
21+
impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
22+
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
23+
Self::new(value)
24+
}
25+
}
26+
27+
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
28+
/// Create a new `JournalIngestor` with the given database provider.
29+
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
30+
Self { db }
31+
}
32+
33+
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
34+
where
35+
S: JournalStream<'static> + Send + Unpin + 'static,
36+
{
37+
while let Some(Journal::V1(journal)) = stream.next().await {
38+
// FUTURE: Sanity check that the header height matches the update
39+
// height. Sanity check that both heights are 1 greater than the
40+
// last height in the database.
41+
42+
let db = self.db.clone();
43+
44+
// DB interaction is sync, so we spawn a blocking task for it. We
45+
// immediately await that task. This prevents blocking the worker
46+
// thread
47+
tokio::task::spawn_blocking(move || db.ingest(journal))
48+
.await
49+
.expect("ingestion should not panic")?;
50+
}
51+
// Stream has ended, return Ok
52+
Ok(())
53+
}
54+
55+
/// Spawn a task to ingest journals from the provided stream.
56+
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
57+
where
58+
S: JournalStream<'static> + Send + Unpin + 'static,
59+
{
60+
tokio::spawn(self.task_future(stream))
61+
}
62+
}
63+
64+
/// Ingest journals from a stream into a reth database.
65+
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
66+
where
67+
Db: NodeTypesDbTrait,
68+
S: JournalStream<'static> + Send + Unpin + 'static,
69+
{
70+
let ingestor = JournalIngestor::new(db);
71+
ingestor.task_future(stream).await
72+
}

crates/db/src/journal/mod.rs

Lines changed: 4 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,76 +3,8 @@
33
mod r#trait;
44
pub use r#trait::JournalDb;
55

6-
use crate::SignetDbRw;
7-
use futures_util::{Stream, StreamExt};
8-
use reth::providers::ProviderResult;
9-
use signet_node_types::NodeTypesDbTrait;
10-
use std::sync::Arc;
11-
use tokio::task::JoinHandle;
12-
use trevm::journal::BlockUpdate;
6+
mod provider;
7+
pub use provider::JournalProviderTask;
138

14-
/// A task that ingests journals into a reth database.
15-
#[derive(Debug)]
16-
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
17-
db: Arc<SignetDbRw<Db>>,
18-
}
19-
20-
impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
21-
fn from(value: SignetDbRw<Db>) -> Self {
22-
Self::new(value.into())
23-
}
24-
}
25-
26-
impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
27-
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
28-
Self::new(value)
29-
}
30-
}
31-
32-
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
33-
/// Create a new `JournalIngestor` with the given database provider.
34-
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
35-
Self { db }
36-
}
37-
38-
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
39-
where
40-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
41-
{
42-
while let Some(item) = stream.next().await {
43-
// FUTURE: Sanity check that the header height matches the update
44-
// height. Sanity check that both heights are 1 greater than the
45-
// last height in the database.
46-
47-
let db = self.db.clone();
48-
let (header, block_update) = item;
49-
50-
// DB interaction is sync, so we spawn a blocking task for it. We
51-
// immediately await that task. This prevents blocking the worker
52-
// thread
53-
tokio::task::spawn_blocking(move || db.ingest(&header, block_update))
54-
.await
55-
.expect("ingestion should not panic")?;
56-
}
57-
// Stream has ended, return Ok
58-
Ok(())
59-
}
60-
61-
/// Spawn a task to ingest journals from the provided stream.
62-
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
63-
where
64-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
65-
{
66-
tokio::spawn(self.task_future(stream))
67-
}
68-
}
69-
70-
/// Ingest journals from a stream into a reth database.
71-
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
72-
where
73-
Db: NodeTypesDbTrait,
74-
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
75-
{
76-
let ingestor = JournalIngestor::new(db);
77-
ingestor.task_future(stream).await
78-
}
9+
mod ingestor;
10+
pub use ingestor::{JournalIngestor, ingest_journals};

crates/db/src/journal/provider.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use crate::journal::JournalDb;
2+
use futures_util::StreamExt;
3+
use reth::{
4+
primitives::SealedHeader,
5+
providers::{
6+
CanonChainTracker, DatabaseProviderFactory, DatabaseProviderRW, ProviderResult,
7+
providers::BlockchainProvider,
8+
},
9+
rpc::types::engine::ForkchoiceState,
10+
};
11+
use signet_journal::{Journal, JournalStream};
12+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
13+
use tokio::task::JoinHandle;
14+
15+
/// A task that processes journal updates for a specific database, and calls
16+
/// the appropriate methods on a [`BlockchainProvider`] to update the in-memory
17+
/// chain view.
18+
#[derive(Debug, Clone)]
19+
pub struct JournalProviderTask<Db: NodeTypesDbTrait> {
20+
provider: BlockchainProvider<SignetNodeTypes<Db>>,
21+
}
22+
23+
impl<Db: NodeTypesDbTrait> JournalProviderTask<Db> {
24+
/// Instantiate a new task.
25+
pub const fn new(provider: BlockchainProvider<SignetNodeTypes<Db>>) -> Self {
26+
Self { provider }
27+
}
28+
29+
/// Get a reference to the provider.
30+
pub const fn provider(&self) -> &BlockchainProvider<SignetNodeTypes<Db>> {
31+
&self.provider
32+
}
33+
34+
/// Deconstruct the task into its provider.
35+
pub fn into_inner(self) -> BlockchainProvider<SignetNodeTypes<Db>> {
36+
self.provider
37+
}
38+
39+
/// Create a future for the task, suitable for [`tokio::spawn`] or another
40+
/// task-spawning system.
41+
pub async fn task_future<S>(self, mut journals: S) -> ProviderResult<()>
42+
where
43+
S: JournalStream<'static> + Send + Unpin + 'static,
44+
{
45+
loop {
46+
let Some(Journal::V1(journal)) = journals.next().await else { break };
47+
48+
let rw = self.provider.database_provider_rw().map(DatabaseProviderRW);
49+
50+
let r_header = SealedHeader::new_unhashed(journal.header().clone());
51+
let block_hash = r_header.hash();
52+
53+
// DB interaction is sync, so we spawn a blocking task for it. We
54+
// immediately await that task. This prevents blocking the worker
55+
// thread
56+
tokio::task::spawn_blocking(move || rw?.ingest(journal))
57+
.await
58+
.expect("ingestion should not panic")?;
59+
60+
self.provider.set_canonical_head(r_header.clone());
61+
self.provider.set_safe(r_header.clone());
62+
self.provider.set_finalized(r_header);
63+
self.provider.on_forkchoice_update_received(&ForkchoiceState {
64+
head_block_hash: block_hash,
65+
safe_block_hash: block_hash,
66+
finalized_block_hash: block_hash,
67+
});
68+
}
69+
70+
Ok(())
71+
}
72+
73+
/// Spawn the journal provider task.
74+
pub fn spawn<S>(self, journals: S) -> JoinHandle<ProviderResult<()>>
75+
where
76+
S: JournalStream<'static> + Send + Unpin + 'static,
77+
{
78+
tokio::spawn(self.task_future(journals))
79+
}
80+
}

crates/db/src/journal/trait.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::RuWriter;
22
use alloy::consensus::{BlockHeader, Header};
33
use reth::{providers::ProviderResult, revm::db::BundleState};
44
use signet_evm::{BlockResult, ExecutionOutcome};
5+
use signet_journal::HostJournal;
56
use signet_types::primitives::{RecoveredBlock, SealedBlock, SealedHeader, TransactionSigned};
6-
use trevm::journal::BlockUpdate;
77

88
/// A database that can be updated with journals.
99
pub trait JournalDb: RuWriter {
@@ -18,20 +18,26 @@ pub trait JournalDb: RuWriter {
1818
///
1919
/// This is intended to be used for tx simulation, and other purposes that
2020
/// need fast state access WITHTOUT needing to retrieve historical data.
21-
fn ingest(&self, header: &Header, update: BlockUpdate<'_>) -> ProviderResult<()> {
22-
let journal_hash = update.journal_hash();
21+
fn ingest(&self, journal: HostJournal<'static>) -> ProviderResult<()> {
22+
let journal_hash = journal.journal_hash();
23+
24+
let (meta, bsi) = journal.into_parts();
25+
let (host_height, _, header) = meta.into_parts();
2326

2427
// TODO: remove the clone in future versions. This can be achieved by
2528
// _NOT_ making a `BlockResult` and instead manually updating relevan
2629
// tables. However, this means diverging more fro the underlying reth
2730
// logic that we are currently re-using.
28-
let bundle_state: BundleState = update.journal().clone().into();
31+
let bundle_state: BundleState = bsi.into();
2932
let execution_outcome = ExecutionOutcome::new(bundle_state, vec![], header.number());
3033

3134
let block: SealedBlock<TransactionSigned, Header> =
32-
SealedBlock { header: SealedHeader::new(header.to_owned()), body: Default::default() };
33-
let block_result =
34-
BlockResult { sealed_block: RecoveredBlock::new(block, vec![]), execution_outcome };
35+
SealedBlock { header: SealedHeader::new(header), body: Default::default() };
36+
let block_result = BlockResult {
37+
sealed_block: RecoveredBlock::new(block, vec![]),
38+
execution_outcome,
39+
host_height,
40+
};
3541

3642
self.append_host_block(
3743
None,
@@ -40,7 +46,9 @@ pub trait JournalDb: RuWriter {
4046
std::iter::empty(),
4147
&block_result,
4248
journal_hash,
43-
)
49+
)?;
50+
51+
Ok(())
4452
}
4553
}
4654

crates/db/src/provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ where
417417
//
418418
// last reviewed at tag v1.5.1
419419

420-
let BlockResult { sealed_block: block, execution_outcome } = block_result;
420+
let BlockResult { sealed_block: block, execution_outcome, .. } = block_result;
421421

422422
let ru_height = block.number();
423423
self.insert_signet_block(header, block, journal_hash, StorageLocation::Database)?;

0 commit comments

Comments
 (0)