From 13e469a084daa5d02ea0ee1a72610f13ba9c77b4 Mon Sep 17 00:00:00 2001 From: jxom <7336481+jxom@users.noreply.github.com> Date: Wed, 1 Apr 2026 09:41:13 +1300 Subject: [PATCH] fix: validate RPC batch responses Fix 4 High severity audit findings in src/sync/fetcher.rs: 1. JSON-RPC Batch Reordering: Match responses by JSON-RPC id field instead of assuming positional order. 2. RPC Block-Number Spoofing: Verify block.header.number matches requested height after id-based matching. 3. Truncated Batch Response: Verify response count matches request count; error if blocks are missing. 4. Per-Item Receipt Batch Errors: Check each batch response item for error field and propagate failures instead of silently dropping. Amp-Thread-ID: https://ampcode.com/threads/T-019d458d-01b9-76ca-9fb1-b08263a4d59c Co-authored-by: Amp --- src/sync/fetcher.rs | 458 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 396 insertions(+), 62 deletions(-) diff --git a/src/sync/fetcher.rs b/src/sync/fetcher.rs index e5db0c2f..5dbf70e3 100644 --- a/src/sync/fetcher.rs +++ b/src/sync/fetcher.rs @@ -33,6 +33,7 @@ struct RpcRequest<'a> { #[derive(Deserialize)] struct RpcResponse { + id: Option, result: Option, error: Option, } @@ -106,6 +107,8 @@ impl RpcClient { .await .map_err(|_| anyhow!("RPC semaphore closed"))?; + let expected_count = (range.end() - range.start() + 1) as usize; + let batch: Vec<_> = range .clone() .enumerate() @@ -155,12 +158,61 @@ impl RpcClient { anyhow!("Failed to decode blocks response: {}", e) })?; - responses + // Finding 3: Detect truncated batch responses + if responses.len() != expected_count { + anyhow::bail!( + "Truncated block batch response: expected {} items, got {} (range {}..={})", + expected_count, + responses.len(), + range.start(), + range.end() + ); + } + + // Finding 1: Match responses by JSON-RPC id field (not positional order) + let mut blocks: Vec> = vec![None; expected_count]; + for resp in responses { + if let Some(err) = resp.error { + anyhow::bail!( + "RPC error in block batch item id={:?}: {}", + resp.id, + err.message + ); + } + let id = resp + .id + .ok_or_else(|| anyhow!("Block batch response item missing id field"))? + as usize; + if id >= expected_count { + anyhow::bail!( + "Block batch response id {id} out of range (expected 0..{expected_count})" + ); + } + let block = resp.result.ok_or_else(|| { + anyhow!("Block {} not found", range.start() + id as u64) + })?; + // Finding 2: Verify block number matches the requested height + let expected_num = range.start() + id as u64; + if block.header.number != expected_num { + anyhow::bail!( + "Block number mismatch: requested {} but RPC returned block {}", + expected_num, + block.header.number + ); + } + blocks[id] = Some(block); + } + + blocks .into_iter() .enumerate() - .map(|(i, r)| { - r.result - .ok_or_else(|| anyhow!("Block {} not found", range.start() + i as u64)) + .map(|(i, b)| { + b.ok_or_else(|| { + anyhow!( + "Missing response for block {} (duplicate id in batch?)", + range.start() + i as u64 + ) + }) }) .collect() } @@ -188,6 +240,8 @@ impl RpcClient { .await .map_err(|_| anyhow!("RPC semaphore closed"))?; + let expected_count = (range.end() - range.start() + 1) as usize; + let batch: Vec<_> = range .clone() .enumerate() @@ -239,9 +293,54 @@ impl RpcClient { anyhow!("Failed to decode receipts response: {}", e) })?; - responses + // Detect truncated batch responses + if responses.len() != expected_count { + anyhow::bail!( + "Truncated receipt batch response: expected {} items, got {} (range {}..={})", + expected_count, + responses.len(), + range.start(), + range.end() + ); + } + + // Match responses by JSON-RPC id field and check per-item errors + let mut results: Vec>> = vec![None; expected_count]; + for resp in responses { + // Finding 4: Check per-item errors instead of silently dropping them + if let Some(err) = resp.error { + anyhow::bail!( + "RPC error in receipt batch item id={:?} (block {}): {}", + resp.id, + resp.id + .map(|id| range.start() + id) + .map_or("unknown".to_string(), |n| n.to_string()), + err.message + ); + } + let id = resp + .id + .ok_or_else(|| anyhow!("Receipt batch response item missing id field"))? + as usize; + if id >= expected_count { + anyhow::bail!( + "Receipt batch response id {id} out of range (expected 0..{expected_count})" + ); + } + results[id] = Some(resp.result.unwrap_or_default()); + } + + results .into_iter() - .map(|r| Ok(r.result.unwrap_or_default())) + .enumerate() + .map(|(i, r)| { + r.ok_or_else(|| { + anyhow!( + "Missing receipt response for block {} (duplicate id in batch?)", + range.start() + i as u64 + ) + }) + }) .collect() } @@ -418,84 +517,319 @@ impl RpcClient { #[cfg(test)] mod tests { use super::*; - use axum::{Json, Router, extract::State, response::IntoResponse, routing::post}; + use axum::{Json, Router, routing::post}; use serde_json::{Value, json}; use std::sync::Arc; use tokio::sync::Mutex; - #[derive(Clone)] - struct TestState { - request_sizes: Arc>>, + // ---- helpers ---- + + /// Minimal valid block JSON for a given number + fn fake_block_json(number: u64) -> Value { + json!({ + "hash": format!("0x{:064x}", number), + "number": format!("0x{:x}", number), + "parentHash": format!("0x{:064x}", number.wrapping_sub(1)), + "timestamp": "0x0", + "gasLimit": "0x0", + "gasUsed": "0x0", + "miner": "0x0000000000000000000000000000000000000000", + "extraData": "0x", + "baseFeePerGas": "0x0", + "difficulty": "0x0", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "sha3Uncles": "0x0000000000000000000000000000000000000000000000000000000000000000", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "transactionsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "size": "0x0", + "totalDifficulty": "0x0", + "uncles": [], + "transactions": [] + }) + } + + /// Spin up a test RPC server, returns (url, abort handle) + macro_rules! test_server { + ($app:expr) => {{ + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind test RPC server"); + let addr = listener.local_addr().unwrap(); + let url = format!("http://127.0.0.1:{}", addr.port()); + let handle = tokio::spawn(async move { + axum::serve(listener, $app).await.expect("RPC server failed"); + }); + (url, handle) + }}; } - async fn rpc_handler( - State(state): State, - Json(body): Json, - ) -> impl IntoResponse { - let requests = body.as_array().expect("expected batch request"); - let batch_size = requests.len(); - state.request_sizes.lock().await.push(batch_size); - - if batch_size > 2 { - Json(json!({ - "jsonrpc": "2.0", - "id": 0, - "error": { - "code": -32000, - "message": "response size exceeded" + // ---- adaptive receipt split test ---- + + #[tokio::test] + async fn test_get_receipts_batch_adaptive_splits_and_preserves_order() { + let request_sizes: Arc>> = Arc::new(Mutex::new(Vec::new())); + let sizes_clone = request_sizes.clone(); + + let app = Router::new().route( + "/", + post(move |Json(body): Json| { + let sizes = sizes_clone.clone(); + async move { + let requests = body.as_array().expect("expected batch request"); + let batch_size = requests.len(); + sizes.lock().await.push(batch_size); + + if batch_size > 2 { + Json(json!({ + "jsonrpc": "2.0", + "id": 0, + "error": { "code": -32000, "message": "response size exceeded" } + })) + } else { + let responses: Vec = requests + .iter() + .map(|req| json!({ "jsonrpc": "2.0", "id": req["id"], "result": [] })) + .collect(); + Json(Value::Array(responses)) + } } - })) - } else { - let responses: Vec = requests - .iter() - .map(|req| { - json!({ - "jsonrpc": "2.0", - "id": req["id"].as_u64().unwrap(), - "result": [] + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + let receipts = client + .get_receipts_batch_adaptive(1..=5) + .await + .expect("adaptive receipt fetch should succeed"); + + assert_eq!(receipts.len(), 5); + assert!(receipts.iter().all(Vec::is_empty)); + + let sizes = request_sizes.lock().await.clone(); + assert_eq!(sizes, vec![5, 3, 2, 1, 2]); + server.abort(); + } + + // ---- Finding 1: Out-of-order responses matched by id ---- + + #[tokio::test] + async fn test_blocks_batch_reordered_responses_matched_by_id() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + let mut responses: Vec = requests + .iter() + .map(|req| { + let id = req["id"].as_u64().unwrap(); + let block_hex = req["params"][0].as_str().unwrap(); + let num = u64::from_str_radix(block_hex.trim_start_matches("0x"), 16) + .unwrap(); + json!({ "jsonrpc": "2.0", "id": id, "result": fake_block_json(num) }) }) - }) - .collect(); - Json(Value::Array(responses)) + .collect(); + responses.reverse(); // respond in reverse order + Json(Value::Array(responses)) + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + + let blocks = client + .get_blocks_batch(10..=12) + .await + .expect("should handle reordered responses"); + + assert_eq!(blocks.len(), 3); + assert_eq!(blocks[0].header.number, 10); + assert_eq!(blocks[1].header.number, 11); + assert_eq!(blocks[2].header.number, 12); + server.abort(); + } + + // ---- Happy path: in-order block responses ---- + + #[tokio::test] + async fn test_blocks_batch_happy_path_in_order() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + let responses: Vec = requests + .iter() + .map(|req| { + let id = req["id"].as_u64().unwrap(); + let block_hex = req["params"][0].as_str().unwrap(); + let num = u64::from_str_radix(block_hex.trim_start_matches("0x"), 16) + .unwrap(); + json!({ "jsonrpc": "2.0", "id": id, "result": fake_block_json(num) }) + }) + .collect(); + Json(Value::Array(responses)) + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + + let blocks = client + .get_blocks_batch(5..=9) + .await + .expect("happy path should succeed"); + + assert_eq!(blocks.len(), 5); + for (i, block) in blocks.iter().enumerate() { + assert_eq!(block.header.number, 5 + i as u64); } + server.abort(); } + // ---- Finding 2: Block number mismatch detection ---- + #[tokio::test] - async fn test_get_receipts_batch_adaptive_splits_and_preserves_order() { - let request_sizes = Arc::new(Mutex::new(Vec::new())); - let state = TestState { - request_sizes: request_sizes.clone(), - }; + async fn test_blocks_batch_detects_number_mismatch() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + let responses: Vec = requests + .iter() + .map(|req| { + let id = req["id"].as_u64().unwrap(); + // Return wrong block number (always block 999) + json!({ "jsonrpc": "2.0", "id": id, "result": fake_block_json(999) }) + }) + .collect(); + Json(Value::Array(responses)) + }), + ); - let app = Router::new() - .route("/", post(rpc_handler)) - .with_state(state); + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + let err = client + .get_blocks_batch(10..=12) .await - .expect("Failed to bind test RPC server"); - let addr = listener.local_addr().unwrap(); + .expect_err("should detect block number mismatch"); - let server = tokio::spawn(async move { - axum::serve(listener, app).await.expect("RPC server failed"); - }); + assert!( + err.to_string().contains("Block number mismatch"), + "expected mismatch error, got: {err}" + ); + server.abort(); + } - let client = RpcClient::new(&format!("http://127.0.0.1:{}", addr.port())); - let receipts = client - .get_receipts_batch_adaptive(1..=5) + // ---- Finding 3: Truncated response detection ---- + + #[tokio::test] + async fn test_blocks_batch_detects_truncated_response() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + // Return only the first response + let req = &requests[0]; + let id = req["id"].as_u64().unwrap(); + let block_hex = req["params"][0].as_str().unwrap(); + let num = + u64::from_str_radix(block_hex.trim_start_matches("0x"), 16).unwrap(); + Json(json!([{ + "jsonrpc": "2.0", + "id": id, + "result": fake_block_json(num) + }])) + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + + let err = client + .get_blocks_batch(10..=14) .await - .expect("adaptive receipt fetch should succeed"); + .expect_err("should detect truncated response"); - assert_eq!( - receipts.len(), - 5, - "should return one receipt vector per block" + assert!( + err.to_string().contains("Truncated block batch response"), + "expected truncation error, got: {err}" ); - assert!(receipts.iter().all(Vec::is_empty)); + server.abort(); + } - let sizes = request_sizes.lock().await.clone(); - assert_eq!(sizes, vec![5, 3, 2, 1, 2]); + // ---- Finding 4: Per-item receipt error detection ---- + #[tokio::test] + async fn test_receipts_batch_detects_per_item_error() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + let responses: Vec = requests + .iter() + .map(|req| { + let id = req["id"].as_u64().unwrap(); + if id == 1 { + json!({ + "jsonrpc": "2.0", + "id": id, + "error": { "code": -32000, "message": "block not available" } + }) + } else { + json!({ "jsonrpc": "2.0", "id": id, "result": [] }) + } + }) + .collect(); + Json(Value::Array(responses)) + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + + let err = client + .get_receipts_batch(100..=102) + .await + .expect_err("should detect per-item receipt error"); + + assert!( + err.to_string().contains("block not available"), + "expected per-item error propagated, got: {err}" + ); + server.abort(); + } + + // ---- Receipt happy path with reordering ---- + + #[tokio::test] + async fn test_receipts_batch_reordered_responses() { + let app = Router::new().route( + "/", + post(|Json(body): Json| async move { + let requests = body.as_array().expect("expected batch"); + let mut responses: Vec = requests + .iter() + .map(|req| json!({ "jsonrpc": "2.0", "id": req["id"], "result": [] })) + .collect(); + responses.reverse(); + Json(Value::Array(responses)) + }), + ); + + let (url, server) = test_server!(app); + let client = RpcClient::new(&url); + + let receipts = client + .get_receipts_batch(1..=4) + .await + .expect("should handle reordered receipt responses"); + + assert_eq!(receipts.len(), 4); + assert!(receipts.iter().all(Vec::is_empty)); server.abort(); } }