Skip to content

feat: impl generic catchup and livestream#743

Open
ChaoticTempest wants to merge 16 commits intodevelopfrom
phuong/feat/catchup-and-livestream
Open

feat: impl generic catchup and livestream#743
ChaoticTempest wants to merge 16 commits intodevelopfrom
phuong/feat/catchup-and-livestream

Conversation

@ChaoticTempest
Copy link
Copy Markdown
Contributor

This implements catchup then livestream in a generic way for all chains but realistic only implemented for ethereum. This is outlined in #663 in the generalized catchup section.

The issue with the previous ethereum catchup and live/final block polling was that all three of them were being done concurrently, so order is not guaranteed. We would end up with potentially out of order events here. So in this PR, ethereum processes block linearly.

Solana will be implemented if we like what we see here.

The basic gist of the generalized catchup & livestream is the following:

ethereum_indexer = ethereum_stream.start();

task::spawn({
   let buffered_stream = ethereum_indexer.livestream();
   let anchor_block = buffered_stream.initial();
   let anchor_height = ethereum_indexer.height(anchor_block);
   let catchup_range = ethereum_indexer.catchup_range(anchor_height);
   for i in catchup_range {
       ethereum_indexer.process_height(i);
   }
   while Some(block) = buffered_stream.next() {
      ethereum_indexer.process_block(block);
   }
})

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors chain indexing to support a generic “catchup then livestream” flow (intended to guarantee linear block processing), updates the ChainStream interface accordingly, and adapts Ethereum/Solana streams plus integration tests to the new model.

Changes:

  • Introduces ChainIndexer / ChainBufferedStream abstractions and a shared catchup_then_livestream runner; removes ChainEvent::CatchupCompleted.
  • Refactors Ethereum indexing to buffer live blocks, run catchup to an anchor height, and then process buffered blocks in order, emitting events only after the chosen buffering/finality policy.
  • Updates Solana/Ethereum streams and integration tests to use the new ChainStream::start() -> Result<Indexer> API and indexer spawning helpers.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
integration-tests/tests/cases/solana_stream.rs Updates tests for new fallible ChainStream::start() signature.
integration-tests/tests/cases/ethereum_stream.rs Adds linear catchup/resume integration tests and uses spawn_stream_indexer helper.
chain-signatures/node/src/stream/mod.rs Adds generic catchup+livestream orchestration and requeue gating via oneshot completion signal.
chain-signatures/node/src/indexer_sol.rs Updates Solana stream to new ChainStream API and disables indexer via DisabledChainIndexer.
chain-signatures/node/src/indexer_eth/mod.rs Refactors Ethereum indexer for buffered livestream + catchup + ordered emission (incl. finality gating).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +174 to +189
let buffered = match indexer.livestream().await {
Ok(buffered) => buffered,
Err(err) => {
tracing::error!(?err, %chain, "failed to initialize livestream");
return;
}
};
let Some(mut buffered) = buffered else {
let _ = catchup_completed_tx.send(());
return;
};

