diff --git a/.changelog/happy-pigs-write.md b/.changelog/happy-pigs-write.md new file mode 100644 index 00000000..d6ee5968 --- /dev/null +++ b/.changelog/happy-pigs-write.md @@ -0,0 +1,5 @@ +--- +tidx: patch +--- + +Added a `consensus_proposer` column to the `blocks` table for `TIP-1031` diff --git a/Cargo.lock b/Cargo.lock index 57f08a26..12cc0ec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6719,7 +6719,7 @@ dependencies = [ [[package]] name = "tidx" -version = "0.5.4" +version = "0.5.5" dependencies = [ "alloy", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 47e3b254..99d544bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ postgres-types = { version = "0.2", features = ["derive"] } # Ethereum/Tempo primitives alloy = { version = "2", features = ["full"] } tempo-alloy = { git = "https://github.com/tempoxyz/tempo", default-features = false } -tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false } +tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false, features = ["serde"] } # URL parsing form_urlencoded = "1" @@ -65,7 +65,7 @@ thiserror = "2" anyhow = "1" # Time -chrono = { version = "0.4", features = ["serde"] } +chrono = { version = "0.4", features = ["std", "serde"] } # ClickHouse clickhouse = { version = "0.14", features = ["lz4", "chrono", "rustls-tls"] } @@ -160,4 +160,3 @@ needless_range_loop = "allow" format_push_string = "allow" unnecessary_cast = "allow" cast_lossless = "allow" - diff --git a/README.md b/README.md index f5901f5d..f6fcce45 100644 --- a/README.md +++ b/README.md @@ -470,6 +470,7 @@ All tables use composite primary keys with timestamps for efficient range querie | `gas_used` | `INT8` | Gas used | | `miner` | `BYTEA` | Block producer | | `extra_data` | `BYTEA` | Extra data field | +| `consensus_proposer` | `BYTEA` | Ed25519 consensus proposer pubkey (TIP-1031, NULL pre-fork) | ### txs diff --git a/benches/sync_bench.rs b/benches/sync_bench.rs index bf60d707..625fbef8 100644 --- a/benches/sync_bench.rs +++ b/benches/sync_bench.rs @@ -19,6 +19,7 @@ fn generate_blocks(count: usize, offset: usize) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, } }) .collect() diff --git a/benches/write_bench.rs b/benches/write_bench.rs index 0fea1e60..6fd8cce9 100644 --- a/benches/write_bench.rs +++ b/benches/write_bench.rs @@ -16,6 +16,7 @@ fn generate_blocks(count: usize) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, }) .collect() } diff --git a/compose.yml b/compose.yml index c989361e..d4733d52 100644 --- a/compose.yml +++ b/compose.yml @@ -144,7 +144,7 @@ services: sleep 1 done status="$$(tidx pgroll status --postgres-url "$${POSTGRES_URL}" 2>/dev/null || true)" - if echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then + if echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then echo 'pgroll migrations already complete' exit 0 fi diff --git a/db/blocks.sql b/db/blocks.sql index f980c8dc..42f4651a 100644 --- a/db/blocks.sql +++ b/db/blocks.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS blocks ( gas_used INT8 NOT NULL, miner BYTEA NOT NULL, extra_data BYTEA, + consensus_proposer BYTEA CHECK (consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32), PRIMARY KEY (timestamp, num) ); diff --git a/db/clickhouse/blocks.sql b/db/clickhouse/blocks.sql index ee428128..0992f070 100644 --- a/db/clickhouse/blocks.sql +++ b/db/clickhouse/blocks.sql @@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS blocks ( gas_limit Int64, gas_used Int64, miner String, - extra_data Nullable(String) + extra_data Nullable(String), + consensus_proposer Nullable(String) ) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(timestamp) ORDER BY (num) diff --git a/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql b/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql new file mode 100644 index 00000000..cf1ea8b2 --- /dev/null +++ b/db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql @@ -0,0 +1,8 @@ +-- TIP-1031: store the ed25519 consensus proposer pubkey for each block. +-- +-- Existing ClickHouse deployments created before this change already have a +-- `blocks` table, so `CREATE TABLE IF NOT EXISTS` in db/clickhouse/blocks.sql +-- will not add the new column. Run this migration once during upgrade, or let +-- tidx apply it via `ClickHouseSink::ensure_schema()` on startup. +ALTER TABLE blocks + ADD COLUMN IF NOT EXISTS consensus_proposer Nullable(String); diff --git a/db/pgroll/20260430_add_blocks_consensus_proposer.json b/db/pgroll/20260430_add_blocks_consensus_proposer.json new file mode 100644 index 00000000..c0030ea1 --- /dev/null +++ b/db/pgroll/20260430_add_blocks_consensus_proposer.json @@ -0,0 +1,18 @@ +{ + "operations": [ + { + "add_column": { + "table": "blocks", + "column": { + "name": "consensus_proposer", + "type": "bytea", + "nullable": true, + "check": { + "name": "blocks_consensus_proposer_len", + "constraint": "consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32" + } + } + } + } + ] +} diff --git a/db/pgroll/README.md b/db/pgroll/README.md index 20ff9682..6e9a46f2 100644 --- a/db/pgroll/README.md +++ b/db/pgroll/README.md @@ -40,7 +40,7 @@ Production Docker commands from `docker/prod`: - `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-bootstrap` - Normal migration flow before starting/updating `tidx`: - `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-migrate` - - `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260417_add_logs_virtual_forward_indexes` (for example, if bootstrap was skipped on a non-empty database). + - `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260430_add_blocks_consensus_proposer` (for example, if bootstrap was skipped on a non-empty database). The production image published by GitHub Actions includes both `tidx pgroll` and the `/db/pgroll` migration files. @@ -48,3 +48,4 @@ Post-baseline migrations in this directory: - `20260416_add_is_virtual_forward_column.json` - `20260417_add_logs_virtual_forward_indexes.json` +- `20260430_add_blocks_consensus_proposer.json` diff --git a/docker/prod/docker-compose.yml b/docker/prod/docker-compose.yml index 53c9157f..b51c4bd0 100644 --- a/docker/prod/docker-compose.yml +++ b/docker/prod/docker-compose.yml @@ -88,7 +88,7 @@ services: tidx pgroll init --postgres-url "$${postgres_url}" || true tidx pgroll migrate /db/pgroll --complete --postgres-url "$${postgres_url}" status="$$(tidx pgroll status --postgres-url "$${postgres_url}")" - if ! echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then + if ! echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then echo "pgroll migration did not reach expected version for $${postgres_url}:" >&2 echo "$${status}" >&2 exit 1 diff --git a/src/sync/ch_sink.rs b/src/sync/ch_sink.rs index 45a638f2..32d7f0e2 100644 --- a/src/sync/ch_sink.rs +++ b/src/sync/ch_sink.rs @@ -20,6 +20,8 @@ const LOGS_MIGRATION_20260416: &str = include_str!("../../db/clickhouse/migrations/20260416_add_is_virtual_forward.sql"); const LOGS_MIGRATION_20260417: &str = include_str!("../../db/clickhouse/migrations/20260417_add_logs_virtual_forward_index.sql"); +const BLOCKS_MIGRATION_20260430: &str = + include_str!("../../db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql"); const RECEIPTS_SCHEMA: &str = include_str!("../../db/clickhouse/receipts.sql"); /// Max rows per ClickHouse INSERT to avoid unbounded memory growth during backfills. @@ -114,6 +116,13 @@ impl ClickHouseSink { .await .map_err(|e| anyhow!("Failed to run ClickHouse logs migration 20260417: {e}"))?; + // TIP-1031: ed25519 consensus proposer pubkey on blocks. + self.client + .query(BLOCKS_MIGRATION_20260430) + .execute() + .await + .map_err(|e| anyhow!("Failed to run ClickHouse blocks migration 20260430: {e}"))?; + info!(database = %self.database, "ClickHouse schema ready"); Ok(()) } @@ -345,6 +354,7 @@ struct ChBlockWire { gas_used: i64, miner: String, extra_data: Option, + consensus_proposer: Option, } impl ChBlockWire { @@ -359,6 +369,7 @@ impl ChBlockWire { gas_used: b.gas_used, miner: hex_encode(&b.miner), extra_data: b.extra_data.as_ref().map(|v| hex_encode(v)), + consensus_proposer: b.consensus_proposer.as_ref().map(|v| hex_encode(v)), } } } @@ -553,6 +564,7 @@ mod tests { gas_used: 15_000_000, miner: vec![0xee; 20], extra_data: None, + consensus_proposer: None, }; let wire = ChBlockWire::from_row(&block); diff --git a/src/sync/decoder.rs b/src/sync/decoder.rs index 881d35b9..db4ec7ef 100644 --- a/src/sync/decoder.rs +++ b/src/sync/decoder.rs @@ -1,6 +1,7 @@ use alloy::consensus::transaction::Recovered; -use alloy::consensus::{Transaction as TransactionTrait, Typed2718}; +use alloy::consensus::{BlockHeader as _, Transaction as TransactionTrait, Typed2718}; use alloy::network::{ReceiptResponse, TransactionResponse}; +use alloy::primitives::B256; use chrono::{DateTime, TimeZone, Utc}; use tempo_alloy::primitives::transaction::SignatureType; @@ -14,25 +15,30 @@ pub fn timestamp_from_secs(secs: u64) -> DateTime { } pub fn decode_block(block: &Block) -> BlockRow { - let timestamp_secs = block.header.timestamp; + let header = &block.header; + let timestamp_secs = header.timestamp(); let timestamp = timestamp_from_secs(timestamp_secs); let timestamp_ms = (timestamp_secs * 1000) as i64; BlockRow { - num: block.header.number as i64, - hash: block.header.hash.as_slice().to_vec(), - parent_hash: block.header.parent_hash.as_slice().to_vec(), + num: header.number() as i64, + hash: header.hash.as_slice().to_vec(), + parent_hash: header.parent_hash().as_slice().to_vec(), timestamp, timestamp_ms, - gas_limit: block.header.gas_limit as i64, - gas_used: block.header.gas_used as i64, - miner: block.header.beneficiary.as_slice().to_vec(), - extra_data: Some(block.header.extra_data.to_vec()), + gas_limit: header.gas_limit() as i64, + gas_used: header.gas_used() as i64, + miner: header.beneficiary().as_slice().to_vec(), + extra_data: Some(header.extra_data().to_vec()), + consensus_proposer: header + .consensus_context + .as_ref() + .map(|consensus_context| B256::from(&consensus_context.proposer).0.to_vec()), } } pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { - let block_timestamp = timestamp_from_secs(block.header.timestamp); + let block_timestamp = timestamp_from_secs(block.header.timestamp()); let inner: &Recovered = &tx.inner; // Extract Tempo-specific fields if this is a 0x76 transaction @@ -57,7 +63,7 @@ pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow { }; TxRow { - block_num: block.header.number as i64, + block_num: block.header.number() as i64, block_timestamp, idx: idx as i32, hash: tx.tx_hash().as_slice().to_vec(), diff --git a/src/sync/engine.rs b/src/sync/engine.rs index c24fd4c8..16f0ba37 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -1,3 +1,4 @@ +use alloy::consensus::BlockHeader as _; use alloy::network::ReceiptResponse; use anyhow::Result; use std::collections::HashMap; @@ -409,14 +410,14 @@ impl SyncEngine { for block in &blocks { broadcaster.send(BlockUpdate { chain_id: self.chain_id, - block_num: block.header.number, + block_num: block.header.number(), block_hash: format!("0x{}", hex::encode(block.header.hash)), tx_count: block.transactions.len() as u64, log_count: logs_per_block - .get(&(block.header.number as i64)) + .get(&(block.header.number() as i64)) .copied() .unwrap_or(0), - timestamp: block.header.timestamp as i64, + timestamp: block.header.timestamp() as i64, }); } } @@ -446,7 +447,7 @@ impl SyncEngine { } let first_block = &blocks[0]; - let first_num = first_block.header.number; + let first_num = first_block.header.number(); // Check parent hash against stored block (if not genesis) if first_num > 0 @@ -455,7 +456,7 @@ impl SyncEngine { let expected_parent: [u8; 32] = stored_hash .try_into() .map_err(|_| anyhow::anyhow!("Invalid stored hash length"))?; - if first_block.header.parent_hash.0 != expected_parent { + if first_block.header.parent_hash().0 != expected_parent { // Reorg detected - handle it automatically return self.handle_reorg(first_num).await; } @@ -463,11 +464,11 @@ impl SyncEngine { // Validate internal chain continuity for window in blocks.windows(2) { - if window[1].header.parent_hash != window[0].header.hash { + if window[1].header.parent_hash() != window[0].header.hash { return Err(anyhow::anyhow!( "Internal chain break at block {}: parent_hash {:?} != prev hash {:?}", - window[1].header.number, - hex::encode(window[1].header.parent_hash.0), + window[1].header.number(), + hex::encode(window[1].header.parent_hash().0), hex::encode(window[0].header.hash.0) )); } @@ -561,7 +562,7 @@ impl SyncEngine { let block_timestamps: HashMap = blocks .iter() - .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) + .map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp()))) .collect(); let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); @@ -636,7 +637,7 @@ impl SyncEngine { )?; let block_row = decode_block(&block); - let block_ts = timestamp_from_secs(block.header.timestamp); + let block_ts = timestamp_from_secs(block.header.timestamp()); let mut txs: Vec<_> = block .transactions .txns() @@ -1356,7 +1357,7 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: let block_timestamps: HashMap = blocks .iter() - .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp))) + .map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp()))) .collect(); let block_rows: Vec<_> = blocks.iter().map(decode_block).collect(); diff --git a/src/sync/sink.rs b/src/sync/sink.rs index d238cf24..024d74d3 100644 --- a/src/sync/sink.rs +++ b/src/sync/sink.rs @@ -318,7 +318,7 @@ async fn fetch_blocks( ) -> Result> { let rows = conn .query( - "SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data \ + "SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer \ FROM blocks WHERE num >= $1 AND num <= $2 ORDER BY num", &[&from, &to], ) @@ -336,6 +336,7 @@ async fn fetch_blocks( gas_used: r.get(6), miner: r.get(7), extra_data: r.get(8), + consensus_proposer: r.get(9), }) .collect()) } diff --git a/src/sync/writer.rs b/src/sync/writer.rs index 268ff3db..e1f115c7 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -46,7 +46,8 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { tx.execute( "CREATE TEMP TABLE _staging_blocks ( num INT8, hash BYTEA, parent_hash BYTEA, timestamp TIMESTAMPTZ, - timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA + timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA, + consensus_proposer BYTEA ) ON COMMIT DROP", &[], ) @@ -62,11 +63,12 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { Type::INT8, // gas_used Type::BYTEA, // miner Type::BYTEA, // extra_data + Type::BYTEA, // consensus_proposer ]; let sink = tx .copy_in( - "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data) FROM STDIN BINARY", + "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer) FROM STDIN BINARY", ) .await?; @@ -86,6 +88,7 @@ pub async fn write_blocks(pool: &Pool, blocks: &[BlockRow]) -> Result<()> { &block.gas_used, &block.miner, &block.extra_data as &(dyn tokio_postgres::types::ToSql + Sync), + &block.consensus_proposer as &(dyn tokio_postgres::types::ToSql + Sync), ]) .await?; } @@ -420,7 +423,8 @@ pub async fn write_batch( tx.execute( "CREATE TEMP TABLE _staging_blocks ( num INT8, hash BYTEA, parent_hash BYTEA, timestamp TIMESTAMPTZ, - timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA + timestamp_ms INT8, gas_limit INT8, gas_used INT8, miner BYTEA, extra_data BYTEA, + consensus_proposer BYTEA ) ON COMMIT DROP", &[], ) @@ -436,11 +440,12 @@ pub async fn write_batch( Type::INT8, // gas_used Type::BYTEA, // miner Type::BYTEA, // extra_data + Type::BYTEA, // consensus_proposer ]; let sink = tx .copy_in( - "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data) FROM STDIN BINARY", + "COPY _staging_blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer) FROM STDIN BINARY", ) .await?; @@ -460,6 +465,7 @@ pub async fn write_batch( &block.gas_used, &block.miner, &block.extra_data as &(dyn tokio_postgres::types::ToSql + Sync), + &block.consensus_proposer as &(dyn tokio_postgres::types::ToSql + Sync), ]) .await?; } diff --git a/src/tempo.rs b/src/tempo.rs index 70fb6b9e..aae844e4 100644 --- a/src/tempo.rs +++ b/src/tempo.rs @@ -1,8 +1,16 @@ -pub use tempo_alloy::TempoNetwork; +//! Tempo type aliases. +//! +//! JSON-RPC payloads decode into upstream `TempoHeaderResponse` / `TempoHeader` +//! so the TIP-1031 consensus context, millisecond timestamp, and Tempo-specific +//! gas limits are preserved. + +use alloy::network::Network; +use tempo_alloy::TempoNetwork; + pub use tempo_alloy::primitives::TempoTxEnvelope; pub use tempo_alloy::rpc::TempoTransactionReceipt; -pub type Block = alloy::rpc::types::Block; -pub type Transaction = alloy::rpc::types::Transaction; +pub type Block = ::BlockResponse; +pub type Transaction = ::TransactionResponse; pub type Log = alloy::rpc::types::Log; pub type Receipt = TempoTransactionReceipt; diff --git a/src/types.rs b/src/types.rs index 578b6c1f..b270c1ce 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,8 @@ pub struct BlockRow { pub gas_used: i64, pub miner: Vec, pub extra_data: Option>, + /// T5+: Ed25519 public key of the consensus proposer for this block. Previously `None`. + pub consensus_proposer: Option>, } #[derive(Debug, Clone, Default)] diff --git a/tests/clickhouse_test.rs b/tests/clickhouse_test.rs index 725b246f..443a64e3 100644 --- a/tests/clickhouse_test.rs +++ b/tests/clickhouse_test.rs @@ -827,6 +827,7 @@ fn make_block(num: i64) -> BlockRow { gas_used: 21_000 * num, miner: vec![0xaa; 20], extra_data: Some(vec![0xbb, 0xcc]), + consensus_proposer: None, } } diff --git a/tests/common/seed.rs b/tests/common/seed.rs index b03c26c0..ce3f8c20 100644 --- a/tests/common/seed.rs +++ b/tests/common/seed.rs @@ -79,6 +79,7 @@ pub async fn seed(pool: &Pool, config: &SeedConfig) -> Result<(u64, i64, u64)> { gas_used: rng.random_range(10_000_000..25_000_000), miner: generate_address(block_num as u64), extra_data: None, + consensus_proposer: None, }); prev_block_hash = block_hash; diff --git a/tests/migration_test.rs b/tests/migration_test.rs index cfa36591..47b8f9ac 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -1,6 +1,6 @@ use futures::FutureExt; use std::panic::AssertUnwindSafe; -use tidx::db::{create_pool, run_migrations}; +use tidx::db::{create_pool, run_migrations, run_post_startup_migrations}; use tokio_postgres::NoTls; use url::Url; @@ -57,6 +57,9 @@ async fn test_pg_upgrade_adds_virtual_forward_column_before_indexes() { run_migrations(&pool) .await .expect("Failed to run migrations against old logs schema"); + run_post_startup_migrations(&pool) + .await + .expect("Failed to run post-startup migrations against old logs schema"); let conn = pool.get().await.expect("Failed to get post-migration connection"); diff --git a/tests/sync_optimizations_test.rs b/tests/sync_optimizations_test.rs index 8f24ebab..5f625dc4 100644 --- a/tests/sync_optimizations_test.rs +++ b/tests/sync_optimizations_test.rs @@ -24,6 +24,7 @@ fn generate_blocks(count: usize, offset: i64) -> Vec { gas_used: 15_000_000, miner: vec![0u8; 20], extra_data: Some(vec![0u8; 32]), + consensus_proposer: None, } }) .collect()