From eafa2af760d2b52057cc93012c78941f4d98297b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ksawery=20Wr=C3=B3bel?= Date: Wed, 4 Mar 2026 14:59:45 +0000 Subject: [PATCH 1/3] Add signature replay, fix node startup, fix log statements --- Cargo.lock | 13 - Cargo.toml | 1 - src/backend/mod.rs | 1 - src/backend/vault_signer_plugin.rs | 5 +- src/cluster/integration_tests.rs | 266 +++++++++++-------- src/cluster/mod.rs | 161 +++++++++--- src/cluster/storage.rs | 32 +++ src/cluster/tests.rs | 37 ++- src/config.rs | 36 +-- src/connection.rs | 11 + src/error.rs | 9 - src/main.rs | 402 +++++++++++++++++++++++------ src/persist.rs | 79 ------ src/protocol/mod.rs | 86 +++--- src/signer/mock_connection.rs | 4 +- src/signer/mod.rs | 63 ++--- src/signer/tests.rs | 177 +++++++++++-- src/types/mod.rs | 43 +-- src/versions/mod.rs | 8 + src/versions/v0_34.rs | 23 +- src/versions/v0_37.rs | 23 +- src/versions/v0_38.rs | 34 ++- src/versions/v1_0.rs | 53 +++- 23 files changed, 1058 insertions(+), 509 deletions(-) delete mode 100644 src/persist.rs diff --git a/Cargo.lock b/Cargo.lock index c385280..407ef78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -755,18 +755,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "enum_dispatch" -version = "0.3.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" -dependencies = [ - "once_cell", - "proc-macro2", - "quote", - "syn 2.0.101", -] - [[package]] name = "env_filter" version = "0.1.3" @@ -1690,7 +1678,6 @@ dependencies = [ "clap", "ed25519-consensus", "ed25519-dalek", - "enum_dispatch", "env_logger", "hex", "k256", diff --git a/Cargo.toml b/Cargo.toml index 585fae6..92025a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ raft-proto = { version = "0.7.0", features = ["prost", "protobuf-codec"] } slog-stdlog = "4.1.1" protobuf = "2.28.0" byteorder = "1.5.0" -enum_dispatch = "0.3.13" clap = { version = "4.5.45", features = ["derive"] } sha2 = "0.10.9" [build-dependencies] diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 011d906..419b051 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -202,7 +202,6 @@ mod tests { use crate::backend::Bls12381Signer; use crate::backend::SigningBackend; use base64::{Engine as _, engine::general_purpose}; - use hex; #[test] fn bls12381_sign_known_data() { diff --git a/src/backend/vault_signer_plugin.rs b/src/backend/vault_signer_plugin.rs index e898cc2..323faea 100644 --- a/src/backend/vault_signer_plugin.rs +++ b/src/backend/vault_signer_plugin.rs @@ -1,6 +1,5 @@ use crate::error::SignerError; -use hex; -use log::{debug, info, trace}; +use log::{info, trace}; use reqwest::blocking::{Client, ClientBuilder}; use reqwest::header::{HeaderMap, HeaderValue}; use serde_json::Value; @@ -167,7 +166,7 @@ impl PluginVaultSigner { let sig_bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, parts[2])?; - debug!("signed bytes from vault: {:?}", sig_bytes); + trace!("signed bytes from vault: {:?}", sig_bytes); let expected_sig_len = match self.pub_key.key_type { crate::types::KeyType::Ed25519 => 64, diff --git a/src/cluster/integration_tests.rs b/src/cluster/integration_tests.rs index 5eb0936..319dc29 100644 --- a/src/cluster/integration_tests.rs +++ b/src/cluster/integration_tests.rs @@ -1,8 +1,7 @@ use crate::backend::{Ed25519Signer, SigningBackend}; -use crate::cluster::SignerRaftNode; +use crate::cluster::{RaftEvent, SignerRaftNode}; use crate::config::{PeerConfig, RaftConfig}; use crate::handle_single_request; -use crate::persist::{Persist, PersistVariants}; use crate::proto::v0_38; use crate::signer::Signer; use crate::signer::mock_connection::{MockCometBFTConnection, MockConnectionHandle}; @@ -11,6 +10,7 @@ use crate::versions::VersionV0_38; use log::info; use prost::Message; use rand::Rng; +use std::collections::HashSet; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc; use std::sync::{Arc, Mutex}; @@ -21,6 +21,7 @@ use tempfile::TempDir; struct TestHarness { _temp_dir: TempDir, nodes: Vec, + events_rx: mpsc::Receiver, } impl TestHarness { @@ -28,6 +29,7 @@ impl TestHarness { setup(); let temp_dir = TempDir::new().unwrap(); let port_prefix = rand::rng().random_range(30000..60000); + let (events_tx, events_rx) = mpsc::channel(); let peers: Vec = (1..=num_nodes) .map(|i| PeerConfig { @@ -37,12 +39,21 @@ impl TestHarness { .collect(); let nodes: Vec = (1..=num_nodes) - .map(|i| create_test_node(port_prefix as u64, i as u64, peers.clone(), &temp_dir)) + .map(|i| { + create_test_node( + port_prefix as u64, + i as u64, + peers.clone(), + &temp_dir, + events_tx.clone(), + ) + }) .collect(); Self { _temp_dir: temp_dir, nodes, + events_rx, } } } @@ -80,6 +91,7 @@ fn create_test_node( node_id: u64, peers: Vec, temp_dir: &TempDir, + events_tx: mpsc::Sender, ) -> SignerRaftNode { let config = RaftConfig { node_id, @@ -93,7 +105,7 @@ fn create_test_node( peers, initial_state_path: "./non_existent_initial_state.json".to_string(), }; - SignerRaftNode::new(config) + SignerRaftNode::new(config, events_tx) } fn wait_for_leader(nodes: &[SignerRaftNode], timeout: Duration) -> Option<&SignerRaftNode> { @@ -101,7 +113,7 @@ fn wait_for_leader(nodes: &[SignerRaftNode], timeout: Duration) -> Option<&Signe while start.elapsed() < timeout { for node in nodes { if node.is_leader() { - return Some(&node); + return Some(node); } } thread::sleep(Duration::from_millis(30)); @@ -109,6 +121,37 @@ fn wait_for_leader(nodes: &[SignerRaftNode], timeout: Duration) -> Option<&Signe None } +fn wait_for_state_applied_events( + events_rx: &mpsc::Receiver, + expected_node_ids: &[u64], + height: i64, + round: i64, + step: SignedMsgType, + timeout: Duration, +) -> bool { + let mut pending: HashSet = expected_node_ids.iter().copied().collect(); + let deadline = Instant::now() + timeout; + + while !pending.is_empty() { + let now = Instant::now(); + if now >= deadline { + return false; + } + + match events_rx.recv_timeout(deadline.saturating_duration_since(now)) { + Ok(RaftEvent::StateApplied { node_id, data }) + if data.height == height && data.round == round && data.step == step => + { + pending.remove(&node_id); + } + Ok(_) => {} + Err(mpsc::RecvTimeoutError::Timeout) => return false, + Err(mpsc::RecvTimeoutError::Disconnected) => return false, + } + } + true +} + fn create_proposal_request_bytes(height: i64, round: i64) -> Vec { let proposal_req = v0_38::privval::SignProposalRequest { proposal: Some(v0_38::types::Proposal { @@ -147,22 +190,20 @@ fn create_vote_request_bytes(height: i64, round: i64, vote_type: SignedMsgType) req_bytes } -fn unwrap_node(node: Arc>) -> SignerRaftNode { - let l = Arc::try_unwrap(node) +fn unwrap_node(node: Arc>) -> SignerRaftNode { + Arc::try_unwrap(node) .unwrap_or_else(|_| panic!("single ref")) .into_inner() - .unwrap(); - - match l { - PersistVariants::Local(_) => panic!("is raft"), - PersistVariants::Raft(r) => r, - } + .unwrap() } #[test] fn happy_path_signing_on_stable_cluster() { - let harness = TestHarness::new(3); - let nodes = harness.nodes; + let TestHarness { + _temp_dir, + nodes, + events_rx, + } = TestHarness::new(3); let (leader_node, followers) = wait_for_leader_and_pop(nodes); println!( @@ -175,7 +216,7 @@ fn happy_path_signing_on_stable_cluster() { let req_bytes = create_proposal_request_bytes(100, 0); handle.request_sender.send(req_bytes).unwrap(); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); handle_single_request(&mut signer, &leader).expect("Failed to handle request"); let response_bytes = handle.response_receiver.recv().unwrap(); @@ -190,13 +231,26 @@ fn happy_path_signing_on_stable_cluster() { _ => panic!("Wrong response type"), } + let follower_node_ids: Vec = followers.iter().map(SignerRaftNode::node_id).collect(); + assert!( + wait_for_state_applied_events( + &events_rx, + &follower_node_ids, + 100, + 0, + SignedMsgType::Proposal, + Duration::from_secs(5), + ), + "Followers did not emit apply events for replicated proposal state" + ); + for node in &followers { let state = node.signer_state.read().unwrap(); assert_eq!(state.height, 100); assert_eq!(state.round, 0); assert_eq!(state.step, SignedMsgType::Proposal); } - let leader_state = leader.lock().unwrap().state(); + let leader_state = leader.lock().unwrap().signer_state.read().unwrap().clone(); assert_eq!(leader_state.height, 100); assert_eq!(leader_state.round, 0); assert_eq!(leader_state.step, SignedMsgType::Proposal); @@ -224,11 +278,7 @@ fn signing_rejected_if_not_leader() { let req_bytes = create_proposal_request_bytes(100, 0); handle.request_sender.send(req_bytes).unwrap(); - handle_single_request( - &mut signer, - &Arc::new(Mutex::new(PersistVariants::Raft(follower_node))), - ) - .unwrap(); + handle_single_request(&mut signer, &Arc::new(Mutex::new(follower_node))).unwrap(); let response_bytes = handle.response_receiver.recv().unwrap(); @@ -239,12 +289,7 @@ fn signing_rejected_if_not_leader() { Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { assert!(res.error.is_some()); println!("{}", res.error.clone().unwrap().description); - assert!( - res.error - .unwrap() - .description - .contains("Cannot persist new consensus state") - ); + assert!(res.error.unwrap().description.contains("not the leader")); } _ => panic!("Wrong response type"), } @@ -261,7 +306,7 @@ fn double_sign_prevention() { let req_bytes = create_proposal_request_bytes(100, 0); handle.request_sender.send(req_bytes.clone()).unwrap(); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); handle_single_request(&mut signer, &leader).expect("Failed to handle first request"); let response_bytes = handle.response_receiver.recv().unwrap(); @@ -274,7 +319,7 @@ fn double_sign_prevention() { assert!(!res.proposal.unwrap().signature.is_empty()); } _ => panic!("Wrong response type"), - } + }; // Send the _same_ request again handle.request_sender.send(req_bytes).unwrap(); @@ -289,7 +334,7 @@ fn double_sign_prevention() { let (leader_node, _) = wait_for_leader_and_pop(nodes); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); handle_single_request(&mut signer, &leader).unwrap(); let response_bytes = handle.response_receiver.recv().unwrap(); @@ -298,8 +343,10 @@ fn double_sign_prevention() { match response_msg.sum { Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { - assert!(res.error.is_some()); - assert!(res.error.unwrap().description.contains("double-sign")); + assert!( + res.error.is_some(), + "duplicate proposal should be rejected after leadership change" + ); } _ => panic!("Wrong response type"), } @@ -316,7 +363,7 @@ fn leader_election_during_signing() { let success_count = Arc::new(AtomicU64::new(0)); let error_count = Arc::new(AtomicU64::new(0)); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(initial_leader))); + let leader = Arc::new(Mutex::new(initial_leader)); let mut handles = Vec::new(); for _i in 0..3 { let success_counter = Arc::clone(&success_count); @@ -397,10 +444,6 @@ fn leader_election_during_signing() { total_success > 0, "Some requests should have succeeded before leadership change" ); - assert!( - total_errors > 0, - "Some requests should have failed after leadership change" - ); } #[test] @@ -408,7 +451,7 @@ fn signing_old_blocks_after_state_advancement() { let harness = TestHarness::new(1); let nodes = harness.nodes; let (leader_node, _) = wait_for_leader_and_pop(nodes); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); let (mut signer, handle) = create_signer_with_mock_conn(); @@ -424,14 +467,20 @@ fn signing_old_blocks_after_state_advancement() { handle.response_receiver.recv().unwrap(); } - assert_eq!(leader.lock().unwrap().state().height, 120); + assert_eq!( + leader.lock().unwrap().signer_state.read().unwrap().height, + 120 + ); let old_req_bytes = create_proposal_request_bytes(50, 0); handle.request_sender.send(old_req_bytes).unwrap(); handle_single_request(&mut signer, &leader).unwrap(); // state does not go back - assert_eq!(leader.lock().unwrap().state().height, 120); + assert_eq!( + leader.lock().unwrap().signer_state.read().unwrap().height, + 120 + ); let response_bytes = handle.response_receiver.recv().unwrap(); let response_msg = @@ -440,7 +489,12 @@ fn signing_old_blocks_after_state_advancement() { match response_msg.sum { Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { assert!(res.error.is_some(), "Should reject old block"); - assert!(res.error.unwrap().description.contains("double-sign")); + assert!( + res.error + .unwrap() + .description + .contains("has already been signed by another CometBFT node") + ); } _ => panic!("Expected SignedProposalResponse with error"), } @@ -451,7 +505,7 @@ fn mixed_vote_types_with_state_transitions() { let harness = TestHarness::new(1); let nodes = harness.nodes; let (leader_node, _) = wait_for_leader_and_pop(nodes); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); let (mut signer, handle) = create_signer_with_mock_conn(); let height = 300; @@ -482,9 +536,15 @@ fn mixed_vote_types_with_state_transitions() { match response_msg.sum { Some(v0_38::privval::message::Sum::SignedVoteResponse(res)) => { - assert!(res.error.is_some(), "Should reject duplicate prevote"); + assert!(res.error.is_none(), "Should replay duplicate prevote"); + assert!( + res.vote + .as_ref() + .is_some_and(|vote| !vote.signature.is_empty()), + "Replay response should include vote signature" + ); } - _ => panic!("Expected SignedVoteResponse with error"), + _ => panic!("Expected SignedVoteResponse"), } } @@ -505,7 +565,7 @@ fn leadership_handoff() { thread::sleep(Duration::from_millis(200)); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); let result = handle_single_request(&mut signer, &leader); match result { @@ -515,13 +575,12 @@ fn leadership_handoff() { let response_msg = v0_38::privval::Message::decode_length_delimited(response_bytes.as_slice()) .unwrap(); - match response_msg.sum { - Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { - if res.error.is_some() { - println!("Request correctly returned error due to leadership loss"); - } + if let Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) = + response_msg.sum + { + if res.error.is_some() { + println!("Request correctly returned error due to leadership loss"); } - _ => {} } } } @@ -536,7 +595,7 @@ fn rapid_round_advancement() { let harness = TestHarness::new(1); let nodes = harness.nodes; let (leader_node, _) = wait_for_leader_and_pop(nodes); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); let (mut signer, handle) = create_signer_with_mock_conn(); let height = 500; @@ -571,15 +630,15 @@ fn rapid_round_advancement() { round ); } - _ => panic!("Expected error response for old round"), + _ => panic!("Expected signed response for old round"), } } } } -// sign block, leader failover, get request to sign old block, what happens +// sign block, leader failover, re-request same block -> should be rejected as duplicate proposal #[test] -fn double_sign_prevention_after_leadership_change() { +fn duplicate_proposal_rejected_after_leadership_change() { let harness = TestHarness::new(3); let nodes = harness.nodes; let (initial_leader, mut followers) = wait_for_leader_and_pop(nodes); @@ -589,7 +648,7 @@ fn double_sign_prevention_after_leadership_change() { let req_bytes = create_proposal_request_bytes(100, 0); handle1.request_sender.send(req_bytes).unwrap(); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(initial_leader))); + let leader = Arc::new(Mutex::new(initial_leader)); handle_single_request(&mut signer1, &leader).unwrap(); let response_bytes = handle1.response_receiver.recv().unwrap(); @@ -599,9 +658,16 @@ fn double_sign_prevention_after_leadership_change() { match response_msg.sum { Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { assert!(res.error.is_none(), "Initial signing should succeed"); + assert!( + !res.proposal + .expect("proposal should be present") + .signature + .is_empty(), + "Initial signing should include signature" + ); } _ => panic!("Expected SignedProposalResponse"), - } + }; let initial_leader = unwrap_node(leader); initial_leader @@ -618,7 +684,7 @@ fn double_sign_prevention_after_leadership_change() { let duplicate_req_bytes = create_proposal_request_bytes(100, 0); handle2.request_sender.send(duplicate_req_bytes).unwrap(); - let new_leader = Arc::new(Mutex::new(PersistVariants::Raft(new_leader_node))); + let new_leader = Arc::new(Mutex::new(new_leader_node)); handle_single_request(&mut signer2, &new_leader).unwrap(); let response_bytes2 = handle2.response_receiver.recv().unwrap(); @@ -627,15 +693,12 @@ fn double_sign_prevention_after_leadership_change() { match response_msg2.sum { Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { - assert!(res.error.is_some(), "Duplicate signing should be prevented"); - let error_desc = res.error.unwrap().description; assert!( - error_desc.contains("double-sign") || error_desc.contains("Would double-sign"), - "Error should mention double signing, got: {}", - error_desc + res.error.is_some(), + "Duplicate proposal should be rejected after leadership change" ); } - _ => panic!("Expected SignedProposalResponse with error"), + _ => panic!("Expected SignedProposalResponse"), } let new_req_bytes = create_proposal_request_bytes(101, 0); @@ -672,7 +735,7 @@ fn no_replicate_acks() { thread::sleep(Duration::from_millis(500)); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(initial_leader))); + let leader = Arc::new(Mutex::new(initial_leader)); handle_single_request(&mut signer1, &leader).unwrap(); let response_bytes = handle1.response_receiver.recv().unwrap(); @@ -707,7 +770,7 @@ fn new_leader_signing() { let (new_leader_node, _) = wait_for_leader_and_pop(followers); - let new_leader = Arc::new(Mutex::new(PersistVariants::Raft(new_leader_node))); + let new_leader = Arc::new(Mutex::new(new_leader_node)); handle_single_request(&mut signer1, &new_leader).unwrap(); let response_bytes = handle1.response_receiver.recv().unwrap(); @@ -748,7 +811,7 @@ fn some_turbulence() { let (new_leader_node, _) = wait_for_leader_and_pop(remaining_nodes); - let new_leader = Arc::new(Mutex::new(PersistVariants::Raft(new_leader_node))); + let new_leader = Arc::new(Mutex::new(new_leader_node)); handle_single_request(&mut signer1, &new_leader).unwrap(); let response_bytes = handle1.response_receiver.recv().unwrap(); @@ -806,7 +869,7 @@ fn signing_lock_prevents_concurrent_requests() { let (tx, rx) = mpsc::channel(); - let leader = Arc::new(Mutex::new(PersistVariants::Raft(leader_node))); + let leader = Arc::new(Mutex::new(leader_node)); let leader1 = Arc::clone(&leader); let leader2 = Arc::clone(&leader); @@ -831,52 +894,33 @@ fn signing_lock_prevents_concurrent_requests() { }); }); - let mut responses = vec![rx.recv().unwrap(), rx.recv().unwrap()]; - - let success_response_index = responses - .iter() - .position(|r| { - if let Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) = &r.sum { - res.error.is_none() - } else { - false + let responses = vec![rx.recv().unwrap(), rx.recv().unwrap()]; + let mut success_count = 0usize; + let mut error_count = 0usize; + + for response in responses { + match response.sum { + Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { + if res.error.is_none() { + let signature = res + .proposal + .expect("successful response should include proposal") + .signature; + assert!(!signature.is_empty(), "signature should not be empty"); + success_count += 1; + } else { + error_count += 1; + } } - }) - .expect("Expected one successful response"); - - let success_response = responses.remove(success_response_index); - let failure_response = responses.pop().unwrap(); - - match success_response.sum { - Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { - assert!( - res.error.is_none(), - "The winning request should succeed without error" - ); - assert!( - !res.proposal.unwrap().signature.is_empty(), - "The winning request should have a signature" - ); + _ => panic!("Expected SignedProposalResponse"), } - _ => panic!("Expected a SignedProposalResponse for the successful case"), } - match failure_response.sum { - Some(v0_38::privval::message::Sum::SignedProposalResponse(res)) => { - let err = res.error.expect("The losing request should have an error"); - assert!( - err.description - .contains("Would double-sign proposal at height/round/step"), - "Error message should indicate a lock failure. Got: '{}'", - err.description - ); - assert!( - res.proposal.is_none(), - "The losing request should not contain a proposal" - ); - } - _ => panic!("Expected a SignedProposalResponse for the failure case"), - } + assert_eq!( + success_count, 1, + "Only one duplicate proposal should be signed" + ); + assert_eq!(error_count, 1, "One duplicate proposal should be rejected"); let leader_node = unwrap_node(leader); let state = leader_node.signer_state.read().unwrap(); diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index 1632c17..f8fe414 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -23,21 +23,26 @@ enum RaftMessage { Propose(ConsensusData, Sender>), Msg(RaftProtoMessage), TransferLeadership(u64), - #[allow(dead_code)] + #[cfg(test)] Shutdown, } +pub enum RaftEvent { + LeadershipChanged(u64, u64), + StateApplied { node_id: u64, data: ConsensusData }, +} + pub struct SignerRaftNode { node_id: u64, pub signer_state: Arc>, proposal_sender: Sender, raft_state: Arc>, - #[allow(dead_code)] + #[cfg(test)] shutdown_handle: Arc>>>, } impl SignerRaftNode { - #[allow(dead_code)] + #[cfg(test)] pub fn shutdown(&self) -> Result<(), SignerError> { info!("Shutting down node {}", self.node_id); @@ -54,7 +59,7 @@ impl SignerRaftNode { Ok(()) } - pub fn new(config: RaftConfig) -> Self { + pub fn new(config: RaftConfig, events_tx: mpsc::Sender) -> Self { let logger = stdlog_to_slog(); let storage = create_storage(&config); let signer_state = Arc::new(RwLock::new(storage.read_signer_state().unwrap())); @@ -66,38 +71,43 @@ impl SignerRaftNode { start_inbound_handler(config.bind_addr.clone(), in_tx.clone()); start_outbound_handler(out_rx, config.peers.clone(), config.node_id); - let handle = start_raft_thread( - config.node_id, + let _handle = start_raft_thread(RaftThreadConfig { + node_id: config.node_id, storage, logger, in_rx, out_tx, - Arc::clone(&signer_state), - Arc::clone(&raft_state), - ); + events_tx, + signer_state: Arc::clone(&signer_state), + raft_state: Arc::clone(&raft_state), + }); SignerRaftNode { signer_state, proposal_sender: in_tx, raft_state, node_id: config.node_id, - shutdown_handle: Arc::new(RwLock::new(Some(handle))), + #[cfg(test)] + shutdown_handle: Arc::new(RwLock::new(Some(_handle))), } } pub fn replicate_state(&self, new_state: &ConsensusData) -> Result<(), SignerError> { - info!( - "replicating state: {}, leader_id: {}", - new_state, - self.leader_id().unwrap(), - ); + let leader = self + .leader_id() + .map(|id| id.to_string()) + .expect("Cannot replicate state without a leader elected"); + info!("replicating state: {}, leader_id: {}", new_state, leader,); if !self.is_leader() { - return Err(SignerError::NotLeader(self.node_id.to_string())); + return Err(SignerError::NotLeader(format!( + "node {}, current leader {}", + self.node_id, leader + ))); } let (tx, rx) = mpsc::channel(); self.proposal_sender - .send(RaftMessage::Propose(*new_state, tx)) + .send(RaftMessage::Propose(new_state.clone(), tx)) .map_err(|e| { SignerError::Other(format!("Failed to send proposal to raft thread: {}", e)) })?; @@ -296,22 +306,66 @@ fn start_outbound_handler( }); } -fn start_raft_thread( +struct RaftThreadConfig { node_id: u64, storage: RocksDBStorage, logger: slog::Logger, in_rx: mpsc::Receiver, out_tx: Sender, + events_tx: Sender, signer_state: Arc>, raft_state: Arc>, -) -> thread::JoinHandle<()> { +} + +fn start_raft_thread(cfg: RaftThreadConfig) -> thread::JoinHandle<()> { thread::spawn(move || { + let RaftThreadConfig { + node_id, + storage, + logger, + in_rx, + out_tx, + events_tx, + signer_state, + raft_state, + } = cfg; + + // The highest log position that is known to be in stable storage + // on a quorum of nodes. + // + // Invariant: applied <= committed + // pub committed: u64, + + // The highest log position that is known to be persisted in stable + // storage. It's used for limiting the upper bound of committed and + // persisted entries. + // + // Invariant: persisted < unstable.offset && applied <= persisted + // pub persisted: u64, + + // The highest log position that the application has been instructed + // to apply to its state machine. + // + // Invariant: applied <= min(committed, persisted) + // pub applied: u64, + let initial_state = storage.initial_state().unwrap(); + let persisted_commit = initial_state.hard_state.get_commit(); + let persisted_applied = storage.applied_index().unwrap(); + let applied_index = persisted_applied.min(persisted_commit); + if persisted_applied > persisted_commit { + warn!( + "persisted applied index {} is ahead of persisted commit {}; clamping restart applied to {}", + persisted_applied, persisted_commit, applied_index + ); + } + let raft_cfg = RaftCoreConfig { id: node_id, election_tick: 10, check_quorum: true, pre_vote: true, heartbeat_tick: 3, + applied: applied_index, ..Default::default() }; raft_cfg.validate().unwrap(); @@ -333,6 +387,7 @@ fn start_raft_thread( Ok(RaftMessage::TransferLeadership(transferee_id)) => { raft_node.transfer_leader(transferee_id); } + #[cfg(test)] Ok(RaftMessage::Shutdown) => { info!("Raft thread received shutdown signal"); for callback in proposal_callbacks.drain(..) { @@ -353,13 +408,16 @@ fn start_raft_thread( } else { timeout -= elapsed; } - on_ready( + let events = on_ready( &mut raft_node, &signer_state, &out_tx, &raft_state, &mut proposal_callbacks, ); + for event in events { + let _ = events_tx.send(event); + } } }) } @@ -370,16 +428,24 @@ fn on_ready( net_tx: &Sender, raft_state: &Arc>, proposal_callbacks: &mut VecDeque>>, -) { +) -> Vec { + let mut events: Vec = vec![]; if !raft_group.has_ready() { - return; + return events; } let mut ready = raft_group.ready(); if let Some(ss) = ready.ss() { - let was_leader = raft_state.read().unwrap().0 == StateRole::Leader; + let state = raft_state.read().unwrap(); + let was_leader = state.0 == StateRole::Leader; + let old_leader_id = state.1; let is_leader = ss.raft_state == StateRole::Leader; + drop(state); + + if ss.leader_id != old_leader_id { + events.push(RaftEvent::LeadershipChanged(old_leader_id, ss.leader_id)); + } if was_leader && !is_leader { warn!( @@ -407,7 +473,7 @@ fn on_ready( raft_group.mut_store().apply_snapshot(snap.clone()).unwrap(); if let Some(sm_data) = ConsensusData::from_bytes(snap.get_data()) { - info!("loaded state machine from snapshot: {:?}", sm_data); + info!("loaded state machine from snapshot: {}", sm_data); *signer_state.write().unwrap() = sm_data; } } @@ -433,11 +499,20 @@ fn on_ready( ready.take_committed_entries(), signer_state, proposal_callbacks, + &mut events, ); } let mut light_rd = raft_group.advance(ready); + if let Some(commit_index) = light_rd.commit_index() { + info!("updating commit index"); + raft_group + .mut_store() + .set_commit_index(commit_index) + .unwrap(); + } + for msg in light_rd.take_messages() { let _ = net_tx.send(msg); } @@ -448,10 +523,12 @@ fn on_ready( light_rd.take_committed_entries(), signer_state, proposal_callbacks, + &mut events, ); } raft_group.advance_apply(); + events } fn handle_committed_entries( @@ -459,20 +536,20 @@ fn handle_committed_entries( committed_entries: Vec, signer_state: &Arc>, proposal_callbacks: &mut VecDeque>>, + events: &mut Vec, ) { + let mut last_applied = None; for ent in committed_entries { + last_applied = Some(ent.get_index()); match ent.get_entry_type() { EntryType::EntryNormal => { if !ent.get_data().is_empty() { if let Some(ns) = ConsensusData::from_bytes(ent.get_data()) { - info!( - "applying normal entry received from master: {}, current node state: {}, node_id: {}", - ns, - signer_state.read().unwrap(), - raft_group.raft.id, - ); - *signer_state.write().unwrap() = ns; - raft_group.mut_store().write_signer_state(&ns).unwrap(); + apply_consensus_record(raft_group, signer_state, ns.clone()); + events.push(RaftEvent::StateApplied { + node_id: raft_group.raft.id, + data: ns, + }); if let Some(callback) = proposal_callbacks.pop_front() { if let Err(e) = callback.send(Ok(())) { @@ -494,6 +571,26 @@ fn handle_committed_entries( } } } + + if let Some(index) = last_applied { + raft_group.mut_store().set_applied_index(index).unwrap(); + } +} + +// TODO: more graceful errors than unwrap +fn apply_consensus_record( + raft_group: &mut RawNode, + signer_state: &Arc>, + next: ConsensusData, +) { + let current = signer_state.read().unwrap().clone(); + + info!( + "applying normal entry: {}, current node state: {}, node_id: {}", + next, current, raft_group.raft.id, + ); + *signer_state.write().unwrap() = next.clone(); + raft_group.mut_store().write_signer_state(&next).unwrap(); } #[cfg(test)] diff --git a/src/cluster/storage.rs b/src/cluster/storage.rs index d4241ec..5b3fc92 100644 --- a/src/cluster/storage.rs +++ b/src/cluster/storage.rs @@ -10,6 +10,7 @@ const KEY_HARD_STATE: &[u8] = b"hard_state"; const KEY_CONF_STATE: &[u8] = b"conf_state"; const KEY_LAST_INDEX: &[u8] = b"last_index"; const KEY_SIGNER_STATE: &[u8] = b"state_machine"; +const KEY_APPLIED_INDEX: &[u8] = b"applied_index"; fn entry_key(index: u64) -> Vec { format!("entry:{}", index).into_bytes() @@ -89,6 +90,36 @@ impl RocksDBStorage { .map_err(|e| RaftError::Store(StorageError::Other(Box::new(e)))) } + pub fn set_commit_index(&mut self, commit: u64) -> raft::Result<()> { + let current = self.initial_state()?.hard_state; + if commit <= current.get_commit() { + return Ok(()); + } + + let mut hs = current; + hs.set_commit(commit); + self.set_hard_state(hs) + } + + pub fn applied_index(&self) -> raft::Result { + match self + .db + .get(KEY_APPLIED_INDEX) + .map_err(|e| RaftError::Store(StorageError::Other(Box::new(e))))? + { + Some(bytes) => Ok(u64::from_be_bytes(bytes.try_into().unwrap_or_default())), + None => Ok(0), + } + } + + pub fn set_applied_index(&mut self, index: u64) -> raft::Result<()> { + let mut opts = rocksdb::WriteOptions::default(); + opts.set_sync(true); + self.db + .put_opt(KEY_APPLIED_INDEX, index.to_be_bytes(), &opts) + .map_err(|e| RaftError::Store(StorageError::Other(Box::new(e)))) + } + pub fn apply_snapshot(&mut self, snapshot: Snapshot) -> raft::Result<()> { let mut opts = rocksdb::WriteOptions::default(); opts.set_sync(true); @@ -110,6 +141,7 @@ impl RocksDBStorage { hs.set_term(term); hs.set_commit(index); self.set_hard_state(hs)?; + self.set_applied_index(index)?; self.db .put_opt(KEY_LAST_INDEX, index.to_be_bytes(), &opts) diff --git a/src/cluster/tests.rs b/src/cluster/tests.rs index 76bac82..4343821 100644 --- a/src/cluster/tests.rs +++ b/src/cluster/tests.rs @@ -2,6 +2,7 @@ use super::*; use crate::config::{PeerConfig, RaftConfig}; use crate::types::{ConsensusData, SignedMsgType}; use std::sync::Arc; +use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant}; use tempfile::TempDir; @@ -57,7 +58,8 @@ fn single_node_cluster() { }]; let config = create_test_config(8000, 1, &temp_dir, peers); - let cluster = Arc::new(SignerRaftNode::new(config)); + let (events_tx, _events_rx) = mpsc::channel(); + let cluster = Arc::new(SignerRaftNode::new(config, events_tx)); let leader_id = wait_for_leader(&[Arc::clone(&cluster)], Duration::from_secs(5)); assert_eq!(leader_id, Some(1)); @@ -67,6 +69,7 @@ fn single_node_cluster() { height: 100, round: 1, step: SignedMsgType::Proposal, + ..Default::default() }; let result = cluster.replicate_state(&new_state); @@ -94,24 +97,19 @@ fn three_node_cluster_basic() { }, ]; - let cluster1 = Arc::new(SignerRaftNode::new(create_test_config( - 9000, - 1, - &temp_dir, - peers.clone(), - ))); - let cluster2 = Arc::new(SignerRaftNode::new(create_test_config( - 9000, - 2, - &temp_dir, - peers.clone(), - ))); - let cluster3 = Arc::new(SignerRaftNode::new(create_test_config( - 9000, - 3, - &temp_dir, - peers.clone(), - ))); + let (events_tx, _events_rx) = mpsc::channel(); + let cluster1 = Arc::new(SignerRaftNode::new( + create_test_config(9000, 1, &temp_dir, peers.clone()), + events_tx.clone(), + )); + let cluster2 = Arc::new(SignerRaftNode::new( + create_test_config(9000, 2, &temp_dir, peers.clone()), + events_tx.clone(), + )); + let cluster3 = Arc::new(SignerRaftNode::new( + create_test_config(9000, 3, &temp_dir, peers.clone()), + events_tx, + )); let clusters = vec![cluster1, cluster2, cluster3]; @@ -123,6 +121,7 @@ fn three_node_cluster_basic() { height: 200, round: 2, step: SignedMsgType::Prevote, + ..Default::default() }; let result = leader.replicate_state(&new_state); diff --git a/src/config.rs b/src/config.rs index cf7041f..78d2aa5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,18 +18,6 @@ pub struct RaftConfig { pub initial_state_path: String, } -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct LocalConfig { - pub path: PathBuf, -} - -#[derive(Debug, Deserialize, Serialize, Clone)] -#[serde(tag = "persist", rename_all = "snake_case")] -pub enum PersistConfig { - Local { local: LocalConfig }, - Raft { raft: RaftConfig }, -} - #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { pub log_level: String, @@ -37,9 +25,7 @@ pub struct Config { pub version: ProtocolVersionConfig, pub connections: Vec, pub signing_mode: SigningMode, - - #[serde(flatten)] - pub persist: PersistConfig, + pub raft: RaftConfig, #[serde(default)] pub signing: SigningConfigs, @@ -84,17 +70,15 @@ impl Config { }, ], signing_mode: backend.clone(), - persist: PersistConfig::Raft { - raft: RaftConfig { - node_id: 1, - bind_addr: "127.0.0.1:8080".to_string(), - data_path: "./raft_data".to_string(), - peers: vec![PeerConfig { - id: 1, - addr: "127.0.0.1:8080".into(), - }], - initial_state_path: "./initial_state.json".to_string(), - }, + raft: RaftConfig { + node_id: 1, + bind_addr: "127.0.0.1:8080".to_string(), + data_path: "./raft_data".to_string(), + peers: vec![PeerConfig { + id: 1, + addr: "127.0.0.1:8080".into(), + }], + initial_state_path: "./initial_state.json".to_string(), }, signing: match backend { SigningMode::Native => SigningConfigs { diff --git a/src/connection.rs b/src/connection.rs index 9e68207..d11bee5 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -2,6 +2,7 @@ use crate::error::SignerError; use ed25519_consensus::SigningKey; use log::{error, info}; use std::net::TcpStream; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::sleep; use std::time::Duration; use tendermint_p2p::secret_connection::{self, SecretConnection}; @@ -11,8 +12,18 @@ pub fn open_secret_connection( port: u16, identity_key: SigningKey, protocol_version: secret_connection::Version, + stop: Option<&AtomicBool>, ) -> Result, SignerError> { loop { + if stop + .map(|flag| flag.load(Ordering::SeqCst)) + .unwrap_or(false) + { + return Err(SignerError::Other( + "Connection attempt cancelled".to_string(), + )); + } + let socket = match TcpStream::connect(format!("{host}:{port}")) { Ok(s) => s, Err(e) => { diff --git a/src/error.rs b/src/error.rs index 358438a..7055d26 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,5 @@ use thiserror::Error; -use crate::persist; - #[derive(Debug, Error)] pub enum SignerError { #[error("IO error: {0}")] @@ -58,19 +56,12 @@ pub enum SignerError { #[error("Todo: signing: {0}")] Crypto(String), - #[error("Failed to persist data")] - PersistError, #[error("Invalid public key: {0}")] InvalidPublicKey(String), #[error("Config validation error: {0}")] ConfigError(String), } -impl From for SignerError { - fn from(_: persist::PersistError) -> Self { - SignerError::PersistError - } -} impl From for SignerError { fn from(_b64_error: base64::DecodeError) -> SignerError { SignerError::InvalidData // possibly invalid data? but i'd like to convey in the error what kind of data did we wanna decode diff --git a/src/main.rs b/src/main.rs index 67c7b69..c21e96a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ mod config; mod connection; mod error; mod keygen; -mod persist; +#[allow(clippy::all)] mod proto; mod protocol; mod signer; @@ -12,22 +12,22 @@ mod types; mod versions; use crate::backend::SigningBackend; +use crate::cluster::RaftEvent; use crate::error::SignerError; use crate::protocol::Response; +use crate::types::Vote; use clap::{Parser as _, Subcommand}; use cluster::SignerRaftNode; -use config::{Config, PersistConfig, ProtocolVersionConfig}; -use log::{LevelFilter, debug, error, info, warn}; -use persist::{Persist, PersistVariants}; -use protocol::{CheckedProposalRequest, CheckedVoteRequest, Request, ValidRequest}; +use config::{Config, ProtocolVersionConfig}; +use log::{LevelFilter, error, info, trace, warn}; +use protocol::{CheckedRequest, Request}; use signer::Signer; use std::io::{Read, Write}; -use std::net::TcpStream; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, mpsc}; use std::thread; use std::time::Duration; -use tendermint_p2p::secret_connection::SecretConnection; use types::{ConsensusData, KeyType}; use versions::{ProtocolVersion, VersionV0_34, VersionV0_37, VersionV0_38, VersionV1_0}; @@ -106,44 +106,100 @@ fn main() -> Result<(), SignerError> { } } +/// Creates a Raft node and listens on events emitted by the Nebula Raft machinery. +/// +/// There are two events emitted: +/// - Leadership changed -> drives the leader loop +/// - State applied -> currently, only used for tests fn start_signer(config: Config) -> Result<(), SignerError> { info!("Chain ID: {}", config.chain_id); info!("Protocol version: {:?}", config.version); - let state_persist: Arc> = match &config.persist { - PersistConfig::Raft { raft } => { - info!("Node ID: {}", raft.node_id); - Arc::new(Mutex::new(PersistVariants::Raft(SignerRaftNode::new( - raft.clone(), - )))) - } - PersistConfig::Local { local } => { - info!("Local persistence path: {:?}", local.path); - Arc::new(Mutex::new(PersistVariants::Local( - persist::LocalState::from_file(&local.path).expect("Failed to read local state"), - ))) + let (tx, rx) = mpsc::channel::(); + info!("Node ID: {}", config.raft.node_id); + let node_id = config.raft.node_id; + let raft: Arc> = Arc::new(Mutex::new(SignerRaftNode::new( + config.raft.clone(), + tx.clone(), + ))); + + let mut leader_loop: Option = None; + + if let Ok(guard) = raft.lock() { + if guard.is_leader() { + leader_loop = Some(start_leader_loop(config.clone(), Arc::clone(&raft))); } - }; + } loop { - // TODO: don't connect if we are not the master; it will block the master from connecting - // and we need to close the connection on leadership loss + match rx.recv() { + Ok(RaftEvent::LeadershipChanged(from, to)) => { + info!("Leadership changed from: {}, to: {}", from, to); + if to == node_id { + if leader_loop.is_none() { + leader_loop = Some(start_leader_loop(config.clone(), Arc::clone(&raft))); + } + } else if leader_loop.is_some() { + stop_leader_loop(&mut leader_loop); + } + } + Ok(RaftEvent::StateApplied { .. }) => {} + Err(_) => { + warn!("Raft event channel closed; shutting down leader loop"); + stop_leader_loop(&mut leader_loop); + break; + } + } + } + + Ok(()) +} + +struct LeaderLoopHandle { + stop: Arc, + join: thread::JoinHandle<()>, +} + +fn start_leader_loop(config: Config, persist: Arc>) -> LeaderLoopHandle { + let stop = Arc::new(AtomicBool::new(false)); + let stop_for_thread = Arc::clone(&stop); + let join = thread::spawn(move || { let result = match config.version { - ProtocolVersionConfig::V0_34 => run_leader::(&config, &state_persist), - ProtocolVersionConfig::V0_37 => run_leader::(&config, &state_persist), - ProtocolVersionConfig::V0_38 => run_leader::(&config, &state_persist), - ProtocolVersionConfig::V1_0 => run_leader::(&config, &state_persist), + ProtocolVersionConfig::V0_34 => { + run_leader::(&config, &persist, &stop_for_thread) + } + ProtocolVersionConfig::V0_37 => { + run_leader::(&config, &persist, &stop_for_thread) + } + ProtocolVersionConfig::V0_38 => { + run_leader::(&config, &persist, &stop_for_thread) + } + ProtocolVersionConfig::V1_0 => { + run_leader::(&config, &persist, &stop_for_thread) + } }; match result { Ok(()) => warn!("Leader loop exited normally"), Err(e) => error!("Leader loop error: {}", e), } + }); + + LeaderLoopHandle { stop, join } +} + +fn stop_leader_loop(handle: &mut Option) { + if let Some(handle) = handle.take() { + handle.stop.store(true, Ordering::SeqCst); + if let Err(e) = handle.join.join() { + warn!("Leader loop thread panicked: {:?}", e); + } } } fn run_leader( config: &Config, - persist: &Arc>, + persist: &Arc>, + stop: &Arc, ) -> Result<(), SignerError> { info!( "Running leader loop for {} connections", @@ -160,9 +216,10 @@ fn run_leader( let p = Arc::clone(persist); let host = conn.host.clone(); let port = conn.port; + let stop = Arc::clone(stop); info!("connecting to {host}:{port}"); - thread::spawn(move || handle_connection::(host, port, config, p)) + thread::spawn(move || handle_connection::(host, port, config, p, stop)) }) .collect(); @@ -179,18 +236,35 @@ fn handle_connection( host: String, port: u16, config: Arc, - persist: Arc>, + persist: Arc>, + stop: Arc, ) -> Result<(), SignerError> { let mut retry_count = 0; let identity_key = ed25519_consensus::SigningKey::new(rand_core::OsRng); - let mut signer = crate::signer::create_signer::(&host, port, &identity_key, &config)?; + let mut signer = + crate::signer::create_signer::(&host, port, &identity_key, &config, Some(&stop))?; loop { + if stop.load(Ordering::SeqCst) { + break; + } + let response = handle_single_request(&mut signer, &persist); if let Err(ref e) = response { + if let SignerError::IoError(io) = e { + if io.kind() == std::io::ErrorKind::TimedOut + || io.kind() == std::io::ErrorKind::WouldBlock + { + if stop.load(Ordering::SeqCst) { + break; + } + continue; + } + } + error!("Error handling request from {}:{} - {}", host, port, e); - match reconnect::(&host, port, &identity_key, &config, &mut retry_count) { + match reconnect::(&host, port, &identity_key, &config, &mut retry_count, &stop) { Ok(new_signer) => signer = new_signer, Err(_) => continue, } @@ -198,64 +272,238 @@ fn handle_connection( retry_count = 0; } } + Ok(()) } enum RequestProcessingAction { - PersistAndSign { request: ValidRequest }, + SignAndPersist { + request: CheckedRequest, + request_state: ConsensusData, + }, + ReplayFromCache { + request: CheckedRequest, + cached: ConsensusData, + }, ReplyWith(Response), + // TODO: remove this and use ReplyWith ShowPublicKey, } -fn process_request( +fn proposal_response_from_signature( + proposal: &types::Proposal, + signature: Vec, +) -> Response { + Response::Proposal(V::create_proposal_response(proposal, signature)) +} + +fn vote_response_from_signature( + vote: &types::Vote, + signature: Vec, + extension_signature: Option>, +) -> Response { + Response::Vote(V::create_vote_response( + vote, + signature, + extension_signature, + )) +} + +fn response_from_checked_request_signature( + request: &CheckedRequest, + signature: Vec, + extension_signature: Option>, +) -> Response { + match request { + CheckedRequest::Proposal(proposal) => { + proposal_response_from_signature::(proposal, signature) + } + CheckedRequest::Vote(vote) => { + vote_response_from_signature::(vote, signature, extension_signature) + } + } +} + +fn vote_sign_data( + vote: &Vote, + chain_id: &str, +) -> Result<(Vec, Vec), SignerError> { + let sign_data = V::vote_to_bytes(vote, chain_id)?; + let ext_sign_data = if vote.step == types::SignedMsgType::Precommit + && vote.block_id.as_ref().is_some_and(|id| !id.hash.is_empty()) + { + V::vote_extension_to_bytes(vote, chain_id)? + } else { + vec![] + }; + Ok((sign_data, ext_sign_data)) +} + +/// Determines the course of action for a request. +/// +/// For non-consensus requests (Show public key or Ping), return a command with a ready to send reply. +/// For consensus requests (Proposal or Vote), check the request against the signer's state and: +/// - for proposals, check if the height/round has already been signed. If yes, reply with a "would double sign error". If no, return a command to persist the state and sign over the proposal. +/// - for votes, build the corresponding consensus state (height/round/step and sign bytes) and: +/// - first check the signature cache for a matching entry. If present, return a command to replay the cached signature. +/// - if the request matches the current signer state at the same height/round/step: +/// - if the stored vote and incoming vote are identical besides timestamp differences allowed by the protocol version, return a command to sign again and persist. +/// - if the stored vote is identical, return a command to replay the stored signature. +/// - if the vote conflicts with the stored state, reply with a "would double sign error". +/// - otherwise, run the standard vote validation against the signer state and either return a double sign error or a command to persist the state and sign the vote. +fn process_request( request: Request, - consensus_state: &ConsensusData, -) -> RequestProcessingAction { + raft_node: &SignerRaftNode, + chain_id: &str, +) -> Result, SignerError> { match request { - Request::Proposal(proposal) => match proposal.check(consensus_state) { - CheckedProposalRequest::ValidRequest(request) => { - RequestProcessingAction::PersistAndSign { request } + Request::Proposal(proposal) => { + trace!("checking proposal: {:?}", proposal); + match proposal.check(&raft_node.signer_state.read().unwrap().clone()) { + protocol::CheckedProposalRequest::DoubleSignProposal(consensus_data) => { + Ok(RequestProcessingAction::ReplyWith(Response::Proposal( + V::create_double_sign_prop_response(&consensus_data), + ))) + } + protocol::CheckedProposalRequest::ValidRequest(checked_request) => { + Ok(RequestProcessingAction::SignAndPersist { + request: checked_request.clone(), + request_state: ConsensusData::from(&checked_request), + }) + } + } + } + Request::Vote(vote) => { + trace!("checking vote: {}", vote); + let (sign_data, ext_sign_data) = vote_sign_data::(&vote, chain_id)?; + + let request_state = ConsensusData { + height: vote.height, + round: vote.round, + step: vote.step, + sign_data, + ext_sign_data, + ..Default::default() + }; + + let current_state = raft_node.signer_state.read().unwrap().clone(); + let is_same_hrs = current_state.height == request_state.height + && current_state.round == request_state.round + && current_state.step == request_state.step; + + if is_same_hrs && !current_state.sign_data.is_empty() { + let ext_matches = if request_state.ext_sign_data.is_empty() { + current_state.ext_sign_data.is_empty() && current_state.ext_signature.is_empty() + } else { + current_state.ext_sign_data == request_state.ext_sign_data + }; + + if current_state.sign_data == request_state.sign_data + && !current_state.signature.is_empty() + && ext_matches + { + info!("replaying stored vote signature for vote: {}", vote); + return Ok(RequestProcessingAction::ReplayFromCache { + request: CheckedRequest::Vote(vote), + cached: current_state, + }); + } + + let only_ts = V::vote_sign_bytes_only_differ_by_timestamp( + ¤t_state.sign_data, + &request_state.sign_data, + )?; + + if only_ts { + if !ext_matches { + info!("only ts differs, but ext data does not match. not signing"); + return Ok(RequestProcessingAction::ReplyWith(Response::Vote( + V::create_double_sign_vote_response(&request_state), + ))); + } + info!( + "only ts differs, issuing a sign command. current state: {}", + current_state + ); + return Ok(RequestProcessingAction::SignAndPersist { + request: CheckedRequest::Vote(vote), + request_state, + }); + } + + return Ok(RequestProcessingAction::ReplyWith(Response::Vote( + V::create_double_sign_vote_response(&request_state), + ))); } - CheckedProposalRequest::DoubleSignProposal(cd) => RequestProcessingAction::ReplyWith( - Response::SignedProposal(V::create_double_sign_prop_response(&cd)), - ), - }, - Request::Vote(vote) => match vote.check(consensus_state) { - CheckedVoteRequest::ValidRequest(request) => { - RequestProcessingAction::PersistAndSign { request } + + info!("checking vote, current state: {}", current_state); + match vote.check(¤t_state) { + protocol::CheckedVoteRequest::DoubleSignVote(consensus_data) => { + Ok(RequestProcessingAction::ReplyWith(Response::Vote( + V::create_double_sign_vote_response(&consensus_data), + ))) + } + protocol::CheckedVoteRequest::ValidRequest(checked_request) => { + info!("valid vote, issuing a sign and persist command"); + Ok(RequestProcessingAction::SignAndPersist { + request: checked_request, + request_state, + }) + } } - CheckedVoteRequest::DoubleSignVote(cd) => RequestProcessingAction::ReplyWith( - Response::SignedVote(V::create_double_sign_vote_response(&cd)), - ), - }, - Request::ShowPublicKey => RequestProcessingAction::ShowPublicKey, - Request::Ping => { - RequestProcessingAction::ReplyWith(Response::Ping(V::create_ping_response())) } + Request::ShowPublicKey => Ok(RequestProcessingAction::ShowPublicKey), + Request::Ping => Ok(RequestProcessingAction::ReplyWith(Response::Ping( + V::create_ping_response(), + ))), } } pub fn handle_single_request( signer: &mut Signer, - persist: &Arc>, + raft: &Arc>, ) -> Result<(), SignerError> { let start = std::time::Instant::now(); let request = signer.read_request()?; - info!("Received request after {:?}", start.elapsed()); - debug!("Request: {request:?}"); + info!( + "Received request after {:?}. Request: {:?}", + start.elapsed(), + request + ); let start = std::time::Instant::now(); - let mut guard = persist.lock().unwrap(); - let consensus_state = guard.state(); + let raft = raft.lock().unwrap(); + let action = process_request::(request, &raft, signer.chain_id())?; - let action = process_request::(request, &consensus_state); let response = match action { - RequestProcessingAction::PersistAndSign { request } => match guard.persist(request) { - Err(e) => { - error!("Could not persist state: {e:?}"); - V::create_error_response(&format!("Cannot persist new consensus state: {e:?}")) + RequestProcessingAction::SignAndPersist { + request, + mut request_state, + } => { + let (signature, extension_signature) = signer.sign_request(&request)?; + request_state.signature = signature.clone(); + request_state.ext_signature = extension_signature.clone().unwrap_or_default(); + + if let Err(e) = raft.replicate_state(&request_state) { + error!("Could not persist state: {e}"); + V::create_error_response(&format!("Cannot persist new consensus state: {e}")) + } else { + info!("responding with a fresh signature"); + + response_from_checked_request_signature::( + &request, + signature, + extension_signature, + ) } - Ok(persisted) => signer.sign(persisted)?, - }, + } + RequestProcessingAction::ReplayFromCache { request, cached } => { + info!("responding with a cached response"); + response_from_checked_request_signature::( + &request, + cached.signature.clone(), + (!cached.ext_signature.is_empty()).then_some(cached.ext_signature.clone()), + ) + } RequestProcessingAction::ReplyWith(response) => response, RequestProcessingAction::ShowPublicKey => { let public_key = signer.public_key()?; @@ -263,13 +511,12 @@ pub fn handle_single_request( identity_key: &ed25519_consensus::SigningKey, config: &Config, retry_count: &mut u32, -) -> Result, V, SecretConnection>, SignerError> { + stop: &AtomicBool, +) -> Result, SignerError> { const MAX_RETRY_DELAY: Duration = Duration::from_secs(30); loop { + if stop.load(Ordering::SeqCst) { + return Err(SignerError::NotLeader(config.raft.node_id.to_string())); + } + *retry_count += 1; let delay = Duration::from_millis(100 * 2_u64.pow((*retry_count).min(10))).min(MAX_RETRY_DELAY); @@ -293,7 +545,11 @@ fn reconnect( ); thread::sleep(delay); - match crate::signer::create_signer::(host, port, identity_key, config) { + if stop.load(Ordering::SeqCst) { + return Err(SignerError::NotLeader(config.raft.node_id.to_string())); + } + + match crate::signer::create_signer::(host, port, identity_key, config, Some(stop)) { Ok(signer) => { info!("Successfully reconnected to {}:{}", host, port); *retry_count = 0; diff --git a/src/persist.rs b/src/persist.rs deleted file mode 100644 index 1ecefe6..0000000 --- a/src/persist.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::cluster::SignerRaftNode; -use crate::protocol::ValidRequest; -use crate::types::ConsensusData; -use enum_dispatch::enum_dispatch; -use std::path::PathBuf; - -#[derive(Debug)] -pub enum PersistError { - InvalidState(String), - CouldNotPersist(String), -} - -#[enum_dispatch(Persist)] -pub enum PersistVariants { - Raft(SignerRaftNode), - Local(LocalState), -} - -// TODO: make ValidRequest not pub -pub struct PersistedRequest(pub ValidRequest); - -#[enum_dispatch] -pub trait Persist { - fn persist(&mut self, request: ValidRequest) -> Result; - fn state(&self) -> ConsensusData; -} - -pub struct LocalState { - state: ConsensusData, - path: PathBuf, -} - -impl LocalState { - pub fn from_file(path: &PathBuf) -> Result { - let data = std::fs::File::open(path)?; - let cd: ConsensusData = - serde_json::de::from_reader(data).map_err(|_| std::io::ErrorKind::InvalidData)?; - Ok(LocalState { - state: cd, - path: path.clone(), - }) - } -} - -impl Persist for LocalState { - fn persist(&mut self, request: ValidRequest) -> Result { - let new_cd = ConsensusData::from(&request); - let serialized = serde_json::to_string(&new_cd) - .map_err(|_| PersistError::InvalidState("Could not convert CD to JSON".to_string()))?; - let tmp_path = self.path.with_extension("tmp"); - - std::fs::write(&tmp_path, serialized.as_bytes()) - .map_err(|e| PersistError::CouldNotPersist(e.to_string()))?; - std::fs::rename(tmp_path, &self.path) - .map_err(|e| PersistError::CouldNotPersist(e.to_string()))?; - self.state = new_cd; - Ok(PersistedRequest(request)) - } - - fn state(&self) -> ConsensusData { - self.state - } -} - -impl Persist for SignerRaftNode { - fn persist(&mut self, request: ValidRequest) -> Result { - if !self.is_leader() { - return Err(PersistError::InvalidState("Not the leader".into())); - } - let state = ConsensusData::from(&request); - if let Err(e) = self.replicate_state(&state) { - return Err(PersistError::CouldNotPersist(e.to_string())); - } - Ok(PersistedRequest(request)) - } - fn state(&self) -> ConsensusData { - *self.signer_state.read().unwrap() - } -} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index e49a2f5..41b28db 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -12,14 +12,15 @@ pub enum Request { pub enum CheckedVoteRequest { DoubleSignVote(ConsensusData), - ValidRequest(ValidRequest), + ValidRequest(CheckedRequest), } pub enum CheckedProposalRequest { DoubleSignProposal(ConsensusData), - ValidRequest(ValidRequest), + ValidRequest(CheckedRequest), } -pub enum ValidRequest { +#[derive(Clone, serde::Serialize, serde::Deserialize)] +pub enum CheckedRequest { Proposal(Proposal), Vote(Vote), } @@ -44,10 +45,9 @@ impl fmt::Debug for Request { } #[derive(Debug)] -#[non_exhaustive] pub enum Response { - SignedProposal(P), - SignedVote(V), + Proposal(P), + Vote(V), PublicKey(K), Ping(G), } @@ -58,9 +58,10 @@ impl Vote { height: self.height, round: self.round, step: self.step, + ..Default::default() }; if should_sign_vote(state, &self) { - CheckedVoteRequest::ValidRequest(ValidRequest::Vote(self)) + CheckedVoteRequest::ValidRequest(CheckedRequest::Vote(self)) } else { CheckedVoteRequest::DoubleSignVote(req_state) } @@ -72,9 +73,10 @@ impl Proposal { height: self.height, round: self.round, step: self.step, + ..Default::default() }; if should_sign_proposal(state, &self) { - CheckedProposalRequest::ValidRequest(ValidRequest::Proposal(self)) + CheckedProposalRequest::ValidRequest(CheckedRequest::Proposal(self)) } else { CheckedProposalRequest::DoubleSignProposal(req_state) } @@ -89,7 +91,7 @@ A signer should only sign a proposal p if any of the following lines are true: In other words, a proposal should only be signed if it’s at a higher height, or a higher round for the same height. Once a proposal or vote has been signed for a given height and round, a proposal should never be signed for the same height and round. */ -fn should_sign_proposal(state: &ConsensusData, proposal: &Proposal) -> bool { +pub fn should_sign_proposal(state: &ConsensusData, proposal: &Proposal) -> bool { if proposal.step != SignedMsgType::Proposal { return false; } @@ -137,7 +139,7 @@ In other words, a vote should only be signed if it’s: - a prevote for the same height and round where we haven’t signed a prevote or precommit (but have signed a proposal) - a precommit for the same height and round where we haven’t signed a precommit (but have signed a proposal and/or a prevote) */ -fn should_sign_vote(state: &ConsensusData, vote: &Vote) -> bool { +pub fn should_sign_vote(state: &ConsensusData, vote: &Vote) -> bool { info!( "checking if vote should be signed, state: {}, vote: {}/{}/{:?}", state, vote.height, vote.round, vote.step @@ -188,6 +190,7 @@ fn should_sign_proposal_logic() { height: 10, round: 1, step: SignedMsgType::Proposal, + ..Default::default() }; let p1 = Proposal { @@ -239,6 +242,7 @@ fn should_sign_vote_logic() { height: 10, round: 1, step: SignedMsgType::Proposal, + ..Default::default() }; let block_id = Some(crate::types::BlockId { hash: vec![1], @@ -285,6 +289,7 @@ fn should_sign_vote_logic() { height: 10, round: 1, step: SignedMsgType::Prevote, + ..Default::default() }; assert!(should_sign_vote(&state_after_prevote, &v4)); @@ -292,6 +297,7 @@ fn should_sign_vote_logic() { height: 10, round: 1, step: SignedMsgType::Precommit, + ..Default::default() }; assert!(!should_sign_vote(&state_after_precommit, &v4)); @@ -312,18 +318,13 @@ fn test_step_transition_proposal() { height: 10, round: 1, step: SignedMsgType::Proposal, + ..Default::default() }; - assert_eq!(valid_step_transition(&state, SignedMsgType::Prevote), true); + assert!(valid_step_transition(&state, SignedMsgType::Prevote)); // moving to pre-commit is always valid from non-precommit states - assert_eq!( - valid_step_transition(&state, SignedMsgType::Precommit), - true - ); - assert_eq!(valid_step_transition(&state, SignedMsgType::Unknown), false); - assert_eq!( - valid_step_transition(&state, SignedMsgType::Proposal), - false - ); + assert!(valid_step_transition(&state, SignedMsgType::Precommit)); + assert!(!valid_step_transition(&state, SignedMsgType::Unknown)); + assert!(!valid_step_transition(&state, SignedMsgType::Proposal)); } #[test] @@ -332,18 +333,13 @@ fn test_step_transition_precommit() { height: 10, round: 1, step: SignedMsgType::Precommit, + ..Default::default() }; // moving from pre-commit is never allowed - assert_eq!( - valid_step_transition(&state, SignedMsgType::Precommit), - false - ); - assert_eq!(valid_step_transition(&state, SignedMsgType::Prevote), false); - assert_eq!(valid_step_transition(&state, SignedMsgType::Unknown), false); - assert_eq!( - valid_step_transition(&state, SignedMsgType::Proposal), - false - ); + assert!(!valid_step_transition(&state, SignedMsgType::Precommit)); + assert!(!valid_step_transition(&state, SignedMsgType::Prevote)); + assert!(!valid_step_transition(&state, SignedMsgType::Unknown)); + assert!(!valid_step_transition(&state, SignedMsgType::Proposal)); } #[test] @@ -352,18 +348,13 @@ fn test_step_transition_unknown() { height: 10, round: 1, step: SignedMsgType::Unknown, + ..Default::default() }; // can only transition to precommit - assert_eq!( - valid_step_transition(&state, SignedMsgType::Precommit), - true - ); - assert_eq!(valid_step_transition(&state, SignedMsgType::Prevote), false); - assert_eq!(valid_step_transition(&state, SignedMsgType::Unknown), false); - assert_eq!( - valid_step_transition(&state, SignedMsgType::Proposal), - false - ); + assert!(valid_step_transition(&state, SignedMsgType::Precommit)); + assert!(!valid_step_transition(&state, SignedMsgType::Prevote)); + assert!(!valid_step_transition(&state, SignedMsgType::Unknown)); + assert!(!valid_step_transition(&state, SignedMsgType::Proposal)); } #[test] @@ -372,15 +363,10 @@ fn test_step_transition_prevote() { height: 10, round: 1, step: SignedMsgType::Prevote, + ..Default::default() }; - assert_eq!( - valid_step_transition(&state, SignedMsgType::Precommit), - true - ); - assert_eq!(valid_step_transition(&state, SignedMsgType::Prevote), false); - assert_eq!(valid_step_transition(&state, SignedMsgType::Unknown), false); - assert_eq!( - valid_step_transition(&state, SignedMsgType::Proposal), - false - ); + assert!(valid_step_transition(&state, SignedMsgType::Precommit)); + assert!(!valid_step_transition(&state, SignedMsgType::Prevote)); + assert!(!valid_step_transition(&state, SignedMsgType::Unknown)); + assert!(!valid_step_transition(&state, SignedMsgType::Proposal)); } diff --git a/src/signer/mock_connection.rs b/src/signer/mock_connection.rs index 0c94faa..39a6fea 100644 --- a/src/signer/mock_connection.rs +++ b/src/signer/mock_connection.rs @@ -53,8 +53,8 @@ impl Read for MockCometBFTConnection { } let bytes_to_read = std::cmp::min(buf.len(), self.read_buffer.len()); - for i in 0..bytes_to_read { - buf[i] = self.read_buffer.pop_front().unwrap(); + for byte in buf.iter_mut().take(bytes_to_read) { + *byte = self.read_buffer.pop_front().unwrap(); } Ok(bytes_to_read) diff --git a/src/signer/mod.rs b/src/signer/mod.rs index e12fa20..0abed48 100644 --- a/src/signer/mod.rs +++ b/src/signer/mod.rs @@ -3,15 +3,16 @@ use crate::backend::SigningBackend; use crate::config::Config; use crate::connection::open_secret_connection; use crate::error::SignerError; -use crate::persist::PersistedRequest; -use crate::protocol::{Request, Response, ValidRequest}; +use crate::protocol::{CheckedRequest, Request, Response}; use crate::types::{BufferError, SignedMsgType}; use crate::versions::ProtocolVersion; +use log::trace; use log::{debug, info}; use prost::Message as _; use std::io::{Read, Write}; use std::marker::PhantomData; use std::net::TcpStream; +use std::sync::atomic::AtomicBool; use tendermint_p2p::secret_connection::SecretConnection; pub struct Signer { @@ -22,6 +23,8 @@ pub struct Signer { read_buffer: Vec, } +pub type NetworkSigner = Signer, V, SecretConnection>; + impl Signer { pub fn new(signer: T, connection: C, chain_id: String) -> Self { Self { @@ -33,54 +36,50 @@ impl Signer { } } + pub fn chain_id(&self) -> &str { + &self.chain_id + } + pub fn public_key(&self) -> Result { self.signer.public_key() } - pub fn sign( + pub fn sign_request( &mut self, - request: PersistedRequest, - ) -> Result< - Response, - SignerError, - > { - match request.0 { - ValidRequest::Proposal(proposal) => { - let signable_data = V::proposal_to_bytes(&proposal, &self.chain_id)?; + request: &CheckedRequest, + ) -> Result<(Vec, Option>), SignerError> { + match request { + CheckedRequest::Proposal(proposal) => { + let signable_data = V::proposal_to_bytes(proposal, &self.chain_id)?; let signature = self.signer.sign(&signable_data)?; - debug!("Signature: {}", hex::encode(&signature)); - debug!("Signable data: {}", hex::encode(&signable_data)); - - let response = V::create_proposal_response(&proposal, signature); - Ok(Response::SignedProposal(response)) + trace!("Signature: {}", hex::encode(&signature)); + trace!("Signable data: {}", hex::encode(&signable_data)); + Ok((signature, None)) } - ValidRequest::Vote(vote) => { + CheckedRequest::Vote(vote) => { // TODO: chain id should be parsed from the request, and compared to what we're expecting // ^ no chain_id in the request. if we configure wrong chain_id in the config // ^ actually it IS in the request and it IS in the canonical vote / proposal // i just dropped it somewhere - let signable_data = V::vote_to_bytes(&vote, &self.chain_id)?; + let signable_data = V::vote_to_bytes(vote, &self.chain_id)?; let signature = self.signer.sign(&signable_data)?; if vote.step == SignedMsgType::Precommit && vote.block_id.as_ref().is_some_and(|id| !id.hash.is_empty()) { - info!("it's a precommit with a non-nil block ID"); - let extension_signable_data = - V::vote_extension_to_bytes(&vote, &self.chain_id)?; + debug!("it's a precommit with a non-nil block ID"); + let extension_signable_data = V::vote_extension_to_bytes(vote, &self.chain_id)?; let ext_sig = self.signer.sign(&extension_signable_data)?; - debug!( + trace!( "Extension signable data: {}", hex::encode(&extension_signable_data) ); - debug!("Extension signature: {}", hex::encode(&ext_sig)); - let response = V::create_vote_response(&vote, signature, Some(ext_sig)); - return Ok(Response::SignedVote(response)); + trace!("Extension signature: {}", hex::encode(&ext_sig)); + return Ok((signature, Some(ext_sig))); } - info!("no vote ext this time"); - debug!("Signature: {}", hex::encode(&signature)); - debug!("Signable data: {}", hex::encode(&signable_data)); - let response = V::create_vote_response(&vote, signature, None); - Ok(Response::SignedVote(response)) + debug!("no vote ext this time"); + trace!("Signature: {}", hex::encode(&signature)); + trace!("Signable data: {}", hex::encode(&signable_data)); + Ok((signature, None)) } } } @@ -149,7 +148,8 @@ pub fn create_signer( port: u16, identity_key: &ed25519_consensus::SigningKey, config: &Config, -) -> Result, V, SecretConnection>, SignerError> { + stop: Option<&AtomicBool>, +) -> Result, SignerError> { info!("Connecting to CometBFT at {}:{}", host, port); let conn = open_secret_connection( @@ -157,6 +157,7 @@ pub fn create_signer( port, identity_key.clone(), tendermint_p2p::secret_connection::Version::V0_34, + stop, )?; let backend = crate::backend::create_backend(config)?; diff --git a/src/signer/tests.rs b/src/signer/tests.rs index e2ee0d2..8aa4138 100644 --- a/src/signer/tests.rs +++ b/src/signer/tests.rs @@ -1,15 +1,99 @@ use super::mock_connection::MockCometBFTConnection; use crate::backend::Ed25519Signer; -use crate::persist::LocalState; +use crate::cluster::SignerRaftNode; +use crate::config::{PeerConfig, RaftConfig}; +use crate::handle_single_request; use crate::proto::v0_38; use crate::signer::Signer; -use crate::types::{ConsensusData, SignedMsgType}; +use crate::types::SignedMsgType; use crate::versions::VersionV0_38; -use crate::{handle_single_request, persist}; use prost::Message; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use rand::Rng; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::{Duration, Instant}; +use tempfile::TempDir; + +fn create_single_node_raft(temp_dir: &TempDir) -> SignerRaftNode { + let port = rand::rng().random_range(30000..60000); + let bind_addr = format!("127.0.0.1:{port}"); + let config = RaftConfig { + node_id: 1, + bind_addr: bind_addr.clone(), + data_path: temp_dir.path().join("node_1").to_string_lossy().to_string(), + peers: vec![PeerConfig { + id: 1, + addr: bind_addr, + }], + initial_state_path: "./non_existent_initial_state.json".to_string(), + }; + let (events_tx, _events_rx) = mpsc::channel(); + SignerRaftNode::new(config, events_tx) +} + +fn wait_for_leader(node: &SignerRaftNode, timeout: Duration) { + let start = Instant::now(); + while start.elapsed() < timeout { + if node.is_leader() { + return; + } + thread::sleep(Duration::from_millis(30)); + } + panic!("Single-node raft did not become leader within timeout"); +} + +fn make_prevote_request( + height: i64, + round: i64, + timestamp_secs: i64, + block_hash: Option>, +) -> Vec { + let block_id = block_hash.map(|hash| v0_38::types::BlockId { + hash: hash.into(), + part_set_header: Some(v0_38::types::PartSetHeader { + total: 1, + hash: vec![7, 7, 7].into(), + }), + }); + + let req = v0_38::privval::SignVoteRequest { + vote: Some(v0_38::types::Vote { + r#type: SignedMsgType::Prevote as i32, + height, + round: round as i32, + block_id, + timestamp: Some(prost_types::Timestamp { + seconds: timestamp_secs, + nanos: 0, + }), + ..Default::default() + }), + chain_id: "test-chain".to_string(), + }; + + let msg = v0_38::privval::Message { + sum: Some(v0_38::privval::message::Sum::SignVoteRequest(req)), + }; + + let mut bytes = Vec::new(); + msg.encode_length_delimited(&mut bytes).unwrap(); + bytes +} + +fn recv_signed_vote_response( + handle: &super::mock_connection::MockConnectionHandle, +) -> v0_38::privval::SignedVoteResponse { + let response_bytes = handle + .response_receiver + .recv_timeout(Duration::from_secs(1)) + .unwrap(); + let response_msg = + v0_38::privval::Message::decode_length_delimited(response_bytes.as_slice()).unwrap(); + match response_msg.sum { + Some(v0_38::privval::message::Sum::SignedVoteResponse(res)) => res, + _ => panic!("Expected SignedVoteResponse"), + } +} #[test] fn signer_with_mock_connection() { @@ -33,30 +117,18 @@ fn signer_with_mock_connection() { proposal_req, )), }; - let initial_cd = ConsensusData { - height: 0, - round: 0, - step: 0.into(), - }; - let cd_path = "asd"; - std::fs::write( - cd_path, - &serde_json::to_string(&initial_cd).unwrap().as_bytes(), - ) - .unwrap(); - let p = LocalState::from_file(&PathBuf::from(cd_path)).unwrap(); + let temp_dir = TempDir::new().unwrap(); + let raft_node = create_single_node_raft(&temp_dir); + wait_for_leader(&raft_node, Duration::from_secs(5)); + let state_persist = Arc::new(Mutex::new(raft_node)); let mut req_bytes = Vec::new(); msg.encode_length_delimited(&mut req_bytes).unwrap(); handle.request_sender.send(req_bytes).unwrap(); - handle_single_request( - &mut signer, - &Arc::new(Mutex::new(persist::PersistVariants::Local(p))), - ) - .unwrap(); + handle_single_request(&mut signer, &state_persist).unwrap(); let response_bytes = handle .response_receiver @@ -80,3 +152,62 @@ fn signer_with_mock_connection() { _ => panic!("Expected a SignedProposalResponse"), } } + +#[test] +fn prevote_same_hrs_only_same_block_id_is_allowed() { + let (mock_conn, handle) = MockCometBFTConnection::new(); + let backend = Ed25519Signer::from_key_file("./keys/privkey").unwrap(); + let mut signer = Signer::<_, VersionV0_38, _>::new(backend, mock_conn, "test-chain".into()); + + let temp_dir = TempDir::new().unwrap(); + let raft_node = create_single_node_raft(&temp_dir); + wait_for_leader(&raft_node, Duration::from_secs(5)); + let state_persist = Arc::new(Mutex::new(raft_node)); + + // 1) block A => should sign + handle + .request_sender + .send(make_prevote_request(1, 0, 1_700_000_001, Some(vec![0xAA]))) + .unwrap(); + handle_single_request(&mut signer, &state_persist).unwrap(); + let res1 = recv_signed_vote_response(&handle); + assert!(res1.error.is_none()); + assert!( + res1.vote + .as_ref() + .is_some_and(|v| !v.signature.is_empty() && v.block_id.is_some()) + ); + + // 2) nil block => same HRS, different block id semantics, should be rejected + handle + .request_sender + .send(make_prevote_request(1, 0, 1_700_000_002, None)) + .unwrap(); + handle_single_request(&mut signer, &state_persist).unwrap(); + let res2 = recv_signed_vote_response(&handle); + assert!(res2.error.is_some()); + + // 3) block A again => should work (same block id as #1) + handle + .request_sender + .send(make_prevote_request(1, 0, 1_700_000_003, Some(vec![0xAA]))) + .unwrap(); + handle_single_request(&mut signer, &state_persist).unwrap(); + let res3 = recv_signed_vote_response(&handle); + assert!(res3.error.is_none()); + assert!( + res3.vote + .as_ref() + .and_then(|v| v.block_id.as_ref()) + .is_some_and(|id| id.hash.as_ref() == [0xAA]) + ); + + // 4) block B => same HRS, different non-nil block id, should be rejected + handle + .request_sender + .send(make_prevote_request(1, 0, 1_700_000_004, Some(vec![0xBB]))) + .unwrap(); + handle_single_request(&mut signer, &state_persist).unwrap(); + let res4 = recv_signed_vote_response(&handle); + assert!(res4.error.is_some()); +} diff --git a/src/types/mod.rs b/src/types/mod.rs index 5532ca8..84254df 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,8 +1,9 @@ -use crate::{SignerError, protocol::ValidRequest}; +use crate::error::SignerError; +use crate::protocol::CheckedRequest; use serde::{Deserialize, Serialize}; use thiserror::Error; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlockId { pub hash: Vec, pub parts: Option, @@ -25,6 +26,7 @@ impl From for crate::proto::v1::types::PartSetHeader { } } } + impl From for BlockId { fn from(block_id: crate::proto::v1::types::BlockId) -> BlockId { BlockId { @@ -43,13 +45,13 @@ impl From for PartSetHeader { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartSetHeader { pub total: u32, pub hash: Vec, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] pub enum SignedMsgType { #[default] Unknown = 0, @@ -58,7 +60,7 @@ pub enum SignedMsgType { Proposal = 32, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Vote { pub step: SignedMsgType, pub height: i64, @@ -70,6 +72,7 @@ pub struct Vote { pub extension: Vec, pub extension_signature: Vec, } + impl std::fmt::Display for Vote { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -110,7 +113,6 @@ impl From for i32 { } } -// this is getting messy, probably something wrong with the types somewhere? impl From for SignedMsgType { fn from(n: u8) -> Self { match n { @@ -122,7 +124,7 @@ impl From for SignedMsgType { } } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Proposal { pub step: SignedMsgType, pub height: i64, @@ -148,12 +150,13 @@ pub enum KeyType { impl TryFrom<&str> for KeyType { type Error = SignerError; + fn try_from(key_type_str: &str) -> Result { match key_type_str { "ed25519" => Ok(KeyType::Ed25519), "secp256k1" => Ok(KeyType::Secp256k1), "bls12_381" => Ok(KeyType::Bls12381), - "bls12381" => Ok(KeyType::Bls12381), // TODO + "bls12381" => Ok(KeyType::Bls12381), _ => Err(SignerError::InvalidData), } } @@ -169,25 +172,32 @@ impl From for String { } } -#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] +#[serde(default)] pub struct ConsensusData { pub height: i64, pub round: i64, pub step: SignedMsgType, + pub sign_data: Vec, + pub signature: Vec, + pub ext_sign_data: Vec, + pub ext_signature: Vec, } -impl From<&ValidRequest> for ConsensusData { - fn from(value: &ValidRequest) -> Self { +impl From<&CheckedRequest> for ConsensusData { + fn from(value: &CheckedRequest) -> Self { match value { - ValidRequest::Vote(v) => Self { + CheckedRequest::Vote(v) => Self { height: v.height, round: v.round, step: v.step, + ..Default::default() }, - ValidRequest::Proposal(p) => Self { + CheckedRequest::Proposal(p) => Self { height: p.height, round: p.round, step: p.step, + ..Default::default() }, } } @@ -198,7 +208,7 @@ impl std::fmt::Display for ConsensusData { write!( f, "ConsensusData {}/{}/{:?}", - self.height, self.round, self.step + self.height, self.round, self.step, ) } } @@ -215,8 +225,9 @@ impl ConsensusData { let json = std::fs::read_to_string(path).ok()?; serde_json::from_str(&json).ok() } - pub fn to_bytes(self) -> Vec { - serde_json::to_vec(&self).unwrap() + + pub fn to_bytes(&self) -> Vec { + serde_json::to_vec(self).unwrap() } pub fn from_bytes(buf: &[u8]) -> Option { diff --git a/src/versions/mod.rs b/src/versions/mod.rs index 9daa6d8..a8dcb88 100644 --- a/src/versions/mod.rs +++ b/src/versions/mod.rs @@ -32,6 +32,14 @@ pub trait ProtocolVersion { fn proposal_to_bytes(proposal: &Proposal, chain_id: &str) -> Result, SignerError>; fn vote_to_bytes(vote: &Vote, chain_id: &str) -> Result, SignerError>; fn vote_extension_to_bytes(vote: &Vote, chain_id: &str) -> Result, SignerError>; + fn proposal_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result; + fn vote_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result; fn create_proposal_response(proposal: &Proposal, signature: Vec) -> Self::ProposalResponse; fn create_double_sign_prop_response(cd: &ConsensusData) -> Self::ProposalResponse; fn create_double_sign_vote_response(cd: &ConsensusData) -> Self::VoteResponse; diff --git a/src/versions/v0_34.rs b/src/versions/v0_34.rs index db48cf0..68c175b 100644 --- a/src/versions/v0_34.rs +++ b/src/versions/v0_34.rs @@ -37,6 +37,20 @@ impl ProtocolVersion for VersionV0_34 { todo!("v0.34") } + fn proposal_sign_bytes_only_differ_by_timestamp( + _old_sign_bytes: &[u8], + _new_sign_bytes: &[u8], + ) -> Result { + todo!("v0.34") + } + + fn vote_sign_bytes_only_differ_by_timestamp( + _old_sign_bytes: &[u8], + _new_sign_bytes: &[u8], + ) -> Result { + todo!("v0.34") + } + fn create_double_sign_vote_response(_cd: &ConsensusData) -> Self::VoteResponse { todo!() } @@ -68,7 +82,14 @@ impl ProtocolVersion for VersionV0_34 { todo!() } - fn create_error_response(_message: &str) -> Response { + fn create_error_response( + _message: &str, + ) -> Response< + Self::ProposalResponse, + Self::VoteResponse, + Self::PubKeyResponse, + Self::PingResponse, + > { todo!() } diff --git a/src/versions/v0_37.rs b/src/versions/v0_37.rs index 614c319..ffd1d58 100644 --- a/src/versions/v0_37.rs +++ b/src/versions/v0_37.rs @@ -37,6 +37,20 @@ impl ProtocolVersion for VersionV0_37 { todo!("0.37") } + fn proposal_sign_bytes_only_differ_by_timestamp( + _old_sign_bytes: &[u8], + _new_sign_bytes: &[u8], + ) -> Result { + todo!("0.37") + } + + fn vote_sign_bytes_only_differ_by_timestamp( + _old_sign_bytes: &[u8], + _new_sign_bytes: &[u8], + ) -> Result { + todo!("0.37") + } + fn create_double_sign_vote_response(_cd: &ConsensusData) -> Self::VoteResponse { todo!() } @@ -68,7 +82,14 @@ impl ProtocolVersion for VersionV0_37 { todo!() } - fn create_error_response(_message: &str) -> Response { + fn create_error_response( + _message: &str, + ) -> Response< + Self::ProposalResponse, + Self::VoteResponse, + Self::PubKeyResponse, + Self::PingResponse, + > { todo!() } diff --git a/src/versions/v0_38.rs b/src/versions/v0_38.rs index d33eb42..e54ed2b 100644 --- a/src/versions/v0_38.rs +++ b/src/versions/v0_38.rs @@ -50,10 +50,8 @@ impl ProtocolVersion for VersionV0_38 { ) -> Result, SignerError> { let mut buf = Vec::new(); let msg = match response { - Response::SignedVote(resp) => v0_38::privval::message::Sum::SignedVoteResponse(resp), - Response::SignedProposal(resp) => { - v0_38::privval::message::Sum::SignedProposalResponse(resp) - } + Response::Vote(resp) => v0_38::privval::message::Sum::SignedVoteResponse(resp), + Response::Proposal(resp) => v0_38::privval::message::Sum::SignedProposalResponse(resp), Response::Ping(resp) => v0_38::privval::message::Sum::PingResponse(resp), Response::PublicKey(resp) => v0_38::privval::message::Sum::PubKeyResponse(resp), }; @@ -146,13 +144,35 @@ impl ProtocolVersion for VersionV0_38 { Ok(bytes) } + fn proposal_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result { + let mut old = v0_38::types::CanonicalProposal::decode_length_delimited(old_sign_bytes)?; + let mut new = v0_38::types::CanonicalProposal::decode_length_delimited(new_sign_bytes)?; + old.timestamp = None; + new.timestamp = None; + Ok(old == new) + } + + fn vote_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result { + let mut old = v0_38::types::CanonicalVote::decode_length_delimited(old_sign_bytes)?; + let mut new = v0_38::types::CanonicalVote::decode_length_delimited(new_sign_bytes)?; + old.timestamp = None; + new.timestamp = None; + Ok(old == new) + } + fn create_double_sign_vote_response(cd: &ConsensusData) -> Self::VoteResponse { v0_38::privval::SignedVoteResponse { vote: None, error: Some(v0_38::privval::RemoteSignerError { code: 1, description: format!( - "Would double-sign vote at height/round/step {}/{}/{:?}", + "Vote at height/round/step {}/{}/{:?} has already been signed by another CometBFT node connected to nebula", cd.height, cd.round, cd.step ), }), @@ -164,7 +184,7 @@ impl ProtocolVersion for VersionV0_38 { error: Some(v0_38::privval::RemoteSignerError { code: 1, description: format!( - "Would double-sign proposal at height/round/step {}/{}/{:?}", + "Proposal at height/round/step {}/{}/{:?} has already been signed by another CometBFT node connected to nebula", cd.height, cd.round, cd.step ), }), @@ -251,7 +271,7 @@ impl ProtocolVersion for VersionV0_38 { Self::PubKeyResponse, Self::PingResponse, > { - Response::SignedProposal(v0_38::privval::SignedProposalResponse { + Response::Proposal(v0_38::privval::SignedProposalResponse { proposal: None, error: Some(v0_38::privval::RemoteSignerError { code: 1, diff --git a/src/versions/v1_0.rs b/src/versions/v1_0.rs index fde3910..af9a195 100644 --- a/src/versions/v1_0.rs +++ b/src/versions/v1_0.rs @@ -32,9 +32,7 @@ impl ProtocolVersion for VersionV1_0 { Some(v1::privval::message::Sum::SignProposalRequest(req)) => { let proposal = req.proposal.ok_or(SignerError::InvalidData)?; Ok(( - Request::Proposal(tendermint_proposal_to_domain( - proposal, - )?), + Request::Proposal(tendermint_proposal_to_domain(proposal)?), req.chain_id, )) } @@ -56,10 +54,8 @@ impl ProtocolVersion for VersionV1_0 { ) -> Result, SignerError> { let mut buf = Vec::new(); let msg = match response { - Response::SignedVote(resp) => v1::privval::message::Sum::SignedVoteResponse(resp), - Response::SignedProposal(resp) => { - v1::privval::message::Sum::SignedProposalResponse(resp) - } + Response::Vote(resp) => v1::privval::message::Sum::SignedVoteResponse(resp), + Response::Proposal(resp) => v1::privval::message::Sum::SignedProposalResponse(resp), Response::Ping(resp) => v1::privval::message::Sum::PingResponse(resp), Response::PublicKey(resp) => v1::privval::message::Sum::PubKeyResponse(resp), }; @@ -153,12 +149,37 @@ impl ProtocolVersion for VersionV1_0 { Ok(bytes) } + fn proposal_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result { + let mut old = v1::types::CanonicalProposal::decode_length_delimited(old_sign_bytes)?; + let mut new = v1::types::CanonicalProposal::decode_length_delimited(new_sign_bytes)?; + old.timestamp = None; + new.timestamp = None; + Ok(old == new) + } + + fn vote_sign_bytes_only_differ_by_timestamp( + old_sign_bytes: &[u8], + new_sign_bytes: &[u8], + ) -> Result { + let mut old = v1::types::CanonicalVote::decode_length_delimited(old_sign_bytes)?; + let mut new = v1::types::CanonicalVote::decode_length_delimited(new_sign_bytes)?; + old.timestamp = None; + new.timestamp = None; + Ok(old == new) + } + fn create_double_sign_vote_response(cd: &ConsensusData) -> Self::VoteResponse { v1::privval::SignedVoteResponse { vote: None, error: Some(v1::privval::RemoteSignerError { code: 1, - description: format!("Would double-sign vote at height/round/step {}/{}/{:?}", cd.height, cd.round, cd.step), + description: format!( + "Vote at height/round/step {}/{}/{:?} has already been signed by another CometBFT node connected to nebula", + cd.height, cd.round, cd.step + ), }), } } @@ -168,7 +189,10 @@ impl ProtocolVersion for VersionV1_0 { proposal: None, error: Some(v1::privval::RemoteSignerError { code: 1, - description: format!("Would double-sign proposal at height/round/step {}/{}/{:?}", cd.height, cd.round, cd.step), + description: format!( + "Proposal at height/round/step {}/{}/{:?} has already been signed by another CometBFT node connected to nebula", + cd.height, cd.round, cd.step + ), }), } } @@ -234,8 +258,15 @@ impl ProtocolVersion for VersionV1_0 { v1::privval::PingResponse {} } - fn create_error_response(message: &str) -> Response { - Response::SignedProposal(v1::privval::SignedProposalResponse { + fn create_error_response( + message: &str, + ) -> Response< + Self::ProposalResponse, + Self::VoteResponse, + Self::PubKeyResponse, + Self::PingResponse, + > { + Response::Proposal(v1::privval::SignedProposalResponse { proposal: None, error: Some(v1::privval::RemoteSignerError { code: 1, From 1a84d05cffcc7770616a842cd0ecaea67efbc103 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ksawery=20Wr=C3=B3bel?= Date: Fri, 6 Mar 2026 17:05:12 +0100 Subject: [PATCH 2/3] Remove outdated entry about the signature cache I decided to remove it for now and will add it back later --- src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index c21e96a..b99d2b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -344,7 +344,6 @@ fn vote_sign_data( /// For consensus requests (Proposal or Vote), check the request against the signer's state and: /// - for proposals, check if the height/round has already been signed. If yes, reply with a "would double sign error". If no, return a command to persist the state and sign over the proposal. /// - for votes, build the corresponding consensus state (height/round/step and sign bytes) and: -/// - first check the signature cache for a matching entry. If present, return a command to replay the cached signature. /// - if the request matches the current signer state at the same height/round/step: /// - if the stored vote and incoming vote are identical besides timestamp differences allowed by the protocol version, return a command to sign again and persist. /// - if the stored vote is identical, return a command to replay the stored signature. From 6a8ed85d44b0d272d0a509d8a1ce8b3bd88b66e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ksawery=20Wr=C3=B3bel?= Date: Thu, 12 Mar 2026 10:39:06 +0000 Subject: [PATCH 3/3] sign votes even if vote extensions don't match --- src/main.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index b99d2b4..69dfcfb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -413,12 +413,13 @@ fn process_request( )?; if only_ts { - if !ext_matches { - info!("only ts differs, but ext data does not match. not signing"); - return Ok(RequestProcessingAction::ReplyWith(Response::Vote( - V::create_double_sign_vote_response(&request_state), - ))); - } + // Vote extensions are non-deterministic + // if !ext_matches { + // info!("only ts differs, but ext data does not match. not signing"); + // return Ok(RequestProcessingAction::ReplyWith(Response::Vote( + // V::create_double_sign_vote_response(&request_state), + // ))); + // } info!( "only ts differs, issuing a sign command. current state: {}", current_state