let Some(anchor_block) = buffered.initial().await else {
tracing::warn!(%chain, "buffered livestream ended before anchor block");
return;
};
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catchup_then_livestream can return early (e.g., livestream() returns Err, or buffered.initial() returns None) without notifying run_stream that catchup is “done”. For Ethereum this can leave RecoveryRequeueMode::AfterCatchup entries never requeued, effectively stalling recovered requests. Consider ensuring the catchup completion signal is always sent on all exit paths (including errors / early stream end), or propagate an explicit error so run_stream can decide how to handle deferred requeueing.

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +217
let mut next_block = anchor_block;
loop {
if let Err(err) = indexer.process_buffered_block(next_block).await {
tracing::warn!(?err, %chain, "buffered block processing failed");
}

match buffered.next().await {
Some(block) => next_block = block,
None => break,
}
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffered/live block processing logs errors but still advances to the next buffered block. That can skip heights and break the PR’s stated guarantee of linear block processing (and can also keep running after the event channel is closed, doing useless work). Consider retrying process_buffered_block with the same backoff semantics as catchup (or treating certain errors as fatal and breaking) instead of moving on to buffered.next() immediately.

Copilot uses AI. Check for mistakes.
Comment on lines 271 to 285
@@ -161,36 +283,67 @@ pub async fn run_stream<S: ChainStream>(
.await;
recovered.pending.clear();
}
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_stream only requeues recovered requests when catchup_completed_rx resolves with Ok(()). If the catchup/livestream task exits early (e.g., initialization error) and drops the oneshot sender, result will be Err(Canceled) and the deferred AfterCatchup requeue never happens, leaving recovered requests stuck indefinitely. Consider requeueing on both Ok and Err (possibly with different logging/behavior), or otherwise ensuring the sender always sends on failure paths.

Copilot uses AI. Check for mistakes.
))
.await
else {
tracing::warn!(block_number, "ethereum live block not yet available");
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In buffer_live_blocks, if get_block(block_number) returns None, the loop breaks without updating next_block_number. On the next outer iteration, block_number falls back to latest_block_number, which can skip the missing height permanently if latest has advanced. Consider keeping next_block_number = Some(block_number) (and retrying) on missing blocks so the stream stays contiguous and never skips heights.

Suggested change
tracing::warn!(block_number, "ethereum live block not yet available");
tracing::warn!(block_number, "ethereum live block not yet available");
next_block_number = Some(block_number);

Copilot uses AI. Check for mistakes.
);
Self::add_failed_block(blocks_failed_send.clone(), block).await;

let mut block_number = next_block_number.unwrap_or(latest_block_number);
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer_live_blocks defaults to starting at latest_block_number when next_block_number is unset. If the node restarts and the persisted processed_block equals the current latest height, the initial buffered “anchor” will replay an already-processed block. Re-emitting SignRequest for that block can overwrite existing backlog entries (resetting status/execution) because Backlog::insert replaces entries by id. Consider seeding the live buffer start at processed_block + 1 (or skipping buffered blocks <= processed_block) to avoid replaying the last processed height.

Suggested change
let mut block_number = next_block_number.unwrap_or(latest_block_number);
let mut block_number =
next_block_number.unwrap_or_else(|| latest_block_number.saturating_add(1));

Copilot uses AI. Check for mistakes.
Comment on lines +1104 to +1107
anyhow::bail!(
"block {block_number} hash mismatch: expected {block_hash:?}, got {:?}",
block.header.hash
);
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On block hash mismatch during emit_processed_block, this function bail!s with the original block_hash from process_block. In the catchup path this error is retried for the same height, but the stored block_hash never changes, so a reorg can lead to an infinite retry loop (catchup never completes, deferred requeue never happens). Consider handling mismatch by re-fetching the canonical block and re-running process_block (or updating the expected hash) rather than retrying with a stale BlockAndRequests payload.

Suggested change
anyhow::bail!(
"block {block_number} hash mismatch: expected {block_hash:?}, got {:?}",
block.header.hash
);
// The block was reorged after `process_block` produced this payload.
// Do not emit stale events for a different canonical block, but also do
// not return an error that would cause the catchup path to retry this
// same stale payload forever.
return Ok(());

Copilot uses AI. Check for mistakes.
Comment on lines +1155 to 1182
let new_final_block_number = finalized_block.header.number;
if last_final_block_number.is_none_or(|n| new_final_block_number > n) {
tracing::debug!(
new_final_block_number,
last_final_block_number,
"New finalized block number"
);
last_final_block_number.replace(new_final_block_number);
crate::metrics::indexers::LATEST_BLOCK_NUMBER
.with_label_values(&[Chain::Ethereum.as_str(), "finalized"])
.set(new_final_block_number as i64);
continue;
}

