From b67660ebb87a4b76693a55dd1ad0d0acd378393a Mon Sep 17 00:00:00 2001 From: 0xrusowsky <0xrusowsky@proton.me> Date: Thu, 30 Apr 2026 15:19:41 +0200 Subject: [PATCH 1/6] feat: index ed25519 consensus proposer pubkey per block (TIP-1031) Adds a `consensus_proposer` column to the `blocks` table on both PostgreSQL (BYTEA, 32-byte CHECK) and ClickHouse (Nullable(String)), populated from the optional consensus_context.proposer field embedded in the Tempo block header by TIP-1031. Pre-fork blocks store NULL. To deserialize the new field, the Block/Transaction type aliases now use TempoNetwork::BlockResponse from tempo-alloy directly, picking up TempoHeaderResponse with its consensus context. Header field accesses move to BlockHeader trait methods since TempoHeader no longer exposes alloy_consensus::Header fields directly. Existing rows self-heal from NULL to populated on any later reinsert via `ON CONFLICT DO UPDATE ... WHERE consensus_proposer IS NULL` so no separate backfill is required after upgrading past T4. A `tidx_blocks_consensus_context_total{proposer="present"|"absent"}` counter gives operators a deploy-day signal that the field is flowing through. Refs: tempoxyz/tempo TIP-1031 --- Cargo.toml | 9 ++- README.md | 1 + db/blocks.sql | 1 + db/clickhouse/blocks.sql | 3 +- ...20260430_add_blocks_consensus_proposer.sql | 8 +++ ...20260430_add_blocks_consensus_proposer.sql | 21 ++++++ src/db/schema.rs | 6 ++ src/metrics.rs | 9 +++ src/sync/ch_sink.rs | 12 ++++ src/sync/decoder.rs | 69 ++++++++++++++++--- src/sync/engine.rs | 23 ++++--- src/sync/sink.rs | 3 +- src/sync/writer.rs | 30 ++++++-- src/tempo.rs | 13 +++- src/types.rs | 3 + tests/clickhouse_test.rs | 1 + tests/common/seed.rs | 1 + tests/sync_optimizations_test.rs | 1 + 18 files changed, 180 insertions(+), 34 deletions(-) create mode 100644 db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql create mode 100644 db/migrations/20260430_add_blocks_consensus_proposer.sql diff --git a/Cargo.toml b/Cargo.toml index 47e3b254..0495f421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,12 @@ postgres-types = { version = "0.2", features = ["derive"] } # Ethereum/Tempo primitives alloy = { version = "2", features = ["full"] } tempo-alloy = { git = "https://github.com/tempoxyz/tempo", default-features = false } -tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false } +# `default-features = false` avoids tempo-primitives' default `reth`/`std` +# features, which transitively pull `revm` -> `c-kzg 2.1.7` and conflict with +# the `c-kzg 2.1.5` locked by `alloy-consensus 2.0`. We only need `serde` for +# `TempoConsensusContext` deserialization; `std` is satisfied transitively +# through tempo-alloy's dependency on tempo-primitives with `serde`+`rpc`. +tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false, features = ["serde"] } # URL parsing form_urlencoded = "1" @@ -65,7 +70,7 @@ thiserror = "2" anyhow = "1" # Time -chrono = { version = "0.4", features = ["serde"] } +chrono = { version = "0.4", features = ["std", "serde"] } # ClickHouse clickhouse = { version = "0.14", features = ["lz4", "chrono", "rustls-tls"] } diff --git a/README.md b/README.md index f5901f5d..f6fcce45 100644 --- a/README.md +++ b/README.md @@ -470,6 +470,7 @@ All tables use composite primary keys with timestamps for efficient range querie | `gas_used` | `INT8` | Gas used | | `miner` | `BYTEA` | Block producer | | `extra_data` | `BYTEA` | Extra data field | +| `consensus_proposer` | `BYTEA` | Ed25519 consensus proposer pubkey (TIP-1031, NULL pre-fork) | ### txs diff --git a/db/blocks.sql b/db/blocks.sql index f980c8dc..42f4651a 100644 --- a/db/blocks.sql +++ b/db/blocks.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS blocks ( gas_used INT8 NOT NULL, miner BYTEA NOT NULL, extra_data BYTEA, + consensus_proposer BYTEA CHECK (consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32), PRIMARY KEY (timestamp, num) ); diff --git a/db/clickhouse/blocks.sql b/db/clickhouse/blocks.sql index ee428128..0992f070 100644 --- a/db/clickhouse/blocks.sql +++ b/db/clickhouse/blocks.sql @@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS blocks ( gas_limit Int64, gas_used Int64, miner String, - extra_data Nullable(String) + extra_data Nullable(String), + consensus_proposer Nullable(String) ) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(timestamp) ORDER BY (num) diff --git a/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql b/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql new file mode 100644 index 00000000..cf1ea8b2 --- /dev/null +++ b/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql @@ -0,0 +1,8 @@ +-- TIP-1031: store the ed25519 consensus proposer pubkey for each block. +-- +-- Existing ClickHouse deployments created before this change already have a +-- `blocks` table, so `CREATE TABLE IF NOT EXISTS` in db/clickhouse/blocks.sql +-- will not add the new column. Run this migration once during upgrade, or let +-- tidx apply it via `ClickHouseSink::ensure_schema()` on startup. +ALTER TABLE blocks + ADD COLUMN IF NOT EXISTS consensus_proposer Nullable(String); diff --git a/db/migrations/20260430_add_blocks_consensus_proposer.sql b/db/migrations/20260430_add_blocks_consensus_proposer.sql new file mode 100644 index 00000000..c0eecc53 --- /dev/null +++ b/db/migrations/20260430_add_blocks_consensus_proposer.sql @@ -0,0 +1,21 @@ +-- TIP-1031: store the ed25519 consensus proposer pubkey for each block. +-- +-- Existing PostgreSQL deployments created before this change already have a +-- `blocks` table, so `CREATE TABLE IF NOT EXISTS` in db/blocks.sql will not +-- add the new column. Run this migration once during upgrade, or let tidx +-- apply it via `run_migrations()` on startup. +ALTER TABLE blocks + ADD COLUMN IF NOT EXISTS consensus_proposer BYTEA; + +-- Pubkey must be exactly 32 bytes (or NULL for pre-fork blocks). +-- Add the constraint defensively in case the column already existed without it. +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'blocks_consensus_proposer_len' + ) THEN + ALTER TABLE blocks + ADD CONSTRAINT blocks_consensus_proposer_len + CHECK (consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32); + END IF; +END$$; diff --git a/src/db/schema.rs b/src/db/schema.rs index 7daecb78..bcdbb549 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -54,6 +54,12 @@ pub async fn run_migrations(pool: &Pool) -> Result<()> { )) .await?; + // TIP-1031: ed25519 consensus proposer pubkey on blocks. + conn.batch_execute(include_str!( + "../../db/migrations/20260430_add_blocks_consensus_proposer.sql" + )) + .await?; + // Heavyweight upgrades such as concurrent index creation run in a // best-effort post-startup task so normal boot isn't blocked. Production // should still apply them in a pre-deploy migration flow. diff --git a/src/metrics.rs b/src/metrics.rs index 8f6a89b9..92e1f028 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -8,6 +8,15 @@ pub fn record_blocks_indexed(chain_id: u64, count: u64) { counter!("tidx_blocks_indexed_total", &labels).increment(count); } +/// TIP-1031: count of decoded block headers broken down by whether they +/// carried a consensus context. After T4 activation `present` should rise +/// monotonically and `absent` should plateau. Operators can correlate +/// against `tidx_blocks_indexed_total{chain_id=...}` for per-chain breakdown. +pub fn record_block_consensus_context(present: bool) { + let labels = [("proposer", if present { "present" } else { "absent" })]; + counter!("tidx_blocks_consensus_context_total", &labels).increment(1); +} + pub fn record_txs_indexed(chain_id: u64, count: u64) { let labels = [("chain_id", chain_id.to_string())]; counter!("tidx_txs_indexed_total", &labels).increment(count); diff --git a/src/sync/ch_sink.rs b/src/sync/ch_sink.rs index 45a638f2..32d7f0e2 100644 --- a/src/sync/ch_sink.rs +++ b/src/sync/ch_sink.rs @@ -20,6 +20,8 @@ const LOGS_MIGRATION_20260416: &str = include_str!("../../db/clickhouse/migrations/20260416_add_is_virtual_forward.sql"); const LOGS_MIGRATION_20260417: &str = include_str!("../../db/clickhouse/migrations/20260417_add_logs_virtual_forward_index.sql"); +const BLOCKS_MIGRATION_20260430: &str = + include_str!("../../db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql"); const RECEIPTS_SCHEMA: &str = include_str!("../../db/clickhouse/receipts.sql"); /// Max rows per ClickHouse INSERT to avoid unbounded memory growth during backfills. @@ -114,6 +116,13 @@ impl ClickHouseSink { .await .map_err(|e| anyhow!("Failed to run ClickHouse logs migration 20260417: {e}"))?; + // TIP-1031: ed25519 consensus proposer pubkey on blocks. + self.client + .query(BLOCKS_MIGRATION_20260430) + .execute() + .await + .map_err(|e| anyhow!("Failed to run ClickHouse blocks migration 20260430: {e}"))?; + info!(database = %self.database, "ClickHouse schema ready"); Ok(()) } @@ -345,6 +354,7 @@ struct ChBlockWire { gas_used: i64, miner: String, extra_data: Option, + consensus_proposer: Option, } impl ChBlockWire { @@ -359,6 +369,7 @@ impl ChBlockWire { gas_used: b.gas_used, miner: hex_encode(&b.miner), extra_data: b.extra_data.as_ref().map(|v| hex_encode(v)), + consensus_proposer: b.consensus_proposer.as_ref().map(|v| hex_encode(v)), } } } @@ -553,6 +564,7 @@ mod tests { gas_used: 15_000_000, miner: vec![0xee; 20], extra_data: None, + consensus_proposer: None, }; let wire = ChBlockWire::from_row(&block); diff --git a/src/sync/decoder.rs b/src/sync/decoder.rs index 881d35b9..6a0aadb1 100644 --- a/src/sync/decoder.rs +++ b/src/sync/decoder.rs @@ -1,12 +1,22 @@ use alloy::consensus::transaction::Recovered; -use alloy::consensus::{Transaction as TransactionTrait, Typed2718}; +use alloy::consensus::{BlockHeader as _, Transaction as TransactionTrait, Typed2718}; use alloy::network::{ReceiptResponse, TransactionResponse}; +use alloy::primitives::B256; use chrono::{DateTime, TimeZone, Utc}; use tempo_alloy::primitives::transaction::SignatureType; use crate::tempo::{Block, Log, Receipt, TempoTxEnvelope, Transaction}; use crate::types::{BlockRow, LogRow, ReceiptRow, TxRow}; +use tempo_primitives::TempoConsensusContext; + +/// TIP-1031: extract the 32-byte ed25519 proposer pubkey from an optional +/// consensus context. Returns `None` for pre-fork blocks where the header +/// carries no consensus context. +fn extract_consensus_proposer(ctx: Option<&TempoConsensusContext>) -> Option> { + ctx.map(|c| B256::from(&c.proposer).0.to_vec()) +} + pub fn timestamp_from_secs(secs: u64) -> DateTime { Utc.timestamp_opt(secs as i64, 0) .single() @@ -14,25 +24,30 @@ pub fn timestamp_from_secs(secs: u64) -> DateTime { } pub fn decode_block(block: &Block) -> BlockRow { - let timestamp_secs = block.header.timestamp; + let header = &block.header; + let timestamp_secs = header.timestamp(); let timestamp = timestamp_from_secs(timestamp_secs); let timestamp_ms = (timestamp_secs * 1000) as i64; + let consensus_proposer = extract_consensus_proposer(header.consensus_context.as_ref()); + crate::metrics::record_block_consensus_context(consensus_proposer.is_some()); + BlockRow { - num: block.header.number as i64, - hash: block.header.hash.as_slice().to_vec(), - parent_hash: block.header.parent_hash.as_slice().to_vec(), + num: header.number() as i64, + hash: header.hash.as_slice().to_vec(), + parent_hash: header.parent_hash().as_slice().to_vec(), timestamp, timestamp_ms, - gas_limit: block.header.gas_limit as i64, - gas_used: block.header.gas_used as i64, - miner: block.header.beneficiary.as_slice().to_vec(), - extra_data: Some(block.header.extra_data.to_vec()), + gas_limit: header.gas_limit() as i64, + gas_used: header.gas_used() as i64, + miner: header.beneficiary().as_slice().to_vec(), + extra_data: Some(header.extra_data().to_vec()), + consensus_proposer, } } pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { - let block_timestamp = timestamp_from_secs(block.header.timestamp); + let block_timestamp = timestamp_from_secs(block.header.timestamp()); let inner: &Recovered = &tx.inner; // Extract Tempo-specific fields if this is a 0x76 transaction @@ -57,7 +72,7 @@ pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { }; TxRow { - block_num: block.header.number as i64, + block_num: block.header.number() as i64, block_timestamp, idx: idx as i32, hash: tx.tx_hash().as_slice().to_vec(), @@ -144,6 +159,38 @@ pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime) -> Rece #[cfg(test)] mod tests { use super::*; + use tempo_primitives::ed25519::PublicKey; + + /// RFC 8032 ed25519 test vector #1 — known-valid 32-byte verification key. + /// Hardcoded as bytes (rather than constructed via `PublicKey::from_seed`) + /// because that constructor is gated on the `arbitrary` feature in + /// tempo-primitives, which we don't enable from tidx. + const RFC8032_TEST_VECTOR_1_PUBKEY: [u8; 32] = [ + 0xd7, 0x5a, 0x98, 0x01, 0x82, 0xb1, 0x0a, 0xb7, 0xd5, 0x4b, 0xfe, 0xd3, + 0xc9, 0x64, 0x07, 0x3a, 0x0e, 0xe1, 0x72, 0xf3, 0xda, 0xa6, 0x23, 0x25, + 0xaf, 0x02, 0x1a, 0x68, 0xf7, 0x07, 0x51, 0x1a, + ]; + + #[test] + fn extract_consensus_proposer_none_when_pre_fork() { + assert_eq!(extract_consensus_proposer(None), None); + } + + #[test] + fn extract_consensus_proposer_returns_raw_32_bytes_when_present() { + let proposer: PublicKey = B256::from(RFC8032_TEST_VECTOR_1_PUBKEY) + .try_into() + .expect("valid ed25519 pubkey from RFC 8032 test vector 1"); + let ctx = TempoConsensusContext { + epoch: 7, + view: 42, + parent_view: 41, + proposer, + }; + let out = extract_consensus_proposer(Some(&ctx)).expect("some"); + assert_eq!(out.len(), 32); + assert_eq!(out.as_slice(), RFC8032_TEST_VECTOR_1_PUBKEY.as_slice()); + } fn make_tx(block_num: i64, idx: i32) -> TxRow { TxRow { diff --git a/src/sync/engine.rs b/src/sync/engine.rs index c24fd4c8..16f0ba37 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -1,3 +1,4 @@ +use alloy::consensus::BlockHeader as _; use alloy::network::ReceiptResponse; use anyhow::Result; use std::collections::HashMap; @@ -409,14 +410,14 @@ impl SyncEngine { for block in &blocks { broadcaster.send(BlockUpdate { chain_id: self.chain_id, - block_num: block.header.number, + block_num: block.header.number(), block_hash: format!("0x{}", hex::encode(block.header.hash)), tx_count: block.transactions.len() as u64, log_count: logs_per_block - .get(&(block.header.number as i64)) + .get(&(block.header.number() as i64)) .copied() .unwrap_or(0), - timestamp: block.header.timestamp as i64, + timestamp: block.header.timestamp() as i64, }); } } @@ -446,7 +447,7 @@ impl SyncEngine { } let first_block = &blocks[0]; - let first_num = first_block.header.number; + let first_num = first_block.header.number(); // Check parent hash against stored block (if not genesis) if first_num > 0 @@ -455,7 +456,7 @@ impl SyncEngine { let expected_parent: [u8; 32] = stored_hash .try_into() .map_err(|_| anyhow::anyhow!("Invalid stored hash length"))?; - if first_block.header.parent_hash.0 != expected_parent { + if first_block.header.parent_hash().0 != expected_parent { // Reorg detected - handle it automatically return self.handle_reorg(first_num).await; } @@ -463,11 +464,11 @@ impl SyncEngine { // Validate internal chain continuity for window in blocks.windows(2) { - if window[1].header.parent_hash != window[0].header.hash { + if window[1].header.parent_hash() != window[0].header.hash { return Err(anyhow::anyhow!( "Internal chain break at block {}: parent_hash {:?} != prev hash {:?}", - window[1].header.number, - hex::encode(window[1].header.parent_hash.0), + window[1].header.number(), + hex::encode(window[1].header.parent_hash().0), hex::encode(window[0].header.hash.0) )); } @@ -561,7 +562,7 @@ impl SyncEngine { let block_timestamps: HashMap = blocks .iter() - .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) + .map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp()))) .collect(); let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); @@ -636,7 +637,7 @@ impl SyncEngine { )?; let block_row = decode_block(&block); - let block_ts = timestamp_from_secs(block.header.timestamp); + let block_ts = timestamp_from_secs(block.header.timestamp()); let mut txs: Vec<_> = block .transactions .txns() @@ -1356,7 +1357,7 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: let block_timestamps: HashMap = blocks .iter() - .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) + .map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp()))) .collect(); let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); diff --git a/src/sync/sink.rs b/src/sync/sink.rs index d238cf24..024d74d3 100644 --- a/src/sync/sink.rs +++ b/src/sync/sink.rs @@ -318,7 +318,7 @@ async fn fetch_blocks( ) -> Result> { let rows = conn .query( - "SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data \ + "SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer \ FROM blocks WHERE num >= $1 AND num <= $2 ORDER BY num", &[&from, &to], ) @@ -336,6 +336,7 @@ async fn fetch_blocks( gas_used: r.get(6), miner: r.get(7), extra_data: r.get(8), + consensus_proposer: r.get(9), }) .collect()) } diff --git a/src/sync/writer.rs b/src/sync/writer.rs index 268ff3db..05581ade 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -46,7 +46,8 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { tx.execute( "CREATE TEMP TABLE _staging_blocks ( num INT8, hash BYTEA, parent_hash BYTEA, timestamp TIMESTAMPTZ, - timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA + timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA, + consensus_proposer BYTEA ) ON COMMIT DROP", &[], ) @@ -62,11 +63,12 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { Type::INT8, // gas_used Type::BYTEA, // miner Type::BYTEA, // extra_data + Type::BYTEA, // consensus_proposer ]; let sink = tx .copy_in( - "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data) FROM STDIN BINARY", + "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer) FROM STDIN BINARY", ) .await?; @@ -86,14 +88,23 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { &block.gas_used, &block.miner, &block.extra_data as &(dyn tokio_postgres::types::ToSql + Sync), + &block.consensus_proposer as &(dyn tokio_postgres::types::ToSql + Sync), ]) .await?; } pinned_writer.as_mut().finish().await?; + // Use DO UPDATE for `consensus_proposer` so post-T4 blocks indexed by an + // older tidx version (which stored NULL) self-heal to the populated value + // on any later reinsert (reorg, refetch). All other columns stay + // immutable: the WHERE clause makes this a no-op once `consensus_proposer` + // is set, preserving existing reorg-idempotent semantics. tx.execute( - "INSERT INTO blocks SELECT * FROM _staging_blocks ON CONFLICT (timestamp, num) DO NOTHING", + "INSERT INTO blocks SELECT * FROM _staging_blocks \ + ON CONFLICT (timestamp, num) DO UPDATE \ + SET consensus_proposer = EXCLUDED.consensus_proposer \ + WHERE blocks.consensus_proposer IS NULL AND EXCLUDED.consensus_proposer IS NOT NULL", &[], ) .await?; @@ -420,7 +431,8 @@ pub async fn write_batch( tx.execute( "CREATE TEMP TABLE _staging_blocks ( num INT8, hash BYTEA, parent_hash BYTEA, timestamp TIMESTAMPTZ, - timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA + timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA, + consensus_proposer BYTEA ) ON COMMIT DROP", &[], ) @@ -436,11 +448,12 @@ pub async fn write_batch( Type::INT8, // gas_used Type::BYTEA, // miner Type::BYTEA, // extra_data + Type::BYTEA, // consensus_proposer ]; let sink = tx .copy_in( - "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data) FROM STDIN BINARY", + "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer) FROM STDIN BINARY", ) .await?; @@ -460,14 +473,19 @@ pub async fn write_batch( &block.gas_used, &block.miner, &block.extra_data as &(dyn tokio_postgres::types::ToSql + Sync), + &block.consensus_proposer as &(dyn tokio_postgres::types::ToSql + Sync), ]) .await?; } pinned_writer.as_mut().finish().await?; + // See `write_blocks` for rationale; keep both paths in lockstep. tx.execute( - "INSERT INTO blocks SELECT * FROM _staging_blocks ON CONFLICT (timestamp, num) DO NOTHING", + "INSERT INTO blocks SELECT * FROM _staging_blocks \ + ON CONFLICT (timestamp, num) DO UPDATE \ + SET consensus_proposer = EXCLUDED.consensus_proposer \ + WHERE blocks.consensus_proposer IS NULL AND EXCLUDED.consensus_proposer IS NOT NULL", &[], ) .await?; diff --git a/src/tempo.rs b/src/tempo.rs index 70fb6b9e..e3754363 100644 --- a/src/tempo.rs +++ b/src/tempo.rs @@ -1,8 +1,17 @@ +//! Tempo type aliases. +//! +//! JSON-RPC payloads decode into upstream `TempoHeaderResponse` / `TempoHeader` +//! so the TIP-1031 consensus context, millisecond timestamp, and Tempo-specific +//! gas limits are preserved. + +use alloy::network::Network; +use tempo_alloy::TempoNetwork; + pub use tempo_alloy::TempoNetwork; pub use tempo_alloy::primitives::TempoTxEnvelope; pub use tempo_alloy::rpc::TempoTransactionReceipt; -pub type Block = alloy::rpc::types::Block; -pub type Transaction = alloy::rpc::types::Transaction; +pub type Block = ::BlockResponse; +pub type Transaction = ::TransactionResponse; pub type Log = alloy::rpc::types::Log; pub type Receipt = TempoTransactionReceipt; diff --git a/src/types.rs b/src/types.rs index 578b6c1f..56b307d1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,9 @@ pub struct BlockRow { pub gas_used: i64, pub miner: Vec, pub extra_data: Option>, + /// Ed25519 public key of the consensus proposer for this block (TIP-1031). + /// `None` for pre-fork blocks where the header carries no consensus context. + pub consensus_proposer: Option>, } #[derive(Debug, Clone, Default)] diff --git a/tests/clickhouse_test.rs b/tests/clickhouse_test.rs index 725b246f..443a64e3 100644 --- a/tests/clickhouse_test.rs +++ b/tests/clickhouse_test.rs @@ -827,6 +827,7 @@ fn make_block(num: i64) -> BlockRow { gas_used: 21_000 * num, miner: vec![0xaa; 20], extra_data: Some(vec![0xbb, 0xcc]), + consensus_proposer: None, } } diff --git a/tests/common/seed.rs b/tests/common/seed.rs index b03c26c0..ce3f8c20 100644 --- a/tests/common/seed.rs +++ b/tests/common/seed.rs @@ -79,6 +79,7 @@ pub async fn seed(pool: &Pool, config: &SeedConfig) -> Result<(u64, i64, u64)> { gas_used: rng.random_range(10_000_000..25_000_000), miner: generate_address(block_num as u64), extra_data: None, + consensus_proposer: None, }); prev_block_hash = block_hash; diff --git a/tests/sync_optimizations_test.rs b/tests/sync_optimizations_test.rs index 8f24ebab..5f625dc4 100644 --- a/tests/sync_optimizations_test.rs +++ b/tests/sync_optimizations_test.rs @@ -24,6 +24,7 @@ fn generate_blocks(count: usize, offset: i64) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, } }) .collect() From 21b374d692ed076d82d2a4c3263ad48e9ac19a36 Mon Sep 17 00:00:00 2001 From: Centaur AI Date: Thu, 30 Apr 2026 15:49:33 +0000 Subject: [PATCH 2/6] fix: address proposer review feedback Amp-Thread-ID: https://ampcode.com/threads/T-019dded9-09b3-756a-a669-839a8901a90c Co-authored-by: Amp --- Cargo.toml | 6 ------ src/metrics.rs | 9 --------- src/sync/decoder.rs | 49 ++++----------------------------------------- src/sync/writer.rs | 16 ++------------- src/types.rs | 3 +-- 5 files changed, 7 insertions(+), 76 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0495f421..99d544bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,11 +35,6 @@ postgres-types = { version = "0.2", features = ["derive"] } # Ethereum/Tempo primitives alloy = { version = "2", features = ["full"] } tempo-alloy = { git = "https://github.com/tempoxyz/tempo", default-features = false } -# `default-features = false` avoids tempo-primitives' default `reth`/`std` -# features, which transitively pull `revm` -> `c-kzg 2.1.7` and conflict with -# the `c-kzg 2.1.5` locked by `alloy-consensus 2.0`. We only need `serde` for -# `TempoConsensusContext` deserialization; `std` is satisfied transitively -# through tempo-alloy's dependency on tempo-primitives with `serde`+`rpc`. tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false, features = ["serde"] } # URL parsing @@ -165,4 +160,3 @@ needless_range_loop = "allow" format_push_string = "allow" unnecessary_cast = "allow" cast_lossless = "allow" - diff --git a/src/metrics.rs b/src/metrics.rs index 92e1f028..8f6a89b9 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -8,15 +8,6 @@ pub fn record_blocks_indexed(chain_id: u64, count: u64) { counter!("tidx_blocks_indexed_total", &labels).increment(count); } -/// TIP-1031: count of decoded block headers broken down by whether they -/// carried a consensus context. After T4 activation `present` should rise -/// monotonically and `absent` should plateau. Operators can correlate -/// against `tidx_blocks_indexed_total{chain_id=...}` for per-chain breakdown. -pub fn record_block_consensus_context(present: bool) { - let labels = [("proposer", if present { "present" } else { "absent" })]; - counter!("tidx_blocks_consensus_context_total", &labels).increment(1); -} - pub fn record_txs_indexed(chain_id: u64, count: u64) { let labels = [("chain_id", chain_id.to_string())]; counter!("tidx_txs_indexed_total", &labels).increment(count); diff --git a/src/sync/decoder.rs b/src/sync/decoder.rs index 6a0aadb1..db4ec7ef 100644 --- a/src/sync/decoder.rs +++ b/src/sync/decoder.rs @@ -8,15 +8,6 @@ use tempo_alloy::primitives::transaction::SignatureType; use crate::tempo::{Block, Log, Receipt, TempoTxEnvelope, Transaction}; use crate::types::{BlockRow, LogRow, ReceiptRow, TxRow}; -use tempo_primitives::TempoConsensusContext; - -/// TIP-1031: extract the 32-byte ed25519 proposer pubkey from an optional -/// consensus context. Returns `None` for pre-fork blocks where the header -/// carries no consensus context. -fn extract_consensus_proposer(ctx: Option<&TempoConsensusContext>) -> Option> { - ctx.map(|c| B256::from(&c.proposer).0.to_vec()) -} - pub fn timestamp_from_secs(secs: u64) -> DateTime { Utc.timestamp_opt(secs as i64, 0) .single() @@ -29,9 +20,6 @@ pub fn decode_block(block: &Block) -> BlockRow { let timestamp = timestamp_from_secs(timestamp_secs); let timestamp_ms = (timestamp_secs * 1000) as i64; - let consensus_proposer = extract_consensus_proposer(header.consensus_context.as_ref()); - crate::metrics::record_block_consensus_context(consensus_proposer.is_some()); - BlockRow { num: header.number() as i64, hash: header.hash.as_slice().to_vec(), @@ -42,7 +30,10 @@ pub fn decode_block(block: &Block) -> BlockRow { gas_used: header.gas_used() as i64, miner: header.beneficiary().as_slice().to_vec(), extra_data: Some(header.extra_data().to_vec()), - consensus_proposer, + consensus_proposer: header + .consensus_context + .as_ref() + .map(|consensus_context| B256::from(&consensus_context.proposer).0.to_vec()), } } @@ -159,38 +150,6 @@ pub fn decode_receipt(receipt: &Receipt, block_timestamp: DateTime) -> Rece #[cfg(test)] mod tests { use super::*; - use tempo_primitives::ed25519::PublicKey; - - /// RFC 8032 ed25519 test vector #1 — known-valid 32-byte verification key. - /// Hardcoded as bytes (rather than constructed via `PublicKey::from_seed`) - /// because that constructor is gated on the `arbitrary` feature in - /// tempo-primitives, which we don't enable from tidx. - const RFC8032_TEST_VECTOR_1_PUBKEY: [u8; 32] = [ - 0xd7, 0x5a, 0x98, 0x01, 0x82, 0xb1, 0x0a, 0xb7, 0xd5, 0x4b, 0xfe, 0xd3, - 0xc9, 0x64, 0x07, 0x3a, 0x0e, 0xe1, 0x72, 0xf3, 0xda, 0xa6, 0x23, 0x25, - 0xaf, 0x02, 0x1a, 0x68, 0xf7, 0x07, 0x51, 0x1a, - ]; - - #[test] - fn extract_consensus_proposer_none_when_pre_fork() { - assert_eq!(extract_consensus_proposer(None), None); - } - - #[test] - fn extract_consensus_proposer_returns_raw_32_bytes_when_present() { - let proposer: PublicKey = B256::from(RFC8032_TEST_VECTOR_1_PUBKEY) - .try_into() - .expect("valid ed25519 pubkey from RFC 8032 test vector 1"); - let ctx = TempoConsensusContext { - epoch: 7, - view: 42, - parent_view: 41, - proposer, - }; - let out = extract_consensus_proposer(Some(&ctx)).expect("some"); - assert_eq!(out.len(), 32); - assert_eq!(out.as_slice(), RFC8032_TEST_VECTOR_1_PUBKEY.as_slice()); - } fn make_tx(block_num: i64, idx: i32) -> TxRow { TxRow { diff --git a/src/sync/writer.rs b/src/sync/writer.rs index 05581ade..e1f115c7 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -95,16 +95,8 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { pinned_writer.as_mut().finish().await?; - // Use DO UPDATE for `consensus_proposer` so post-T4 blocks indexed by an - // older tidx version (which stored NULL) self-heal to the populated value - // on any later reinsert (reorg, refetch). All other columns stay - // immutable: the WHERE clause makes this a no-op once `consensus_proposer` - // is set, preserving existing reorg-idempotent semantics. tx.execute( - "INSERT INTO blocks SELECT * FROM _staging_blocks \ - ON CONFLICT (timestamp, num) DO UPDATE \ - SET consensus_proposer = EXCLUDED.consensus_proposer \ - WHERE blocks.consensus_proposer IS NULL AND EXCLUDED.consensus_proposer IS NOT NULL", + "INSERT INTO blocks SELECT * FROM _staging_blocks ON CONFLICT (timestamp, num) DO NOTHING", &[], ) .await?; @@ -480,12 +472,8 @@ pub async fn write_batch( pinned_writer.as_mut().finish().await?; - // See `write_blocks` for rationale; keep both paths in lockstep. tx.execute( - "INSERT INTO blocks SELECT * FROM _staging_blocks \ - ON CONFLICT (timestamp, num) DO UPDATE \ - SET consensus_proposer = EXCLUDED.consensus_proposer \ - WHERE blocks.consensus_proposer IS NULL AND EXCLUDED.consensus_proposer IS NOT NULL", + "INSERT INTO blocks SELECT * FROM _staging_blocks ON CONFLICT (timestamp, num) DO NOTHING", &[], ) .await?; diff --git a/src/types.rs b/src/types.rs index 56b307d1..b270c1ce 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,8 +12,7 @@ pub struct BlockRow { pub gas_used: i64, pub miner: Vec, pub extra_data: Option>, - /// Ed25519 public key of the consensus proposer for this block (TIP-1031). - /// `None` for pre-fork blocks where the header carries no consensus context. + /// T5+: Ed25519 public key of the consensus proposer for this block. Previously `None`. pub consensus_proposer: Option>, } From b8945224ae9c20749bd372fcc40f1e9fa8124e6f Mon Sep 17 00:00:00 2001 From: Centaur AI Date: Thu, 30 Apr 2026 19:04:04 +0000 Subject: [PATCH 3/6] fix: update benches for proposer field Amp-Thread-ID: https://ampcode.com/threads/T-019dded9-09b3-756a-a669-839a8901a90c Co-authored-by: Amp --- benches/sync_bench.rs | 1 + benches/write_bench.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/benches/sync_bench.rs b/benches/sync_bench.rs index bf60d707..625fbef8 100644 --- a/benches/sync_bench.rs +++ b/benches/sync_bench.rs @@ -19,6 +19,7 @@ fn generate_blocks(count: usize, offset: usize) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, } }) .collect() diff --git a/benches/write_bench.rs b/benches/write_bench.rs index 0fea1e60..6fd8cce9 100644 --- a/benches/write_bench.rs +++ b/benches/write_bench.rs @@ -16,6 +16,7 @@ fn generate_blocks(count: usize) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, }) .collect() } From 026d0f967773fba2d7d3d8ea955a42852d91dd65 Mon Sep 17 00:00:00 2001 From: 0xrusowsky <0xrusowsky@proton.me> Date: Mon, 4 May 2026 09:25:43 +0200 Subject: [PATCH 4/6] feat(pgroll): add migration for blocks.consensus_proposer Mirror db/migrations/20260430_add_blocks_consensus_proposer.sql as a pgroll migration so production deploys via pgroll pick up the new column. Bump expected final pgroll version in compose files and the pgroll README. --- compose.yml | 2 +- ...20260430_add_blocks_consensus_proposer.json | 18 ++++++++++++++++++ db/pgroll/README.md | 3 ++- docker/prod/docker-compose.yml | 2 +- 4 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 db/pgroll/20260430_add_blocks_consensus_proposer.json diff --git a/compose.yml b/compose.yml index c989361e..d4733d52 100644 --- a/compose.yml +++ b/compose.yml @@ -144,7 +144,7 @@ services: sleep 1 done status="$$(tidx pgroll status --postgres-url "$${POSTGRES_URL}" 2>/dev/null || true)" - if echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then + if echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then echo 'pgroll migrations already complete' exit 0 fi diff --git a/db/pgroll/20260430_add_blocks_consensus_proposer.json b/db/pgroll/20260430_add_blocks_consensus_proposer.json new file mode 100644 index 00000000..c0030ea1 --- /dev/null +++ b/db/pgroll/20260430_add_blocks_consensus_proposer.json @@ -0,0 +1,18 @@ +{ + "operations": [ + { + "add_column": { + "table": "blocks", + "column": { + "name": "consensus_proposer", + "type": "bytea", + "nullable": true, + "check": { + "name": "blocks_consensus_proposer_len", + "constraint": "consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32" + } + } + } + } + ] +} diff --git a/db/pgroll/README.md b/db/pgroll/README.md index 20ff9682..6e9a46f2 100644 --- a/db/pgroll/README.md +++ b/db/pgroll/README.md @@ -40,7 +40,7 @@ Production Docker commands from `docker/prod`: - `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-bootstrap` - Normal migration flow before starting/updating `tidx`: - `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-migrate` - - `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260417_add_logs_virtual_forward_indexes` (for example, if bootstrap was skipped on a non-empty database). + - `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260430_add_blocks_consensus_proposer` (for example, if bootstrap was skipped on a non-empty database). The production image published by GitHub Actions includes both `tidx pgroll` and the `/db/pgroll` migration files. @@ -48,3 +48,4 @@ Post-baseline migrations in this directory: - `20260416_add_is_virtual_forward_column.json` - `20260417_add_logs_virtual_forward_indexes.json` +- `20260430_add_blocks_consensus_proposer.json` diff --git a/docker/prod/docker-compose.yml b/docker/prod/docker-compose.yml index 53c9157f..b51c4bd0 100644 --- a/docker/prod/docker-compose.yml +++ b/docker/prod/docker-compose.yml @@ -88,7 +88,7 @@ services: tidx pgroll init --postgres-url "$${postgres_url}" || true tidx pgroll migrate /db/pgroll --complete --postgres-url "$${postgres_url}" status="$$(tidx pgroll status --postgres-url "$${postgres_url}")" - if ! echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then + if ! echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then echo "pgroll migration did not reach expected version for $${postgres_url}:" >&2 echo "$${status}" >&2 exit 1 From ab3c642d0295b1fa3f2cb9e3b1e789b60f95469c Mon Sep 17 00:00:00 2001 From: o-az Date: Mon, 4 May 2026 22:57:18 -0700 Subject: [PATCH 5/6] fix: remove manual ps migrations and keep pgroll one --- Cargo.lock | 2 +- ...20260430_add_blocks_consensus_proposer.sql | 21 ------------------- src/db/schema.rs | 6 ------ src/tempo.rs | 1 - tests/migration_test.rs | 5 ++++- 5 files changed, 5 insertions(+), 30 deletions(-) delete mode 100644 db/migrations/20260430_add_blocks_consensus_proposer.sql diff --git a/Cargo.lock b/Cargo.lock index 57f08a26..12cc0ec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6719,7 +6719,7 @@ dependencies = [ [[package]] name = "tidx" -version = "0.5.4" +version = "0.5.5" dependencies = [ "alloy", "anyhow", diff --git a/db/migrations/20260430_add_blocks_consensus_proposer.sql b/db/migrations/20260430_add_blocks_consensus_proposer.sql deleted file mode 100644 index c0eecc53..00000000 --- a/db/migrations/20260430_add_blocks_consensus_proposer.sql +++ /dev/null @@ -1,21 +0,0 @@ --- TIP-1031: store the ed25519 consensus proposer pubkey for each block. --- --- Existing PostgreSQL deployments created before this change already have a --- `blocks` table, so `CREATE TABLE IF NOT EXISTS` in db/blocks.sql will not --- add the new column. Run this migration once during upgrade, or let tidx --- apply it via `run_migrations()` on startup. -ALTER TABLE blocks - ADD COLUMN IF NOT EXISTS consensus_proposer BYTEA; - --- Pubkey must be exactly 32 bytes (or NULL for pre-fork blocks). --- Add the constraint defensively in case the column already existed without it. -DO $$ -BEGIN - IF NOT EXISTS ( - SELECT 1 FROM pg_constraint WHERE conname = 'blocks_consensus_proposer_len' - ) THEN - ALTER TABLE blocks - ADD CONSTRAINT blocks_consensus_proposer_len - CHECK (consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32); - END IF; -END$$; diff --git a/src/db/schema.rs b/src/db/schema.rs index bcdbb549..7daecb78 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -54,12 +54,6 @@ pub async fn run_migrations(pool: &Pool) -> Result<()> { )) .await?; - // TIP-1031: ed25519 consensus proposer pubkey on blocks. - conn.batch_execute(include_str!( - "../../db/migrations/20260430_add_blocks_consensus_proposer.sql" - )) - .await?; - // Heavyweight upgrades such as concurrent index creation run in a // best-effort post-startup task so normal boot isn't blocked. Production // should still apply them in a pre-deploy migration flow. diff --git a/src/tempo.rs b/src/tempo.rs index e3754363..aae844e4 100644 --- a/src/tempo.rs +++ b/src/tempo.rs @@ -7,7 +7,6 @@ use alloy::network::Network; use tempo_alloy::TempoNetwork; -pub use tempo_alloy::TempoNetwork; pub use tempo_alloy::primitives::TempoTxEnvelope; pub use tempo_alloy::rpc::TempoTransactionReceipt; diff --git a/tests/migration_test.rs b/tests/migration_test.rs index cfa36591..47b8f9ac 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -1,6 +1,6 @@ use futures::FutureExt; use std::panic::AssertUnwindSafe; -use tidx::db::{create_pool, run_migrations}; +use tidx::db::{create_pool, run_migrations, run_post_startup_migrations}; use tokio_postgres::NoTls; use url::Url; @@ -57,6 +57,9 @@ async fn test_pg_upgrade_adds_virtual_forward_column_before_indexes() { run_migrations(&pool) .await .expect("Failed to run migrations against old logs schema"); + run_post_startup_migrations(&pool) + .await + .expect("Failed to run post-startup migrations against old logs schema"); let conn = pool.get().await.expect("Failed to get post-migration connection"); From 8e3b47e9315d157623d300226c9c98b455c6e156 Mon Sep 17 00:00:00 2001 From: o-az Date: Mon, 4 May 2026 23:02:18 -0700 Subject: [PATCH 6/6] chore: changeset --- .changelog/happy-pigs-write.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changelog/happy-pigs-write.md diff --git a/.changelog/happy-pigs-write.md b/.changelog/happy-pigs-write.md new file mode 100644 index 00000000..d6ee5968 --- /dev/null +++ b/.changelog/happy-pigs-write.md @@ -0,0 +1,5 @@ +--- +tidx: patch +--- + +Added a `consensus_proposer` column to the `blocks` table for `TIP-1031`