Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/happy-pigs-write.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
tidx: patch
---

Added a `consensus_proposer` column to the `blocks` table for `TIP-1031`
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ postgres-types = { version = "0.2", features = ["derive"] }
# Ethereum/Tempo primitives
alloy = { version = "2", features = ["full"] }
tempo-alloy = { git = "https://github.com/tempoxyz/tempo", default-features = false }
tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false }
tempo-primitives = { git = "https://github.com/tempoxyz/tempo", package = "tempo-primitives", default-features = false, features = ["serde"] }

# URL parsing
form_urlencoded = "1"
Expand Down Expand Up @@ -65,7 +65,7 @@ thiserror = "2"
anyhow = "1"

# Time
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4", features = ["std", "serde"] }

# ClickHouse
clickhouse = { version = "0.14", features = ["lz4", "chrono", "rustls-tls"] }
Expand Down Expand Up @@ -160,4 +160,3 @@ needless_range_loop = "allow"
format_push_string = "allow"
unnecessary_cast = "allow"
cast_lossless = "allow"

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ All tables use composite primary keys with timestamps for efficient range querie
| `gas_used` | `INT8` | Gas used |
| `miner` | `BYTEA` | Block producer |
| `extra_data` | `BYTEA` | Extra data field |
| `consensus_proposer` | `BYTEA` | Ed25519 consensus proposer pubkey (TIP-1031, NULL pre-fork) |

### txs

Expand Down
1 change: 1 addition & 0 deletions benches/sync_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn generate_blocks(count: usize, offset: usize) -> Vec<BlockRow> {
gas_used: 15_000_000,
miner: vec![0u8; 20],
extra_data: Some(vec![0u8; 32]),
consensus_proposer: None,
}
})
.collect()
Expand Down
1 change: 1 addition & 0 deletions benches/write_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn generate_blocks(count: usize) -> Vec<BlockRow> {
gas_used: 15_000_000,
miner: vec![0u8; 20],
extra_data: Some(vec![0u8; 32]),
consensus_proposer: None,
})
.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ services:
sleep 1
done
status="$$(tidx pgroll status --postgres-url "$${POSTGRES_URL}" 2>/dev/null || true)"
if echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then
if echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then
echo 'pgroll migrations already complete'
exit 0
fi
Expand Down
1 change: 1 addition & 0 deletions db/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS blocks (
gas_used INT8 NOT NULL,
miner BYTEA NOT NULL,
extra_data BYTEA,
consensus_proposer BYTEA CHECK (consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32),
PRIMARY KEY (timestamp, num)
);

Expand Down
3 changes: 2 additions & 1 deletion db/clickhouse/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ CREATE TABLE IF NOT EXISTS blocks (
gas_limit Int64,
gas_used Int64,
miner String,
extra_data Nullable(String)
extra_data Nullable(String),
consensus_proposer Nullable(String)
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (num)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- TIP-1031: store the ed25519 consensus proposer pubkey for each block.
--
-- Existing ClickHouse deployments created before this change already have a
-- `blocks` table, so `CREATE TABLE IF NOT EXISTS` in db/clickhouse/blocks.sql
-- will not add the new column. Run this migration once during upgrade, or let
-- tidx apply it via `ClickHouseSink::ensure_schema()` on startup.
ALTER TABLE blocks
ADD COLUMN IF NOT EXISTS consensus_proposer Nullable(String);
18 changes: 18 additions & 0 deletions db/pgroll/20260430_add_blocks_consensus_proposer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"operations": [
{
"add_column": {
"table": "blocks",
"column": {
"name": "consensus_proposer",
"type": "bytea",
"nullable": true,
"check": {
"name": "blocks_consensus_proposer_len",
"constraint": "consensus_proposer IS NULL OR octet_length(consensus_proposer) = 32"
}
}
}
}
]
}
3 changes: 2 additions & 1 deletion db/pgroll/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ Production Docker commands from `docker/prod`:
- `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-bootstrap`
- Normal migration flow before starting/updating `tidx`:
- `PGROLL_POSTGRES_URLS="postgres://.../tidx_moderato?sslmode=disable postgres://.../tidx_mainnet?sslmode=disable" docker compose --profile migrations run --rm pgroll-migrate`
- `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260417_add_logs_virtual_forward_indexes` (for example, if bootstrap was skipped on a non-empty database).
- `pgroll-migrate` verifies the final pgroll version and exits non-zero if migrations do not reach `20260430_add_blocks_consensus_proposer` (for example, if bootstrap was skipped on a non-empty database).

