From 4b441cd2db3529d9bb16cd2bac271f7a654cda6f Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Thu, 9 Apr 2026 21:54:13 -0700 Subject: [PATCH 1/4] fix!: skip key rotation dealer phase when local previous shares are empty --- crates/hashi/src/mpc/mpc_except_signing.rs | 12 +- .../hashi/src/mpc/mpc_except_signing_tests.rs | 154 +++++++++++++++++- 2 files changed, 161 insertions(+), 5 deletions(-) diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index f1b2edd7f..eb62a47d5 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -75,10 +75,6 @@ const EXPECT_THRESHOLD_VALIDATED: &str = "Threshold already validated"; const EXPECT_THRESHOLD_MET: &str = "Already checked earlier that threshold is met"; const EXPECT_SERIALIZATION_SUCCESS: &str = "Serialization should always succeed"; -// DKG protocol -// 1) A dealer sends out a message to all parties containing the encrypted shares and the public keys of the nonces. -// 2) Each party verifies the message and returns a signature. Once sufficient valid signatures are received from the parties, the dealer sends a certificate to Sui (TOB). -// 3) Once sufficient valid certificates are received, a party completes the protocol locally by aggregating the shares from the dealers. pub struct MpcManager { // Immutable during the epoch pub party_id: PartyId, @@ -546,7 +542,11 @@ impl MpcManager { } } } + // Optimization: a node that fell back to the new-member path has empty + // key shares and cannot generate valid rotation messages. + let has_previous_shares = !previous.key_shares.shares.is_empty(); if is_member_of_previous_committee + && has_previous_shares && { let certified = ordered_broadcast_channel.certified_dealers().await; let mgr = mpc_manager.read().unwrap(); @@ -559,6 +559,10 @@ impl MpcManager { let certified_share_count: usize = certified .iter() .filter_map(|d| { + let messages = mgr.rotation_messages.get(d)?; + if messages.is_empty() { + return None; + } let party_id = prev_committee.index_of(d)? as u16; prev_nodes.share_ids_of(party_id).ok() }) diff --git a/crates/hashi/src/mpc/mpc_except_signing_tests.rs b/crates/hashi/src/mpc/mpc_except_signing_tests.rs index 5da9ab538..d9b63faeb 100644 --- a/crates/hashi/src/mpc/mpc_except_signing_tests.rs +++ b/crates/hashi/src/mpc/mpc_except_signing_tests.rs @@ -5382,7 +5382,16 @@ async fn test_run_key_rotation_skips_dealer_phase() { let prev_output = manager.previous_output.clone().unwrap(); let msgs = manager.create_rotation_messages(&prev_output, &mut rng); let rotation_messages = Messages::Rotation(msgs.clone()); - manager.rotation_messages.insert(addr, msgs); + manager.rotation_messages.insert(addr, msgs.clone()); + // Simulate RPC delivery: test_manager (validator 0) needs to + // know about this dealer's messages so the dealer skip check + // counts them under the robust filter (which excludes + // dealers with empty rotation messages). + test_manager + .write() + .unwrap() + .rotation_messages + .insert(addr, msgs); let own_sig = manager .try_sign_rotation_messages(&prev_output, addr, &rotation_messages) .unwrap(); @@ -5441,6 +5450,149 @@ async fn test_run_key_rotation_skips_dealer_phase() { assert_eq!(new_output.public_key, test_dkg_output.public_key); } +#[tokio::test] +async fn test_run_key_rotation_excludes_empty_messages_from_share_count() { + let mut rng = rand::thread_rng(); + let rotation_setup = RotationTestSetup::new(); + // weights [3, 2, 4, 1, 2] (total = 12, threshold = 4) + + let (mut test_manager, test_dkg_output, _) = + rotation_setup.create_rotation_dealer_with_memory_store(0); + let test_addr = rotation_setup.setup.address(0); + test_manager.source_epoch = rotation_setup.setup.epoch(); + test_manager.previous_output = Some(test_dkg_output.clone()); + + let mut other_managers_map = HashMap::new(); + for i in 1..5 { + let (mut manager, output) = rotation_setup.create_receiver_with_memory_store(i); + manager.previous_output = Some(output); + other_managers_map.insert(rotation_setup.setup.address(i), manager); + } + let mock_p2p = MockP2PChannel::new(other_managers_map, test_addr); + + let validator_2_addr = rotation_setup.setup.address(2); + let validator_1_addr = rotation_setup.setup.address(1); + + let mut rotation_certificates = Vec::new(); + { + let mut other_managers = mock_p2p.managers.lock().unwrap(); + let (empty_rotation_messages, v2_own_sig, epoch) = { + let manager = other_managers.get_mut(&validator_2_addr).unwrap(); + let empty_msgs = BTreeMap::new(); + let rotation_messages = Messages::Rotation(empty_msgs.clone()); + manager + .rotation_messages + .insert(validator_2_addr, empty_msgs); + let prev_output = manager.previous_output.clone().unwrap(); + let own_sig = manager + .try_sign_rotation_messages(&prev_output, validator_2_addr, &rotation_messages) + .unwrap(); + (rotation_messages, own_sig, manager.mpc_config.epoch) + }; + let v2_signer_addr = rotation_setup.setup.address(3); + let v2_signer_sig = { + let signer = other_managers.get_mut(&v2_signer_addr).unwrap(); + let signer_prev = signer.previous_output.clone().unwrap(); + signer + .try_sign_rotation_messages( + &signer_prev, + validator_2_addr, + &empty_rotation_messages, + ) + .unwrap() + }; + let cert = create_rotation_test_certificate( + rotation_setup.setup.committee(), + &empty_rotation_messages, + validator_2_addr, + vec![ + MemberSignature::new(epoch, validator_2_addr, v2_own_sig), + MemberSignature::new(epoch, v2_signer_addr, v2_signer_sig), + ], + ) + .unwrap(); + rotation_certificates.push(CertificateV1::Rotation(cert)); + + // Validator 1: valid rotation messages (weight=2). + let (v1_rotation_messages, v1_own_sig, epoch) = { + let manager = other_managers.get_mut(&validator_1_addr).unwrap(); + let prev_output = manager.previous_output.clone().unwrap(); + let msgs = manager.create_rotation_messages(&prev_output, &mut rng); + let rotation_messages = Messages::Rotation(msgs.clone()); + manager + .rotation_messages + .insert(validator_1_addr, msgs.clone()); + let own_sig = manager + .try_sign_rotation_messages(&prev_output, validator_1_addr, &rotation_messages) + .unwrap(); + (rotation_messages, own_sig, manager.mpc_config.epoch) + }; + let v1_signer_addr = rotation_setup.setup.address(3); + let v1_signer_sig = { + let signer = other_managers.get_mut(&v1_signer_addr).unwrap(); + let signer_prev = signer.previous_output.clone().unwrap(); + if let Messages::Rotation(ref msgs) = v1_rotation_messages { + signer + .rotation_messages + .insert(validator_1_addr, msgs.clone()); + } + signer + .try_sign_rotation_messages(&signer_prev, validator_1_addr, &v1_rotation_messages) + .unwrap() + }; + let cert = create_rotation_test_certificate( + rotation_setup.setup.committee(), + &v1_rotation_messages, + validator_1_addr, + vec![ + MemberSignature::new(epoch, validator_1_addr, v1_own_sig), + MemberSignature::new(epoch, v1_signer_addr, v1_signer_sig), + ], + ) + .unwrap(); + rotation_certificates.push(CertificateV1::Rotation(cert)); + } + + // Populate test_manager's rotation_messages so the filter can inspect them. + { + // Validator 2: empty messages (the key scenario the filter must handle). + test_manager + .rotation_messages + .insert(validator_2_addr, BTreeMap::new()); + // Validator 1: valid messages. + let other_managers = mock_p2p.managers.lock().unwrap(); + let v1_msgs = other_managers[&validator_1_addr] + .rotation_messages + .get(&validator_1_addr) + .unwrap() + .clone(); + test_manager + .rotation_messages + .insert(validator_1_addr, v1_msgs); + } + let test_manager = Arc::new(RwLock::new(test_manager)); + + let mut mock_tob = MockOrderedBroadcastChannel::new(rotation_certificates); + + let new_output = MpcManager::run_key_rotation( + &test_manager, + &rotation_setup.certificates(), + &mock_p2p, + &mut mock_tob, + ) + .await + .unwrap(); + + // Dealer MUST have published: the filter excluded the empty-messages dealer + // (validator 2, weight=4) from the share count, leaving only + // validator 1's 2 shares — below threshold 4. + assert!( + mock_tob.published_count() > 0, + "Dealer phase must run when empty-messages dealers are excluded from share count" + ); + assert_eq!(new_output.public_key, test_dkg_output.public_key); +} + #[tokio::test] async fn test_run_key_rotation_recovers_from_hash_mismatch() { // Test that run_key_rotation_as_party retrieves the correct message and reprocesses From fdd76a1a7b800cdc9e465c0b65f2a14895e2ea00 Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Thu, 9 Apr 2026 22:28:00 -0700 Subject: [PATCH 2/4] fix!: sandbox reconstruction's complaint recovery to a local cache --- crates/hashi/src/mpc/mpc_except_signing.rs | 93 ++++++++++++------- .../hashi/src/mpc/mpc_except_signing_tests.rs | 54 ++++++----- crates/internal-tools/src/key_recovery.rs | 5 +- 3 files changed, 92 insertions(+), 60 deletions(-) diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index eb62a47d5..7cd4d9cc6 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -880,7 +880,7 @@ impl MpcManager { .expect("certificate verified above") }; let epoch = mpc_manager.read().unwrap().mpc_config.epoch; - Self::recover_shares_via_complaint( + let recovered = Self::recover_shares_via_complaint( mpc_manager, &dealer, signers, @@ -888,6 +888,13 @@ impl MpcManager { epoch, ) .await?; + { + let mut mgr = mpc_manager.write().unwrap(); + mgr.dealer_outputs + .insert(DealerOutputsKey::Dkg(dealer), recovered); + mgr.complaints_to_process + .remove(&ComplaintsToProcessKey::Dkg(dealer)); + } } let dealer_weight = { let mgr = mpc_manager.read().unwrap(); @@ -1106,7 +1113,7 @@ impl MpcManager { .expect("certificate verified above") }; let epoch = mpc_manager.read().unwrap().mpc_config.epoch; - Self::recover_rotation_shares_via_complaints( + let recovered = Self::recover_rotation_shares_via_complaints( mpc_manager, &dealer, previous, @@ -1115,6 +1122,15 @@ impl MpcManager { epoch, ) .await?; + { + let mut mgr = mpc_manager.write().unwrap(); + for (share_index, output) in recovered { + mgr.dealer_outputs + .insert(DealerOutputsKey::Rotation(share_index), output); + mgr.complaints_to_process + .remove(&ComplaintsToProcessKey::Rotation(dealer, share_index)); + } + } // Only add indices that have outputs (avoids adding indices for // dealers with empty rotation messages, e.g. a node that rejoined // with no shares from the new-member fallback). @@ -2039,7 +2055,7 @@ impl MpcManager { signers: Vec
, p2p_channel: &impl P2PChannel, epoch: u64, - ) -> MpcResult<()> { + ) -> MpcResult { let (complaint_request, receiver, message) = { let mgr = mpc_manager.read().unwrap(); let complaint = mgr @@ -2102,12 +2118,7 @@ impl MpcManager { }; match result { Ok(partial_output) => { - let mut mgr = mpc_manager.write().unwrap(); - mgr.dealer_outputs - .insert(DealerOutputsKey::Dkg(*dealer), partial_output); - mgr.complaints_to_process - .remove(&ComplaintsToProcessKey::Dkg(*dealer)); - return Ok(()); + return Ok(partial_output); } Err(FastCryptoError::InputTooShort(_)) => { continue; @@ -2229,7 +2240,7 @@ impl MpcManager { signers: Vec
, p2p_channel: &impl P2PChannel, epoch: u64, - ) -> MpcResult<()> { + ) -> MpcResult> { let (request, recovery_contexts) = { let mgr = mpc_manager.read().unwrap(); let Some(RotationComplainContext { @@ -2237,7 +2248,7 @@ impl MpcManager { recovery_contexts, }) = mgr.prepare_rotation_complain_request(dealer, previous_dkg_output, epoch)? else { - return Ok(()); + return Ok(HashMap::new()); }; tracing::info!( "Rotation complaint detected for dealer {:?}, recovering via Complain RPC", @@ -2256,6 +2267,7 @@ impl MpcManager { Vec>, > = HashMap::new(); let mut pending_shares: HashSet = HashSet::new(); + let mut recovered_outputs: HashMap = HashMap::new(); for &share_index in recovery_contexts.keys() { all_responses.insert(share_index, Vec::new()); pending_shares.insert(share_index); @@ -2291,15 +2303,7 @@ impl MpcManager { }; match result { Ok(partial_output) => { - let mut mgr = mpc_manager.write().unwrap(); - mgr.dealer_outputs.insert( - DealerOutputsKey::Rotation(share_index), - partial_output, - ); - mgr.complaints_to_process.remove( - &ComplaintsToProcessKey::Rotation(*dealer, share_index), - ); - drop(mgr); + recovered_outputs.insert(share_index, partial_output); pending_shares.remove(&share_index); } Err(FastCryptoError::InputTooShort(_)) => { @@ -2331,7 +2335,7 @@ impl MpcManager { dealer, pending_shares ))); } - Ok(()) + Ok(recovered_outputs) } fn load_stored_messages(&mut self) -> MpcResult<()> { @@ -2598,16 +2602,21 @@ impl MpcManager { pub fn reconstruct_previous_output( &self, certificates: &[CertificateV1], + complaint_cache: &HashMap, ) -> MpcResult { match certificates.first() { Some(CertificateV1::Dkg(_)) | None => { - self.reconstruct_from_dkg_certificates(certificates) + self.reconstruct_from_dkg_certificates(certificates, complaint_cache) } Some(CertificateV1::Rotation(_)) => { let previous_threshold = self.previous_threshold.ok_or_else(|| { MpcError::InvalidConfig("Key rotation requires previous threshold".into()) })?; - self.reconstruct_from_rotation_certificates(certificates, previous_threshold) + self.reconstruct_from_rotation_certificates( + certificates, + previous_threshold, + complaint_cache, + ) } Some(CertificateV1::NonceGeneration { .. }) => { unreachable!( @@ -2620,6 +2629,7 @@ impl MpcManager { fn reconstruct_from_dkg_certificates( &self, certificates: &[CertificateV1], + complaint_cache: &HashMap, ) -> MpcResult { let previous_committee = self.previous_committee.clone().ok_or_else(|| { MpcError::InvalidConfig("DKG reconstruction requires previous committee".into()) @@ -2676,11 +2686,7 @@ impl MpcManager { let session_id = source_session_id .dealer_session_id(&dealer_address) .to_vec(); - // Check for previously recovered output (from complaint recovery on a prior attempt). - if let Some(output) = self - .dealer_outputs - .get(&DealerOutputsKey::Dkg(dealer_address)) - { + if let Some(output) = complaint_cache.get(&DealerOutputsKey::Dkg(dealer_address)) { outputs.insert(dealer_party_id, output.clone()); let dealer_weight = previous_nodes .weight_of(dealer_party_id) @@ -2750,6 +2756,7 @@ impl MpcManager { &self, certificates: &[CertificateV1], previous_threshold: u16, + complaint_cache: &HashMap, ) -> MpcResult { let previous_nodes = self.previous_nodes.clone().ok_or_else(|| { MpcError::InvalidConfig("Rotation reconstruction requires previous nodes".into()) @@ -2797,13 +2804,10 @@ impl MpcManager { ))); } for (share_index, message) in rotation_msgs { - // Check for previously recovered output (from complaint recovery on a prior attempt). - if let Some(output) = self - .dealer_outputs - .get(&DealerOutputsKey::Rotation(share_index)) + if let Some(output) = complaint_cache.get(&DealerOutputsKey::Rotation(share_index)) { tracing::info!( - "reconstruct_from_rotation_certificates: cache hit for \ + "reconstruct_from_rotation_certificates: complaint cache hit for \ dealer {:?} share_index={share_index}", dealer_address, ); @@ -3024,12 +3028,14 @@ impl MpcManager { previous_certificates: &[CertificateV1], p2p_channel: &impl P2PChannel, ) -> MpcResult { + let mut complaint_cache: HashMap = HashMap::new(); loop { let mgr = Arc::clone(mpc_manager); let certs = previous_certificates.to_vec(); + let cache_snapshot = complaint_cache.clone(); match spawn_blocking(move || { let mgr = mgr.read().unwrap(); - mgr.reconstruct_previous_output(&certs) + mgr.reconstruct_previous_output(&certs, &cache_snapshot) }) .await? { @@ -3089,7 +3095,7 @@ impl MpcManager { match protocol_type { ProtocolTypeIndicator::Dkg => { let source_epoch = mpc_manager.read().unwrap().source_epoch; - Self::recover_shares_via_complaint( + let recovered = Self::recover_shares_via_complaint( mpc_manager, &dealer_address, signers, @@ -3097,6 +3103,13 @@ impl MpcManager { source_epoch, ) .await?; + complaint_cache + .insert(DealerOutputsKey::Dkg(dealer_address), recovered); + mpc_manager + .write() + .unwrap() + .complaints_to_process + .remove(&ComplaintsToProcessKey::Dkg(dealer_address)); } ProtocolTypeIndicator::KeyRotation => { let (previous_output, source_epoch) = { @@ -3108,7 +3121,7 @@ impl MpcManager { mgr.source_epoch, ) }; - Self::recover_rotation_shares_via_complaints( + let recovered = Self::recover_rotation_shares_via_complaints( mpc_manager, &dealer_address, &previous_output, @@ -3117,6 +3130,14 @@ impl MpcManager { source_epoch, ) .await?; + let mut mgr = mpc_manager.write().unwrap(); + for (share_index, output) in recovered { + complaint_cache + .insert(DealerOutputsKey::Rotation(share_index), output); + mgr.complaints_to_process.remove( + &ComplaintsToProcessKey::Rotation(dealer_address, share_index), + ); + } } ProtocolTypeIndicator::NonceGeneration => {} } diff --git a/crates/hashi/src/mpc/mpc_except_signing_tests.rs b/crates/hashi/src/mpc/mpc_except_signing_tests.rs index d9b63faeb..cf90ce876 100644 --- a/crates/hashi/src/mpc/mpc_except_signing_tests.rs +++ b/crates/hashi/src/mpc/mpc_except_signing_tests.rs @@ -3263,16 +3263,17 @@ async fn test_recover_shares_via_complaint_succeeds_with_exact_threshold() { "Recovery should succeed: {:?}", result.err() ); + let _recovered_output = result.unwrap(); let mgr = party_manager.read().unwrap(); - // DKG: outputs keyed by dealer address assert!( - mgr.dealer_outputs - .contains_key(&DealerOutputsKey::Dkg(dealer_addr)) + !mgr.dealer_outputs + .contains_key(&DealerOutputsKey::Dkg(dealer_addr)), + "recover_shares_via_complaint must not touch the global dealer_outputs" ); assert!( - !mgr.complaints_to_process + mgr.complaints_to_process .contains_key(&ComplaintsToProcessKey::Dkg(dealer_addr)), - "Complaint should be cleared after successful recovery" + "recover_shares_via_complaint must leave the complaint for the caller to clear atomically" ); } @@ -3341,16 +3342,17 @@ async fn test_recover_shares_via_complaint_skips_failed_signers() { "Recovery should succeed despite failed signer: {:?}", result.err() ); + let _recovered_output = result.unwrap(); let mgr = party_manager.read().unwrap(); - // DKG: outputs keyed by dealer address assert!( - mgr.dealer_outputs - .contains_key(&DealerOutputsKey::Dkg(dealer_addr)) + !mgr.dealer_outputs + .contains_key(&DealerOutputsKey::Dkg(dealer_addr)), + "recover_shares_via_complaint must not touch the global dealer_outputs" ); assert!( - !mgr.complaints_to_process + mgr.complaints_to_process .contains_key(&ComplaintsToProcessKey::Dkg(dealer_addr)), - "Complaint should be cleared after successful recovery" + "recover_shares_via_complaint must leave the complaint for the caller to clear atomically" ); } @@ -6558,23 +6560,25 @@ async fn test_recover_rotation_shares_via_complaint_success() { result.err() ); - // Verify complaint was removed + let recovered = result.unwrap(); + assert!( + recovered.contains_key(&first_share_index), + "Recovered output should be returned for the share index" + ); { let mgr = test_manager.read().unwrap(); assert!( - !mgr.complaints_to_process + !mgr.dealer_outputs + .contains_key(&DealerOutputsKey::Rotation(first_share_index)), + "recover_rotation_shares_via_complaints must not touch the global dealer_outputs" + ); + assert!( + mgr.complaints_to_process .contains_key(&ComplaintsToProcessKey::Rotation( dealer_addr, first_share_index )), - "Complaint should be removed after successful recovery" - ); - - // Verify output was created (Rotation: outputs keyed by share index) - assert!( - mgr.dealer_outputs - .contains_key(&DealerOutputsKey::Rotation(first_share_index)), - "Output should be created for recovered share" + "recover_rotation_shares_via_complaints must leave the complaint for the caller to clear atomically" ); } } @@ -7180,7 +7184,7 @@ fn test_reconstruct_from_dkg_certificates_with_shifted_party_ids() { // if previous committee parameters were not used for decryption. let reconstructed = unwrap_reconstruction_success( manager - .reconstruct_from_dkg_certificates(&certificates) + .reconstruct_from_dkg_certificates(&certificates, &HashMap::new()) .unwrap(), ); @@ -7331,7 +7335,7 @@ fn test_reconstruct_from_dkg_certificates_stops_at_threshold() { // and produces key_threshold. let reconstructed = unwrap_reconstruction_success( manager - .reconstruct_from_dkg_certificates(&certificates) + .reconstruct_from_dkg_certificates(&certificates, &HashMap::new()) .unwrap(), ); @@ -7541,7 +7545,11 @@ fn test_reconstruct_from_rotation_certificates_with_shifted_party_ids() { // This would panic with index-out-of-bounds if previous committee parameters were not used for decryption. let reconstructed = unwrap_reconstruction_success( manager - .reconstruct_from_rotation_certificates(&rotation_certificates, previous_threshold) + .reconstruct_from_rotation_certificates( + &rotation_certificates, + previous_threshold, + &HashMap::new(), + ) .unwrap(), ); diff --git a/crates/internal-tools/src/key_recovery.rs b/crates/internal-tools/src/key_recovery.rs index a697065df..8bd8f846d 100644 --- a/crates/internal-tools/src/key_recovery.rs +++ b/crates/internal-tools/src/key_recovery.rs @@ -133,8 +133,11 @@ pub async fn run(args: Args, onchain_state: &OnchainState, chain_id: &str) -> an // the on-chain epoch may have advanced past the backup. manager.set_source_epoch(source_epoch); + // Pass an empty complaint cache: the recovery tool runs reconstruction + // once without retrying through complaint recovery, so there are no + // recovered outputs to reuse across attempts. let outcome = manager - .reconstruct_previous_output(&certificates) + .reconstruct_previous_output(&certificates, &std::collections::HashMap::new()) .map_err(|e| anyhow!("reconstruction failed: {e}"))?; match outcome { From d1699b42539289fbe135f5e84baf202d450e5d8e Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Thu, 9 Apr 2026 23:44:36 -0700 Subject: [PATCH 3/4] fix!: re-process send_messages on post-restart cache miss --- crates/hashi/src/mpc/mpc_except_signing.rs | 12 +-- .../hashi/src/mpc/mpc_except_signing_tests.rs | 84 +++++++++++++++++-- 2 files changed, 84 insertions(+), 12 deletions(-) diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index 7cd4d9cc6..acc1300d2 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -94,7 +94,8 @@ pub struct MpcManager { previous_output: Option, pub batch_size_per_weight: u16, - // Mutable during the epoch + // TODO: Rename these fields so it is clear at the call site which are + // backed by persistent store and which live only in memory. pub dealer_outputs: HashMap, pub dkg_messages: HashMap, pub rotation_messages: HashMap, @@ -254,11 +255,10 @@ impl MpcManager { if let Some(response) = self.message_responses.get(&sender) { return Ok(response.clone()); } - return Err(MpcError::InvalidMessage { - sender, - reason: "Message previously received but no valid response was produced" - .to_string(), - }); + tracing::info!( + "handle_send_messages_request: existing message from {sender:?} but no \ + cached response (e.g. post-restart), re-processing" + ); } let signature = match &request.messages { Messages::Dkg(msg) => { diff --git a/crates/hashi/src/mpc/mpc_except_signing_tests.rs b/crates/hashi/src/mpc/mpc_except_signing_tests.rs index cf90ce876..aaabe8e2e 100644 --- a/crates/hashi/src/mpc/mpc_except_signing_tests.rs +++ b/crates/hashi/src/mpc/mpc_except_signing_tests.rs @@ -1418,10 +1418,15 @@ async fn test_run_dkg() { "Should have commitments equal to total weight" ); - // Verify all certificates were consumed from the TOB channel (only threshold needed) + // Verify the party phase consumed exactly threshold-many certs from TOB. + // run_dkg_as_dealer first publishes test_manager's own cert into the TOB + // (peers' handle_send_messages_request re-processes and returns valid + // signatures via the post-restart recovery path), so the TOB starts the + // party phase with `other_certificates_len + 1` messages. The party phase + // then consumes threshold-many to satisfy the dealer-weight check. assert_eq!( mock_tob.pending_messages(), - Some(other_certificates_len - threshold as usize), + Some((other_certificates_len + 1) - threshold as usize), "TOB should have consumed exactly threshold certificates" ); @@ -4302,16 +4307,20 @@ async fn test_handle_send_messages_request_invalid_shares_no_panic_on_retry() { _ => panic!("Expected InvalidMessage error"), } - // Second call: same message - should return error with "previously rejected" message + // Second call: same message — re-processes and returns the same + // "Invalid shares" error. The point of this test is that the second + // call must NOT panic; the specific error message is allowed to be + // either the original Complaint reason or a short-circuit reason. let result2 = receiver_manager.handle_send_messages_request(dealer_addr, &request); assert!(result2.is_err(), "Second call should also return error"); match result2.unwrap_err() { MpcError::InvalidMessage { sender, reason } => { assert_eq!(sender, dealer_addr); assert!( - reason.contains("previously received but no valid response"), - "Second call should indicate message was previously received with no valid response, got: {}", - reason + reason.contains("Invalid shares") + || reason.contains("previously received but no valid response"), + "Second call should return either the original Complaint error or a \ + 'previously received' short-circuit, got: {reason}" ); } _ => panic!("Expected InvalidMessage error"), @@ -4348,6 +4357,69 @@ async fn test_handle_send_messages_request_invalid_shares_no_panic_on_retry() { ); } +#[tokio::test] +async fn test_handle_send_messages_request_post_restart_reprocesses() { + let mut rng = rand::thread_rng(); + let setup = TestSetup::new(5); + + // Create dealer (party 1) and a valid DKG message. + let dealer_addr = setup.address(1); + let dealer_manager = setup.create_dealer_with_message(1, &mut rng); + let dealer_message = dealer_manager + .dkg_messages + .get(&dealer_addr) + .expect("dealer should have stored its own message") + .clone(); + let messages = Messages::Dkg(dealer_message.clone()); + + // Create receiver (party 0) and pre-populate `dkg_messages` to + // simulate the post-restart state where `load_stored_messages` has + // reloaded the message but `message_responses` is empty. + let mut receiver_manager = setup.create_manager(0); + receiver_manager + .store_dkg_message(dealer_addr, &dealer_message) + .unwrap(); + assert!( + receiver_manager.dkg_messages.contains_key(&dealer_addr), + "precondition: stored message present" + ); + assert!( + !receiver_manager + .message_responses + .contains_key(&dealer_addr), + "precondition: no cached response" + ); + + // Peer retries send_messages post-restart. + let request = SendMessagesRequest { messages }; + let response = receiver_manager + .handle_send_messages_request(dealer_addr, &request) + .expect("post-restart re-processing should succeed for a valid message"); + assert!( + !response.signature.as_ref().is_empty(), + "should return a non-empty BLS signature" + ); + + // After re-processing, the response is now cached in memory. + assert!( + receiver_manager + .message_responses + .contains_key(&dealer_addr), + "response should be cached after successful re-processing" + ); + + // Subsequent retries return the cached response (no further + // re-processing). + let response2 = receiver_manager + .handle_send_messages_request(dealer_addr, &request) + .unwrap(); + assert_eq!( + response.signature.as_ref(), + response2.signature.as_ref(), + "cached response should be returned on subsequent retries" + ); +} + #[tokio::test] async fn test_retrieve_stores_invalid_message_for_later_complaint() { // retrieve_dealer_message should store without validation From 98a1b02a11a9ee0c92cb3204c472953f406026b3 Mon Sep 17 00:00:00 2001 From: Zhou Fang Date: Fri, 10 Apr 2026 13:56:12 -0700 Subject: [PATCH 4/4] fix!: rebuild the `MpcManager` from persistent store on every retry --- crates/hashi/src/mpc/mpc_except_signing.rs | 2 ++ crates/hashi/src/mpc/service.rs | 40 ++++++++-------------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index acc1300d2..b585376ba 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -1401,6 +1401,8 @@ impl MpcManager { Ok(()) } + // TODO: Change return type to `MpcResult<()>` and propagate disk errors + // (mirroring `store_dkg_message` and `store_rotation_messages`). fn store_nonce_message(&mut self, dealer: Address, nonce: &NonceMessage) { self.nonce_messages.insert(dealer, nonce.clone()); if let Err(e) = self.public_messages_store.store_nonce_message( diff --git a/crates/hashi/src/mpc/service.rs b/crates/hashi/src/mpc/service.rs index cd02efdb2..308c88229 100644 --- a/crates/hashi/src/mpc/service.rs +++ b/crates/hashi/src/mpc/service.rs @@ -39,7 +39,7 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(10); const RPC_TIMEOUT: Duration = Duration::from_secs(5); const MAX_PROTOCOL_ATTEMPTS: u32 = 3; const START_RECONFIG_POLL_INTERVAL: Duration = Duration::from_millis(500); -const MPC_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(1800); +const MPC_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(1200); #[derive(Clone)] pub struct MpcHandle { @@ -532,8 +532,6 @@ impl MpcService { } async fn handle_reconfig(&self, target_epoch: u64) { - // Determine whether this is an initial DKG or a key rotation - // based on if we already have a committed mpc_public_key. let run_dkg = self .inner .onchain_state() @@ -542,33 +540,25 @@ impl MpcService { .committees .mpc_public_key() .is_empty(); - - info!("handle_reconfig: epoch={target_epoch}, run_dkg={run_dkg}",); - // Create the MpcManager once before the retry loop so retries reuse - // the same manager (and its accumulated messages) instead of generating - // fresh random dealer messages that conflict with previously sent ones. - if run_dkg { - if let Err(e) = self.setup_initial_dkg(target_epoch) { - error!( - "Failed to set up initial DKG for epoch {}: {e}", - target_epoch - ); - return; - } - } else if let Err(e) = self.setup_key_rotation(target_epoch) { - error!( - "Failed to set up key rotation for epoch {}: {e}", - target_epoch - ); - return; - } - - info!("handle_reconfig: setup complete for epoch {target_epoch}, entering retry loop",); + info!("handle_reconfig: epoch={target_epoch}, run_dkg={run_dkg}, entering retry loop",); let output = loop { if self.get_pending_epoch_change() != Some(target_epoch) { info!("handle_reconfig: epoch {target_epoch} no longer pending, aborting",); return; } + let setup_result = if run_dkg { + self.setup_initial_dkg(target_epoch) + } else { + self.setup_key_rotation(target_epoch) + }; + if let Err(e) = setup_result { + error!( + "Failed to set up MPC manager for epoch {}: {e}, retrying...", + target_epoch + ); + self.sleep_if_still_pending(target_epoch).await; + continue; + } let result = if run_dkg { tokio::time::timeout(MPC_PROTOCOL_TIMEOUT, self.run_dkg(target_epoch)) .await