diff --git a/crates/hashi/src/db.rs b/crates/hashi/src/db.rs index d11776a67..2af550913 100644 --- a/crates/hashi/src/db.rs +++ b/crates/hashi/src/db.rs @@ -969,4 +969,35 @@ mod tests { // Should be a no-op, not an error. db.prune_messages_below(100).unwrap(); } + + #[test] + fn test_epoch_store_writes_at_explicit_epoch_not_self_epoch() { + use crate::storage::EpochPublicMessagesStore; + use crate::storage::PublicMessagesStore; + use std::collections::BTreeMap; + use std::num::NonZeroU16; + + let tmpdir = tempfile::Builder::new().tempdir().unwrap(); + let db = std::sync::Arc::new(Database::open(tmpdir.path()).unwrap()); + + let mut store = EpochPublicMessagesStore::new(db.clone(), 87); + + let dealer = Address::new([1u8; 32]); + let mut rotation_msgs: BTreeMap = BTreeMap::new(); + rotation_msgs.insert(NonZeroU16::new(1).unwrap(), create_test_message()); + + store + .store_rotation_messages(71, &dealer, &rotation_msgs) + .unwrap(); + + assert!( + store.get_rotation_messages(71, &dealer).unwrap().is_some(), + "rotation messages written with explicit epoch=71 must be readable at epoch=71" + ); + + assert!( + store.get_rotation_messages(87, &dealer).unwrap().is_none(), + "rotation messages must not leak to the store's self.epoch=87" + ); + } } diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index b585376ba..4998606f4 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -262,7 +262,7 @@ impl MpcManager { } let signature = match &request.messages { Messages::Dkg(msg) => { - self.store_dkg_message(sender, msg)?; + self.store_dkg_message(self.mpc_config.epoch, sender, msg)?; self.try_sign_dkg_message(sender, &request.messages)? } Messages::Rotation(msgs) => { @@ -270,11 +270,11 @@ impl MpcManager { .previous_output .clone() .ok_or_else(|| MpcError::NotReady("Rotation not started".into()))?; - self.store_rotation_messages(sender, msgs)?; + self.store_rotation_messages(self.mpc_config.epoch, sender, msgs)?; self.try_sign_rotation_messages(&previous, sender, &request.messages)? } Messages::NonceGeneration(nonce) => { - self.store_nonce_message(sender, nonce); + self.store_nonce_message(self.mpc_config.epoch, sender, nonce)?; self.try_sign_nonce_message(sender, &request.messages)? } }; @@ -1381,37 +1381,43 @@ impl MpcManager { dealer.create_message(rng) } - fn store_dkg_message(&mut self, dealer: Address, message: &avss::Message) -> MpcResult<()> { + fn store_dkg_message( + &mut self, + epoch: u64, + dealer: Address, + message: &avss::Message, + ) -> MpcResult<()> { self.dkg_messages.insert(dealer, message.clone()); self.public_messages_store - .store_dealer_message(&dealer, message) + .store_dealer_message(epoch, &dealer, message) .map_err(|e| MpcError::StorageError(e.to_string()))?; Ok(()) } fn store_rotation_messages( &mut self, + epoch: u64, dealer: Address, messages: &RotationMessages, ) -> MpcResult<()> { self.rotation_messages.insert(dealer, messages.clone()); self.public_messages_store - .store_rotation_messages(&dealer, messages) + .store_rotation_messages(epoch, &dealer, messages) .map_err(|e| MpcError::StorageError(e.to_string()))?; Ok(()) } - // TODO: Change return type to `MpcResult<()>` and propagate disk errors - // (mirroring `store_dkg_message` and `store_rotation_messages`). - fn store_nonce_message(&mut self, dealer: Address, nonce: &NonceMessage) { + fn store_nonce_message( + &mut self, + epoch: u64, + dealer: Address, + nonce: &NonceMessage, + ) -> MpcResult<()> { self.nonce_messages.insert(dealer, nonce.clone()); - if let Err(e) = self.public_messages_store.store_nonce_message( - nonce.batch_index, - &dealer, - &nonce.message, - ) { - tracing::error!("Failed to persist nonce message for dealer {dealer:?}: {e}"); - } + self.public_messages_store + .store_nonce_message(epoch, nonce.batch_index, &dealer, &nonce.message) + .map_err(|e| MpcError::StorageError(e.to_string()))?; + Ok(()) } fn needs_nonce_retrieval( @@ -1808,7 +1814,8 @@ impl MpcManager { ); }; let mut mgr = mpc_manager.write().unwrap(); - mgr.store_dkg_message(message.dealer_address, msg)?; + let epoch = mgr.mpc_config.epoch; + mgr.store_dkg_message(epoch, message.dealer_address, msg)?; return Ok(()); } tracing::info!( @@ -1869,7 +1876,8 @@ impl MpcManager { ); }; let mut mgr = mpc_manager.write().unwrap(); - mgr.store_nonce_message(message.dealer_address, nonce); + let epoch = mgr.mpc_config.epoch; + mgr.store_nonce_message(epoch, message.dealer_address, nonce)?; return Ok(()); } tracing::info!( @@ -1901,7 +1909,7 @@ impl MpcManager { Some(msg) => Messages::Dkg(msg.clone()), None => { let msg = self.create_dealer_message(rng); - self.store_dkg_message(self.address, &msg)?; + self.store_dkg_message(self.mpc_config.epoch, self.address, &msg)?; Messages::Dkg(msg) } }; @@ -1918,7 +1926,7 @@ impl MpcManager { Some(msgs) => Messages::Rotation(msgs.clone()), None => { let msgs = self.create_rotation_messages(previous, rng); - self.store_rotation_messages(self.address, &msgs)?; + self.store_rotation_messages(self.mpc_config.epoch, self.address, &msgs)?; Messages::Rotation(msgs) } }; @@ -1936,7 +1944,7 @@ impl MpcManager { None => { let msgs = self.create_nonce_dealer_message(batch_index, rng)?; if let Messages::NonceGeneration(ref nonce) = msgs { - self.store_nonce_message(self.address, nonce); + self.store_nonce_message(self.mpc_config.epoch, self.address, nonce)?; } msgs } @@ -2028,7 +2036,8 @@ impl MpcManager { continue; }; let mut mgr = mpc_manager.write().unwrap(); - mgr.store_rotation_messages(message.dealer_address, msgs)?; + let epoch = mgr.mpc_config.epoch; + mgr.store_rotation_messages(epoch, message.dealer_address, msgs)?; return Ok(()); } tracing::info!( @@ -3313,12 +3322,17 @@ impl MpcManager { let actual_hash = compute_messages_hash(&response.messages); if actual_hash == message.messages_hash { let mut mgr = mpc_manager.write().unwrap(); + let source_epoch = mgr.source_epoch; match &response.messages { Messages::Dkg(msg) => { - mgr.store_dkg_message(message.dealer_address, msg)?; + mgr.store_dkg_message(source_epoch, message.dealer_address, msg)?; } Messages::Rotation(msgs) => { - mgr.store_rotation_messages(message.dealer_address, msgs)?; + mgr.store_rotation_messages( + source_epoch, + message.dealer_address, + msgs, + )?; } _ => { tracing::warn!( diff --git a/crates/hashi/src/mpc/mpc_except_signing_tests.rs b/crates/hashi/src/mpc/mpc_except_signing_tests.rs index aaabe8e2e..5b66a2486 100644 --- a/crates/hashi/src/mpc/mpc_except_signing_tests.rs +++ b/crates/hashi/src/mpc/mpc_except_signing_tests.rs @@ -45,6 +45,7 @@ struct MockPublicMessagesStore; impl PublicMessagesStore for MockPublicMessagesStore { fn store_dealer_message( &mut self, + _epoch: u64, _dealer: &Address, _message: &avss::Message, ) -> anyhow::Result<()> { @@ -65,6 +66,7 @@ impl PublicMessagesStore for MockPublicMessagesStore { fn store_rotation_messages( &mut self, + _epoch: u64, _dealer: &Address, _messages: &RotationMessages, ) -> anyhow::Result<()> { @@ -85,6 +87,7 @@ impl PublicMessagesStore for MockPublicMessagesStore { fn store_nonce_message( &mut self, + _epoch: u64, _batch_index: u32, _dealer: &Address, _message: &batch_avss::Message, @@ -117,7 +120,7 @@ fn receive_dealer_messages( let Messages::Dkg(msg) = messages else { panic!("receive_dealer_messages called with rotation messages"); }; - manager.store_dkg_message(dealer, msg)?; + manager.store_dkg_message(manager.mpc_config.epoch, dealer, msg)?; let sig = manager.try_sign_dkg_message(dealer, messages)?; Ok(MemberSignature::new( manager.mpc_config.epoch, @@ -1032,6 +1035,7 @@ impl InMemoryPublicMessagesStore { impl PublicMessagesStore for InMemoryPublicMessagesStore { fn store_dealer_message( &mut self, + _epoch: u64, dealer: &Address, message: &avss::Message, ) -> anyhow::Result<()> { @@ -1057,6 +1061,7 @@ impl PublicMessagesStore for InMemoryPublicMessagesStore { fn store_rotation_messages( &mut self, + _epoch: u64, dealer: &Address, messages: &RotationMessages, ) -> anyhow::Result<()> { @@ -1082,6 +1087,7 @@ impl PublicMessagesStore for InMemoryPublicMessagesStore { fn store_nonce_message( &mut self, + _epoch: u64, batch_index: u32, dealer: &Address, message: &batch_avss::Message, @@ -1118,6 +1124,7 @@ struct FailingPublicMessagesStore; impl PublicMessagesStore for FailingPublicMessagesStore { fn store_dealer_message( &mut self, + _epoch: u64, _dealer: &Address, _message: &avss::Message, ) -> anyhow::Result<()> { @@ -1138,6 +1145,7 @@ impl PublicMessagesStore for FailingPublicMessagesStore { fn store_rotation_messages( &mut self, + _epoch: u64, _dealer: &Address, _messages: &RotationMessages, ) -> anyhow::Result<()> { @@ -1158,6 +1166,7 @@ impl PublicMessagesStore for FailingPublicMessagesStore { fn store_nonce_message( &mut self, + _epoch: u64, _batch_index: u32, _dealer: &Address, _message: &batch_avss::Message, @@ -1491,7 +1500,9 @@ async fn test_run_dkg_with_complaint_recovery() { let Messages::Dkg(msg) = messages else { unreachable!() }; - manager.store_dkg_message(dealer_addr, msg).unwrap(); + manager + .store_dkg_message(manager.mpc_config.epoch, dealer_addr, msg) + .unwrap(); continue; } let sig = receive_dealer_messages(manager, messages, dealer_addr).unwrap(); @@ -1831,7 +1842,7 @@ async fn test_run_as_party_recovers_shares_via_complaint() { // Party 2 stores dealer 1's cheating message and creates complaint during processing party_manager - .store_dkg_message(dealer_1_addr, &dealer_1_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_1_addr, &dealer_1_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_1_addr) @@ -2942,7 +2953,7 @@ fn test_handle_retrieve_messages_request_db_fallback_dkg() { let dealer_message = manager.create_dealer_message(&mut rng); manager .public_messages_store - .store_dealer_message(&dealer_address, &dealer_message) + .store_dealer_message(manager.mpc_config.epoch, &dealer_address, &dealer_message) .unwrap(); // Verify it's NOT in the in-memory map. assert!(!manager.dkg_messages.contains_key(&dealer_address)); @@ -2981,7 +2992,7 @@ fn test_handle_retrieve_messages_request_db_fallback_rotation() { let rotation_msgs = manager.create_rotation_messages(&dkg_output, &mut rng); manager .public_messages_store - .store_rotation_messages(&dealer_address, &rotation_msgs) + .store_rotation_messages(manager.mpc_config.epoch, &dealer_address, &rotation_msgs) .unwrap(); // Verify NOT in in-memory map. assert!(!manager.rotation_messages.contains_key(&dealer_address)); @@ -3012,7 +3023,7 @@ fn test_handle_retrieve_messages_request_skips_memory_for_different_epoch() { let prev_msgs = manager.create_rotation_messages(&dkg_output, &mut rng); manager .public_messages_store - .store_rotation_messages(&dealer_address, &prev_msgs) + .store_rotation_messages(prev_epoch, &dealer_address, &prev_msgs) .unwrap(); // Put DIFFERENT "current epoch" messages in the in-memory map. @@ -3050,7 +3061,12 @@ fn test_handle_retrieve_messages_request_nonce_db_fallback() { let nonce_msg = create_nonce_dealer_message(&setup, 0, 0, &mut rng); manager .public_messages_store - .store_nonce_message(0, &dealer_address, &nonce_msg.message) + .store_nonce_message( + manager.mpc_config.epoch, + 0, + &dealer_address, + &nonce_msg.message, + ) .unwrap(); assert!(!manager.nonce_messages.contains_key(&dealer_address)); @@ -3223,7 +3239,7 @@ async fn test_recover_shares_via_complaint_succeeds_with_exact_threshold() { unreachable!() }; party_manager - .store_dkg_message(dealer_addr, inner_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_addr, inner_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_addr) @@ -3298,7 +3314,7 @@ async fn test_recover_shares_via_complaint_skips_failed_signers() { unreachable!() }; party_manager - .store_dkg_message(dealer_addr, inner_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_addr, inner_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_addr) @@ -3478,7 +3494,7 @@ async fn test_recover_shares_via_complaint_insufficient_signers() { unreachable!() }; party_manager - .store_dkg_message(dealer_addr, inner_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_addr, inner_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_addr) @@ -3547,7 +3563,7 @@ async fn test_recover_shares_via_complaint_no_dealer_message() { unreachable!() }; party_manager - .store_dkg_message(dealer_addr, inner_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_addr, inner_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_addr) @@ -3607,7 +3623,7 @@ async fn test_recover_shares_via_complaint_crypto_error() { unreachable!() }; party_manager - .store_dkg_message(dealer_addr, inner_msg) + .store_dkg_message(party_manager.mpc_config.epoch, dealer_addr, inner_msg) .unwrap(); party_manager .process_certified_dkg_message(dealer_addr) @@ -4377,7 +4393,11 @@ async fn test_handle_send_messages_request_post_restart_reprocesses() { // reloaded the message but `message_responses` is empty. let mut receiver_manager = setup.create_manager(0); receiver_manager - .store_dkg_message(dealer_addr, &dealer_message) + .store_dkg_message( + receiver_manager.mpc_config.epoch, + dealer_addr, + &dealer_message, + ) .unwrap(); assert!( receiver_manager.dkg_messages.contains_key(&dealer_addr), @@ -4548,6 +4568,7 @@ impl TrackingPublicMessagesStore { impl PublicMessagesStore for TrackingPublicMessagesStore { fn store_dealer_message( &mut self, + _epoch: u64, dealer: &Address, message: &avss::Message, ) -> anyhow::Result<()> { @@ -4574,6 +4595,7 @@ impl PublicMessagesStore for TrackingPublicMessagesStore { fn store_rotation_messages( &mut self, + _epoch: u64, dealer: &Address, messages: &RotationMessages, ) -> anyhow::Result<()> { @@ -4599,6 +4621,7 @@ impl PublicMessagesStore for TrackingPublicMessagesStore { fn store_nonce_message( &mut self, + _epoch: u64, _batch_index: u32, _dealer: &Address, _message: &batch_avss::Message, @@ -6805,13 +6828,14 @@ impl SharedMemoryStore { impl PublicMessagesStore for SharedMemoryStore { fn store_dealer_message( &mut self, + _epoch: u64, dealer: &Address, message: &avss::Message, ) -> anyhow::Result<()> { self.inner .lock() .unwrap() - .store_dealer_message(dealer, message) + .store_dealer_message(0, dealer, message) } fn get_dealer_message( @@ -6828,13 +6852,14 @@ impl PublicMessagesStore for SharedMemoryStore { fn store_rotation_messages( &mut self, + _epoch: u64, dealer: &Address, messages: &RotationMessages, ) -> anyhow::Result<()> { self.inner .lock() .unwrap() - .store_rotation_messages(dealer, messages) + .store_rotation_messages(0, dealer, messages) } fn get_rotation_messages( @@ -6854,6 +6879,7 @@ impl PublicMessagesStore for SharedMemoryStore { fn store_nonce_message( &mut self, + _epoch: u64, batch_index: u32, dealer: &Address, message: &batch_avss::Message, @@ -6861,7 +6887,7 @@ impl PublicMessagesStore for SharedMemoryStore { self.inner .lock() .unwrap() - .store_nonce_message(batch_index, dealer, message) + .store_nonce_message(0, batch_index, dealer, message) } fn get_nonce_message( @@ -6919,7 +6945,7 @@ fn test_dealer_restart_reuses_stored_rotation_messages() { // Create and store rotation messages let msgs = dealer_manager.create_rotation_messages(&dkg_output, &mut rng); dealer_manager - .store_rotation_messages(dealer_addr, &msgs) + .store_rotation_messages(dealer_manager.mpc_config.epoch, dealer_addr, &msgs) .unwrap(); // Return the messages for comparison @@ -7045,7 +7071,7 @@ fn test_party_restart_uses_stored_rotation_messages() { .inner .lock() .unwrap() - .store_rotation_messages(dealer_addr, rotation_msgs) + .store_rotation_messages(0, dealer_addr, rotation_msgs) .unwrap(); } @@ -7220,7 +7246,7 @@ fn test_reconstruct_from_dkg_certificates_with_shifted_party_ids() { Messages::Dkg(m) => m, _ => panic!("Expected DKG message"), }; - store.store_dealer_message(&dealer_addr, msg).unwrap(); + store.store_dealer_message(0, &dealer_addr, msg).unwrap(); } // Create MpcManager for the shifted member with the target committee. @@ -7382,7 +7408,7 @@ fn test_reconstruct_from_dkg_certificates_stops_at_threshold() { let Messages::Dkg(inner) = msg else { unreachable!() }; - store.store_dealer_message(&dealer_addr, inner).unwrap(); + store.store_dealer_message(0, &dealer_addr, inner).unwrap(); } // Create manager for member 4 at target epoch. @@ -7579,7 +7605,7 @@ fn test_reconstruct_from_rotation_certificates_with_shifted_party_ids() { _ => panic!("Expected rotation messages"), }; store - .store_rotation_messages(dealer_addr, rotation_msgs) + .store_rotation_messages(0, dealer_addr, rotation_msgs) .unwrap(); } @@ -8832,7 +8858,13 @@ async fn test_recover_nonce_shares_via_complaint() { create_cheating_nonce_message(&setup, dealer_idx, batch_index, &mut rng); // Store the cheating message in test manager - test_manager.store_nonce_message(dealer_addr, &cheating_messages); + test_manager + .store_nonce_message( + test_manager.mpc_config.epoch, + dealer_addr, + &cheating_messages, + ) + .unwrap(); // Process cheating message → generates complaint test_manager @@ -8937,7 +8969,9 @@ async fn test_run_nonce_generation_with_complaint_recovery() { for (mgr_idx, manager) in managers.iter_mut().enumerate() { if dealer_idx == cheating_dealer_idx && mgr_idx == test_party_idx { // Validator 0 can't sign — just store the message - manager.store_nonce_message(dealer_addr, nonce_msg); + manager + .store_nonce_message(manager.mpc_config.epoch, dealer_addr, nonce_msg) + .unwrap(); continue; } let response = send_and_assert_ok(manager, dealer_addr, &messages); diff --git a/crates/hashi/src/storage/epoch_public_messages_store.rs b/crates/hashi/src/storage/epoch_public_messages_store.rs index c7b1ec044..41666231e 100644 --- a/crates/hashi/src/storage/epoch_public_messages_store.rs +++ b/crates/hashi/src/storage/epoch_public_messages_store.rs @@ -26,11 +26,12 @@ impl EpochPublicMessagesStore { impl PublicMessagesStore for EpochPublicMessagesStore { fn store_dealer_message( &mut self, + epoch: u64, dealer: &Address, message: &avss::Message, ) -> anyhow::Result<()> { self.db - .store_dealer_message(self.epoch, dealer, message) + .store_dealer_message(epoch, dealer, message) .map_err(|e| anyhow::anyhow!("failed to store dealer message: {e}")) } @@ -57,11 +58,12 @@ impl PublicMessagesStore for EpochPublicMessagesStore { fn store_rotation_messages( &mut self, + epoch: u64, dealer: &Address, messages: &RotationMessages, ) -> anyhow::Result<()> { self.db - .store_rotation_messages(self.epoch, dealer, messages) + .store_rotation_messages(epoch, dealer, messages) .map_err(|e| anyhow::anyhow!("failed to store rotation messages: {e}")) } @@ -88,12 +90,13 @@ impl PublicMessagesStore for EpochPublicMessagesStore { fn store_nonce_message( &mut self, + epoch: u64, batch_index: u32, dealer: &Address, message: &batch_avss::Message, ) -> anyhow::Result<()> { self.db - .store_nonce_message(self.epoch, batch_index, dealer, message) + .store_nonce_message(epoch, batch_index, dealer, message) .map_err(|e| anyhow::anyhow!("failed to store nonce message: {e}")) } diff --git a/crates/hashi/src/storage/interfaces.rs b/crates/hashi/src/storage/interfaces.rs index d79d13e48..3acad45c8 100644 --- a/crates/hashi/src/storage/interfaces.rs +++ b/crates/hashi/src/storage/interfaces.rs @@ -10,32 +10,32 @@ pub use crate::mpc::types::Messages; pub use crate::mpc::types::RotationMessages; pub trait PublicMessagesStore: Send + Sync { - /// Store a dealer's DKG message. - /// + /// Store a dealer's DKG message at the given epoch. /// If a message already exists for this dealer, it will be overwritten. - /// Old messages (for epochs < current_epoch - 1) are automatically cleaned up. - fn store_dealer_message(&mut self, dealer: &Address, message: &avss::Message) -> Result<()>; + fn store_dealer_message( + &mut self, + epoch: u64, + dealer: &Address, + message: &avss::Message, + ) -> Result<()>; /// Retrieve a dealer's DKG message for the given epoch. - /// /// Returns None if no message exists for this dealer. fn get_dealer_message(&self, epoch: u64, dealer: &Address) -> Result>; /// List all stored dealer messages for the current epoch. fn list_all_dealer_messages(&self) -> Result>; - /// Store a dealer's rotation messages. - /// + /// Store a dealer's rotation messages at the given epoch. /// If messages already exist for this dealer, they will be overwritten. - /// Old messages (for epochs < current_epoch - 1) are automatically cleaned up. fn store_rotation_messages( &mut self, + epoch: u64, dealer: &Address, messages: &RotationMessages, ) -> Result<()>; /// Retrieve a dealer's rotation messages for the given epoch. - /// /// Returns None if no messages exist for this dealer. fn get_rotation_messages( &self, @@ -46,11 +46,11 @@ pub trait PublicMessagesStore: Send + Sync { /// List all stored rotation messages for the current epoch. fn list_all_rotation_messages(&self) -> Result>; - /// Store a dealer's nonce message for presignature generation. - /// - /// Old messages (for epochs < current_epoch - 1) are automatically cleaned up. + /// Store a dealer's nonce message at the given epoch. + /// If a message already exists for this dealer and batch, it will be overwritten. fn store_nonce_message( &mut self, + epoch: u64, batch_index: u32, dealer: &Address, message: &batch_avss::Message,