The production image published by GitHub Actions includes both `tidx pgroll` and the `/db/pgroll` migration files.

Post-baseline migrations in this directory:

- `20260416_add_is_virtual_forward_column.json`
- `20260417_add_logs_virtual_forward_indexes.json`
- `20260430_add_blocks_consensus_proposer.json`
2 changes: 1 addition & 1 deletion docker/prod/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ services:
tidx pgroll init --postgres-url "$${postgres_url}" || true
tidx pgroll migrate /db/pgroll --complete --postgres-url "$${postgres_url}"
status="$$(tidx pgroll status --postgres-url "$${postgres_url}")"
if ! echo "$${status}" | grep -q '"version": "20260417_add_logs_virtual_forward_indexes"'; then
if ! echo "$${status}" | grep -q '"version": "20260430_add_blocks_consensus_proposer"'; then
echo "pgroll migration did not reach expected version for $${postgres_url}:" >&2
echo "$${status}" >&2
exit 1
Expand Down
12 changes: 12 additions & 0 deletions src/sync/ch_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const LOGS_MIGRATION_20260416: &str =
include_str!("../../db/clickhouse/migrations/20260416_add_is_virtual_forward.sql");
const LOGS_MIGRATION_20260417: &str =
include_str!("../../db/clickhouse/migrations/20260417_add_logs_virtual_forward_index.sql");
const BLOCKS_MIGRATION_20260430: &str =
include_str!("../../db/clickhouse/migrations/20260430_add_blocks_consensus_proposer.sql");
const RECEIPTS_SCHEMA: &str = include_str!("../../db/clickhouse/receipts.sql");

/// Max rows per ClickHouse INSERT to avoid unbounded memory growth during backfills.
Expand Down Expand Up @@ -114,6 +116,13 @@ impl ClickHouseSink {
.await
.map_err(|e| anyhow!("Failed to run ClickHouse logs migration 20260417: {e}"))?;

// TIP-1031: ed25519 consensus proposer pubkey on blocks.
self.client
.query(BLOCKS_MIGRATION_20260430)
.execute()
.await
.map_err(|e| anyhow!("Failed to run ClickHouse blocks migration 20260430: {e}"))?;

info!(database = %self.database, "ClickHouse schema ready");
Ok(())
}
Expand Down Expand Up @@ -345,6 +354,7 @@ struct ChBlockWire {
gas_used: i64,
miner: String,
extra_data: Option<String>,
consensus_proposer: Option<String>,
}

impl ChBlockWire {
Expand All @@ -359,6 +369,7 @@ impl ChBlockWire {
gas_used: b.gas_used,
miner: hex_encode(&b.miner),
extra_data: b.extra_data.as_ref().map(|v| hex_encode(v)),
consensus_proposer: b.consensus_proposer.as_ref().map(|v| hex_encode(v)),
}
}
}
Expand Down Expand Up @@ -553,6 +564,7 @@ mod tests {
gas_used: 15_000_000,
miner: vec![0xee; 20],
extra_data: None,
consensus_proposer: None,
};

let wire = ChBlockWire::from_row(&block);
Expand Down
28 changes: 17 additions & 11 deletions src/sync/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloy::consensus::transaction::Recovered;
use alloy::consensus::{Transaction as TransactionTrait, Typed2718};
use alloy::consensus::{BlockHeader as _, Transaction as TransactionTrait, Typed2718};
use alloy::network::{ReceiptResponse, TransactionResponse};
use alloy::primitives::B256;
use chrono::{DateTime, TimeZone, Utc};
use tempo_alloy::primitives::transaction::SignatureType;

Expand All @@ -14,25 +15,30 @@ pub fn timestamp_from_secs(secs: u64) -> DateTime<Utc> {
}

pub fn decode_block(block: &Block) -> BlockRow {
let timestamp_secs = block.header.timestamp;
let header = &block.header;
let timestamp_secs = header.timestamp();
let timestamp = timestamp_from_secs(timestamp_secs);
let timestamp_ms = (timestamp_secs * 1000) as i64;

BlockRow {
num: block.header.number as i64,
hash: block.header.hash.as_slice().to_vec(),
parent_hash: block.header.parent_hash.as_slice().to_vec(),
num: header.number() as i64,
hash: header.hash.as_slice().to_vec(),
parent_hash: header.parent_hash().as_slice().to_vec(),
timestamp,
timestamp_ms,
gas_limit: block.header.gas_limit as i64,
gas_used: block.header.gas_used as i64,
miner: block.header.beneficiary.as_slice().to_vec(),
extra_data: Some(block.header.extra_data.to_vec()),
gas_limit: header.gas_limit() as i64,
gas_used: header.gas_used() as i64,
miner: header.beneficiary().as_slice().to_vec(),
extra_data: Some(header.extra_data().to_vec()),
consensus_proposer: header
.consensus_context
.as_ref()
.map(|consensus_context| B256::from(&consensus_context.proposer).0.to_vec()),
}
}

pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow {
let block_timestamp = timestamp_from_secs(block.header.timestamp);
let block_timestamp = timestamp_from_secs(block.header.timestamp());
let inner: &Recovered<TempoTxEnvelope> = &tx.inner;

// Extract Tempo-specific fields if this is a 0x76 transaction
Expand All @@ -57,7 +63,7 @@ pub fn decode_transaction(tx: &Transaction, block: &Block, idx: u32) -> TxRow {
};

TxRow {
block_num: block.header.number as i64,
block_num: block.header.number() as i64,
block_timestamp,
idx: idx as i32,
hash: tx.tx_hash().as_slice().to_vec(),
Expand Down
23 changes: 12 additions & 11 deletions src/sync/engine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use alloy::consensus::BlockHeader as _;
use alloy::network::ReceiptResponse;
use anyhow::Result;
use std::collections::HashMap;
Expand Down Expand Up @@ -409,14 +410,14 @@ impl SyncEngine {
for block in &blocks {
broadcaster.send(BlockUpdate {
chain_id: self.chain_id,
block_num: block.header.number,
block_num: block.header.number(),
block_hash: format!("0x{}", hex::encode(block.header.hash)),
tx_count: block.transactions.len() as u64,
log_count: logs_per_block
.get(&(block.header.number as i64))
.get(&(block.header.number() as i64))
.copied()
.unwrap_or(0),
timestamp: block.header.timestamp as i64,
timestamp: block.header.timestamp() as i64,
});
}
}
Expand Down Expand Up @@ -446,7 +447,7 @@ impl SyncEngine {
}

let first_block = &blocks[0];
let first_num = first_block.header.number;
let first_num = first_block.header.number();

// Check parent hash against stored block (if not genesis)
if first_num > 0
Expand All @@ -455,19 +456,19 @@ impl SyncEngine {
let expected_parent: [u8; 32] = stored_hash
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid stored hash length"))?;
if first_block.header.parent_hash.0 != expected_parent {
if first_block.header.parent_hash().0 != expected_parent {
// Reorg detected - handle it automatically
return self.handle_reorg(first_num).await;
}
}

// Validate internal chain continuity
for window in blocks.windows(2) {
if window[1].header.parent_hash != window[0].header.hash {
if window[1].header.parent_hash() != window[0].header.hash {
return Err(anyhow::anyhow!(
"Internal chain break at block {}: parent_hash {:?} != prev hash {:?}",
window[1].header.number,
hex::encode(window[1].header.parent_hash.0),
window[1].header.number(),
hex::encode(window[1].header.parent_hash().0),
hex::encode(window[0].header.hash.0)
));
}
Expand Down Expand Up @@ -561,7 +562,7 @@ impl SyncEngine {

let block_timestamps: HashMap<u64, _> = blocks
.iter()
.map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp)))
.map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp())))
.collect();

let block_rows: Vec<_> = blocks.iter().map(decode_block).collect();
Expand Down Expand Up @@ -636,7 +637,7 @@ impl SyncEngine {
)?;

let block_row = decode_block(&block);
let block_ts = timestamp_from_secs(block.header.timestamp);
let block_ts = timestamp_from_secs(block.header.timestamp());
let mut txs: Vec<_> = block
.transactions
.txns()
Expand Down Expand Up @@ -1356,7 +1357,7 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to:

let block_timestamps: HashMap<u64, _> = blocks
.iter()
.map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp)))
.map(|b| (b.header.number(), timestamp_from_secs(b.header.timestamp())))
.collect();

let block_rows: Vec<_> = blocks.iter().map(decode_block).collect();
Expand Down
3 changes: 2 additions & 1 deletion src/sync/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ async fn fetch_blocks(
) -> Result<Vec<BlockRow>> {
let rows = conn
.query(
"SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data \
"SELECT num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner, extra_data, consensus_proposer \
FROM blocks WHERE num >= $1 AND num <= $2 ORDER BY num",
&[&from, &to],
)
Expand All @@ -336,6 +336,7 @@ async fn fetch_blocks(
gas_used: r.get(6),
miner: r.get(7),
extra_data: r.get(8),
consensus_proposer: r.get(9),
})
.collect())
}
Expand Down
Loading
Loading