let Some(last_final_block_number) = final_block_number else {
let Some(last_final_block_number) = last_final_block_number else {
continue;
};

if new_final_block_number < last_final_block_number {
tracing::warn!(
"New finalized block number overflowed range of u64 and has wrapped around!"
new_final_block_number,
last_final_block_number,
"new finalized block number overflowed range of u64 and has wrapped around!"
);
}

if last_final_block_number == new_final_block_number {
tracing::debug!("No new finalized block");
if new_final_block_number == last_final_block_number {
tracing::debug!(new_final_block_number, "no new finalized block");
}
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait_for_finalized_block updates last_final_block_number before later comparing new_final_block_number against it, which makes the subsequent < / == comparisons effectively compare the value to itself (so the overflow/logging branches aren’t meaningful). Consider storing the previous finalized height separately (e.g., prev_final = last_final_block_number.replace(new_final_block_number)) and doing comparisons against prev_final to keep the logic correct and easier to reason about.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@jakmeier jakmeier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I just spent 2h+ looking at this and I didn't even get to the tests, yet. Your PR description is quite clear but the code somehow doesn't click for me.

Handling old blocks as catchup, while concurrently fetching blocks for new incoming events and buffering them, just seems a bit overcomplicated. And if I got that right, you start both before even defining the boundary between them (called anchor height in the code).
Wouldn't it be easier to just finish catchup first? I would only subscribe to new blocks after that. I.e. do catchup until processed_block_height == latest_block_height. Then subscribe to new blocks from latest_block_height+1.

Frankly, I think this might also need some refactoring. (Or maybe I just need a brain reset before taking another look.) Please also go through the Copilot comments.

Please re-request a review when I should take another look.

Comment on lines +755 to +758

let mut block_number = next_block_number.unwrap_or(latest_block_number);
if block_number > latest_block_number {
tokio::time::sleep(Duration::from_millis(500)).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut block_number = next_block_number.unwrap_or(latest_block_number);
if block_number > latest_block_number {
tokio::time::sleep(Duration::from_millis(500)).await;
if next_block_number.is_some_and(|block_number| block_number > latest_block_number ) {
tokio::time::sleep(Duration::from_millis(500)).await;

Comment on lines +778 to +779
next_block_number = Some(block_number.saturating_add(1));
block_number = block_number.saturating_add(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
next_block_number = Some(block_number.saturating_add(1));
block_number = block_number.saturating_add(1);
block_number = block_number.saturating_add(1);
next_block_number = Some(block_number);

crate::metrics::indexers::LATEST_BLOCK_NUMBER
.with_label_values(&[Chain::Ethereum.as_str(), "indexed"])
.set(block_number as i64);
tokio::time::sleep(Duration::from_millis(500)).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sleep here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff blocks are a bit everywhere. There was a interval.tick() before, but I'll just go back to the interval since it's clearer

let mut next_block = anchor_block;
loop {
if let Err(err) = indexer.process_buffered_block(next_block).await {
tracing::warn!(?err, %chain, "buffered block processing failed");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we miss a block, don't we? Seems like this should be an error, not a warning. Or ideally, we shouldn't move on at all.

Copy link
Copy Markdown
Contributor Author

@ChaoticTempest ChaoticTempest Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is a case I forgot to point out. Should we stall on not being able to process a block? In this case, our logic is incorrect in parsing or maybe something happened? Then would retrying be ideal here?

Actually thinking about it again, it's best to stall/retry so we can push a fix to unstuck an indexer if it ever becomes a problem. That way we know which block it gets stuck at

Comment on lines +113 to +114
async fn initial(&mut self) -> Option<Self::Block>;
async fn next(&mut self) -> Option<Self::Block>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking the rest of the PR, it seems to me these two are functionally identical, just different names. Wouldn't it be less confusing to have only one method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you're right. Initially just had it to be clearer with naming but it could just be a variable name

Comment on lines +181 to +184
let Some(mut buffered) = buffered else {
let _ = catchup_completed_tx.send(());
return;
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. If the stream doesn't implement buffering, we immediately claim catchup to be completed without doing anything?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I probably should put a comment here. It's because I haven't implemented this for solana, and solana doesn't do catchup nor the livestream logic here since it does it's own livestream of blocks in it's indexer right now. So this is a way of disabling the catchup_and_livestream feature for solana

Comment on lines +195 to +203
loop {
match indexer.process_catchup_on_height(height).await {
Ok(()) => break,
Err(err) => {
tracing::warn!(?err, %chain, height, "catchup height processing failed; retrying");
tokio::time::sleep(indexer.retry_delay()).await;
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
loop {
match indexer.process_catchup_on_height(height).await {
Ok(()) => break,
Err(err) => {
tracing::warn!(?err, %chain, height, "catchup height processing failed; retrying");
tokio::time::sleep(indexer.retry_delay()).await;
}
}
}
while let Err(err) = indexer.process_catchup_on_height(height).await {
tracing::warn!(?err, %chain, height, "catchup height processing failed; retrying");
tokio::time::sleep(indexer.retry_delay()).await;
}
}

Comment on lines +743 to +782
async fn buffer_live_blocks(
client: Arc<EthereumClient>,
live_blocks: mpsc::Sender<alloy::rpc::types::Block>,
) {
tracing::info!("buffering ethereum live blocks");
let mut next_block_number: Option<u64> = None;

let mut interval = tokio::time::interval(Duration::from_millis(200));
let requests_indexed_send_clone = requests_indexed_send.clone();
loop {
let Some(block_to_process) = blocks_to_process_recv.recv().await else {
interval.tick().await;
let Some(latest_block_number) = client.get_latest_block_number().await else {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
};
let (block, is_catchup) = match block_to_process {
BlockToProcess::Catchup(block_number) => {
let block = client
.get_block(alloy::rpc::types::BlockId::Number(
BlockNumberOrTag::Number(block_number),
))
.await;
if let Some(block) = block {
(block, true)
} else {
tracing::warn!("Block {block_number} not found from Ethereum client");
continue;
}
}
BlockToProcess::NewBlock(block) => ((*block).clone(), false),
};
let block_number = block.header.number;
if let Err(err) = Self::process_block(
client.clone(),
block.clone(),
contract_address,
requests_indexed_send_clone.clone(),
backlog.clone(),
events_tx.clone(),
)
.await
{
tracing::warn!(
"Eth indexer failed to process block number {block_number}: {err:?}"
);
Self::add_failed_block(blocks_failed_send.clone(), block).await;

let mut block_number = next_block_number.unwrap_or(latest_block_number);
if block_number > latest_block_number {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if block_number % 10 == 0 {
if is_catchup {
tracing::info!("Processed catchup block number {block_number}");
} else {
tracing::info!("Processed new block number {block_number}");
}
}

if is_catchup && !catchup_completed_emitted {
processed_catchup_blocks.insert(block_number);
if processed_catchup_blocks.len() >= expected_catchup_blocks {
if let Err(err) = events_tx.send(ChainEvent::CatchupCompleted).await {
tracing::warn!(?err, "failed to emit ethereum catchup completion event");
} else {
catchup_completed_emitted = true;
}
while block_number <= latest_block_number {
let Some(block) = client
.get_block(alloy::rpc::types::BlockId::Number(
BlockNumberOrTag::Number(block_number),
))
.await
else {
tracing::warn!(block_number, "ethereum live block not yet available");
break;
};

if let Err(err) = live_blocks.send(block).await {
tracing::warn!(?err, block_number, "failed to buffer ethereum live block");
return;
}

next_block_number = Some(block_number.saturating_add(1));
block_number = block_number.saturating_add(1);
}

crate::metrics::indexers::LATEST_BLOCK_NUMBER
.with_label_values(&[Chain::Ethereum.as_str(), "indexed"])
.set(block_number as i64);
tokio::time::sleep(Duration::from_millis(500)).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate variables for block_number, next_block_number, latest_block_number makes this code rather hard to read.

Shouldn't it be enough to have one variable for where we are now (e.g. buffer_tip) and compare that to client.get_latest_block_number() directly?

@jakmeier
Copy link
Copy Markdown
Contributor

jakmeier commented Apr 9, 2026

jakmeier approved these changes 1 minute ago

oops, sorry, I didn't mean to approve, just wanted to comment 🤷

@ChaoticTempest
Copy link
Copy Markdown
Contributor Author

Handling old blocks as catchup, while concurrently fetching blocks for new incoming events and buffering them, just seems a bit overcomplicated. And if I got that right, you start both before even defining the boundary between them (called anchor height in the code). Wouldn't it be easier to just finish catchup first? I would only subscribe to new blocks after that. I.e. do catchup until processed_block_height == latest_block_height. Then subscribe to new blocks from latest_block_height+1.

There are a couple issues this abstraction tries to solve:

  • whether we can have a live feed or we have to fetch blocks one by one (current eth client gets blocks one by one, solana has live feed)
  • if live feed, then we need to buffer because we can't stop once we start the solana websocket subscription.
  • if get block one by one via height, then we don't need to buffer. I can simplify this part for ethereum where currently in this PR, we get the latest block and use it as anchor height but don't need to query the next block yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants