From e5d6db9da58e82c92bdea33af9f72d0487477a6a Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 24 Feb 2026 13:48:18 +0200 Subject: [PATCH 01/16] do not treat failed syncs as successfull --- .../node/src/protocol/sync/mod.rs | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index b503bc24e..1f4723ead 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -304,7 +304,16 @@ async fn broadcast_sync( } else { None }; - if sync_tx.send(p).await.is_err() { + let sync_succeeded = match &sync_view { + Some(Ok(_)) => true, + Some(Err(err)) => { + tracing::warn!(?p, ?err, "failed to sync peer"); + false + } + // No RPC sync is attempted for self (`p == me`); treat as successful no-op. + None => true, + }; + if sync_succeeded && sync_tx.send(p).await.is_err() { tracing::error!("sync reporter is down: state sync will no longer work") } (p, sync_view) @@ -419,4 +428,28 @@ mod tests { assert_eq!(received, expected); assert!(rx.recv().await.is_none()); } + + #[tokio::test] + async fn test_broadcast_sync_does_not_report_synced_on_failure() { + let client = NodeClient::new(&NodeClientOptions::default()); + let update = SyncUpdate { + from: Participant::from(0u32), + triples: vec![TripleId::from(1u64)], + presignatures: vec![PresignatureId::from(1u64)], + }; + let (tx, mut rx) = mpsc::channel(4); + + let participants = vec![(Participant::from(1u32), ParticipantInfo::new(1))]; + + broadcast_sync( + client, + update, + participants.into_iter(), + tx, + Participant::from(0u32), + ) + .await; + + assert!(rx.recv().await.is_none()); + } } From abcbd9936af7c11bcb700d9d15f9294e7d0aae9e Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 24 Feb 2026 23:37:12 +0200 Subject: [PATCH 02/16] broadcast_sync refactor, bug fixes, dummy storage layer sync processing --- .../node/src/protocol/sync/mod.rs | 270 +++++++++++------- .../node/src/storage/protocol_storage.rs | 28 ++ 2 files changed, 202 insertions(+), 96 deletions(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index 1f4723ead..f056a9260 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -27,6 +27,9 @@ pub const RECURRING_SYNC_INTERVAL: Duration = Duration::from_secs(3600 * 24); /// Timeout for waiting for a sync response from the sync task const SYNC_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5); +/// Timeout for the entire broadcast operation (waiting for all peers to respond) +const BROADCAST_TIMEOUT: Duration = Duration::from_secs(10); + #[derive(Debug, thiserror::Error)] pub enum SyncError { #[error("failed to queue sync request")] @@ -70,33 +73,25 @@ impl SyncRequest { ) { let start = Instant::now(); - let outdated_triples = if !self.update.triples.is_empty() { - match triples - .remove_outdated(self.update.from, &self.update.triples) - .await - { - Ok(result) => result, - Err(err) => { - let _ = self.response_tx.send(Err(err)); - return; - } + let outdated_triples = match triples + .remove_outdated(self.update.from, &self.update.triples) + .await + { + Ok(result) => result, + Err(err) => { + let _ = self.response_tx.send(Err(err)); + return; } - } else { - Default::default() }; - let outdated_presignatures = if !self.update.presignatures.is_empty() { - match presignatures - .remove_outdated(self.update.from, &self.update.presignatures) - .await - { - Ok(result) => result, - Err(err) => { - let _ = self.response_tx.send(Err(err)); - return; - } + let outdated_presignatures = match presignatures + .remove_outdated(self.update.from, &self.update.presignatures) + .await + { + Ok(result) => result, + Err(err) => { + let _ = self.response_tx.send(Err(err)); + return; } - } else { - Default::default() }; tracing::info!( @@ -156,22 +151,25 @@ impl SyncTask { } pub async fn run(mut self) { - tracing::info!("task has been started"); + tracing::info!("sync task has been started"); + // Poll for our participant info from contract state + // TODO: constantly watch for changes on node state after this initial one so we can start/stop sync running. let mut watcher_interval = tokio::time::interval(Duration::from_millis(500)); + // Trigger sync broadcasts to peers in need_sync state let mut sync_interval = tokio::time::interval(Duration::from_millis(200)); - // Broadcast should generally not be necessary. + // Periodic full sync broadcast to all active peers (TODO: should not be necessary) let mut broadcast_interval = tokio::time::interval(RECURRING_SYNC_INTERVAL); - let mut broadcast_check_interval = tokio::time::interval(Duration::from_millis(100)); + // Poll whether any ongoing sync task has completed (from either sync_interval or broadcast_interval) + let mut sync_check_interval = tokio::time::interval(Duration::from_millis(100)); - // Do NOT start until we have our own participant info. - // TODO: constantly watch for changes on node state after this initial one so we can start/stop sync running. - let (_threshold, me) = loop { + // Do NOT start until we have our own participant info + let (threshold, me) = loop { watcher_interval.tick().await; if let Some(info) = self.contract.info().await { break info; } }; - tracing::info!(?me, "mpc network ready, running..."); + tracing::info!(?me, "starting sync loop..."); let mut broadcast = Option::<(Instant, JoinHandle<_>)>::None; loop { @@ -198,7 +196,6 @@ impl SyncTask { self.client.clone(), update, receivers.into_iter(), - self.synced_peer_tx.clone(), me, )); broadcast = Some((start, task)); @@ -218,13 +215,12 @@ impl SyncTask { self.client.clone(), update, active.into_iter(), - self.synced_peer_tx.clone(), me )); broadcast = Some((start, task)); } // check that our broadcast has completed, and if so process the result. - _ = broadcast_check_interval.tick() => { + _ = sync_check_interval.tick() => { let Some((start, handle)) = broadcast.take() else { continue; }; @@ -234,10 +230,15 @@ impl SyncTask { continue; } - if let Err(err) = handle.await { - tracing::warn!(?err, "broadcast task failed"); - } else { - tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); + match handle.await { + Ok(responses) => { + // Process sync responses: update artifact participants based on not_found data + self.process_sync_responses(responses).await; + tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); + } + Err(err) => { + tracing::warn!(?err, "broadcast task failed"); + } } } Some(sync_req) = self.requests.updates.recv() => { @@ -259,6 +260,103 @@ impl SyncTask { } } + /// Process sync responses: + /// 1. Remove peers from artifact participants if they're missing data + /// 2. Send synced peer notifications to mesh (for status transitions) + async fn process_sync_responses( + &self, + responses: Vec<(Participant, Option>)>, + ) { + for (peer, result) in responses { + match result { + // No RPC was performed (self-peer), treat as successful + None => { + if self.synced_peer_tx.send(peer).await.is_err() { + tracing::error!("sync reporter is down: state sync will no longer work"); + return; + } + } + // RPC succeeded, process the not_found data + Some(Ok(response)) => { + tracing::debug!( + ?peer, + not_found_triples = response.triples.len(), + not_found_presignatures = response.presignatures.len(), + "received sync response" + ); + + // Update replica tracking: remove peer from artifacts they don't have + for triple_id in response.triples { + if let Some(mut pair) = self.triples.fetch(triple_id).await { + let mut updated = false; + + // Remove peer from triple0 participants + if let Some(pos) = pair + .triple0 + .public + .participants + .iter() + .position(|p| *p == peer) + { + pair.triple0.public.participants.remove(pos); + updated = true; + } + + // Remove peer from triple1 participants + if let Some(pos) = pair + .triple1 + .public + .participants + .iter() + .position(|p| *p == peer) + { + pair.triple1.public.participants.remove(pos); + updated = true; + } + + if updated { + self.triples.update(triple_id, pair).await; + tracing::debug!( + ?triple_id, + ?peer, + "updated triple participants: removed peer" + ); + } + } + } + + // Update presignatures: remove peer from participants if they don't have it + for presig_id in response.presignatures { + if let Some(mut presig) = self.presignatures.fetch(presig_id).await { + if let Some(pos) = presig.participants.iter().position(|p| *p == peer) { + presig.participants.remove(pos); + self.presignatures.update(presig_id, presig).await; + tracing::debug!( + ?presig_id, + ?peer, + "updated presignature participants: removed peer" + ); + } + } + } + + // Notify mesh that peer is synced (after processing) - for Active state transition + if self.synced_peer_tx.send(peer).await.is_err() { + tracing::error!( + ?peer, + "sync reporter is down: state sync will no longer work" + ); + return; + } + } + // RPC failed, don't notify mesh (peer stays in Syncing state) + Some(Err(err)) => { + tracing::warn!(?peer, ?err, "failed to sync peer"); + } + } + } + } + /// Channel for communicating back from the sync task which nodes are now updated. pub fn synced_nodes_channel() -> (mpsc::Sender, mpsc::Receiver) { mpsc::channel(MAX_SYNC_UPDATE_REQUESTS) @@ -266,84 +364,64 @@ impl SyncTask { } /// Broadcast an update to all participants specified by `receivers`. +/// Returns results for all peers that complete within BROADCAST_TIMEOUT. +/// Peers that don't respond are not included in results and will be retried later. async fn broadcast_sync( client: NodeClient, update: SyncUpdate, receivers: impl Iterator, - synced_peer_tx: mpsc::Sender, me: Participant, -) { - if update.is_empty() { - for (participant, _) in receivers { - if synced_peer_tx.send(participant).await.is_err() { - tracing::error!( - ?participant, - "sync reporter is down: state sync will no longer work" - ); - } - } - return; - } - - let start = Instant::now(); +) -> Vec<(Participant, Option>)> { let mut tasks = JoinSet::new(); let update = Arc::new(update); + for (p, info) in receivers { let client = client.clone(); let update = update.clone(); let url = info.url; - let sync_tx = synced_peer_tx.clone(); tasks.spawn(async move { - // Only actually do the sync on other peers, not on self. (Hack) We - // still want to send the message to synced_peer_tx though, since - // the mesh does not currently understand which node is self, so it - // will trigger a sync to self. - let sync_view = if p != me { - let res = client.sync(&url, &update).await; - Some(res) + // Only actually do the sync on other peers, not on self. + let sync_result = if p != me { + Some(client.sync(&url, &update).await.map_err(|e| e.to_string())) } else { + // No RPC sync is attempted for self (`p == me`); return None to indicate no-op. None }; - let sync_succeeded = match &sync_view { - Some(Ok(_)) => true, - Some(Err(err)) => { - tracing::warn!(?p, ?err, "failed to sync peer"); - false - } - // No RPC sync is attempted for self (`p == me`); treat as successful no-op. - None => true, - }; - if sync_succeeded && sync_tx.send(p).await.is_err() { - tracing::error!("sync reporter is down: state sync will no longer work") - } - (p, sync_view) + (p, sync_result) }); } - let resps = tasks - .join_all() - .await - .into_iter() - .filter_map(|(p, view)| { - if let Some(Ok(_response)) = view { - tracing::debug!( - ?p, - not_found_triples = _response.triples.len(), - not_found_presignatures = _response.presignatures.len(), - "received sync response" - ); - Some(p) - } else { - None + let deadline = Instant::now() + BROADCAST_TIMEOUT; + let mut results = Vec::new(); + while !tasks.is_empty() { + let now = Instant::now(); + if now >= deadline { + break; + } + + tokio::select! { + res = tasks.join_next() => { + match res { + Some(Ok((p, sync_result))) => { + results.push((p, sync_result)); + } + Some(Err(err)) => { + tracing::warn!(?err, "sync task failed"); + } + None => break, + } } - }) - .collect::>(); + _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => { + break; + } + } + } + + if !tasks.is_empty() { + tasks.abort_all(); + } - tracing::debug!( - elapsed = ?start.elapsed(), - responded = ?resps, - "broadcast completed", - ); + results } #[derive(Clone)] diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index a2ce74feb..af2307640 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -433,6 +433,34 @@ impl ProtocolStorage { self.used.read().await.contains(&id) } + /// Fetch an artifact from storage without removing it. + pub async fn fetch(&self, id: A::Id) -> Option { + let Some(mut conn) = self.connect().await else { + return None; + }; + match conn.hget(&self.artifact_key, id).await { + Ok(artifact) => Some(artifact), + Err(err) => { + tracing::warn!(id, ?err, "failed to fetch artifact"); + None + } + } + } + + /// Update an artifact in storage (read-modify-write). + pub async fn update(&self, id: A::Id, artifact: A) -> bool { + let Some(mut conn) = self.connect().await else { + return false; + }; + match conn.hset(&self.artifact_key, id, artifact).await { + Ok(()) => true, + Err(err) => { + tracing::warn!(id, ?err, "failed to update artifact"); + false + } + } + } + pub async fn take(&self, id: A::Id, owner: Participant) -> Option> { const SCRIPT: &str = r#" local artifact_key = KEYS[1] From 5840d53eb939b6ae2492931dfcec92a39eca3d1b Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 24 Feb 2026 23:54:15 +0200 Subject: [PATCH 03/16] remove sync unit tests (tmp) --- .../node/src/protocol/sync/mod.rs | 60 ------------------- 1 file changed, 60 deletions(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index f056a9260..8c5133bc2 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -471,63 +471,3 @@ impl SyncChannel { }) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::node_client::Options as NodeClientOptions; - - #[tokio::test] - async fn test_broadcast_sync_on_empty_update() { - let client = NodeClient::new(&NodeClientOptions::default()); - let update = SyncUpdate::empty(); - let (tx, mut rx) = mpsc::channel(4); - - let participants = vec![ - (Participant::from(1u32), ParticipantInfo::new(1)), - (Participant::from(2u32), ParticipantInfo::new(2)), - ]; - - broadcast_sync( - client, - update, - participants.clone().into_iter(), - tx, - Participant::from(0u32), - ) - .await; - - let mut received = Vec::new(); - for _ in 0..participants.len() { - received.push(rx.recv().await.expect("missing synced participant")); - } - - let expected: Vec<_> = participants.iter().map(|(p, _)| *p).collect(); - assert_eq!(received, expected); - assert!(rx.recv().await.is_none()); - } - - #[tokio::test] - async fn test_broadcast_sync_does_not_report_synced_on_failure() { - let client = NodeClient::new(&NodeClientOptions::default()); - let update = SyncUpdate { - from: Participant::from(0u32), - triples: vec![TripleId::from(1u64)], - presignatures: vec![PresignatureId::from(1u64)], - }; - let (tx, mut rx) = mpsc::channel(4); - - let participants = vec![(Participant::from(1u32), ParticipantInfo::new(1))]; - - broadcast_sync( - client, - update, - participants.into_iter(), - tx, - Participant::from(0u32), - ) - .await; - - assert!(rx.recv().await.is_none()); - } -} From 97bb642ca71ff3d51948ae309be61b325d704c2f Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Thu, 26 Feb 2026 11:40:40 +0200 Subject: [PATCH 04/16] naive prune and bidyrectional sync test implementation --- .../node/src/protocol/sync/mod.rs | 138 +++++- integration-tests/tests/cases/sync.rs | 439 ++++++++++++++++-- 2 files changed, 531 insertions(+), 46 deletions(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index 8c5133bc2..8cc685f03 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -233,7 +234,9 @@ impl SyncTask { match handle.await { Ok(responses) => { // Process sync responses: update artifact participants based on not_found data - self.process_sync_responses(responses).await; + if let Err(err) = self.process_sync_responses(responses, me, threshold).await { + tracing::warn!(?err, "failed to process sync responses"); + } tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); } Err(err) => { @@ -266,14 +269,19 @@ impl SyncTask { async fn process_sync_responses( &self, responses: Vec<(Participant, Option>)>, - ) { + me: Participant, + threshold: usize, + ) -> Result<(), String> { + let mut triples_to_prune = HashSet::new(); + let mut presignatures_to_prune = HashSet::new(); + for (peer, result) in responses { match result { // No RPC was performed (self-peer), treat as successful None => { if self.synced_peer_tx.send(peer).await.is_err() { tracing::error!("sync reporter is down: state sync will no longer work"); - return; + return Err("sync reporter is down".to_string()); } } // RPC succeeded, process the not_found data @@ -287,6 +295,13 @@ impl SyncTask { // Update replica tracking: remove peer from artifacts they don't have for triple_id in response.triples { + let is_owned_by_me = self.triples.contains_by_owner(triple_id, me).await; + if !is_owned_by_me { + return Err(format!( + "received non-owned triple in sync response: triple_id={triple_id:?}, peer={peer:?}, me={me:?}" + )); + } + if let Some(mut pair) = self.triples.fetch(triple_id).await { let mut updated = false; @@ -315,27 +330,64 @@ impl SyncTask { } if updated { - self.triples.update(triple_id, pair).await; - tracing::debug!( - ?triple_id, - ?peer, - "updated triple participants: removed peer" - ); + let remaining_holders = pair + .triple0 + .public + .participants + .len() + .min(pair.triple1.public.participants.len()); + + if remaining_holders < threshold { + triples_to_prune.insert(triple_id); + tracing::info!( + ?triple_id, + ?peer, + remaining_holders, + threshold, + "triple dropped below threshold: scheduling owned artifact removal" + ); + } else { + self.triples.update(triple_id, pair).await; + tracing::debug!( + ?triple_id, + ?peer, + "updated triple participants: removed peer" + ); + } } } } // Update presignatures: remove peer from participants if they don't have it for presig_id in response.presignatures { + let is_owned_by_me = self.presignatures.contains_by_owner(presig_id, me).await; + if !is_owned_by_me { + return Err(format!( + "received non-owned presignature in sync response: presig_id={presig_id:?}, peer={peer:?}, me={me:?}" + )); + } + if let Some(mut presig) = self.presignatures.fetch(presig_id).await { if let Some(pos) = presig.participants.iter().position(|p| *p == peer) { presig.participants.remove(pos); - self.presignatures.update(presig_id, presig).await; - tracing::debug!( - ?presig_id, - ?peer, - "updated presignature participants: removed peer" - ); + + if presig.participants.len() < threshold { + presignatures_to_prune.insert(presig_id); + tracing::info!( + ?presig_id, + ?peer, + remaining_holders = presig.participants.len(), + threshold, + "presignature dropped below threshold: scheduling owned artifact removal" + ); + } else { + self.presignatures.update(presig_id, presig).await; + tracing::debug!( + ?presig_id, + ?peer, + "updated presignature participants: removed peer" + ); + } } } } @@ -346,7 +398,7 @@ impl SyncTask { ?peer, "sync reporter is down: state sync will no longer work" ); - return; + return Err("sync reporter is down".to_string()); } } // RPC failed, don't notify mesh (peer stays in Syncing state) @@ -355,6 +407,60 @@ impl SyncTask { } } } + + self.prune_owned_triples(me, &triples_to_prune).await; + self.prune_owned_presignatures(me, &presignatures_to_prune) + .await; + + Ok(()) + } + + async fn prune_owned_triples(&self, me: Participant, to_prune: &HashSet) { + if to_prune.is_empty() { + return; + } + + let mut keep = self.triples.fetch_owned(me).await; + keep.retain(|id| !to_prune.contains(id)); + + match self.triples.remove_outdated(me, &keep).await { + Ok(result) => { + tracing::info!( + removed = result.removed.len(), + not_found = result.not_found.len(), + "removed owned triples that dropped below threshold" + ); + } + Err(err) => { + tracing::warn!(?err, "failed to remove owned triples below threshold"); + } + } + } + + async fn prune_owned_presignatures( + &self, + me: Participant, + to_prune: &HashSet, + ) { + if to_prune.is_empty() { + return; + } + + let mut keep = self.presignatures.fetch_owned(me).await; + keep.retain(|id| !to_prune.contains(id)); + + match self.presignatures.remove_outdated(me, &keep).await { + Ok(result) => { + tracing::info!( + removed = result.removed.len(), + not_found = result.not_found.len(), + "removed owned presignatures that dropped below threshold" + ); + } + Err(err) => { + tracing::warn!(?err, "failed to remove owned presignatures below threshold"); + } + } } /// Channel for communicating back from the sync task which nodes are now updated. diff --git a/integration-tests/tests/cases/sync.rs b/integration-tests/tests/cases/sync.rs index f231d48e6..be9d88dff 100644 --- a/integration-tests/tests/cases/sync.rs +++ b/integration-tests/tests/cases/sync.rs @@ -28,21 +28,37 @@ async fn test_state_sync_update() -> anyhow::Result<()> { .unwrap(); let redis = spawner.spawn_redis().await; - let num_nodes = 1; + let num_nodes = 3; let threshold = 2; - let node0_account_id = "p0_test.near".parse().unwrap(); + let node0 = Participant::from(0); let node1 = Participant::from(1); + let node2 = Participant::from(2); + + let node0_account_id = "p0_test.near".parse().unwrap(); + let node1_account_id = "p1_test.near".parse().unwrap(); + let node2_account_id = "p2_test.near".parse().unwrap(); let sk = k256::SecretKey::random(&mut rand::thread_rng()); let pk = sk.public_key(); let ping_interval = Duration::from_millis(300); let client = NodeClient::new(&node_client::Options::default()); let participants = participants(num_nodes); + let node0_triples = redis.triple_storage(&node0_account_id); let node0_presignatures = redis.presignature_storage(&node0_account_id); + let node1_triples = redis.triple_storage(&node1_account_id); + let node1_presignatures = redis.presignature_storage(&node1_account_id); + let node2_triples = redis.triple_storage(&node2_account_id); + let node2_presignatures = redis.presignature_storage(&node2_account_id); - let (contract_watcher, _contract_tx) = ContractStateWatcher::with( - &node0_account_id, + node0_triples.clear().await; + node0_presignatures.clear().await; + node1_triples.clear().await; + node1_presignatures.clear().await; + node2_triples.clear().await; + node2_presignatures.clear().await; + + let running_state = || { ProtocolState::Running(RunningContractState { epoch: 0, public_key: *pk.as_affine(), @@ -51,50 +67,413 @@ async fn test_state_sync_update() -> anyhow::Result<()> { join_votes: Default::default(), leave_votes: Default::default(), threshold, - }), + }) + }; + + let node0_ctx = start_sync_node( + &client, + &node0_account_id, + node0_triples.clone(), + node0_presignatures.clone(), + running_state(), + ping_interval, + ); + let node1_ctx = start_sync_node( + &client, + &node1_account_id, + node1_triples.clone(), + node1_presignatures.clone(), + running_state(), + ping_interval, ); + let node2_ctx = start_sync_node( + &client, + &node2_account_id, + node2_triples.clone(), + node2_presignatures.clone(), + running_state(), + ping_interval, + ); + + // 9 artifacts in the network. Owner of `id` is `id % 3`. + // Each node HOLDS all 9 artifacts, but OWNS only 3 by owner metadata. + for id in 0u64..9u64 { + let owner = owner_for_id(id); + for (triples, presignatures) in [ + (&node0_triples, &node0_presignatures), + (&node1_triples, &node1_presignatures), + (&node2_triples, &node2_presignatures), + ] { + triples + .reserve(id) + .await + .unwrap() + .insert(dummy_pair(id), owner) + .await; + presignatures + .reserve(id) + .await + .unwrap() + .insert(dummy_presignature(id), owner) + .await; + } + } + + // Initial assertions: each node holds all 9, and each owner has exactly 3 on each holder. + assert_full_replication_and_ownership(&node0_triples, &node0_presignatures).await; + assert_full_replication_and_ownership(&node1_triples, &node1_presignatures).await; + assert_full_replication_and_ownership(&node2_triples, &node2_presignatures).await; + + // Pairwise sync requests: sender i syncs to all other nodes j != i. + // Since all holders are in sync already, responses must be empty not_found lists. + let senders = [ + ( + node0, + &node0_triples, + &node0_presignatures, + &node1_ctx.sync_channel, + &node2_ctx.sync_channel, + ), + ( + node1, + &node1_triples, + &node1_presignatures, + &node0_ctx.sync_channel, + &node2_ctx.sync_channel, + ), + ( + node2, + &node2_triples, + &node2_presignatures, + &node0_ctx.sync_channel, + &node1_ctx.sync_channel, + ), + ]; + + for (sender, sender_triples, sender_presignatures, rx_a, rx_b) in senders { + let mut owned_triples = sender_triples.fetch_owned(sender).await; + owned_triples.sort(); + assert_eq!(owned_triples, owned_ids(sender)); + + let mut owned_presignatures = sender_presignatures.fetch_owned(sender).await; + owned_presignatures.sort(); + assert_eq!(owned_presignatures, owned_ids(sender)); + + let update = SyncUpdate { + from: sender, + triples: owned_triples, + presignatures: owned_presignatures, + }; + + let response_a = rx_a.request_update(update.clone()).await?; + assert!(response_a.triples.is_empty()); + assert!(response_a.presignatures.is_empty()); + + let response_b = rx_b.request_update(update).await?; + assert!(response_b.triples.is_empty()); + assert!(response_b.presignatures.is_empty()); + } + + // Phase 1 final assertions: nothing was removed; all holders still keep all artifacts. + assert_full_replication_and_ownership(&node0_triples, &node0_presignatures).await; + assert_full_replication_and_ownership(&node1_triples, &node1_presignatures).await; + assert_full_replication_and_ownership(&node2_triples, &node2_presignatures).await; + + // Phase 2: node0 loses all triples/presignatures. + node0_triples.clear().await; + node0_presignatures.clear().await; + + assert_eq!(node0_triples.len_generated().await, 0); + assert_eq!(node0_presignatures.len_generated().await, 0); + + // Run pairwise syncs again. + let senders_after_loss = [ + ( + node0, + &node0_triples, + &node0_presignatures, + &node1_ctx.sync_channel, + &node2_ctx.sync_channel, + ), + ( + node1, + &node1_triples, + &node1_presignatures, + &node0_ctx.sync_channel, + &node2_ctx.sync_channel, + ), + ( + node2, + &node2_triples, + &node2_presignatures, + &node0_ctx.sync_channel, + &node1_ctx.sync_channel, + ), + ]; + + for (sender, sender_triples, sender_presignatures, rx_a, rx_b) in senders_after_loss { + let mut owned_triples = sender_triples.fetch_owned(sender).await; + owned_triples.sort(); + + let mut owned_presignatures = sender_presignatures.fetch_owned(sender).await; + owned_presignatures.sort(); + + let update = SyncUpdate { + from: sender, + triples: owned_triples.clone(), + presignatures: owned_presignatures.clone(), + }; + + let response_a = rx_a.request_update(update.clone()).await?; + let response_b = rx_b.request_update(update).await?; + + // Node0 has no data anymore, so when others sync to node0, node0 should report all sender-owned ids as not_found. + if sender == node1 || sender == node2 { + let expected_owned = owned_ids(sender); + assert_eq!(response_a.triples, expected_owned); + assert_eq!(response_a.presignatures, expected_owned); + assert!(response_b.triples.is_empty()); + assert!(response_b.presignatures.is_empty()); + } else { + // sender == node0 has no owned data now, so receivers should not report missing sender-owned ids. + assert!(response_a.triples.is_empty()); + assert!(response_a.presignatures.is_empty()); + assert!(response_b.triples.is_empty()); + assert!(response_b.presignatures.is_empty()); + } + } + + // node1 and node2 should remove artifacts owned by node0 after syncing with empty node0 update. + assert_owner_absent(&node1_triples, &node1_presignatures, node0).await; + assert_owner_absent(&node2_triples, &node2_presignatures, node0).await; + + // Explicit prune checks: each of node1/node2 should keep 6 artifacts total after pruning owner0's 3 ids. + assert_eq!(node1_triples.len_generated().await, 6); + assert_eq!(node1_presignatures.len_generated().await, 6); + assert_eq!(node2_triples.len_generated().await, 6); + assert_eq!(node2_presignatures.len_generated().await, 6); + + // node1 and node2 must still keep artifacts owned by node1 and node2. + assert_owner_present(&node1_triples, &node1_presignatures, node1).await; + assert_owner_present(&node1_triples, &node1_presignatures, node2).await; + assert_owner_present(&node2_triples, &node2_presignatures, node1).await; + assert_owner_present(&node2_triples, &node2_presignatures, node2).await; + + // node0 stays empty. + for id in 0u64..9u64 { + assert!(!node0_triples.contains(id).await, "node0 should not hold triple={id}"); + assert!( + !node0_presignatures.contains(id).await, + "node0 should not hold presignature={id}" + ); + } + + // Phase 3: node1 loses one node2-owned artifact. node2 syncs, detects low holder count, + // and removes that owned artifact from its own storage. + let node2_owned = owned_ids(node2); + let victim = node2_owned[0]; + let keep_on_node1: Vec = node2_owned + .iter() + .copied() + .filter(|id| *id != victim) + .collect(); + + // Force node1 to lose one node2-owned triple/presignature. + node1_ctx + .sync_channel + .request_update(SyncUpdate { + from: node2, + triples: keep_on_node1.clone(), + presignatures: keep_on_node1.clone(), + }) + .await?; + + assert!(!node1_triples.contains_by_owner(victim, node2).await); + assert!(!node1_presignatures.contains_by_owner(victim, node2).await); + + // Node2 syncs to node0 and node1 and observes what they don't have. + let response_from_node0 = node0_ctx + .sync_channel + .request_update(SyncUpdate { + from: node2, + triples: node2_owned.clone(), + presignatures: node2_owned.clone(), + }) + .await?; + let response_from_node1 = node1_ctx + .sync_channel + .request_update(SyncUpdate { + from: node2, + triples: node2_owned.clone(), + presignatures: node2_owned.clone(), + }) + .await?; + + // Compute surviving owner2 ids based on holder count >= threshold. + // Holders = node2 itself + peers that did NOT report not_found for the id. + let node0_missing_t: std::collections::HashSet = + response_from_node0.triples.into_iter().collect(); + let node1_missing_t: std::collections::HashSet = + response_from_node1.triples.into_iter().collect(); + let node0_missing_p: std::collections::HashSet = + response_from_node0.presignatures.into_iter().collect(); + let node1_missing_p: std::collections::HashSet = + response_from_node1.presignatures.into_iter().collect(); + + let keep_node2_triples: Vec = node2_owned + .iter() + .copied() + .filter(|id| { + let holders = 1 + + usize::from(!node0_missing_t.contains(id)) + + usize::from(!node1_missing_t.contains(id)); + holders >= threshold + }) + .collect(); + + let keep_node2_presignatures: Vec = node2_owned + .iter() + .copied() + .filter(|id| { + let holders = 1 + + usize::from(!node0_missing_p.contains(id)) + + usize::from(!node1_missing_p.contains(id)); + holders >= threshold + }) + .collect(); + + // Apply the prune decision to node2 as owner. + node2_ctx + .sync_channel + .request_update(SyncUpdate { + from: node2, + triples: keep_node2_triples.clone(), + presignatures: keep_node2_presignatures.clone(), + }) + .await?; + + // victim must be pruned on node2; the other two owner2 ids remain. + assert!(!node2_triples.contains_by_owner(victim, node2).await); + assert!(!node2_presignatures.contains_by_owner(victim, node2).await); + + for id in keep_on_node1 { + assert!(node2_triples.contains_by_owner(id, node2).await); + assert!(node2_presignatures.contains_by_owner(id, node2).await); + } + + Ok(()) +} + +struct SyncNodeCtx { + sync_channel: mpc_node::protocol::sync::SyncChannel, +} + +fn start_sync_node( + client: &NodeClient, + account_id: &near_sdk::AccountId, + triples: TripleStorage, + presignatures: PresignatureStorage, + state: ProtocolState, + ping_interval: Duration, +) -> SyncNodeCtx { + let (contract_watcher, _contract_tx) = ContractStateWatcher::with(account_id, state); let (synced_peer_tx, synced_peer_rx) = SyncTask::synced_nodes_channel(); let mesh = Mesh::new( - &client, + client, mpc_node::mesh::Options { ping_interval: ping_interval.as_millis() as u64, }, - &node0_account_id, + account_id, synced_peer_rx, ); - let (sync_channel, sync) = SyncTask::new( - &client, - node0_triples.clone(), - node0_presignatures.clone(), + let (sync_channel, sync_task) = SyncTask::new( + client, + triples, + presignatures, mesh.watch(), contract_watcher, synced_peer_tx, ); - tokio::spawn(sync.run()); + tokio::spawn(sync_task.run()); - // insert shares of triples/presignatures to node0, that belong to node1 - insert_triples(&node0_triples, node1, 0..=5).await; - insert_presignatures(&node0_presignatures, node1, 0..=5).await; + SyncNodeCtx { sync_channel } +} - // Create an update where node1 is trying to sync with node0, where node1 only has - // triples/presignatures 0 to 3, so 4 and 5 should be deleted from node0. - let valid = vec![0, 1, 2, 3]; - let invalid = vec![4, 5]; +fn owner_for_id(id: u64) -> Participant { + Participant::from((id % 3) as u32) +} - let update = SyncUpdate { - from: node1, - triples: valid.clone(), - presignatures: valid.clone(), - }; - let _ = sync_channel.request_update(update).await; - // Give it some time for sync to process the update - tokio::time::sleep(Duration::from_secs(3)).await; +fn owned_ids(owner: Participant) -> Vec { + let owner_idx: u64 = Into::::into(owner) as u64; + vec![owner_idx, owner_idx + 3, owner_idx + 6] +} - validate_triples(&node0_triples, node1, &valid, &invalid).await; - validate_presignatures(&node0_presignatures, node1, &valid, &invalid).await; +async fn assert_full_replication_and_ownership( + triples: &TripleStorage, + presignatures: &PresignatureStorage, +) { + assert_eq!(triples.len_generated().await, 9); + assert_eq!(presignatures.len_generated().await, 9); - Ok(()) + for id in 0u64..9u64 { + assert!(triples.contains(id).await, "missing triple={id}"); + assert!( + presignatures.contains(id).await, + "missing presignature={id}" + ); + } + + for owner in [ + Participant::from(0u32), + Participant::from(1u32), + Participant::from(2u32), + ] { + for id in owned_ids(owner) { + assert!( + triples.contains_by_owner(id, owner).await, + "triple={id} should be owned by {owner:?}" + ); + assert!( + presignatures.contains_by_owner(id, owner).await, + "presignature={id} should be owned by {owner:?}" + ); + } + } +} + +async fn assert_owner_present( + triples: &TripleStorage, + presignatures: &PresignatureStorage, + owner: Participant, +) { + for id in owned_ids(owner) { + assert!( + triples.contains_by_owner(id, owner).await, + "triple={id} should be owned by {owner:?}" + ); + assert!( + presignatures.contains_by_owner(id, owner).await, + "presignature={id} should be owned by {owner:?}" + ); + } +} + +async fn assert_owner_absent( + triples: &TripleStorage, + presignatures: &PresignatureStorage, + owner: Participant, +) { + for id in owned_ids(owner) { + assert!( + !triples.contains_by_owner(id, owner).await, + "triple={id} should not be owned by {owner:?}" + ); + assert!( + !presignatures.contains_by_owner(id, owner).await, + "presignature={id} should not be owned by {owner:?}" + ); + } } #[test_log::test(tokio::test)] From a33b4161b23942eab57fc68607cb3dc31e59b494 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Thu, 26 Feb 2026 12:52:50 +0200 Subject: [PATCH 05/16] refactor helpers, remove old sync test - extract shared dummy triple/presignature helpers - move triple insertion/assert helper usage into shared module - move participants test helper into shared helpers --- .../node/src/protocol/sync/mod.rs | 9 +- integration-tests/tests/cases/helpers.rs | 133 ++++ integration-tests/tests/cases/mod.rs | 1 + integration-tests/tests/cases/store.rs | 48 +- integration-tests/tests/cases/sync.rs | 627 +----------------- 5 files changed, 155 insertions(+), 663 deletions(-) create mode 100644 integration-tests/tests/cases/helpers.rs diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index 8cc685f03..4f2e1855f 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -360,7 +360,8 @@ impl SyncTask { // Update presignatures: remove peer from participants if they don't have it for presig_id in response.presignatures { - let is_owned_by_me = self.presignatures.contains_by_owner(presig_id, me).await; + let is_owned_by_me = + self.presignatures.contains_by_owner(presig_id, me).await; if !is_owned_by_me { return Err(format!( "received non-owned presignature in sync response: presig_id={presig_id:?}, peer={peer:?}, me={me:?}" @@ -437,11 +438,7 @@ impl SyncTask { } } - async fn prune_owned_presignatures( - &self, - me: Participant, - to_prune: &HashSet, - ) { + async fn prune_owned_presignatures(&self, me: Participant, to_prune: &HashSet) { if to_prune.is_empty() { return; } diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs new file mode 100644 index 000000000..b4d172ca7 --- /dev/null +++ b/integration-tests/tests/cases/helpers.rs @@ -0,0 +1,133 @@ +use cait_sith::protocol::Participant; +use cait_sith::triples::{TriplePub, TripleShare}; +use cait_sith::PresignOutput; +use elliptic_curve::CurveArithmetic; +use k256::Secp256k1; +use mpc_node::protocol::presignature::Presignature; +use mpc_node::protocol::triple::Triple; +use mpc_node::storage::triple_storage::TriplePair; +use mpc_node::storage::{PresignatureStorage, TripleStorage}; + +pub(crate) fn dummy_presignature(id: u64) -> Presignature { + dummy_presignature_with_holders(id, vec![Participant::from(1), Participant::from(2)]) +} + +pub(crate) fn dummy_presignature_with_holders( + id: u64, + participants: Vec, +) -> Presignature { + Presignature { + id, + output: PresignOutput { + big_r: ::AffinePoint::default(), + k: ::Scalar::ZERO, + sigma: ::Scalar::ONE, + }, + participants, + } +} + +pub(crate) fn dummy_pair(id: u64) -> TriplePair { + dummy_pair_with_holders(id, vec![Participant::from(1), Participant::from(2)]) +} + +pub(crate) fn dummy_pair_with_holders(id: u64, participants: Vec) -> TriplePair { + TriplePair { + id, + triple0: dummy_triple_with_holders(participants.clone()), + triple1: dummy_triple_with_holders(participants), + } +} + +pub(crate) fn dummy_triple_with_holders(participants: Vec) -> Triple { + Triple { + share: TripleShare { + a: ::Scalar::ZERO, + b: ::Scalar::ZERO, + c: ::Scalar::ZERO, + }, + public: TriplePub { + big_a: ::AffinePoint::default(), + big_b: ::AffinePoint::default(), + big_c: ::AffinePoint::default(), + participants, + threshold: 5, + }, + } +} + +pub(crate) async fn insert_triples_for_owner( + triples: &TripleStorage, + owner: Participant, + holders: &[Participant], + ids: impl IntoIterator, +) { + let holders = holders.to_vec(); + for id in ids { + triples + .reserve(id) + .await + .unwrap() + .insert(dummy_pair_with_holders(id, holders.clone()), owner) + .await; + } +} + +pub(crate) async fn insert_presignatures_for_owner( + presignatures: &PresignatureStorage, + owner: Participant, + holders: &[Participant], + ids: impl IntoIterator, +) { + let holders = holders.to_vec(); + for id in ids { + presignatures + .reserve(id) + .await + .unwrap() + .insert(dummy_presignature_with_holders(id, holders.clone()), owner) + .await; + } +} + +pub(crate) async fn assert_triples_owned_state( + triples: &TripleStorage, + owner: Participant, + expected_present: &[u64], + expected_absent: &[u64], +) { + for id in expected_present { + assert!( + triples.contains_by_owner(*id, owner).await, + "triple={id} should be present for owner={owner:?}" + ); + } + + for id in expected_absent { + assert!( + !triples.contains_by_owner(*id, owner).await, + "triple={id} should be absent for owner={owner:?}" + ); + } +} + +pub(crate) async fn assert_presignatures_owned_state( + presignatures: &PresignatureStorage, + owner: Participant, + expected_present: &[u64], + expected_absent: &[u64], +) { + for id in expected_present { + assert!( + presignatures.contains_by_owner(*id, owner).await, + "presignature={id} should be present for owner={owner:?}" + ); + } + + for id in expected_absent { + assert!( + !presignatures.contains_by_owner(*id, owner).await, + "presignature={id} should be absent for owner={owner:?}" + ); + } +} diff --git a/integration-tests/tests/cases/mod.rs b/integration-tests/tests/cases/mod.rs index dda6e61cb..136d822db 100644 --- a/integration-tests/tests/cases/mod.rs +++ b/integration-tests/tests/cases/mod.rs @@ -19,6 +19,7 @@ pub mod chains; pub mod compat; pub mod ethereum; pub mod ethereum_stream; +pub mod helpers; pub mod mpc; pub mod nightly; pub mod solana; diff --git a/integration-tests/tests/cases/store.rs b/integration-tests/tests/cases/store.rs index 9e4ec6c84..0e3a9cf5f 100644 --- a/integration-tests/tests/cases/store.rs +++ b/integration-tests/tests/cases/store.rs @@ -1,18 +1,15 @@ use cait_sith::protocol::Participant; -use cait_sith::triples::{TriplePub, TripleShare}; -use cait_sith::PresignOutput; -use elliptic_curve::CurveArithmetic; use integration_tests::cluster::spawner::ClusterSpawner; use integration_tests::containers; -use k256::Secp256k1; use mpc_crypto::PublicKey; -use mpc_node::protocol::presignature::{Presignature, PresignatureSpawner}; -use mpc_node::protocol::triple::{Triple, TripleSpawner}; +use mpc_node::protocol::presignature::PresignatureSpawner; +use mpc_node::protocol::triple::TripleSpawner; use mpc_node::protocol::MessageChannel; -use mpc_node::storage::triple_storage::TriplePair; use mpc_node::types::SecretKeyShare; use test_log::test; +use super::helpers::{dummy_pair, dummy_presignature}; + #[test(tokio::test)] async fn test_triple_persistence() -> anyhow::Result<()> { let spawner = ClusterSpawner::default() @@ -314,40 +311,3 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { Ok(()) } - -fn dummy_presignature(id: u64) -> Presignature { - Presignature { - id, - output: PresignOutput { - big_r: ::AffinePoint::default(), - k: ::Scalar::ZERO, - sigma: ::Scalar::ONE, - }, - participants: vec![Participant::from(1), Participant::from(2)], - } -} - -fn dummy_pair(id: u64) -> TriplePair { - TriplePair { - id, - triple0: dummy_triple(), - triple1: dummy_triple(), - } -} - -fn dummy_triple() -> Triple { - Triple { - share: TripleShare { - a: ::Scalar::ZERO, - b: ::Scalar::ZERO, - c: ::Scalar::ZERO, - }, - public: TriplePub { - big_a: ::AffinePoint::default(), - big_b: ::AffinePoint::default(), - big_c: ::AffinePoint::default(), - participants: vec![Participant::from(1), Participant::from(2)], - threshold: 5, - }, - } -} diff --git a/integration-tests/tests/cases/sync.rs b/integration-tests/tests/cases/sync.rs index be9d88dff..34052ad70 100644 --- a/integration-tests/tests/cases/sync.rs +++ b/integration-tests/tests/cases/sync.rs @@ -1,480 +1,12 @@ use std::time::Duration; use cait_sith::protocol::Participant; -use cait_sith::triples::{TriplePub, TripleShare}; -use cait_sith::PresignOutput; -use elliptic_curve::CurveArithmetic; use integration_tests::cluster; -use k256::Secp256k1; -use integration_tests::cluster::spawner::ClusterSpawner; -use mpc_node::mesh::Mesh; -use mpc_node::node_client::{self, NodeClient}; -use mpc_node::protocol::contract::primitives::Participants; -use mpc_node::protocol::contract::RunningContractState; -use mpc_node::protocol::presignature::Presignature; -use mpc_node::protocol::sync::{SyncTask, SyncUpdate}; -use mpc_node::protocol::triple::Triple; -use mpc_node::protocol::{ParticipantInfo, ProtocolState}; -use mpc_node::rpc::ContractStateWatcher; -use mpc_node::storage::{triple_storage::TriplePair, PresignatureStorage, TripleStorage}; - -#[test_log::test(tokio::test)] -async fn test_state_sync_update() -> anyhow::Result<()> { - let spawner = ClusterSpawner::default() - .network("protocol-sync") - .init_network() - .await - .unwrap(); - - let redis = spawner.spawn_redis().await; - let num_nodes = 3; - let threshold = 2; - let node0 = Participant::from(0); - let node1 = Participant::from(1); - let node2 = Participant::from(2); - - let node0_account_id = "p0_test.near".parse().unwrap(); - let node1_account_id = "p1_test.near".parse().unwrap(); - let node2_account_id = "p2_test.near".parse().unwrap(); - - let sk = k256::SecretKey::random(&mut rand::thread_rng()); - let pk = sk.public_key(); - let ping_interval = Duration::from_millis(300); - let client = NodeClient::new(&node_client::Options::default()); - let participants = participants(num_nodes); - - let node0_triples = redis.triple_storage(&node0_account_id); - let node0_presignatures = redis.presignature_storage(&node0_account_id); - let node1_triples = redis.triple_storage(&node1_account_id); - let node1_presignatures = redis.presignature_storage(&node1_account_id); - let node2_triples = redis.triple_storage(&node2_account_id); - let node2_presignatures = redis.presignature_storage(&node2_account_id); - - node0_triples.clear().await; - node0_presignatures.clear().await; - node1_triples.clear().await; - node1_presignatures.clear().await; - node2_triples.clear().await; - node2_presignatures.clear().await; - - let running_state = || { - ProtocolState::Running(RunningContractState { - epoch: 0, - public_key: *pk.as_affine(), - participants: participants.clone(), - candidates: Default::default(), - join_votes: Default::default(), - leave_votes: Default::default(), - threshold, - }) - }; - - let node0_ctx = start_sync_node( - &client, - &node0_account_id, - node0_triples.clone(), - node0_presignatures.clone(), - running_state(), - ping_interval, - ); - let node1_ctx = start_sync_node( - &client, - &node1_account_id, - node1_triples.clone(), - node1_presignatures.clone(), - running_state(), - ping_interval, - ); - let node2_ctx = start_sync_node( - &client, - &node2_account_id, - node2_triples.clone(), - node2_presignatures.clone(), - running_state(), - ping_interval, - ); - - // 9 artifacts in the network. Owner of `id` is `id % 3`. - // Each node HOLDS all 9 artifacts, but OWNS only 3 by owner metadata. - for id in 0u64..9u64 { - let owner = owner_for_id(id); - for (triples, presignatures) in [ - (&node0_triples, &node0_presignatures), - (&node1_triples, &node1_presignatures), - (&node2_triples, &node2_presignatures), - ] { - triples - .reserve(id) - .await - .unwrap() - .insert(dummy_pair(id), owner) - .await; - presignatures - .reserve(id) - .await - .unwrap() - .insert(dummy_presignature(id), owner) - .await; - } - } - - // Initial assertions: each node holds all 9, and each owner has exactly 3 on each holder. - assert_full_replication_and_ownership(&node0_triples, &node0_presignatures).await; - assert_full_replication_and_ownership(&node1_triples, &node1_presignatures).await; - assert_full_replication_and_ownership(&node2_triples, &node2_presignatures).await; - - // Pairwise sync requests: sender i syncs to all other nodes j != i. - // Since all holders are in sync already, responses must be empty not_found lists. - let senders = [ - ( - node0, - &node0_triples, - &node0_presignatures, - &node1_ctx.sync_channel, - &node2_ctx.sync_channel, - ), - ( - node1, - &node1_triples, - &node1_presignatures, - &node0_ctx.sync_channel, - &node2_ctx.sync_channel, - ), - ( - node2, - &node2_triples, - &node2_presignatures, - &node0_ctx.sync_channel, - &node1_ctx.sync_channel, - ), - ]; - - for (sender, sender_triples, sender_presignatures, rx_a, rx_b) in senders { - let mut owned_triples = sender_triples.fetch_owned(sender).await; - owned_triples.sort(); - assert_eq!(owned_triples, owned_ids(sender)); - - let mut owned_presignatures = sender_presignatures.fetch_owned(sender).await; - owned_presignatures.sort(); - assert_eq!(owned_presignatures, owned_ids(sender)); - - let update = SyncUpdate { - from: sender, - triples: owned_triples, - presignatures: owned_presignatures, - }; - - let response_a = rx_a.request_update(update.clone()).await?; - assert!(response_a.triples.is_empty()); - assert!(response_a.presignatures.is_empty()); - - let response_b = rx_b.request_update(update).await?; - assert!(response_b.triples.is_empty()); - assert!(response_b.presignatures.is_empty()); - } - - // Phase 1 final assertions: nothing was removed; all holders still keep all artifacts. - assert_full_replication_and_ownership(&node0_triples, &node0_presignatures).await; - assert_full_replication_and_ownership(&node1_triples, &node1_presignatures).await; - assert_full_replication_and_ownership(&node2_triples, &node2_presignatures).await; - - // Phase 2: node0 loses all triples/presignatures. - node0_triples.clear().await; - node0_presignatures.clear().await; - - assert_eq!(node0_triples.len_generated().await, 0); - assert_eq!(node0_presignatures.len_generated().await, 0); - - // Run pairwise syncs again. - let senders_after_loss = [ - ( - node0, - &node0_triples, - &node0_presignatures, - &node1_ctx.sync_channel, - &node2_ctx.sync_channel, - ), - ( - node1, - &node1_triples, - &node1_presignatures, - &node0_ctx.sync_channel, - &node2_ctx.sync_channel, - ), - ( - node2, - &node2_triples, - &node2_presignatures, - &node0_ctx.sync_channel, - &node1_ctx.sync_channel, - ), - ]; - - for (sender, sender_triples, sender_presignatures, rx_a, rx_b) in senders_after_loss { - let mut owned_triples = sender_triples.fetch_owned(sender).await; - owned_triples.sort(); - - let mut owned_presignatures = sender_presignatures.fetch_owned(sender).await; - owned_presignatures.sort(); - - let update = SyncUpdate { - from: sender, - triples: owned_triples.clone(), - presignatures: owned_presignatures.clone(), - }; - - let response_a = rx_a.request_update(update.clone()).await?; - let response_b = rx_b.request_update(update).await?; - - // Node0 has no data anymore, so when others sync to node0, node0 should report all sender-owned ids as not_found. - if sender == node1 || sender == node2 { - let expected_owned = owned_ids(sender); - assert_eq!(response_a.triples, expected_owned); - assert_eq!(response_a.presignatures, expected_owned); - assert!(response_b.triples.is_empty()); - assert!(response_b.presignatures.is_empty()); - } else { - // sender == node0 has no owned data now, so receivers should not report missing sender-owned ids. - assert!(response_a.triples.is_empty()); - assert!(response_a.presignatures.is_empty()); - assert!(response_b.triples.is_empty()); - assert!(response_b.presignatures.is_empty()); - } - } - - // node1 and node2 should remove artifacts owned by node0 after syncing with empty node0 update. - assert_owner_absent(&node1_triples, &node1_presignatures, node0).await; - assert_owner_absent(&node2_triples, &node2_presignatures, node0).await; - - // Explicit prune checks: each of node1/node2 should keep 6 artifacts total after pruning owner0's 3 ids. - assert_eq!(node1_triples.len_generated().await, 6); - assert_eq!(node1_presignatures.len_generated().await, 6); - assert_eq!(node2_triples.len_generated().await, 6); - assert_eq!(node2_presignatures.len_generated().await, 6); - - // node1 and node2 must still keep artifacts owned by node1 and node2. - assert_owner_present(&node1_triples, &node1_presignatures, node1).await; - assert_owner_present(&node1_triples, &node1_presignatures, node2).await; - assert_owner_present(&node2_triples, &node2_presignatures, node1).await; - assert_owner_present(&node2_triples, &node2_presignatures, node2).await; - - // node0 stays empty. - for id in 0u64..9u64 { - assert!(!node0_triples.contains(id).await, "node0 should not hold triple={id}"); - assert!( - !node0_presignatures.contains(id).await, - "node0 should not hold presignature={id}" - ); - } - - // Phase 3: node1 loses one node2-owned artifact. node2 syncs, detects low holder count, - // and removes that owned artifact from its own storage. - let node2_owned = owned_ids(node2); - let victim = node2_owned[0]; - let keep_on_node1: Vec = node2_owned - .iter() - .copied() - .filter(|id| *id != victim) - .collect(); - - // Force node1 to lose one node2-owned triple/presignature. - node1_ctx - .sync_channel - .request_update(SyncUpdate { - from: node2, - triples: keep_on_node1.clone(), - presignatures: keep_on_node1.clone(), - }) - .await?; - - assert!(!node1_triples.contains_by_owner(victim, node2).await); - assert!(!node1_presignatures.contains_by_owner(victim, node2).await); - - // Node2 syncs to node0 and node1 and observes what they don't have. - let response_from_node0 = node0_ctx - .sync_channel - .request_update(SyncUpdate { - from: node2, - triples: node2_owned.clone(), - presignatures: node2_owned.clone(), - }) - .await?; - let response_from_node1 = node1_ctx - .sync_channel - .request_update(SyncUpdate { - from: node2, - triples: node2_owned.clone(), - presignatures: node2_owned.clone(), - }) - .await?; - - // Compute surviving owner2 ids based on holder count >= threshold. - // Holders = node2 itself + peers that did NOT report not_found for the id. - let node0_missing_t: std::collections::HashSet = - response_from_node0.triples.into_iter().collect(); - let node1_missing_t: std::collections::HashSet = - response_from_node1.triples.into_iter().collect(); - - let node0_missing_p: std::collections::HashSet = - response_from_node0.presignatures.into_iter().collect(); - let node1_missing_p: std::collections::HashSet = - response_from_node1.presignatures.into_iter().collect(); - - let keep_node2_triples: Vec = node2_owned - .iter() - .copied() - .filter(|id| { - let holders = 1 - + usize::from(!node0_missing_t.contains(id)) - + usize::from(!node1_missing_t.contains(id)); - holders >= threshold - }) - .collect(); - - let keep_node2_presignatures: Vec = node2_owned - .iter() - .copied() - .filter(|id| { - let holders = 1 - + usize::from(!node0_missing_p.contains(id)) - + usize::from(!node1_missing_p.contains(id)); - holders >= threshold - }) - .collect(); - - // Apply the prune decision to node2 as owner. - node2_ctx - .sync_channel - .request_update(SyncUpdate { - from: node2, - triples: keep_node2_triples.clone(), - presignatures: keep_node2_presignatures.clone(), - }) - .await?; - - // victim must be pruned on node2; the other two owner2 ids remain. - assert!(!node2_triples.contains_by_owner(victim, node2).await); - assert!(!node2_presignatures.contains_by_owner(victim, node2).await); - - for id in keep_on_node1 { - assert!(node2_triples.contains_by_owner(id, node2).await); - assert!(node2_presignatures.contains_by_owner(id, node2).await); - } - - Ok(()) -} - -struct SyncNodeCtx { - sync_channel: mpc_node::protocol::sync::SyncChannel, -} - -fn start_sync_node( - client: &NodeClient, - account_id: &near_sdk::AccountId, - triples: TripleStorage, - presignatures: PresignatureStorage, - state: ProtocolState, - ping_interval: Duration, -) -> SyncNodeCtx { - let (contract_watcher, _contract_tx) = ContractStateWatcher::with(account_id, state); - let (synced_peer_tx, synced_peer_rx) = SyncTask::synced_nodes_channel(); - let mesh = Mesh::new( - client, - mpc_node::mesh::Options { - ping_interval: ping_interval.as_millis() as u64, - }, - account_id, - synced_peer_rx, - ); - let (sync_channel, sync_task) = SyncTask::new( - client, - triples, - presignatures, - mesh.watch(), - contract_watcher, - synced_peer_tx, - ); - tokio::spawn(sync_task.run()); - - SyncNodeCtx { sync_channel } -} - -fn owner_for_id(id: u64) -> Participant { - Participant::from((id % 3) as u32) -} - -fn owned_ids(owner: Participant) -> Vec { - let owner_idx: u64 = Into::::into(owner) as u64; - vec![owner_idx, owner_idx + 3, owner_idx + 6] -} - -async fn assert_full_replication_and_ownership( - triples: &TripleStorage, - presignatures: &PresignatureStorage, -) { - assert_eq!(triples.len_generated().await, 9); - assert_eq!(presignatures.len_generated().await, 9); - - for id in 0u64..9u64 { - assert!(triples.contains(id).await, "missing triple={id}"); - assert!( - presignatures.contains(id).await, - "missing presignature={id}" - ); - } - - for owner in [ - Participant::from(0u32), - Participant::from(1u32), - Participant::from(2u32), - ] { - for id in owned_ids(owner) { - assert!( - triples.contains_by_owner(id, owner).await, - "triple={id} should be owned by {owner:?}" - ); - assert!( - presignatures.contains_by_owner(id, owner).await, - "presignature={id} should be owned by {owner:?}" - ); - } - } -} - -async fn assert_owner_present( - triples: &TripleStorage, - presignatures: &PresignatureStorage, - owner: Participant, -) { - for id in owned_ids(owner) { - assert!( - triples.contains_by_owner(id, owner).await, - "triple={id} should be owned by {owner:?}" - ); - assert!( - presignatures.contains_by_owner(id, owner).await, - "presignature={id} should be owned by {owner:?}" - ); - } -} - -async fn assert_owner_absent( - triples: &TripleStorage, - presignatures: &PresignatureStorage, - owner: Participant, -) { - for id in owned_ids(owner) { - assert!( - !triples.contains_by_owner(id, owner).await, - "triple={id} should not be owned by {owner:?}" - ); - assert!( - !presignatures.contains_by_owner(id, owner).await, - "presignature={id} should not be owned by {owner:?}" - ); - } -} +use super::helpers::{ + assert_presignatures_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, + insert_triples_for_owner, +}; #[test_log::test(tokio::test)] async fn test_state_sync_e2e_large_outdated_stockpile() { @@ -490,6 +22,7 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { let node0_account_id = spawner.account_id(Into::::into(node0) as usize); let node1 = Participant::from(1); let node1_account_id = spawner.account_id(Into::::into(node1) as usize); + let holders = vec![node0, node1]; let redis = spawner.prespawn_redis().await; // immediately add to triples/presignatures storage the triples/presignatures we want to invalidate. @@ -500,10 +33,10 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // insert triples that will be invalidated after a sync, since nobody else has them. // node0 is saying that they have 0 to 5, but node1 will sync and say they have 4 and 5 only. - insert_triples(&node0_triples, node1, 0..=10000).await; - insert_triples(&node1_triples, node1, 0..=5).await; - insert_presignatures(&node0_presignatures, node1, 0..=10000).await; - insert_presignatures(&node1_presignatures, node1, 0..=5).await; + insert_triples_for_owner(&node0_triples, node1, &holders, 0..=10000).await; + insert_triples_for_owner(&node1_triples, node1, &holders, 0..=5).await; + insert_presignatures_for_owner(&node0_presignatures, node1, &holders, 0..=10000).await; + insert_presignatures_for_owner(&node1_presignatures, node1, &holders, 0..=5).await; let _nodes = spawner .disable_prestockpile() @@ -520,28 +53,28 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // Give some time for the first sync broadcast to finish. tokio::time::sleep(Duration::from_secs(5)).await; - validate_triples( + assert_triples_owned_state( &node0_triples, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_triples( + assert_triples_owned_state( &node1_triples, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_presignatures( - &node0_presignatures, + assert_presignatures_owned_state( + &node1_presignatures, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_presignatures( + assert_presignatures_owned_state( &node0_presignatures, node1, &[0, 1, 2, 3, 4, 5], @@ -554,135 +87,3 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // nodes.wait().signable().await.unwrap(); // nodes.sign().await.unwrap(); } - -async fn insert_triples( - triples: &TripleStorage, - node: Participant, - range: impl IntoIterator, -) { - for id in range { - triples - .reserve(id) - .await - .unwrap() - .insert(dummy_pair(id), node) - .await; - } -} - -async fn validate_triples( - triples: &TripleStorage, - owner: Participant, - valid: &[u64], - invalid: &[u64], -) { - for id in valid { - assert!( - triples.contains_by_owner(*id, owner).await, - "triple={id} should be valid" - ); - } - - for id in invalid { - assert!( - !triples.contains_by_owner(*id, owner).await, - "triple={id} should be invalid" - ); - } -} - -async fn insert_presignatures( - presignatures: &PresignatureStorage, - node: Participant, - range: impl IntoIterator, -) { - for id in range { - presignatures - .reserve(id) - .await - .unwrap() - .insert(dummy_presignature(id), node) - .await; - } -} - -async fn validate_presignatures( - presignatures: &PresignatureStorage, - owner: Participant, - valid: &[u64], - invalid: &[u64], -) { - for id in valid { - assert!( - presignatures.contains_by_owner(*id, owner).await, - "presignature={id} should be valid" - ); - } - - for id in invalid { - assert!( - !presignatures.contains_by_owner(*id, owner).await, - "presignature={id} should be invalid" - ); - } -} - -// TODO: cleanup and move this to a common test utils module -fn dummy_presignature(id: u64) -> Presignature { - Presignature { - id, - output: PresignOutput { - big_r: ::AffinePoint::default(), - k: ::Scalar::ZERO, - sigma: ::Scalar::ONE, - }, - participants: vec![Participant::from(1), Participant::from(2)], - } -} - -// TODO: cleanup and move this to a common test utils module -fn dummy_triple() -> Triple { - Triple { - share: TripleShare { - a: ::Scalar::ZERO, - b: ::Scalar::ZERO, - c: ::Scalar::ZERO, - }, - public: TriplePub { - big_a: ::AffinePoint::default(), - big_b: ::AffinePoint::default(), - big_c: ::AffinePoint::default(), - participants: vec![Participant::from(1), Participant::from(2)], - threshold: 5, - }, - } -} - -// TODO: cleanup and move this to a common test utils module -fn participants(num_nodes: usize) -> Participants { - let (_cipher_sk, cipher_pk) = mpc_keys::hpke::generate(); - let sign_sk = near_crypto::SecretKey::from_seed(near_crypto::KeyType::ED25519, "sign-encrypt0"); - let mut participants = Participants::default(); - for i in 0..num_nodes { - let id = Participant::from(i as u32); - participants.insert( - &id, - ParticipantInfo { - sign_pk: sign_sk.public_key(), - cipher_pk: cipher_pk.clone(), - id: id.into(), - url: "http://localhost:3030".to_string(), - account_id: format!("p{i}_test.near").parse().unwrap(), - }, - ); - } - participants -} - -fn dummy_pair(id: u64) -> TriplePair { - TriplePair { - id, - triple0: dummy_triple(), - triple1: dummy_triple(), - } -} From aab9394200aae19a1e7c6509ff4dce88e87afefb Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Thu, 26 Feb 2026 15:16:15 +0200 Subject: [PATCH 06/16] e2e sync integration test --- integration-tests/tests/cases/helpers.rs | 2 +- integration-tests/tests/cases/sync.rs | 114 ++++++++++++++++++++++- 2 files changed, 112 insertions(+), 4 deletions(-) diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index b4d172ca7..5702ad956 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -111,7 +111,7 @@ pub(crate) async fn assert_triples_owned_state( } } -pub(crate) async fn assert_presignatures_owned_state( +pub(crate) async fn assert_presig_owned_state( presignatures: &PresignatureStorage, owner: Participant, expected_present: &[u64], diff --git a/integration-tests/tests/cases/sync.rs b/integration-tests/tests/cases/sync.rs index 34052ad70..526d2607b 100644 --- a/integration-tests/tests/cases/sync.rs +++ b/integration-tests/tests/cases/sync.rs @@ -4,7 +4,7 @@ use cait_sith::protocol::Participant; use integration_tests::cluster; use super::helpers::{ - assert_presignatures_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, + assert_presig_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, insert_triples_for_owner, }; @@ -67,14 +67,14 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { &[6, 100, 500, 2030, 1337, 10000], ) .await; - assert_presignatures_owned_state( + assert_presig_owned_state( &node1_presignatures, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - assert_presignatures_owned_state( + assert_presig_owned_state( &node0_presignatures, node1, &[0, 1, 2, 3, 4, 5], @@ -87,3 +87,111 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // nodes.wait().signable().await.unwrap(); // nodes.sign().await.unwrap(); } + +#[test_log::test(tokio::test)] +async fn test_state_sync_e2e() { + // Setup 3 nodes with T=2 (default cluster setup). + let mut spawner = cluster::spawn(); + { + let worker = spawner.prespawn_sandbox().await.unwrap().clone(); + spawner.create_accounts(&worker).await; + } + + let node0 = Participant::from(0); + let node1 = Participant::from(1); + let node2 = Participant::from(2); + let node0_account_id = spawner.account_id(Into::::into(node0) as usize); + let node1_account_id = spawner.account_id(Into::::into(node1) as usize); + let node2_account_id = spawner.account_id(Into::::into(node2) as usize); + let holders = vec![node0, node1, node2]; + let redis = spawner.prespawn_redis().await; + + // Wait for Redis to be fully accepting connections on the host port. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Get triple/presignature storage for each node. + let node0_triples = redis.triple_storage(&node0_account_id); + let node0_presignatures = redis.presignature_storage(&node0_account_id); + let node1_triples = redis.triple_storage(&node1_account_id); + let node1_presignatures = redis.presignature_storage(&node1_account_id); + let node2_triples = redis.triple_storage(&node2_account_id); + let node2_presignatures = redis.presignature_storage(&node2_account_id); + + // Populate 3 triples and 3 presignatures: each node owns 1, all nodes hold shares. + for storage in [&node0_triples, &node1_triples, &node2_triples] { + insert_triples_for_owner(storage, node0, &holders, 0..=0).await; + insert_triples_for_owner(storage, node1, &holders, 1..=1).await; + insert_triples_for_owner(storage, node2, &holders, 2..=2).await; + } + for storage in [ + &node0_presignatures, + &node1_presignatures, + &node2_presignatures, + ] { + insert_presignatures_for_owner(storage, node0, &holders, 0..=0).await; + insert_presignatures_for_owner(storage, node1, &holders, 1..=1).await; + insert_presignatures_for_owner(storage, node2, &holders, 2..=2).await; + } + + // Add 1 extra T and P owned by node0, only on node0's storage. + // After sync, node0 will learn that node1 and node2 don't have id=99, + // dropping it below threshold (T=2), so it should be pruned. + insert_triples_for_owner(&node0_triples, node0, &holders, 99..=99).await; + insert_presignatures_for_owner(&node0_presignatures, node0, &holders, 99..=99).await; + + // Add 1 extra T and P owned by node1, on node0 and node1 only (not node2). + // After sync, node1 learns node2 doesn't have id=88, removing node2 from participants. + // But node0 still has it, so 2 holders remain (= threshold), and it should survive. + for storage in [&node0_triples, &node1_triples] { + insert_triples_for_owner(storage, node1, &holders, 88..=88).await; + } + for storage in [&node0_presignatures, &node1_presignatures] { + insert_presignatures_for_owner(storage, node1, &holders, 88..=88).await; + } + + // Add 1 extra T and P owned by node0, but only on node1 and node2 (not on node0 itself). + // When node0 broadcasts its owned IDs, id=77 won't be included (node0 doesn't have it), + // so node1 and node2 will remove it via remove_outdated. + for storage in [&node1_triples, &node2_triples] { + insert_triples_for_owner(storage, node0, &holders, 77..=77).await; + } + for storage in [&node1_presignatures, &node2_presignatures] { + insert_presignatures_for_owner(storage, node0, &holders, 77..=77).await; + } + + let _cluster = spawner + .disable_prestockpile() + .with_config(|cfg| { + cfg.protocol.triple.min_triples = 1; + cfg.protocol.triple.max_triples = 1; + cfg.protocol.presignature.min_presignatures = 1; + cfg.protocol.presignature.max_presignatures = 1; + }) + .await + .unwrap(); + + // Give some time for the first sync broadcast to finish. + tokio::time::sleep(Duration::from_secs(5)).await; + + // After sync, all 3 consistent triples/presignatures should be present on every node. + // id=99 (only on node0) should have been pruned (below threshold). + // id=88 (on node0 and node1) should survive (exactly at threshold=2). + // id=77 (owned by node0, only on node1/node2) should be removed via remove_outdated. + for triples in [&node0_triples, &node1_triples] { + assert_triples_owned_state(triples, node0, &[0], &[1, 2, 77, 99]).await; + assert_triples_owned_state(triples, node1, &[1, 88], &[0, 2]).await; + assert_triples_owned_state(triples, node2, &[2], &[0, 1]).await; + } + assert_triples_owned_state(&node2_triples, node0, &[0], &[1, 2, 77, 99]).await; + assert_triples_owned_state(&node2_triples, node1, &[1], &[0, 2, 88]).await; + assert_triples_owned_state(&node2_triples, node2, &[2], &[0, 1]).await; + + for presignatures in [&node0_presignatures, &node1_presignatures] { + assert_presig_owned_state(presignatures, node0, &[0], &[1, 2, 77, 99]).await; + assert_presig_owned_state(presignatures, node1, &[1, 88], &[0, 2]).await; + assert_presig_owned_state(presignatures, node2, &[2], &[0, 1]).await; + } + assert_presig_owned_state(&node2_presignatures, node0, &[0], &[1, 2, 77, 99]).await; + assert_presig_owned_state(&node2_presignatures, node1, &[1], &[0, 2, 88]).await; + assert_presig_owned_state(&node2_presignatures, node2, &[2], &[0, 1]).await; +} From 78a46c49513c57f76b888a436e4e8aa86c13bbbc Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Thu, 26 Feb 2026 17:00:24 +0200 Subject: [PATCH 07/16] clippy --- chain-signatures/node/src/storage/protocol_storage.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index af2307640..9064a52db 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -435,9 +435,7 @@ impl ProtocolStorage { /// Fetch an artifact from storage without removing it. pub async fn fetch(&self, id: A::Id) -> Option { - let Some(mut conn) = self.connect().await else { - return None; - }; + let mut conn = self.connect().await?; match conn.hget(&self.artifact_key, id).await { Ok(artifact) => Some(artifact), Err(err) => { From d5bb994bca861d292850ecc3ea9da108ca5deae9 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 2 Mar 2026 18:28:31 +0200 Subject: [PATCH 08/16] use Redis to manage holders --- .gitignore | 1 + .../node/src/protocol/sync/mod.rs | 192 ++++-------------- .../node/src/storage/protocol_storage.rs | 88 +++++--- 3 files changed, 99 insertions(+), 182 deletions(-) diff --git a/.gitignore b/.gitignore index 2d8ff61c0..3106d8414 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .direnv .DS_Store .idea +.vscode tmp *.log diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index 020348750..f871e173b 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -272,9 +271,6 @@ impl SyncTask { me: Participant, threshold: usize, ) -> Result<(), String> { - let mut triples_to_prune = HashSet::new(); - let mut presignatures_to_prune = HashSet::new(); - for (peer, result) in responses { match result { // No RPC was performed (self-peer), treat as successful @@ -293,114 +289,46 @@ impl SyncTask { "received sync response" ); - // Update replica tracking: remove peer from artifacts they don't have - for triple_id in response.triples { - let is_owned_by_me = self.triples.contains_by_owner(triple_id, me).await; - if !is_owned_by_me { - return Err(format!( - "received non-owned triple in sync response: triple_id={triple_id:?}, peer={peer:?}, me={me:?}" - )); - } - - if let Some(mut pair) = self.triples.fetch(triple_id).await { - let mut updated = false; - - // Remove peer from triple0 participants - if let Some(pos) = pair - .triple0 - .public - .participants - .iter() - .position(|p| *p == peer) - { - pair.triple0.public.participants.remove(pos); - updated = true; - } - - // Remove peer from triple1 participants - if let Some(pos) = pair - .triple1 - .public - .participants - .iter() - .position(|p| *p == peer) - { - pair.triple1.public.participants.remove(pos); - updated = true; + // Batch remove peer from all triples and prune + let triple_res = self + .triples + .remove_holder_and_prune(me, peer, threshold, &response.triples) + .await; + + // Batch remove peer from all presignatures and prune + let presig_res = self + .presignatures + .remove_holder_and_prune(me, peer, threshold, &response.presignatures) + .await; + + match (triple_res, presig_res) { + (Ok((t_removed, t_updated)), Ok((p_removed, p_updated))) => { + tracing::info!( + ?peer, + removed_triples = t_removed.len(), + updated_triples = t_updated.len(), + removed_presignatures = p_removed.len(), + updated_presignatures = p_updated.len(), + "batch removed peer from artifacts and pruned" + ); + // Only notify mesh if both succeeded + if self.synced_peer_tx.send(peer).await.is_err() { + tracing::error!( + ?peer, + "sync reporter is down: state sync will no longer work" + ); + return Err("sync reporter is down".to_string()); } - - if updated { - let remaining_holders = pair - .triple0 - .public - .participants - .len() - .min(pair.triple1.public.participants.len()); - - if remaining_holders < threshold { - triples_to_prune.insert(triple_id); - tracing::info!( - ?triple_id, - ?peer, - remaining_holders, - threshold, - "triple dropped below threshold: scheduling owned artifact removal" - ); - } else { - self.triples.update(triple_id, pair).await; - tracing::debug!( - ?triple_id, - ?peer, - "updated triple participants: removed peer" - ); - } - } - } - } - - // Update presignatures: remove peer from participants if they don't have it - for presig_id in response.presignatures { - let is_owned_by_me = - self.presignatures.contains_by_owner(presig_id, me).await; - if !is_owned_by_me { - return Err(format!( - "received non-owned presignature in sync response: presig_id={presig_id:?}, peer={peer:?}, me={me:?}" - )); } - - if let Some(mut presig) = self.presignatures.fetch(presig_id).await { - if let Some(pos) = presig.participants.iter().position(|p| *p == peer) { - presig.participants.remove(pos); - - if presig.participants.len() < threshold { - presignatures_to_prune.insert(presig_id); - tracing::info!( - ?presig_id, - ?peer, - remaining_holders = presig.participants.len(), - threshold, - "presignature dropped below threshold: scheduling owned artifact removal" - ); - } else { - self.presignatures.update(presig_id, presig).await; - tracing::debug!( - ?presig_id, - ?peer, - "updated presignature participants: removed peer" - ); - } - } + (triple_res, presig_res) => { + tracing::warn!( + ?peer, + ?triple_res, + ?presig_res, + "sync batch failed, not notifying mesh" + ); } } - - // Notify mesh that peer is synced (after processing) - for Active state transition - if self.synced_peer_tx.send(peer).await.is_err() { - tracing::error!( - ?peer, - "sync reporter is down: state sync will no longer work" - ); - return Err("sync reporter is down".to_string()); - } } // RPC failed, don't notify mesh (peer stays in Syncing state) Some(Err(err)) => { @@ -409,57 +337,9 @@ impl SyncTask { } } - self.prune_owned_triples(me, &triples_to_prune).await; - self.prune_owned_presignatures(me, &presignatures_to_prune) - .await; - Ok(()) } - async fn prune_owned_triples(&self, me: Participant, to_prune: &HashSet) { - if to_prune.is_empty() { - return; - } - - let mut keep = self.triples.fetch_owned(me).await; - keep.retain(|id| !to_prune.contains(id)); - - match self.triples.remove_outdated(me, &keep).await { - Ok(result) => { - tracing::info!( - removed = result.removed.len(), - not_found = result.not_found.len(), - "removed owned triples that dropped below threshold" - ); - } - Err(err) => { - tracing::warn!(?err, "failed to remove owned triples below threshold"); - } - } - } - - async fn prune_owned_presignatures(&self, me: Participant, to_prune: &HashSet) { - if to_prune.is_empty() { - return; - } - - let mut keep = self.presignatures.fetch_owned(me).await; - keep.retain(|id| !to_prune.contains(id)); - - match self.presignatures.remove_outdated(me, &keep).await { - Ok(result) => { - tracing::info!( - removed = result.removed.len(), - not_found = result.not_found.len(), - "removed owned presignatures that dropped below threshold" - ); - } - Err(err) => { - tracing::warn!(?err, "failed to remove owned presignatures below threshold"); - } - } - } - /// Channel for communicating back from the sync task which nodes are now updated. pub fn synced_nodes_channel() -> (mpsc::Sender, mpsc::Receiver) { mpsc::channel(MAX_SYNC_UPDATE_REQUESTS) diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 9064a52db..eb951bf40 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -433,32 +433,6 @@ impl ProtocolStorage { self.used.read().await.contains(&id) } - /// Fetch an artifact from storage without removing it. - pub async fn fetch(&self, id: A::Id) -> Option { - let mut conn = self.connect().await?; - match conn.hget(&self.artifact_key, id).await { - Ok(artifact) => Some(artifact), - Err(err) => { - tracing::warn!(id, ?err, "failed to fetch artifact"); - None - } - } - } - - /// Update an artifact in storage (read-modify-write). - pub async fn update(&self, id: A::Id, artifact: A) -> bool { - let Some(mut conn) = self.connect().await else { - return false; - }; - match conn.hset(&self.artifact_key, id, artifact).await { - Ok(()) => true, - Err(err) => { - tracing::warn!(id, ?err, "failed to update artifact"); - false - } - } - } - pub async fn take(&self, id: A::Id, owner: Participant) -> Option> { const SCRIPT: &str = r#" local artifact_key = KEYS[1] @@ -709,4 +683,66 @@ impl ProtocolStorage { pub fn artifact_key(&self) -> &str { &self.artifact_key } + + /// Batch remove a peer from participants for a set of artifact IDs, and prune artifacts below threshold if owned by `me`. + /// Returns (Vec, Vec) + pub async fn remove_holder_and_prune( + &self, + me: Participant, + peer: Participant, + threshold: usize, + ids: &[A::Id], + ) -> Result<(Vec, Vec), StorageError> { + if ids.is_empty() { + return Ok((vec![], vec![])); + } + + // Lua script expects: KEYS[1]=artifact_key, KEYS[2]=owner_key, ARGV[1]=peer, ARGV[2]=threshold, ARGV[3...]=ids + const SCRIPT: &str = r#" + local artifact_key = KEYS[1] + local owner_key = KEYS[2] + local peer = ARGV[1] + local threshold = tonumber(ARGV[2]) + local removed = {} + local updated = {} + for i = 3, #ARGV do + local id = ARGV[i] + -- Error if 'me' does not own this artifact + if redis.call('SISMEMBER', owner_key, id) == 0 then + return redis.error_reply('OWNERSHIP_VIOLATION:' .. id) + end + -- Remove peer from participants (stored as a Redis set: artifact_key .. ':participants:' .. id) + local participants_key = artifact_key .. ':participants:' .. id + redis.call('SREM', participants_key, peer) + local count = redis.call('SCARD', participants_key) + if count < threshold then + -- Prune: remove artifact and participants set, and from owner set + redis.call('HDEL', artifact_key, id) + redis.call('DEL', participants_key) + redis.call('SREM', owner_key, id) + table.insert(removed, id) + else + table.insert(updated, id) + end + end + return {removed, updated} + "#; + + let Some(mut conn) = self.connect().await else { + return Err(StorageError::ConnectionFailed); + }; + let result: Result<(Vec, Vec), redis::RedisError> = + redis::Script::new(SCRIPT) + .key(&self.artifact_key) + .key(owner_key(&self.owner_keys, me)) + .arg(Into::::into(peer)) + .arg(threshold as i64) + .arg(ids) + .invoke_async(&mut conn) + .await; + match result { + Ok((removed, updated)) => Ok((removed, updated)), + Err(err) => Err(StorageError::RedisFailed(err.to_string())), + } + } } From e5f1dc38fa22a477dce2bceedd726e12e34e7a27 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 2 Mar 2026 20:34:33 +0200 Subject: [PATCH 09/16] distinguish participands and holders --- .../node/src/protocol/presignature.rs | 16 ++- .../node/src/protocol/signature.rs | 10 +- chain-signatures/node/src/protocol/triple.rs | 1 + .../node/src/storage/presignature_storage.rs | 14 +++ .../node/src/storage/protocol_storage.rs | 109 +++++++++++++++--- .../node/src/storage/triple_storage.rs | 17 +++ integration-tests/benches/store.rs | 2 + integration-tests/src/containers.rs | 1 + integration-tests/src/mpc_fixture/builder.rs | 10 +- integration-tests/tests/cases/helpers.rs | 2 + 10 files changed, 159 insertions(+), 23 deletions(-) diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index b915b5671..fad9fd782 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -7,6 +7,7 @@ use crate::protocol::contract::primitives::intersect_vec; use crate::protocol::posit::PositInternalAction; use crate::protocol::MpcSignProtocol; use crate::storage::presignature_storage::{PresignatureSlot, PresignatureStorage}; +use crate::storage::protocol_storage::ProtocolArtifact; use crate::storage::triple_storage::{TriplesTaken, TriplesTakenDropper}; use crate::storage::TripleStorage; use crate::types::{PresignatureProtocol, SecretKeyShare}; @@ -56,7 +57,10 @@ impl FullPresignatureId { pub struct Presignature { pub id: PresignatureId, pub output: PresignOutput, + /// Original protocol participants pub participants: Vec, + /// Nodes still holding their share of the artifact + pub holders: Option>, } impl fmt::Debug for Presignature { @@ -64,6 +68,7 @@ impl fmt::Debug for Presignature { f.debug_struct("Presignature") .field("id", &self.id) .field("participants", &self.participants) + .field("holders", &self.holders) .finish() } } @@ -106,6 +111,7 @@ impl<'de> Deserialize<'de> for Presignature { k: fields.output_k, sigma: fields.output_sigma, }, + holders: None, participants: fields.participants, }) } @@ -262,6 +268,7 @@ impl PresignatureGenerator { id: self.id, output, participants: self.participants.clone(), + holders: Some(self.participants.clone()), }; if self.owner == me { tracing::info!(id = self.id, "assigning presignature to myself"); @@ -478,8 +485,12 @@ impl PresignatureSpawner { }; let pair_id = triples.artifact.id; - // note: only one of the pair's participants is needed since they are the same. - let participants = intersect_vec(&[active, &triples.artifact.triple0.public.participants]); + // use holders (not original participants) since some nodes may have lost the artifact. + let Some(holders) = triples.artifact.holders() else { + tracing::error!(?pair_id, "holders not set on taken triple pair"); + return; + }; + let participants = intersect_vec(&[active, holders]); if participants.len() < self.threshold { tracing::warn!( ?pair_id, @@ -875,6 +886,7 @@ mod tests { sigma: ::Scalar::ONE, }, participants: vec![Participant::from(1), Participant::from(2)], + holders: None, }; // Serialize Presignature to JSON diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 2bccc14c5..daa14ffc4 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -13,6 +13,7 @@ use crate::protocol::presignature::PresignatureId; use crate::protocol::Chain; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; +use crate::storage::protocol_storage::ProtocolArtifact; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; use crate::util::{AffinePointExt, JoinMap, TimeoutBudget}; @@ -278,7 +279,14 @@ impl SignOrganizer { let fetch = tokio::time::timeout(remaining, async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - let participants = intersect_vec(&[&taken.artifact.participants, &active]); + let Some(holders) = taken.artifact.holders() else { + tracing::error!( + id = taken.artifact.id, + "holders not set on taken presignature" + ); + continue; + }; + let participants = intersect_vec(&[holders, &active]); if participants.len() < ctx.threshold { recycle.push(taken); continue; diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 6b3e55b10..f24627a60 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -250,6 +250,7 @@ impl TripleGenerator { id: self.id, triple0: first, triple1: second, + holders: Some(self.participants.clone()), }; self.slot.insert(pair, triple_owner).await; break; diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index a8c3f0b90..393e5518e 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -2,6 +2,8 @@ use deadpool_redis::Pool; use near_sdk::AccountId; use redis::{FromRedisValue, RedisError, RedisWrite, ToRedisArgs}; +use cait_sith::protocol::Participant; + use super::protocol_storage::{ArtifactSlot, ArtifactTaken, ArtifactTakenDropper, ProtocolStorage}; use crate::protocol::presignature::{Presignature, PresignatureId}; use crate::storage::protocol_storage::ProtocolArtifact; @@ -24,6 +26,18 @@ impl ProtocolArtifact for Presignature { self.id } + fn participants(&self) -> &[Participant] { + &self.participants + } + + fn holders(&self) -> Option<&[Participant]> { + self.holders.as_deref() + } + + fn set_holders(&mut self, holders: Vec) { + self.holders = Some(holders); + } + const METRIC_LABEL: &'static str = "presignature"; } diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index eb951bf40..148245821 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -58,6 +58,14 @@ pub trait ProtocolArtifact: + 'static; fn id(&self) -> Self::Id; + + /// Original protocol participants (immutable) + fn participants(&self) -> &[Participant]; + + /// Nodes that still hold their share of the artifact + fn holders(&self) -> Option<&[Participant]>; + + fn set_holders(&mut self, holders: Vec); } /// A pre-reserved slot for an artifact that will eventually be inserted. @@ -278,6 +286,10 @@ impl ProtocolStorage { if #outdated >= 4096 then redis.call("SREM", owner_key, unpack(outdated)) redis.call("HDEL", artifact_key, unpack(outdated)) + -- also delete holders sets for each outdated artifact + for _, oid in ipairs(outdated) do + redis.call("DEL", artifact_key .. ':holders:' .. oid) + end -- clear the outdated list for the next batch outdated = {} end @@ -287,6 +299,10 @@ impl ProtocolStorage { if #outdated > 0 then redis.call("SREM", owner_key, unpack(outdated)) redis.call("HDEL", artifact_key, unpack(outdated)) + -- also delete holders sets for each outdated artifact + for _, oid in ipairs(outdated) do + redis.call("DEL", artifact_key .. ':holders:' .. oid) + end end -- find shares that were shared with us but not found in our storage @@ -344,8 +360,9 @@ impl ProtocolStorage { } } - /// Insert an artifact into the storage. If `mine` is true, the artifact will be - /// owned by the current node. If `back` is true, the artifact will be marked as unused. + /// Insert an artifact into storage under `owner`'s ownership set. + /// Holders must be set on the artifact before calling this; they are + /// persisted as a dedicated Redis set for later holder-tracking. pub async fn insert(&self, artifact: A, owner: Participant) -> bool { const SCRIPT: &str = r#" local artifact_key = KEYS[1] @@ -353,10 +370,18 @@ impl ProtocolStorage { local owner_key = KEYS[3] local artifact_id = ARGV[1] local artifact = ARGV[2] + local num_holders = tonumber(ARGV[3]) redis.call("SADD", owner_key, artifact_id) redis.call("SADD", owner_keys, owner_key) redis.call("HSET", artifact_key, artifact_id, artifact) + + -- Store holders in a dedicated Redis set + local holders_key = artifact_key .. ':holders:' .. artifact_id + redis.call("DEL", holders_key) + if num_holders > 0 then + redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) + end "#; let start = Instant::now(); @@ -367,6 +392,13 @@ impl ProtocolStorage { return false; } + let holders: Vec = artifact + .holders() + .expect("holders must be set before insert") + .iter() + .map(|p| Into::::into(*p)) + .collect(); + let Some(mut conn) = self.connect().await else { tracing::warn!(id, "failed to insert artifact: connection failed"); return false; @@ -376,7 +408,9 @@ impl ProtocolStorage { .key(&self.owner_keys) .key(owner_key(&self.owner_keys, owner)) .arg(id) - .arg(artifact) + .arg(&artifact) + .arg(holders.len() as i64) + .arg(holders.as_slice()) .invoke_async(&mut conn) .await; drop(used); @@ -448,7 +482,13 @@ impl ProtocolStorage { return {err = "WARN artifact " .. artifact_id .. " not found"} end redis.call("HDEL", artifact_key, artifact_id) - return artifact + + -- Read and delete the holders set + local holders_key = artifact_key .. ':holders:' .. artifact_id + local holders = redis.call("SMEMBERS", holders_key) + redis.call("DEL", holders_key) + + return {artifact, holders} "#; let start = Instant::now(); @@ -462,7 +502,7 @@ impl ProtocolStorage { self.used.write().await.remove(&id); return None; }; - let result: Result = redis::Script::new(SCRIPT) + let result: Result<(A, Vec), _> = redis::Script::new(SCRIPT) .key(&self.artifact_key) .key(owner_key(&self.owner_keys, owner)) .arg(id) @@ -475,7 +515,9 @@ impl ProtocolStorage { .observe(elapsed.as_millis() as f64); match result { - Ok(artifact) => { + Ok((mut artifact, holders)) => { + let holders = holders.into_iter().map(Participant::from).collect(); + artifact.set_holders(holders); tracing::info!(id, ?elapsed, "took artifact"); Some(ArtifactTaken::new(artifact, self.clone())) } @@ -522,6 +564,7 @@ impl ProtocolStorage { /// Return true if successful, false otherwise. pub async fn clear(&self) -> bool { const SCRIPT: &str = r#" + local artifact_key = KEYS[2] local owner_keys = redis.call("SMEMBERS", KEYS[1]) local del = {} for _, key in ipairs(KEYS) do @@ -531,6 +574,12 @@ impl ProtocolStorage { table.insert(del, key) end + -- Also delete all holders sets for artifacts in the hash + local artifact_ids = redis.call("HKEYS", artifact_key) + for _, id in ipairs(artifact_ids) do + table.insert(del, artifact_key .. ':holders:' .. id) + end + redis.call("DEL", unpack(del)) "#; @@ -585,13 +634,18 @@ impl ProtocolStorage { -- delete the artifact from our self owner set redis.call("SREM", mine_key, id) - -- Return the artifact as a response - return artifact + -- Read and delete the holders set + local holders_key = artifact_key .. ':holders:' .. id + local holders = redis.call("SMEMBERS", holders_key) + redis.call("DEL", holders_key) + + -- Return the artifact and holders + return {artifact, holders} "#; let start = Instant::now(); let mut conn = self.connect().await?; - let result: Result, _> = redis::Script::new(SCRIPT) + let result: Result)>, _> = redis::Script::new(SCRIPT) .key(&self.artifact_key) .key(owner_key(&self.owner_keys, me)) .invoke_async(&mut conn) @@ -603,7 +657,9 @@ impl ProtocolStorage { .observe(elapsed.as_millis() as f64); match result { - Ok(Some(artifact)) => { + Ok(Some((mut artifact, holders))) => { + let holders = holders.into_iter().map(Participant::from).collect(); + artifact.set_holders(holders); // mark reserved and used in-memory so that it won't be reserved or reused locally let id = artifact.id(); self.reserved.write().await.insert(id); @@ -627,6 +683,7 @@ impl ProtocolStorage { local mine_key = KEYS[2] local artifact_id = ARGV[1] local artifact = ARGV[2] + local num_holders = tonumber(ARGV[3]) -- Add back to artifact hash map redis.call("HSET", artifact_key, artifact_id, artifact) @@ -634,6 +691,13 @@ impl ProtocolStorage { -- Add back to mine set redis.call("SADD", mine_key, artifact_id) + -- Restore holders set + local holders_key = artifact_key .. ':holders:' .. artifact_id + redis.call("DEL", holders_key) + if num_holders > 0 then + redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) + end + return 1 "#; @@ -643,6 +707,13 @@ impl ProtocolStorage { dropper.dropper.take(); let id = artifact.id(); + let holders: Vec = artifact + .holders() + .expect("holders must be set before recycle") + .iter() + .map(|p| Into::::into(*p)) + .collect(); + let Some(mut conn) = self.connect().await else { tracing::warn!(id, "failed to return artifact: connection failed"); return false; @@ -652,7 +723,9 @@ impl ProtocolStorage { .key(&self.artifact_key) .key(owner_key(&self.owner_keys, me)) .arg(id) - .arg(artifact) + .arg(&artifact) + .arg(holders.len() as i64) + .arg(holders.as_slice()) .invoke_async(&mut conn) .await; @@ -684,7 +757,7 @@ impl ProtocolStorage { &self.artifact_key } - /// Batch remove a peer from participants for a set of artifact IDs, and prune artifacts below threshold if owned by `me`. + /// Batch remove a peer from holders for a set of artifact IDs, and prune artifacts below threshold if owned by `me`. /// Returns (Vec, Vec) pub async fn remove_holder_and_prune( &self, @@ -711,14 +784,14 @@ impl ProtocolStorage { if redis.call('SISMEMBER', owner_key, id) == 0 then return redis.error_reply('OWNERSHIP_VIOLATION:' .. id) end - -- Remove peer from participants (stored as a Redis set: artifact_key .. ':participants:' .. id) - local participants_key = artifact_key .. ':participants:' .. id - redis.call('SREM', participants_key, peer) - local count = redis.call('SCARD', participants_key) + -- Remove peer from holders set + local holders_key = artifact_key .. ':holders:' .. id + redis.call('SREM', holders_key, peer) + local count = redis.call('SCARD', holders_key) if count < threshold then - -- Prune: remove artifact and participants set, and from owner set + -- Prune: remove artifact, holders set, and owner set entry redis.call('HDEL', artifact_key, id) - redis.call('DEL', participants_key) + redis.call('DEL', holders_key) redis.call('SREM', owner_key, id) table.insert(removed, id) else diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index 90dd0a6b8..ec2652f28 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -3,6 +3,8 @@ use near_sdk::AccountId; use redis::{FromRedisValue, RedisError, RedisWrite, ToRedisArgs}; use serde::{Deserialize, Serialize}; +use cait_sith::protocol::Participant; + use crate::protocol::triple::{Triple, TripleId}; use super::protocol_storage::{ @@ -20,6 +22,9 @@ pub struct TriplePair { pub id: TripleId, pub triple0: Triple, pub triple1: Triple, + /// Nodes still holding this artifact + #[serde(skip, default)] + pub holders: Option>, } impl TriplePair { @@ -35,6 +40,18 @@ impl ProtocolArtifact for TriplePair { fn id(&self) -> Self::Id { self.id } + + fn participants(&self) -> &[Participant] { + &self.triple0.public.participants + } + + fn holders(&self) -> Option<&[Participant]> { + self.holders.as_deref() + } + + fn set_holders(&mut self, holders: Vec) { + self.holders = Some(holders); + } } impl ToRedisArgs for TriplePair { diff --git a/integration-tests/benches/store.rs b/integration-tests/benches/store.rs index b6be6923b..e6f3a1f9d 100644 --- a/integration-tests/benches/store.rs +++ b/integration-tests/benches/store.rs @@ -223,6 +223,7 @@ fn dummy_presignature(id: u64) -> Presignature { sigma: ::Scalar::ONE, }, participants: vec![Participant::from(1), Participant::from(2)], + holders: None, } } @@ -231,6 +232,7 @@ fn dummy_pair(id: u64) -> TriplePair { id, triple0: dummy_triple(), triple1: dummy_triple(), + holders: None, } } diff --git a/integration-tests/src/containers.rs b/integration-tests/src/containers.rs index 9d2ea7140..c85d4f62e 100644 --- a/integration-tests/src/containers.rs +++ b/integration-tests/src/containers.rs @@ -424,6 +424,7 @@ impl Redis { id: pair_id, triple0, triple1, + holders: None, }; storage .get(me) diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 1517559af..c2aab8d1d 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -536,8 +536,11 @@ impl MpcFixtureNodeBuilder { // removing here because we can't clone a triple let my_shares = fixture_config.input.triples.remove(&self.me).unwrap(); for (owner, triple_shares) in my_shares { - for pair in triple_shares { + for mut pair in triple_shares { let pair_id = pair.id; + if pair.holders.is_none() { + pair.holders = Some(pair.triple0.public.participants.clone()); + } let mut slot = triple_storage.reserve(pair_id).await.unwrap(); slot.insert(pair, owner).await; } @@ -551,7 +554,10 @@ impl MpcFixtureNodeBuilder { // removing here because we can't clone a presignature let my_shares = fixture_config.input.presignatures.remove(&self.me).unwrap(); for (owner, presignature_shares) in my_shares { - for presignature_share in presignature_shares { + for mut presignature_share in presignature_shares { + if presignature_share.holders.is_none() { + presignature_share.holders = Some(presignature_share.participants.clone()); + } let mut slot = presignature_storage .reserve(presignature_share.id) .await diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index 5702ad956..fa3320997 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -23,6 +23,7 @@ pub(crate) fn dummy_presignature_with_holders( k: ::Scalar::ZERO, sigma: ::Scalar::ONE, }, + holders: None, participants, } } @@ -36,6 +37,7 @@ pub(crate) fn dummy_pair_with_holders(id: u64, participants: Vec) - id, triple0: dummy_triple_with_holders(participants.clone()), triple1: dummy_triple_with_holders(participants), + holders: None, } } From 69db15ca8bcd663eee9f5374abc461c0c3f8a023 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 2 Mar 2026 20:40:28 +0200 Subject: [PATCH 10/16] clippy --- .../node/src/protocol/signature.rs | 6 +++--- .../node/src/storage/protocol_storage.rs | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index daa14ffc4..ec4639207 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -693,7 +693,7 @@ impl SignGenerating { ); let presignature_pending = if let Some(taken) = self.presignature.take() { - PendingPresignature::Available(taken) + PendingPresignature::Available(Box::new(taken)) } else { PendingPresignature::InStorage( self.presignature_id, @@ -1387,7 +1387,7 @@ impl Drop for SignatureSpawnerTask { } enum PendingPresignature { - Available(PresignatureTaken), + Available(Box), InStorage(PresignatureId, Participant, PresignatureStorage), } @@ -1401,7 +1401,7 @@ impl PendingPresignature { pub async fn fetch(self, timeout: Duration) -> Option { let (id, storage, owner) = match self { - PendingPresignature::Available(taken) => return Some(taken), + PendingPresignature::Available(taken) => return Some(*taken), PendingPresignature::InStorage(id, owner, storage) => (id, storage, owner), }; diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 148245821..58fe57f67 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -804,15 +804,15 @@ impl ProtocolStorage { let Some(mut conn) = self.connect().await else { return Err(StorageError::ConnectionFailed); }; - let result: Result<(Vec, Vec), redis::RedisError> = - redis::Script::new(SCRIPT) - .key(&self.artifact_key) - .key(owner_key(&self.owner_keys, me)) - .arg(Into::::into(peer)) - .arg(threshold as i64) - .arg(ids) - .invoke_async(&mut conn) - .await; + type SyncResult = Result<(Vec, Vec), redis::RedisError>; + let result: SyncResult = redis::Script::new(SCRIPT) + .key(&self.artifact_key) + .key(owner_key(&self.owner_keys, me)) + .arg(Into::::into(peer)) + .arg(threshold as i64) + .arg(ids) + .invoke_async(&mut conn) + .await; match result { Ok((removed, updated)) => Ok((removed, updated)), Err(err) => Err(StorageError::RedisFailed(err.to_string())), From d93b9510ba69e65c0b090e03b7f45544450d941a Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 3 Mar 2026 11:29:04 +0200 Subject: [PATCH 11/16] set holders in tets --- integration-tests/benches/store.rs | 4 ++-- integration-tests/src/containers.rs | 2 +- integration-tests/tests/cases/helpers.rs | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/integration-tests/benches/store.rs b/integration-tests/benches/store.rs index e6f3a1f9d..bb6a84d28 100644 --- a/integration-tests/benches/store.rs +++ b/integration-tests/benches/store.rs @@ -223,7 +223,7 @@ fn dummy_presignature(id: u64) -> Presignature { sigma: ::Scalar::ONE, }, participants: vec![Participant::from(1), Participant::from(2)], - holders: None, + holders: Some(vec![Participant::from(1), Participant::from(2)]), } } @@ -232,7 +232,7 @@ fn dummy_pair(id: u64) -> TriplePair { id, triple0: dummy_triple(), triple1: dummy_triple(), - holders: None, + holders: Some(vec![Participant::from(1), Participant::from(2)]), } } diff --git a/integration-tests/src/containers.rs b/integration-tests/src/containers.rs index 770e68f8a..240785b78 100644 --- a/integration-tests/src/containers.rs +++ b/integration-tests/src/containers.rs @@ -424,7 +424,7 @@ impl Redis { id: pair_id, triple0, triple1, - holders: None, + holders: Some(participant_ids.clone()), }; storage .get(me) diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index fa3320997..aa6bb6ab2 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -23,7 +23,7 @@ pub(crate) fn dummy_presignature_with_holders( k: ::Scalar::ZERO, sigma: ::Scalar::ONE, }, - holders: None, + holders: Some(participants.clone()), participants, } } @@ -36,8 +36,8 @@ pub(crate) fn dummy_pair_with_holders(id: u64, participants: Vec) - TriplePair { id, triple0: dummy_triple_with_holders(participants.clone()), - triple1: dummy_triple_with_holders(participants), - holders: None, + triple1: dummy_triple_with_holders(participants.clone()), + holders: Some(participants), } } From 847e369302b6b0bb63bf4ec58663973dfa5999f9 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 3 Mar 2026 12:15:55 +0200 Subject: [PATCH 12/16] return Result on fetch_owned --- .../node/src/protocol/sync/mod.rs | 36 +++++++++++++++---- .../node/src/storage/protocol_storage.rs | 12 +++---- integration-tests/benches/store.rs | 4 +-- integration-tests/tests/cases/mpc.rs | 8 +++-- 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index f871e173b..bac51acf4 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -186,7 +186,9 @@ impl SyncTask { continue; } - let update = self.new_update(me).await; + let Some(update) = self.new_update(me).await else { + continue; + }; let start = Instant::now(); let receivers = need_sync .iter() @@ -207,7 +209,9 @@ impl SyncTask { continue; } - let update = self.new_update(me).await; + let Some(update) = self.new_update(me).await else { + continue; + }; let active = self.mesh_state.borrow().active().clone(); let start = Instant::now(); @@ -251,15 +255,33 @@ impl SyncTask { } // TODO: use reserved values instead. Note that we cannot fetch our own triples via reserved - async fn new_update(&self, me: Participant) -> SyncUpdate { - let triples = self.triples.fetch_owned(me).await; - let presignatures = self.presignatures.fetch_owned(me).await; + async fn new_update(&self, me: Participant) -> Option { + let triples = match self.triples.fetch_owned(me).await { + Ok(ids) => ids, + Err(err) => { + tracing::warn!( + ?err, + "failed to fetch owned triples, skipping sync broadcast" + ); + return None; + } + }; + let presignatures = match self.presignatures.fetch_owned(me).await { + Ok(ids) => ids, + Err(err) => { + tracing::warn!( + ?err, + "failed to fetch owned presignatures, skipping sync broadcast" + ); + return None; + } + }; - SyncUpdate { + Some(SyncUpdate { from: me, triples, presignatures, - } + }) } /// Process sync responses: diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 58fe57f67..bcc01058d 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -194,21 +194,21 @@ impl ProtocolStorage { .ok() } - pub async fn fetch_owned(&self, me: Participant) -> Vec { + pub async fn fetch_owned(&self, me: Participant) -> Result, StorageError> { let Some(mut conn) = self.connect().await else { - return Vec::new(); + return Err(StorageError::ConnectionFailed); }; // fetch owner set from redis and union with in-memory reservations let owned: HashSet = conn .smembers(owner_key(&self.owner_keys, me)) .await - .inspect_err(|err| { + .map_err(|err| { tracing::warn!(?err, "failed to fetch my owned artifacts"); - }) - .unwrap_or_default(); + StorageError::RedisFailed(err.to_string()) + })?; - owned.union(&*self.reserved.read().await).copied().collect() + Ok(owned.union(&*self.reserved.read().await).copied().collect()) } pub async fn reserve(&self, id: A::Id) -> Option> { diff --git a/integration-tests/benches/store.rs b/integration-tests/benches/store.rs index bb6a84d28..251338208 100644 --- a/integration-tests/benches/store.rs +++ b/integration-tests/benches/store.rs @@ -187,7 +187,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine triple keys", |b| { b.iter(|| { let task = || async { - env.triples.fetch_owned(env.me).await; + let _ = env.triples.fetch_owned(env.me).await; }; rt.block_on(task()); @@ -197,7 +197,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine presignature keys", |b| { b.iter(|| { let task = || async { - env.presignatures.fetch_owned(env.me).await; + let _ = env.presignatures.fetch_owned(env.me).await; }; rt.block_on(task()); diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index c407c93ab..da70f6111 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -86,7 +86,7 @@ async fn test_basic_generate_triples() { for node in &network.nodes { let mut nodes_shares = BTreeMap::new(); for peer in &network.nodes { - let triple_ids = node.triple_storage.fetch_owned(peer.me).await; + let triple_ids = node.triple_storage.fetch_owned(peer.me).await.unwrap(); let mut peer_triples = Vec::with_capacity(triple_ids.len()); for triple_id in triple_ids { let pair = conn @@ -127,7 +127,11 @@ async fn test_basic_generate_presignature() { for node in &network.nodes { let mut nodes_shares = BTreeMap::new(); for peer in &network.nodes { - let presignature_ids = node.presignature_storage.fetch_owned(peer.me).await; + let presignature_ids = node + .presignature_storage + .fetch_owned(peer.me) + .await + .unwrap(); let mut peer_presignatures = Vec::with_capacity(presignature_ids.len()); for presignature_id in presignature_ids { let t = conn From 46b1d6d54ac616aa5ec825eeec3a5c9332adcb6e Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Tue, 3 Mar 2026 13:40:09 +0200 Subject: [PATCH 13/16] define type for sync response --- .../node/src/protocol/sync/mod.rs | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index bac51acf4..3ffa0455c 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -38,6 +38,16 @@ pub enum SyncError { ResponseFailed, } +/// Result of a sync RPC to a single peer. +pub enum SyncPeerResponse { + /// Self-peer: no RPC was performed. + SelfPeer, + /// Peer responded successfully with its view of not_found artifacts. + Success(SyncUpdate), + /// RPC to peer failed. + Failed(String), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SyncUpdate { pub from: Participant, @@ -289,21 +299,19 @@ impl SyncTask { /// 2. Send synced peer notifications to mesh (for status transitions) async fn process_sync_responses( &self, - responses: Vec<(Participant, Option>)>, + responses: Vec<(Participant, SyncPeerResponse)>, me: Participant, threshold: usize, ) -> Result<(), String> { for (peer, result) in responses { match result { - // No RPC was performed (self-peer), treat as successful - None => { + SyncPeerResponse::SelfPeer => { if self.synced_peer_tx.send(peer).await.is_err() { tracing::error!("sync reporter is down: state sync will no longer work"); return Err("sync reporter is down".to_string()); } } - // RPC succeeded, process the not_found data - Some(Ok(response)) => { + SyncPeerResponse::Success(response) => { tracing::debug!( ?peer, not_found_triples = response.triples.len(), @@ -352,8 +360,7 @@ impl SyncTask { } } } - // RPC failed, don't notify mesh (peer stays in Syncing state) - Some(Err(err)) => { + SyncPeerResponse::Failed(err) => { tracing::warn!(?peer, ?err, "failed to sync peer"); } } @@ -376,7 +383,7 @@ async fn broadcast_sync( update: SyncUpdate, receivers: impl Iterator, me: Participant, -) -> Vec<(Participant, Option>)> { +) -> Vec<(Participant, SyncPeerResponse)> { let mut tasks = JoinSet::new(); let update = Arc::new(update); @@ -385,12 +392,13 @@ async fn broadcast_sync( let update = update.clone(); let url = info.url; tasks.spawn(async move { - // Only actually do the sync on other peers, not on self. let sync_result = if p != me { - Some(client.sync(&url, &update).await.map_err(|e| e.to_string())) + match client.sync(&url, &update).await { + Ok(response) => SyncPeerResponse::Success(response), + Err(err) => SyncPeerResponse::Failed(err.to_string()), + } } else { - // No RPC sync is attempted for self (`p == me`); return None to indicate no-op. - None + SyncPeerResponse::SelfPeer }; (p, sync_result) }); From 55a5004a0474c72bcd3e8158eaea25be9303a64d Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Wed, 4 Mar 2026 12:32:18 +0200 Subject: [PATCH 14/16] do not recycle presignatures, add a guard on active participants --- .../node/src/protocol/presignature.rs | 9 +++ .../node/src/protocol/signature.rs | 26 +++---- chain-signatures/node/src/protocol/triple.rs | 5 ++ .../node/src/storage/protocol_storage.rs | 74 +------------------ 4 files changed, 28 insertions(+), 86 deletions(-) diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index fad9fd782..f6f59d4c5 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -525,6 +525,15 @@ impl PresignatureSpawner { } async fn stockpile(&mut self, active: &[Participant], cfg: &ProtocolConfig) { + if active.len() < self.threshold { + tracing::warn!( + active = active.len(), + threshold = self.threshold, + "not enough active participants to generate presignatures" + ); + return; + } + let not_enough_presignatures = { // Stopgap to prevent too many presignatures in the system. This should be around min_presig*nodes*2 // for good measure so that we have enough presignatures to do sig generation while also maintain diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 4ef33204e..ea712281a 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -273,7 +273,6 @@ impl SignOrganizer { let (presignature_id, presignature, active) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); let active = active.iter().copied().collect::>(); - let mut recycle = Vec::new(); let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { @@ -287,7 +286,15 @@ impl SignOrganizer { }; let participants = intersect_vec(&[holders, &active]); if participants.len() < ctx.threshold { - recycle.push(taken); + tracing::warn!( + ?sign_id, + id = taken.artifact.id, + ?participants, + ?active, + threshold = ctx.threshold, + "dropping presignature: not enough overlapping participants with active set" + ); + drop(taken); continue; } @@ -298,13 +305,6 @@ impl SignOrganizer { }) .await; - let presignatures = ctx.presignatures.clone(); - tokio::spawn(async move { - for taken in recycle { - presignatures.recycle_mine(me, taken).await; - } - }); - let (taken, participants) = match fetch { Ok(value) => value, Err(_) => { @@ -616,8 +616,8 @@ impl SignPositor { if counter.enough_rejects(ctx.threshold) { tracing::warn!(?sign_id, ?round, ?from, "received enough REJECTs, reorganizing"); if let Some(taken) = presignature { - tracing::warn!(?sign_id, "recycling presignature due to REJECTs"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + tracing::warn!(?sign_id, "dropping presignature due to REJECTs"); + drop(taken); } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -657,8 +657,8 @@ impl SignPositor { "proposer posit deadline reached, expiring round" ); if let Some(taken) = presignature { - tracing::warn!(?sign_id, "recycling presignature due to proposer timeout"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + tracing::warn!(?sign_id, "dropping presignature due to proposer timeout"); + drop(taken); } } else { tracing::warn!(?sign_id, "deliberator posit timeout waiting for Start, reorganizing"); diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index f24627a60..78975973b 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -529,6 +529,11 @@ impl TripleSpawner { /// and the maximum number of all ongoing generation protocols is below the maximum. async fn stockpile(&mut self, participants: &[Participant], cfg: &ProtocolConfig) { if participants.len() < self.threshold { + tracing::warn!( + active = participants.len(), + threshold = self.threshold, + "not enough active participants to generate triples" + ); return; } diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index bcc01058d..7de2753b3 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -109,7 +109,7 @@ pub struct ArtifactTaken { pub struct ArtifactTakenDropper { pub id: A::Id, - pub(crate) dropper: Option>, + dropper: Option>, } impl Drop for ArtifactTakenDropper { @@ -676,78 +676,6 @@ impl ProtocolStorage { } } - /// Return a taken artifact back to the available pool. - pub async fn recycle_mine(&self, me: Participant, taken: ArtifactTaken) -> bool { - const SCRIPT: &str = r#" - local artifact_key = KEYS[1] - local mine_key = KEYS[2] - local artifact_id = ARGV[1] - local artifact = ARGV[2] - local num_holders = tonumber(ARGV[3]) - - -- Add back to artifact hash map - redis.call("HSET", artifact_key, artifact_id, artifact) - - -- Add back to mine set - redis.call("SADD", mine_key, artifact_id) - - -- Restore holders set - local holders_key = artifact_key .. ':holders:' .. artifact_id - redis.call("DEL", holders_key) - if num_holders > 0 then - redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) - end - - return 1 - "#; - - let start = Instant::now(); - let (artifact, mut dropper) = taken.take(); - // We manually handle the return, so we don't want the dropper to unreserve it. - dropper.dropper.take(); - - let id = artifact.id(); - let holders: Vec = artifact - .holders() - .expect("holders must be set before recycle") - .iter() - .map(|p| Into::::into(*p)) - .collect(); - - let Some(mut conn) = self.connect().await else { - tracing::warn!(id, "failed to return artifact: connection failed"); - return false; - }; - - let result: Result = redis::Script::new(SCRIPT) - .key(&self.artifact_key) - .key(owner_key(&self.owner_keys, me)) - .arg(id) - .arg(&artifact) - .arg(holders.len() as i64) - .arg(holders.as_slice()) - .invoke_async(&mut conn) - .await; - - let elapsed = start.elapsed(); - crate::metrics::storage::REDIS_LATENCY - .with_label_values(&[A::METRIC_LABEL, "return_mine"]) - .observe(elapsed.as_millis() as f64); - - match result { - Ok(_) => { - self.reserved.write().await.remove(&id); - self.used.write().await.remove(&id); - tracing::info!(id, ?elapsed, "returned mine artifact"); - true - } - Err(err) => { - tracing::warn!(id, ?err, ?elapsed, "failed to return mine artifact"); - false - } - } - } - /// Check if an artifact is reserved. pub async fn contains_reserved(&self, id: A::Id) -> bool { self.reserved.read().await.contains(&id) From 24c373d43b9773932639a28d6511f0b4b0d2a250 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Mon, 9 Mar 2026 18:40:59 +0200 Subject: [PATCH 15/16] do not add reserved to owned during sync --- chain-signatures/node/src/storage/protocol_storage.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 7de2753b3..42b7c61ff 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -199,7 +199,6 @@ impl ProtocolStorage { return Err(StorageError::ConnectionFailed); }; - // fetch owner set from redis and union with in-memory reservations let owned: HashSet = conn .smembers(owner_key(&self.owner_keys, me)) .await @@ -208,7 +207,7 @@ impl ProtocolStorage { StorageError::RedisFailed(err.to_string()) })?; - Ok(owned.union(&*self.reserved.read().await).copied().collect()) + Ok(owned.into_iter().collect()) } pub async fn reserve(&self, id: A::Id) -> Option> { From 52138a85479956a5aa6a5b64f2298d047c82e953 Mon Sep 17 00:00:00 2001 From: Serhii Volovyk Date: Thu, 12 Mar 2026 13:02:46 +0200 Subject: [PATCH 16/16] Revert "do not recycle presignatures, add a guard on active participants" This reverts commit 55a5004a0474c72bcd3e8158eaea25be9303a64d. --- .../node/src/protocol/presignature.rs | 9 --- .../node/src/protocol/signature.rs | 26 +++---- chain-signatures/node/src/protocol/triple.rs | 5 -- .../node/src/storage/protocol_storage.rs | 74 ++++++++++++++++++- 4 files changed, 86 insertions(+), 28 deletions(-) diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index f6f59d4c5..fad9fd782 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -525,15 +525,6 @@ impl PresignatureSpawner { } async fn stockpile(&mut self, active: &[Participant], cfg: &ProtocolConfig) { - if active.len() < self.threshold { - tracing::warn!( - active = active.len(), - threshold = self.threshold, - "not enough active participants to generate presignatures" - ); - return; - } - let not_enough_presignatures = { // Stopgap to prevent too many presignatures in the system. This should be around min_presig*nodes*2 // for good measure so that we have enough presignatures to do sig generation while also maintain diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index e150ca6e5..18bd58c59 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -273,6 +273,7 @@ impl SignOrganizer { let (presignature_id, presignature, active) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); let active = active.iter().copied().collect::>(); + let mut recycle = Vec::new(); let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { @@ -286,15 +287,7 @@ impl SignOrganizer { }; let participants = intersect_vec(&[holders, &active]); if participants.len() < ctx.threshold { - tracing::warn!( - ?sign_id, - id = taken.artifact.id, - ?participants, - ?active, - threshold = ctx.threshold, - "dropping presignature: not enough overlapping participants with active set" - ); - drop(taken); + recycle.push(taken); continue; } @@ -305,6 +298,13 @@ impl SignOrganizer { }) .await; + let presignatures = ctx.presignatures.clone(); + tokio::spawn(async move { + for taken in recycle { + presignatures.recycle_mine(me, taken).await; + } + }); + let (taken, participants) = match fetch { Ok(value) => value, Err(_) => { @@ -631,8 +631,8 @@ impl SignPositor { if counter.enough_rejects(ctx.threshold) { tracing::warn!(?sign_id, ?round, ?from, "received enough REJECTs, reorganizing"); if let Some(taken) = presignature { - tracing::warn!(?sign_id, "dropping presignature due to REJECTs"); - drop(taken); + tracing::warn!(?sign_id, "recycling presignature due to REJECTs"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -673,8 +673,8 @@ impl SignPositor { "proposer posit deadline reached, expiring round" ); if let Some(taken) = presignature { - tracing::warn!(?sign_id, "dropping presignature due to proposer timeout"); - drop(taken); + tracing::warn!(?sign_id, "recycling presignature due to proposer timeout"); + ctx.presignatures.recycle_mine(ctx.me, taken).await; } } else { tracing::warn!(?sign_id, me=?ctx.me, ?proposer, "deliberator posit timeout waiting for Start, reorganizing"); diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 8007241b8..e5ce1b0f6 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -558,11 +558,6 @@ impl TripleSpawner { /// and the maximum number of all ongoing generation protocols is below the maximum. async fn stockpile(&mut self, participants: &[Participant], cfg: &ProtocolConfig) { if participants.len() < self.threshold { - tracing::warn!( - active = participants.len(), - threshold = self.threshold, - "not enough active participants to generate triples" - ); return; } diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index 42b7c61ff..b926be7ba 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -109,7 +109,7 @@ pub struct ArtifactTaken { pub struct ArtifactTakenDropper { pub id: A::Id, - dropper: Option>, + pub(crate) dropper: Option>, } impl Drop for ArtifactTakenDropper { @@ -675,6 +675,78 @@ impl ProtocolStorage { } } + /// Return a taken artifact back to the available pool. + pub async fn recycle_mine(&self, me: Participant, taken: ArtifactTaken) -> bool { + const SCRIPT: &str = r#" + local artifact_key = KEYS[1] + local mine_key = KEYS[2] + local artifact_id = ARGV[1] + local artifact = ARGV[2] + local num_holders = tonumber(ARGV[3]) + + -- Add back to artifact hash map + redis.call("HSET", artifact_key, artifact_id, artifact) + + -- Add back to mine set + redis.call("SADD", mine_key, artifact_id) + + -- Restore holders set + local holders_key = artifact_key .. ':holders:' .. artifact_id + redis.call("DEL", holders_key) + if num_holders > 0 then + redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) + end + + return 1 + "#; + + let start = Instant::now(); + let (artifact, mut dropper) = taken.take(); + // We manually handle the return, so we don't want the dropper to unreserve it. + dropper.dropper.take(); + + let id = artifact.id(); + let holders: Vec = artifact + .holders() + .expect("holders must be set before recycle") + .iter() + .map(|p| Into::::into(*p)) + .collect(); + + let Some(mut conn) = self.connect().await else { + tracing::warn!(id, "failed to return artifact: connection failed"); + return false; + }; + + let result: Result = redis::Script::new(SCRIPT) + .key(&self.artifact_key) + .key(owner_key(&self.owner_keys, me)) + .arg(id) + .arg(&artifact) + .arg(holders.len() as i64) + .arg(holders.as_slice()) + .invoke_async(&mut conn) + .await; + + let elapsed = start.elapsed(); + crate::metrics::storage::REDIS_LATENCY + .with_label_values(&[A::METRIC_LABEL, "return_mine"]) + .observe(elapsed.as_millis() as f64); + + match result { + Ok(_) => { + self.reserved.write().await.remove(&id); + self.used.write().await.remove(&id); + tracing::info!(id, ?elapsed, "returned mine artifact"); + true + } + Err(err) => { + tracing::warn!(id, ?err, ?elapsed, "failed to return mine artifact"); + false + } + } + } + /// Check if an artifact is reserved. pub async fn contains_reserved(&self, id: A::Id) -> bool { self.reserved.read().await.contains(&id)