diff --git a/.gitignore b/.gitignore index 2d8ff61c0..3106d8414 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .direnv .DS_Store .idea +.vscode tmp *.log diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index b915b5671..fad9fd782 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -7,6 +7,7 @@ use crate::protocol::contract::primitives::intersect_vec; use crate::protocol::posit::PositInternalAction; use crate::protocol::MpcSignProtocol; use crate::storage::presignature_storage::{PresignatureSlot, PresignatureStorage}; +use crate::storage::protocol_storage::ProtocolArtifact; use crate::storage::triple_storage::{TriplesTaken, TriplesTakenDropper}; use crate::storage::TripleStorage; use crate::types::{PresignatureProtocol, SecretKeyShare}; @@ -56,7 +57,10 @@ impl FullPresignatureId { pub struct Presignature { pub id: PresignatureId, pub output: PresignOutput, + /// Original protocol participants pub participants: Vec, + /// Nodes still holding their share of the artifact + pub holders: Option>, } impl fmt::Debug for Presignature { @@ -64,6 +68,7 @@ impl fmt::Debug for Presignature { f.debug_struct("Presignature") .field("id", &self.id) .field("participants", &self.participants) + .field("holders", &self.holders) .finish() } } @@ -106,6 +111,7 @@ impl<'de> Deserialize<'de> for Presignature { k: fields.output_k, sigma: fields.output_sigma, }, + holders: None, participants: fields.participants, }) } @@ -262,6 +268,7 @@ impl PresignatureGenerator { id: self.id, output, participants: self.participants.clone(), + holders: Some(self.participants.clone()), }; if self.owner == me { tracing::info!(id = self.id, "assigning presignature to myself"); @@ -478,8 +485,12 @@ impl PresignatureSpawner { }; let pair_id = triples.artifact.id; - // note: only one of the pair's participants is needed since they are the same. - let participants = intersect_vec(&[active, &triples.artifact.triple0.public.participants]); + // use holders (not original participants) since some nodes may have lost the artifact. + let Some(holders) = triples.artifact.holders() else { + tracing::error!(?pair_id, "holders not set on taken triple pair"); + return; + }; + let participants = intersect_vec(&[active, holders]); if participants.len() < self.threshold { tracing::warn!( ?pair_id, @@ -875,6 +886,7 @@ mod tests { sigma: ::Scalar::ONE, }, participants: vec![Participant::from(1), Participant::from(2)], + holders: None, }; // Serialize Presignature to JSON diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index dfcac4e02..18bd58c59 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -13,6 +13,7 @@ use crate::protocol::presignature::PresignatureId; use crate::protocol::Chain; use crate::rpc::{ContractStateWatcher, RpcChannel}; use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; +use crate::storage::protocol_storage::ProtocolArtifact; use crate::storage::PresignatureStorage; use crate::types::SignatureProtocol; use crate::util::{AffinePointExt, JoinMap, TimeoutBudget}; @@ -277,7 +278,14 @@ impl SignOrganizer { let fetch = tokio::time::timeout(remaining, async { loop { if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { - let participants = intersect_vec(&[&taken.artifact.participants, &active]); + let Some(holders) = taken.artifact.holders() else { + tracing::error!( + id = taken.artifact.id, + "holders not set on taken presignature" + ); + continue; + }; + let participants = intersect_vec(&[holders, &active]); if participants.len() < ctx.threshold { recycle.push(taken); continue; @@ -700,7 +708,7 @@ impl SignGenerating { ); let presignature_pending = if let Some(taken) = self.presignature.take() { - PendingPresignature::Available(taken) + PendingPresignature::Available(Box::new(taken)) } else { PendingPresignature::InStorage( self.presignature_id, @@ -1394,7 +1402,7 @@ impl Drop for SignatureSpawnerTask { } enum PendingPresignature { - Available(PresignatureTaken), + Available(Box), InStorage(PresignatureId, Participant, PresignatureStorage), } @@ -1408,7 +1416,7 @@ impl PendingPresignature { pub async fn fetch(self, timeout: Duration) -> Option { let (id, storage, owner) = match self { - PendingPresignature::Available(taken) => return Some(taken), + PendingPresignature::Available(taken) => return Some(*taken), PendingPresignature::InStorage(id, owner, storage) => (id, storage, owner), }; diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index a8322bc9b..3ffa0455c 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -27,6 +27,9 @@ pub const RECURRING_SYNC_INTERVAL: Duration = Duration::from_secs(3600 * 24); /// Timeout for waiting for a sync response from the sync task const SYNC_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5); +/// Timeout for the entire broadcast operation (waiting for all peers to respond) +const BROADCAST_TIMEOUT: Duration = Duration::from_secs(10); + #[derive(Debug, thiserror::Error)] pub enum SyncError { #[error("failed to queue sync request")] @@ -35,6 +38,16 @@ pub enum SyncError { ResponseFailed, } +/// Result of a sync RPC to a single peer. +pub enum SyncPeerResponse { + /// Self-peer: no RPC was performed. + SelfPeer, + /// Peer responded successfully with its view of not_found artifacts. + Success(SyncUpdate), + /// RPC to peer failed. + Failed(String), +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SyncUpdate { pub from: Participant, @@ -70,33 +83,25 @@ impl SyncRequest { ) { let start = Instant::now(); - let outdated_triples = if !self.update.triples.is_empty() { - match triples - .remove_outdated(self.update.from, &self.update.triples) - .await - { - Ok(result) => result, - Err(err) => { - let _ = self.response_tx.send(Err(err)); - return; - } + let outdated_triples = match triples + .remove_outdated(self.update.from, &self.update.triples) + .await + { + Ok(result) => result, + Err(err) => { + let _ = self.response_tx.send(Err(err)); + return; } - } else { - Default::default() }; - let outdated_presignatures = if !self.update.presignatures.is_empty() { - match presignatures - .remove_outdated(self.update.from, &self.update.presignatures) - .await - { - Ok(result) => result, - Err(err) => { - let _ = self.response_tx.send(Err(err)); - return; - } + let outdated_presignatures = match presignatures + .remove_outdated(self.update.from, &self.update.presignatures) + .await + { + Ok(result) => result, + Err(err) => { + let _ = self.response_tx.send(Err(err)); + return; } - } else { - Default::default() }; tracing::info!( @@ -156,22 +161,25 @@ impl SyncTask { } pub async fn run(mut self) { - tracing::info!("task has been started"); + tracing::info!("sync task has been started"); + // Poll for our participant info from contract state + // TODO: constantly watch for changes on node state after this initial one so we can start/stop sync running. let mut watcher_interval = tokio::time::interval(Duration::from_millis(500)); + // Trigger sync broadcasts to peers in need_sync state let mut sync_interval = tokio::time::interval(Duration::from_millis(200)); - // Broadcast should generally not be necessary. + // Periodic full sync broadcast to all active peers (TODO: should not be necessary) let mut broadcast_interval = tokio::time::interval(RECURRING_SYNC_INTERVAL); - let mut broadcast_check_interval = tokio::time::interval(Duration::from_millis(100)); + // Poll whether any ongoing sync task has completed (from either sync_interval or broadcast_interval) + let mut sync_check_interval = tokio::time::interval(Duration::from_millis(100)); - // Do NOT start until we have our own participant info. - // TODO: constantly watch for changes on node state after this initial one so we can start/stop sync running. - let (_threshold, me) = loop { + // Do NOT start until we have our own participant info + let (threshold, me) = loop { watcher_interval.tick().await; if let Some(info) = self.contract.info().await { break info; } }; - tracing::info!(?me, "mpc network ready, running..."); + tracing::info!(?me, "starting sync loop..."); let mut broadcast = Option::<(Instant, JoinHandle<_>)>::None; loop { @@ -188,7 +196,9 @@ impl SyncTask { continue; } - let update = self.new_update(me).await; + let Some(update) = self.new_update(me).await else { + continue; + }; let start = Instant::now(); let receivers = need_sync .iter() @@ -198,7 +208,6 @@ impl SyncTask { self.client.clone(), update, receivers.into_iter(), - self.synced_peer_tx.clone(), me, )); broadcast = Some((start, task)); @@ -210,7 +219,9 @@ impl SyncTask { continue; } - let update = self.new_update(me).await; + let Some(update) = self.new_update(me).await else { + continue; + }; let active = self.mesh_state.borrow().active().clone(); let start = Instant::now(); @@ -218,13 +229,12 @@ impl SyncTask { self.client.clone(), update, active.into_iter(), - self.synced_peer_tx.clone(), me )); broadcast = Some((start, task)); } // check that our broadcast has completed, and if so process the result. - _ = broadcast_check_interval.tick() => { + _ = sync_check_interval.tick() => { let Some((start, handle)) = broadcast.take() else { continue; }; @@ -234,10 +244,17 @@ impl SyncTask { continue; } - if let Err(err) = handle.await { - tracing::warn!(?err, "broadcast task failed"); - } else { - tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); + match handle.await { + Ok(responses) => { + // Process sync responses: update artifact participants based on not_found data + if let Err(err) = self.process_sync_responses(responses, me, threshold).await { + tracing::warn!(?err, "failed to process sync responses"); + } + tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); + } + Err(err) => { + tracing::warn!(?err, "broadcast task failed"); + } } } Some(sync_req) = self.requests.updates.recv() => { @@ -248,15 +265,108 @@ impl SyncTask { } // TODO: use reserved values instead. Note that we cannot fetch our own triples via reserved - async fn new_update(&self, me: Participant) -> SyncUpdate { - let triples = self.triples.fetch_owned(me).await; - let presignatures = self.presignatures.fetch_owned(me).await; + async fn new_update(&self, me: Participant) -> Option { + let triples = match self.triples.fetch_owned(me).await { + Ok(ids) => ids, + Err(err) => { + tracing::warn!( + ?err, + "failed to fetch owned triples, skipping sync broadcast" + ); + return None; + } + }; + let presignatures = match self.presignatures.fetch_owned(me).await { + Ok(ids) => ids, + Err(err) => { + tracing::warn!( + ?err, + "failed to fetch owned presignatures, skipping sync broadcast" + ); + return None; + } + }; - SyncUpdate { + Some(SyncUpdate { from: me, triples, presignatures, + }) + } + + /// Process sync responses: + /// 1. Remove peers from artifact participants if they're missing data + /// 2. Send synced peer notifications to mesh (for status transitions) + async fn process_sync_responses( + &self, + responses: Vec<(Participant, SyncPeerResponse)>, + me: Participant, + threshold: usize, + ) -> Result<(), String> { + for (peer, result) in responses { + match result { + SyncPeerResponse::SelfPeer => { + if self.synced_peer_tx.send(peer).await.is_err() { + tracing::error!("sync reporter is down: state sync will no longer work"); + return Err("sync reporter is down".to_string()); + } + } + SyncPeerResponse::Success(response) => { + tracing::debug!( + ?peer, + not_found_triples = response.triples.len(), + not_found_presignatures = response.presignatures.len(), + "received sync response" + ); + + // Batch remove peer from all triples and prune + let triple_res = self + .triples + .remove_holder_and_prune(me, peer, threshold, &response.triples) + .await; + + // Batch remove peer from all presignatures and prune + let presig_res = self + .presignatures + .remove_holder_and_prune(me, peer, threshold, &response.presignatures) + .await; + + match (triple_res, presig_res) { + (Ok((t_removed, t_updated)), Ok((p_removed, p_updated))) => { + tracing::info!( + ?peer, + removed_triples = t_removed.len(), + updated_triples = t_updated.len(), + removed_presignatures = p_removed.len(), + updated_presignatures = p_updated.len(), + "batch removed peer from artifacts and pruned" + ); + // Only notify mesh if both succeeded + if self.synced_peer_tx.send(peer).await.is_err() { + tracing::error!( + ?peer, + "sync reporter is down: state sync will no longer work" + ); + return Err("sync reporter is down".to_string()); + } + } + (triple_res, presig_res) => { + tracing::warn!( + ?peer, + ?triple_res, + ?presig_res, + "sync batch failed, not notifying mesh" + ); + } + } + } + SyncPeerResponse::Failed(err) => { + tracing::warn!(?peer, ?err, "failed to sync peer"); + } + } } + + Ok(()) } /// Channel for communicating back from the sync task which nodes are now updated. @@ -266,75 +376,65 @@ impl SyncTask { } /// Broadcast an update to all participants specified by `receivers`. +/// Returns results for all peers that complete within BROADCAST_TIMEOUT. +/// Peers that don't respond are not included in results and will be retried later. async fn broadcast_sync( client: NodeClient, update: SyncUpdate, receivers: impl Iterator, - synced_peer_tx: mpsc::Sender, me: Participant, -) { - if update.is_empty() { - for (participant, _) in receivers { - if synced_peer_tx.send(participant).await.is_err() { - tracing::error!( - ?participant, - "sync reporter is down: state sync will no longer work" - ); - } - } - return; - } - - let start = Instant::now(); +) -> Vec<(Participant, SyncPeerResponse)> { let mut tasks = JoinSet::new(); let update = Arc::new(update); + for (p, info) in receivers { let client = client.clone(); let update = update.clone(); let url = info.url; - let sync_tx = synced_peer_tx.clone(); tasks.spawn(async move { - // Only actually do the sync on other peers, not on self. (Hack) We - // still want to send the message to synced_peer_tx though, since - // the mesh does not currently understand which node is self, so it - // will trigger a sync to self. - let sync_view = if p != me { - let res = client.sync(&url, &update).await; - Some(res) + let sync_result = if p != me { + match client.sync(&url, &update).await { + Ok(response) => SyncPeerResponse::Success(response), + Err(err) => SyncPeerResponse::Failed(err.to_string()), + } } else { - None + SyncPeerResponse::SelfPeer }; - if sync_tx.send(p).await.is_err() { - tracing::error!("sync reporter is down: state sync will no longer work") - } - (p, sync_view) + (p, sync_result) }); } - let resps = tasks - .join_all() - .await - .into_iter() - .filter_map(|(p, view)| { - if let Some(Ok(_response)) = view { - tracing::debug!( - ?p, - not_found_triples = _response.triples.len(), - not_found_presignatures = _response.presignatures.len(), - "received sync response" - ); - Some(p) - } else { - None + let deadline = Instant::now() + BROADCAST_TIMEOUT; + let mut results = Vec::new(); + while !tasks.is_empty() { + let now = Instant::now(); + if now >= deadline { + break; + } + + tokio::select! { + res = tasks.join_next() => { + match res { + Some(Ok((p, sync_result))) => { + results.push((p, sync_result)); + } + Some(Err(err)) => { + tracing::warn!(?err, "sync task failed"); + } + None => break, + } } - }) - .collect::>(); + _ = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => { + break; + } + } + } - tracing::debug!( - elapsed = ?start.elapsed(), - responded = ?resps, - "broadcast completed", - ); + if !tasks.is_empty() { + tasks.abort_all(); + } + + results } #[derive(Clone)] @@ -384,39 +484,3 @@ impl SyncChannel { }) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::node_client::Options as NodeClientOptions; - - #[tokio::test] - async fn test_broadcast_sync_on_empty_update() { - let client = NodeClient::new(&NodeClientOptions::default()); - let update = SyncUpdate::empty(); - let (tx, mut rx) = mpsc::channel(4); - - let participants = vec![ - (Participant::from(1u32), ParticipantInfo::new(1)), - (Participant::from(2u32), ParticipantInfo::new(2)), - ]; - - broadcast_sync( - client, - update, - participants.clone().into_iter(), - tx, - Participant::from(0u32), - ) - .await; - - let mut received = Vec::new(); - for _ in 0..participants.len() { - received.push(rx.recv().await.expect("missing synced participant")); - } - - let expected: Vec<_> = participants.iter().map(|(p, _)| *p).collect(); - assert_eq!(received, expected); - assert!(rx.recv().await.is_none()); - } -} diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 1ee0ae81a..e5ce1b0f6 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -279,6 +279,7 @@ impl TripleGenerator { id: self.id, triple0: first, triple1: second, + holders: Some(self.participants.clone()), }; self.slot.insert(pair, triple_owner).await; break; diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index a8c3f0b90..393e5518e 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -2,6 +2,8 @@ use deadpool_redis::Pool; use near_sdk::AccountId; use redis::{FromRedisValue, RedisError, RedisWrite, ToRedisArgs}; +use cait_sith::protocol::Participant; + use super::protocol_storage::{ArtifactSlot, ArtifactTaken, ArtifactTakenDropper, ProtocolStorage}; use crate::protocol::presignature::{Presignature, PresignatureId}; use crate::storage::protocol_storage::ProtocolArtifact; @@ -24,6 +26,18 @@ impl ProtocolArtifact for Presignature { self.id } + fn participants(&self) -> &[Participant] { + &self.participants + } + + fn holders(&self) -> Option<&[Participant]> { + self.holders.as_deref() + } + + fn set_holders(&mut self, holders: Vec) { + self.holders = Some(holders); + } + const METRIC_LABEL: &'static str = "presignature"; } diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index a2ce74feb..b926be7ba 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -58,6 +58,14 @@ pub trait ProtocolArtifact: + 'static; fn id(&self) -> Self::Id; + + /// Original protocol participants (immutable) + fn participants(&self) -> &[Participant]; + + /// Nodes that still hold their share of the artifact + fn holders(&self) -> Option<&[Participant]>; + + fn set_holders(&mut self, holders: Vec); } /// A pre-reserved slot for an artifact that will eventually be inserted. @@ -186,21 +194,20 @@ impl ProtocolStorage { .ok() } - pub async fn fetch_owned(&self, me: Participant) -> Vec { + pub async fn fetch_owned(&self, me: Participant) -> Result, StorageError> { let Some(mut conn) = self.connect().await else { - return Vec::new(); + return Err(StorageError::ConnectionFailed); }; - // fetch owner set from redis and union with in-memory reservations let owned: HashSet = conn .smembers(owner_key(&self.owner_keys, me)) .await - .inspect_err(|err| { + .map_err(|err| { tracing::warn!(?err, "failed to fetch my owned artifacts"); - }) - .unwrap_or_default(); + StorageError::RedisFailed(err.to_string()) + })?; - owned.union(&*self.reserved.read().await).copied().collect() + Ok(owned.into_iter().collect()) } pub async fn reserve(&self, id: A::Id) -> Option> { @@ -278,6 +285,10 @@ impl ProtocolStorage { if #outdated >= 4096 then redis.call("SREM", owner_key, unpack(outdated)) redis.call("HDEL", artifact_key, unpack(outdated)) + -- also delete holders sets for each outdated artifact + for _, oid in ipairs(outdated) do + redis.call("DEL", artifact_key .. ':holders:' .. oid) + end -- clear the outdated list for the next batch outdated = {} end @@ -287,6 +298,10 @@ impl ProtocolStorage { if #outdated > 0 then redis.call("SREM", owner_key, unpack(outdated)) redis.call("HDEL", artifact_key, unpack(outdated)) + -- also delete holders sets for each outdated artifact + for _, oid in ipairs(outdated) do + redis.call("DEL", artifact_key .. ':holders:' .. oid) + end end -- find shares that were shared with us but not found in our storage @@ -344,8 +359,9 @@ impl ProtocolStorage { } } - /// Insert an artifact into the storage. If `mine` is true, the artifact will be - /// owned by the current node. If `back` is true, the artifact will be marked as unused. + /// Insert an artifact into storage under `owner`'s ownership set. + /// Holders must be set on the artifact before calling this; they are + /// persisted as a dedicated Redis set for later holder-tracking. pub async fn insert(&self, artifact: A, owner: Participant) -> bool { const SCRIPT: &str = r#" local artifact_key = KEYS[1] @@ -353,10 +369,18 @@ impl ProtocolStorage { local owner_key = KEYS[3] local artifact_id = ARGV[1] local artifact = ARGV[2] + local num_holders = tonumber(ARGV[3]) redis.call("SADD", owner_key, artifact_id) redis.call("SADD", owner_keys, owner_key) redis.call("HSET", artifact_key, artifact_id, artifact) + + -- Store holders in a dedicated Redis set + local holders_key = artifact_key .. ':holders:' .. artifact_id + redis.call("DEL", holders_key) + if num_holders > 0 then + redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) + end "#; let start = Instant::now(); @@ -367,6 +391,13 @@ impl ProtocolStorage { return false; } + let holders: Vec = artifact + .holders() + .expect("holders must be set before insert") + .iter() + .map(|p| Into::::into(*p)) + .collect(); + let Some(mut conn) = self.connect().await else { tracing::warn!(id, "failed to insert artifact: connection failed"); return false; @@ -376,7 +407,9 @@ impl ProtocolStorage { .key(&self.owner_keys) .key(owner_key(&self.owner_keys, owner)) .arg(id) - .arg(artifact) + .arg(&artifact) + .arg(holders.len() as i64) + .arg(holders.as_slice()) .invoke_async(&mut conn) .await; drop(used); @@ -448,7 +481,13 @@ impl ProtocolStorage { return {err = "WARN artifact " .. artifact_id .. " not found"} end redis.call("HDEL", artifact_key, artifact_id) - return artifact + + -- Read and delete the holders set + local holders_key = artifact_key .. ':holders:' .. artifact_id + local holders = redis.call("SMEMBERS", holders_key) + redis.call("DEL", holders_key) + + return {artifact, holders} "#; let start = Instant::now(); @@ -462,7 +501,7 @@ impl ProtocolStorage { self.used.write().await.remove(&id); return None; }; - let result: Result = redis::Script::new(SCRIPT) + let result: Result<(A, Vec), _> = redis::Script::new(SCRIPT) .key(&self.artifact_key) .key(owner_key(&self.owner_keys, owner)) .arg(id) @@ -475,7 +514,9 @@ impl ProtocolStorage { .observe(elapsed.as_millis() as f64); match result { - Ok(artifact) => { + Ok((mut artifact, holders)) => { + let holders = holders.into_iter().map(Participant::from).collect(); + artifact.set_holders(holders); tracing::info!(id, ?elapsed, "took artifact"); Some(ArtifactTaken::new(artifact, self.clone())) } @@ -522,6 +563,7 @@ impl ProtocolStorage { /// Return true if successful, false otherwise. pub async fn clear(&self) -> bool { const SCRIPT: &str = r#" + local artifact_key = KEYS[2] local owner_keys = redis.call("SMEMBERS", KEYS[1]) local del = {} for _, key in ipairs(KEYS) do @@ -531,6 +573,12 @@ impl ProtocolStorage { table.insert(del, key) end + -- Also delete all holders sets for artifacts in the hash + local artifact_ids = redis.call("HKEYS", artifact_key) + for _, id in ipairs(artifact_ids) do + table.insert(del, artifact_key .. ':holders:' .. id) + end + redis.call("DEL", unpack(del)) "#; @@ -585,13 +633,18 @@ impl ProtocolStorage { -- delete the artifact from our self owner set redis.call("SREM", mine_key, id) - -- Return the artifact as a response - return artifact + -- Read and delete the holders set + local holders_key = artifact_key .. ':holders:' .. id + local holders = redis.call("SMEMBERS", holders_key) + redis.call("DEL", holders_key) + + -- Return the artifact and holders + return {artifact, holders} "#; let start = Instant::now(); let mut conn = self.connect().await?; - let result: Result, _> = redis::Script::new(SCRIPT) + let result: Result)>, _> = redis::Script::new(SCRIPT) .key(&self.artifact_key) .key(owner_key(&self.owner_keys, me)) .invoke_async(&mut conn) @@ -603,7 +656,9 @@ impl ProtocolStorage { .observe(elapsed.as_millis() as f64); match result { - Ok(Some(artifact)) => { + Ok(Some((mut artifact, holders))) => { + let holders = holders.into_iter().map(Participant::from).collect(); + artifact.set_holders(holders); // mark reserved and used in-memory so that it won't be reserved or reused locally let id = artifact.id(); self.reserved.write().await.insert(id); @@ -627,6 +682,7 @@ impl ProtocolStorage { local mine_key = KEYS[2] local artifact_id = ARGV[1] local artifact = ARGV[2] + local num_holders = tonumber(ARGV[3]) -- Add back to artifact hash map redis.call("HSET", artifact_key, artifact_id, artifact) @@ -634,6 +690,13 @@ impl ProtocolStorage { -- Add back to mine set redis.call("SADD", mine_key, artifact_id) + -- Restore holders set + local holders_key = artifact_key .. ':holders:' .. artifact_id + redis.call("DEL", holders_key) + if num_holders > 0 then + redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) + end + return 1 "#; @@ -643,6 +706,13 @@ impl ProtocolStorage { dropper.dropper.take(); let id = artifact.id(); + let holders: Vec = artifact + .holders() + .expect("holders must be set before recycle") + .iter() + .map(|p| Into::::into(*p)) + .collect(); + let Some(mut conn) = self.connect().await else { tracing::warn!(id, "failed to return artifact: connection failed"); return false; @@ -652,7 +722,9 @@ impl ProtocolStorage { .key(&self.artifact_key) .key(owner_key(&self.owner_keys, me)) .arg(id) - .arg(artifact) + .arg(&artifact) + .arg(holders.len() as i64) + .arg(holders.as_slice()) .invoke_async(&mut conn) .await; @@ -683,4 +755,66 @@ impl ProtocolStorage { pub fn artifact_key(&self) -> &str { &self.artifact_key } + + /// Batch remove a peer from holders for a set of artifact IDs, and prune artifacts below threshold if owned by `me`. + /// Returns (Vec, Vec) + pub async fn remove_holder_and_prune( + &self, + me: Participant, + peer: Participant, + threshold: usize, + ids: &[A::Id], + ) -> Result<(Vec, Vec), StorageError> { + if ids.is_empty() { + return Ok((vec![], vec![])); + } + + // Lua script expects: KEYS[1]=artifact_key, KEYS[2]=owner_key, ARGV[1]=peer, ARGV[2]=threshold, ARGV[3...]=ids + const SCRIPT: &str = r#" + local artifact_key = KEYS[1] + local owner_key = KEYS[2] + local peer = ARGV[1] + local threshold = tonumber(ARGV[2]) + local removed = {} + local updated = {} + for i = 3, #ARGV do + local id = ARGV[i] + -- Error if 'me' does not own this artifact + if redis.call('SISMEMBER', owner_key, id) == 0 then + return redis.error_reply('OWNERSHIP_VIOLATION:' .. id) + end + -- Remove peer from holders set + local holders_key = artifact_key .. ':holders:' .. id + redis.call('SREM', holders_key, peer) + local count = redis.call('SCARD', holders_key) + if count < threshold then + -- Prune: remove artifact, holders set, and owner set entry + redis.call('HDEL', artifact_key, id) + redis.call('DEL', holders_key) + redis.call('SREM', owner_key, id) + table.insert(removed, id) + else + table.insert(updated, id) + end + end + return {removed, updated} + "#; + + let Some(mut conn) = self.connect().await else { + return Err(StorageError::ConnectionFailed); + }; + type SyncResult = Result<(Vec, Vec), redis::RedisError>; + let result: SyncResult = redis::Script::new(SCRIPT) + .key(&self.artifact_key) + .key(owner_key(&self.owner_keys, me)) + .arg(Into::::into(peer)) + .arg(threshold as i64) + .arg(ids) + .invoke_async(&mut conn) + .await; + match result { + Ok((removed, updated)) => Ok((removed, updated)), + Err(err) => Err(StorageError::RedisFailed(err.to_string())), + } + } } diff --git a/chain-signatures/node/src/storage/triple_storage.rs b/chain-signatures/node/src/storage/triple_storage.rs index 90dd0a6b8..ec2652f28 100644 --- a/chain-signatures/node/src/storage/triple_storage.rs +++ b/chain-signatures/node/src/storage/triple_storage.rs @@ -3,6 +3,8 @@ use near_sdk::AccountId; use redis::{FromRedisValue, RedisError, RedisWrite, ToRedisArgs}; use serde::{Deserialize, Serialize}; +use cait_sith::protocol::Participant; + use crate::protocol::triple::{Triple, TripleId}; use super::protocol_storage::{ @@ -20,6 +22,9 @@ pub struct TriplePair { pub id: TripleId, pub triple0: Triple, pub triple1: Triple, + /// Nodes still holding this artifact + #[serde(skip, default)] + pub holders: Option>, } impl TriplePair { @@ -35,6 +40,18 @@ impl ProtocolArtifact for TriplePair { fn id(&self) -> Self::Id { self.id } + + fn participants(&self) -> &[Participant] { + &self.triple0.public.participants + } + + fn holders(&self) -> Option<&[Participant]> { + self.holders.as_deref() + } + + fn set_holders(&mut self, holders: Vec) { + self.holders = Some(holders); + } } impl ToRedisArgs for TriplePair { diff --git a/integration-tests/benches/store.rs b/integration-tests/benches/store.rs index b6be6923b..251338208 100644 --- a/integration-tests/benches/store.rs +++ b/integration-tests/benches/store.rs @@ -187,7 +187,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine triple keys", |b| { b.iter(|| { let task = || async { - env.triples.fetch_owned(env.me).await; + let _ = env.triples.fetch_owned(env.me).await; }; rt.block_on(task()); @@ -197,7 +197,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine presignature keys", |b| { b.iter(|| { let task = || async { - env.presignatures.fetch_owned(env.me).await; + let _ = env.presignatures.fetch_owned(env.me).await; }; rt.block_on(task()); @@ -223,6 +223,7 @@ fn dummy_presignature(id: u64) -> Presignature { sigma: ::Scalar::ONE, }, participants: vec![Participant::from(1), Participant::from(2)], + holders: Some(vec![Participant::from(1), Participant::from(2)]), } } @@ -231,6 +232,7 @@ fn dummy_pair(id: u64) -> TriplePair { id, triple0: dummy_triple(), triple1: dummy_triple(), + holders: Some(vec![Participant::from(1), Participant::from(2)]), } } diff --git a/integration-tests/src/containers.rs b/integration-tests/src/containers.rs index 8ae300078..240785b78 100644 --- a/integration-tests/src/containers.rs +++ b/integration-tests/src/containers.rs @@ -424,6 +424,7 @@ impl Redis { id: pair_id, triple0, triple1, + holders: Some(participant_ids.clone()), }; storage .get(me) diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 1517559af..c2aab8d1d 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -536,8 +536,11 @@ impl MpcFixtureNodeBuilder { // removing here because we can't clone a triple let my_shares = fixture_config.input.triples.remove(&self.me).unwrap(); for (owner, triple_shares) in my_shares { - for pair in triple_shares { + for mut pair in triple_shares { let pair_id = pair.id; + if pair.holders.is_none() { + pair.holders = Some(pair.triple0.public.participants.clone()); + } let mut slot = triple_storage.reserve(pair_id).await.unwrap(); slot.insert(pair, owner).await; } @@ -551,7 +554,10 @@ impl MpcFixtureNodeBuilder { // removing here because we can't clone a presignature let my_shares = fixture_config.input.presignatures.remove(&self.me).unwrap(); for (owner, presignature_shares) in my_shares { - for presignature_share in presignature_shares { + for mut presignature_share in presignature_shares { + if presignature_share.holders.is_none() { + presignature_share.holders = Some(presignature_share.participants.clone()); + } let mut slot = presignature_storage .reserve(presignature_share.id) .await diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs new file mode 100644 index 000000000..aa6bb6ab2 --- /dev/null +++ b/integration-tests/tests/cases/helpers.rs @@ -0,0 +1,135 @@ +use cait_sith::protocol::Participant; +use cait_sith::triples::{TriplePub, TripleShare}; +use cait_sith::PresignOutput; +use elliptic_curve::CurveArithmetic; +use k256::Secp256k1; +use mpc_node::protocol::presignature::Presignature; +use mpc_node::protocol::triple::Triple; +use mpc_node::storage::triple_storage::TriplePair; +use mpc_node::storage::{PresignatureStorage, TripleStorage}; + +pub(crate) fn dummy_presignature(id: u64) -> Presignature { + dummy_presignature_with_holders(id, vec![Participant::from(1), Participant::from(2)]) +} + +pub(crate) fn dummy_presignature_with_holders( + id: u64, + participants: Vec, +) -> Presignature { + Presignature { + id, + output: PresignOutput { + big_r: ::AffinePoint::default(), + k: ::Scalar::ZERO, + sigma: ::Scalar::ONE, + }, + holders: Some(participants.clone()), + participants, + } +} + +pub(crate) fn dummy_pair(id: u64) -> TriplePair { + dummy_pair_with_holders(id, vec![Participant::from(1), Participant::from(2)]) +} + +pub(crate) fn dummy_pair_with_holders(id: u64, participants: Vec) -> TriplePair { + TriplePair { + id, + triple0: dummy_triple_with_holders(participants.clone()), + triple1: dummy_triple_with_holders(participants.clone()), + holders: Some(participants), + } +} + +pub(crate) fn dummy_triple_with_holders(participants: Vec) -> Triple { + Triple { + share: TripleShare { + a: ::Scalar::ZERO, + b: ::Scalar::ZERO, + c: ::Scalar::ZERO, + }, + public: TriplePub { + big_a: ::AffinePoint::default(), + big_b: ::AffinePoint::default(), + big_c: ::AffinePoint::default(), + participants, + threshold: 5, + }, + } +} + +pub(crate) async fn insert_triples_for_owner( + triples: &TripleStorage, + owner: Participant, + holders: &[Participant], + ids: impl IntoIterator, +) { + let holders = holders.to_vec(); + for id in ids { + triples + .reserve(id) + .await + .unwrap() + .insert(dummy_pair_with_holders(id, holders.clone()), owner) + .await; + } +} + +pub(crate) async fn insert_presignatures_for_owner( + presignatures: &PresignatureStorage, + owner: Participant, + holders: &[Participant], + ids: impl IntoIterator, +) { + let holders = holders.to_vec(); + for id in ids { + presignatures + .reserve(id) + .await + .unwrap() + .insert(dummy_presignature_with_holders(id, holders.clone()), owner) + .await; + } +} + +pub(crate) async fn assert_triples_owned_state( + triples: &TripleStorage, + owner: Participant, + expected_present: &[u64], + expected_absent: &[u64], +) { + for id in expected_present { + assert!( + triples.contains_by_owner(*id, owner).await, + "triple={id} should be present for owner={owner:?}" + ); + } + + for id in expected_absent { + assert!( + !triples.contains_by_owner(*id, owner).await, + "triple={id} should be absent for owner={owner:?}" + ); + } +} + +pub(crate) async fn assert_presig_owned_state( + presignatures: &PresignatureStorage, + owner: Participant, + expected_present: &[u64], + expected_absent: &[u64], +) { + for id in expected_present { + assert!( + presignatures.contains_by_owner(*id, owner).await, + "presignature={id} should be present for owner={owner:?}" + ); + } + + for id in expected_absent { + assert!( + !presignatures.contains_by_owner(*id, owner).await, + "presignature={id} should be absent for owner={owner:?}" + ); + } +} diff --git a/integration-tests/tests/cases/mod.rs b/integration-tests/tests/cases/mod.rs index dda6e61cb..136d822db 100644 --- a/integration-tests/tests/cases/mod.rs +++ b/integration-tests/tests/cases/mod.rs @@ -19,6 +19,7 @@ pub mod chains; pub mod compat; pub mod ethereum; pub mod ethereum_stream; +pub mod helpers; pub mod mpc; pub mod nightly; pub mod solana; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 4157d522d..915d30dd6 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -86,7 +86,7 @@ async fn test_basic_generate_triples() { for node in &network.nodes { let mut nodes_shares = BTreeMap::new(); for peer in &network.nodes { - let triple_ids = node.triple_storage.fetch_owned(peer.me).await; + let triple_ids = node.triple_storage.fetch_owned(peer.me).await.unwrap(); let mut peer_triples = Vec::with_capacity(triple_ids.len()); for triple_id in triple_ids { let pair = conn @@ -127,7 +127,11 @@ async fn test_basic_generate_presignature() { for node in &network.nodes { let mut nodes_shares = BTreeMap::new(); for peer in &network.nodes { - let presignature_ids = node.presignature_storage.fetch_owned(peer.me).await; + let presignature_ids = node + .presignature_storage + .fetch_owned(peer.me) + .await + .unwrap(); let mut peer_presignatures = Vec::with_capacity(presignature_ids.len()); for presignature_id in presignature_ids { let t = conn diff --git a/integration-tests/tests/cases/store.rs b/integration-tests/tests/cases/store.rs index 9e4ec6c84..0e3a9cf5f 100644 --- a/integration-tests/tests/cases/store.rs +++ b/integration-tests/tests/cases/store.rs @@ -1,18 +1,15 @@ use cait_sith::protocol::Participant; -use cait_sith::triples::{TriplePub, TripleShare}; -use cait_sith::PresignOutput; -use elliptic_curve::CurveArithmetic; use integration_tests::cluster::spawner::ClusterSpawner; use integration_tests::containers; -use k256::Secp256k1; use mpc_crypto::PublicKey; -use mpc_node::protocol::presignature::{Presignature, PresignatureSpawner}; -use mpc_node::protocol::triple::{Triple, TripleSpawner}; +use mpc_node::protocol::presignature::PresignatureSpawner; +use mpc_node::protocol::triple::TripleSpawner; use mpc_node::protocol::MessageChannel; -use mpc_node::storage::triple_storage::TriplePair; use mpc_node::types::SecretKeyShare; use test_log::test; +use super::helpers::{dummy_pair, dummy_presignature}; + #[test(tokio::test)] async fn test_triple_persistence() -> anyhow::Result<()> { let spawner = ClusterSpawner::default() @@ -314,40 +311,3 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { Ok(()) } - -fn dummy_presignature(id: u64) -> Presignature { - Presignature { - id, - output: PresignOutput { - big_r: ::AffinePoint::default(), - k: ::Scalar::ZERO, - sigma: ::Scalar::ONE, - }, - participants: vec![Participant::from(1), Participant::from(2)], - } -} - -fn dummy_pair(id: u64) -> TriplePair { - TriplePair { - id, - triple0: dummy_triple(), - triple1: dummy_triple(), - } -} - -fn dummy_triple() -> Triple { - Triple { - share: TripleShare { - a: ::Scalar::ZERO, - b: ::Scalar::ZERO, - c: ::Scalar::ZERO, - }, - public: TriplePub { - big_a: ::AffinePoint::default(), - big_b: ::AffinePoint::default(), - big_c: ::AffinePoint::default(), - participants: vec![Participant::from(1), Participant::from(2)], - threshold: 5, - }, - } -} diff --git a/integration-tests/tests/cases/sync.rs b/integration-tests/tests/cases/sync.rs index f231d48e6..526d2607b 100644 --- a/integration-tests/tests/cases/sync.rs +++ b/integration-tests/tests/cases/sync.rs @@ -1,101 +1,12 @@ use std::time::Duration; use cait_sith::protocol::Participant; -use cait_sith::triples::{TriplePub, TripleShare}; -use cait_sith::PresignOutput; -use elliptic_curve::CurveArithmetic; use integration_tests::cluster; -use k256::Secp256k1; -use integration_tests::cluster::spawner::ClusterSpawner; -use mpc_node::mesh::Mesh; -use mpc_node::node_client::{self, NodeClient}; -use mpc_node::protocol::contract::primitives::Participants; -use mpc_node::protocol::contract::RunningContractState; -use mpc_node::protocol::presignature::Presignature; -use mpc_node::protocol::sync::{SyncTask, SyncUpdate}; -use mpc_node::protocol::triple::Triple; -use mpc_node::protocol::{ParticipantInfo, ProtocolState}; -use mpc_node::rpc::ContractStateWatcher; -use mpc_node::storage::{triple_storage::TriplePair, PresignatureStorage, TripleStorage}; - -#[test_log::test(tokio::test)] -async fn test_state_sync_update() -> anyhow::Result<()> { - let spawner = ClusterSpawner::default() - .network("protocol-sync") - .init_network() - .await - .unwrap(); - - let redis = spawner.spawn_redis().await; - let num_nodes = 1; - let threshold = 2; - let node0_account_id = "p0_test.near".parse().unwrap(); - let node1 = Participant::from(1); - - let sk = k256::SecretKey::random(&mut rand::thread_rng()); - let pk = sk.public_key(); - let ping_interval = Duration::from_millis(300); - let client = NodeClient::new(&node_client::Options::default()); - let participants = participants(num_nodes); - let node0_triples = redis.triple_storage(&node0_account_id); - let node0_presignatures = redis.presignature_storage(&node0_account_id); - - let (contract_watcher, _contract_tx) = ContractStateWatcher::with( - &node0_account_id, - ProtocolState::Running(RunningContractState { - epoch: 0, - public_key: *pk.as_affine(), - participants: participants.clone(), - candidates: Default::default(), - join_votes: Default::default(), - leave_votes: Default::default(), - threshold, - }), - ); - - let (synced_peer_tx, synced_peer_rx) = SyncTask::synced_nodes_channel(); - let mesh = Mesh::new( - &client, - mpc_node::mesh::Options { - ping_interval: ping_interval.as_millis() as u64, - }, - &node0_account_id, - synced_peer_rx, - ); - let (sync_channel, sync) = SyncTask::new( - &client, - node0_triples.clone(), - node0_presignatures.clone(), - mesh.watch(), - contract_watcher, - synced_peer_tx, - ); - tokio::spawn(sync.run()); - - // insert shares of triples/presignatures to node0, that belong to node1 - insert_triples(&node0_triples, node1, 0..=5).await; - insert_presignatures(&node0_presignatures, node1, 0..=5).await; - - // Create an update where node1 is trying to sync with node0, where node1 only has - // triples/presignatures 0 to 3, so 4 and 5 should be deleted from node0. - let valid = vec![0, 1, 2, 3]; - let invalid = vec![4, 5]; - - let update = SyncUpdate { - from: node1, - triples: valid.clone(), - presignatures: valid.clone(), - }; - let _ = sync_channel.request_update(update).await; - // Give it some time for sync to process the update - tokio::time::sleep(Duration::from_secs(3)).await; - - validate_triples(&node0_triples, node1, &valid, &invalid).await; - validate_presignatures(&node0_presignatures, node1, &valid, &invalid).await; - - Ok(()) -} +use super::helpers::{ + assert_presig_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, + insert_triples_for_owner, +}; #[test_log::test(tokio::test)] async fn test_state_sync_e2e_large_outdated_stockpile() { @@ -111,6 +22,7 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { let node0_account_id = spawner.account_id(Into::::into(node0) as usize); let node1 = Participant::from(1); let node1_account_id = spawner.account_id(Into::::into(node1) as usize); + let holders = vec![node0, node1]; let redis = spawner.prespawn_redis().await; // immediately add to triples/presignatures storage the triples/presignatures we want to invalidate. @@ -121,10 +33,10 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // insert triples that will be invalidated after a sync, since nobody else has them. // node0 is saying that they have 0 to 5, but node1 will sync and say they have 4 and 5 only. - insert_triples(&node0_triples, node1, 0..=10000).await; - insert_triples(&node1_triples, node1, 0..=5).await; - insert_presignatures(&node0_presignatures, node1, 0..=10000).await; - insert_presignatures(&node1_presignatures, node1, 0..=5).await; + insert_triples_for_owner(&node0_triples, node1, &holders, 0..=10000).await; + insert_triples_for_owner(&node1_triples, node1, &holders, 0..=5).await; + insert_presignatures_for_owner(&node0_presignatures, node1, &holders, 0..=10000).await; + insert_presignatures_for_owner(&node1_presignatures, node1, &holders, 0..=5).await; let _nodes = spawner .disable_prestockpile() @@ -141,28 +53,28 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // Give some time for the first sync broadcast to finish. tokio::time::sleep(Duration::from_secs(5)).await; - validate_triples( + assert_triples_owned_state( &node0_triples, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_triples( + assert_triples_owned_state( &node1_triples, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_presignatures( - &node0_presignatures, + assert_presig_owned_state( + &node1_presignatures, node1, &[0, 1, 2, 3, 4, 5], &[6, 100, 500, 2030, 1337, 10000], ) .await; - validate_presignatures( + assert_presig_owned_state( &node0_presignatures, node1, &[0, 1, 2, 3, 4, 5], @@ -176,134 +88,110 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { // nodes.sign().await.unwrap(); } -async fn insert_triples( - triples: &TripleStorage, - node: Participant, - range: impl IntoIterator, -) { - for id in range { - triples - .reserve(id) - .await - .unwrap() - .insert(dummy_pair(id), node) - .await; +#[test_log::test(tokio::test)] +async fn test_state_sync_e2e() { + // Setup 3 nodes with T=2 (default cluster setup). + let mut spawner = cluster::spawn(); + { + let worker = spawner.prespawn_sandbox().await.unwrap().clone(); + spawner.create_accounts(&worker).await; } -} -async fn validate_triples( - triples: &TripleStorage, - owner: Participant, - valid: &[u64], - invalid: &[u64], -) { - for id in valid { - assert!( - triples.contains_by_owner(*id, owner).await, - "triple={id} should be valid" - ); - } + let node0 = Participant::from(0); + let node1 = Participant::from(1); + let node2 = Participant::from(2); + let node0_account_id = spawner.account_id(Into::::into(node0) as usize); + let node1_account_id = spawner.account_id(Into::::into(node1) as usize); + let node2_account_id = spawner.account_id(Into::::into(node2) as usize); + let holders = vec![node0, node1, node2]; + let redis = spawner.prespawn_redis().await; - for id in invalid { - assert!( - !triples.contains_by_owner(*id, owner).await, - "triple={id} should be invalid" - ); - } -} + // Wait for Redis to be fully accepting connections on the host port. + tokio::time::sleep(Duration::from_secs(1)).await; -async fn insert_presignatures( - presignatures: &PresignatureStorage, - node: Participant, - range: impl IntoIterator, -) { - for id in range { - presignatures - .reserve(id) - .await - .unwrap() - .insert(dummy_presignature(id), node) - .await; + // Get triple/presignature storage for each node. + let node0_triples = redis.triple_storage(&node0_account_id); + let node0_presignatures = redis.presignature_storage(&node0_account_id); + let node1_triples = redis.triple_storage(&node1_account_id); + let node1_presignatures = redis.presignature_storage(&node1_account_id); + let node2_triples = redis.triple_storage(&node2_account_id); + let node2_presignatures = redis.presignature_storage(&node2_account_id); + + // Populate 3 triples and 3 presignatures: each node owns 1, all nodes hold shares. + for storage in [&node0_triples, &node1_triples, &node2_triples] { + insert_triples_for_owner(storage, node0, &holders, 0..=0).await; + insert_triples_for_owner(storage, node1, &holders, 1..=1).await; + insert_triples_for_owner(storage, node2, &holders, 2..=2).await; } -} - -async fn validate_presignatures( - presignatures: &PresignatureStorage, - owner: Participant, - valid: &[u64], - invalid: &[u64], -) { - for id in valid { - assert!( - presignatures.contains_by_owner(*id, owner).await, - "presignature={id} should be valid" - ); + for storage in [ + &node0_presignatures, + &node1_presignatures, + &node2_presignatures, + ] { + insert_presignatures_for_owner(storage, node0, &holders, 0..=0).await; + insert_presignatures_for_owner(storage, node1, &holders, 1..=1).await; + insert_presignatures_for_owner(storage, node2, &holders, 2..=2).await; } - for id in invalid { - assert!( - !presignatures.contains_by_owner(*id, owner).await, - "presignature={id} should be invalid" - ); + // Add 1 extra T and P owned by node0, only on node0's storage. + // After sync, node0 will learn that node1 and node2 don't have id=99, + // dropping it below threshold (T=2), so it should be pruned. + insert_triples_for_owner(&node0_triples, node0, &holders, 99..=99).await; + insert_presignatures_for_owner(&node0_presignatures, node0, &holders, 99..=99).await; + + // Add 1 extra T and P owned by node1, on node0 and node1 only (not node2). + // After sync, node1 learns node2 doesn't have id=88, removing node2 from participants. + // But node0 still has it, so 2 holders remain (= threshold), and it should survive. + for storage in [&node0_triples, &node1_triples] { + insert_triples_for_owner(storage, node1, &holders, 88..=88).await; } -} - -// TODO: cleanup and move this to a common test utils module -fn dummy_presignature(id: u64) -> Presignature { - Presignature { - id, - output: PresignOutput { - big_r: ::AffinePoint::default(), - k: ::Scalar::ZERO, - sigma: ::Scalar::ONE, - }, - participants: vec![Participant::from(1), Participant::from(2)], + for storage in [&node0_presignatures, &node1_presignatures] { + insert_presignatures_for_owner(storage, node1, &holders, 88..=88).await; } -} -// TODO: cleanup and move this to a common test utils module -fn dummy_triple() -> Triple { - Triple { - share: TripleShare { - a: ::Scalar::ZERO, - b: ::Scalar::ZERO, - c: ::Scalar::ZERO, - }, - public: TriplePub { - big_a: ::AffinePoint::default(), - big_b: ::AffinePoint::default(), - big_c: ::AffinePoint::default(), - participants: vec![Participant::from(1), Participant::from(2)], - threshold: 5, - }, + // Add 1 extra T and P owned by node0, but only on node1 and node2 (not on node0 itself). + // When node0 broadcasts its owned IDs, id=77 won't be included (node0 doesn't have it), + // so node1 and node2 will remove it via remove_outdated. + for storage in [&node1_triples, &node2_triples] { + insert_triples_for_owner(storage, node0, &holders, 77..=77).await; } -} - -// TODO: cleanup and move this to a common test utils module -fn participants(num_nodes: usize) -> Participants { - let (_cipher_sk, cipher_pk) = mpc_keys::hpke::generate(); - let sign_sk = near_crypto::SecretKey::from_seed(near_crypto::KeyType::ED25519, "sign-encrypt0"); - let mut participants = Participants::default(); - for i in 0..num_nodes { - let id = Participant::from(i as u32); - participants.insert( - &id, - ParticipantInfo { - sign_pk: sign_sk.public_key(), - cipher_pk: cipher_pk.clone(), - id: id.into(), - url: "http://localhost:3030".to_string(), - account_id: format!("p{i}_test.near").parse().unwrap(), - }, - ); + for storage in [&node1_presignatures, &node2_presignatures] { + insert_presignatures_for_owner(storage, node0, &holders, 77..=77).await; } - participants -} -fn dummy_pair(id: u64) -> TriplePair { - TriplePair { - id, - triple0: dummy_triple(), - triple1: dummy_triple(), + let _cluster = spawner + .disable_prestockpile() + .with_config(|cfg| { + cfg.protocol.triple.min_triples = 1; + cfg.protocol.triple.max_triples = 1; + cfg.protocol.presignature.min_presignatures = 1; + cfg.protocol.presignature.max_presignatures = 1; + }) + .await + .unwrap(); + + // Give some time for the first sync broadcast to finish. + tokio::time::sleep(Duration::from_secs(5)).await; + + // After sync, all 3 consistent triples/presignatures should be present on every node. + // id=99 (only on node0) should have been pruned (below threshold). + // id=88 (on node0 and node1) should survive (exactly at threshold=2). + // id=77 (owned by node0, only on node1/node2) should be removed via remove_outdated. + for triples in [&node0_triples, &node1_triples] { + assert_triples_owned_state(triples, node0, &[0], &[1, 2, 77, 99]).await; + assert_triples_owned_state(triples, node1, &[1, 88], &[0, 2]).await; + assert_triples_owned_state(triples, node2, &[2], &[0, 1]).await; + } + assert_triples_owned_state(&node2_triples, node0, &[0], &[1, 2, 77, 99]).await; + assert_triples_owned_state(&node2_triples, node1, &[1], &[0, 2, 88]).await; + assert_triples_owned_state(&node2_triples, node2, &[2], &[0, 1]).await; + + for presignatures in [&node0_presignatures, &node1_presignatures] { + assert_presig_owned_state(presignatures, node0, &[0], &[1, 2, 77, 99]).await; + assert_presig_owned_state(presignatures, node1, &[1, 88], &[0, 2]).await; + assert_presig_owned_state(presignatures, node2, &[2], &[0, 1]).await; } + assert_presig_owned_state(&node2_presignatures, node0, &[0], &[1, 2, 77, 99]).await; + assert_presig_owned_state(&node2_presignatures, node1, &[1], &[0, 2, 88]).await; + assert_presig_owned_state(&node2_presignatures, node2, &[2], &[0, 1]).await; }