diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index bce96aa8a..11d8f62fb 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -97,6 +97,9 @@ impl ConsensusProtocol for StartedState { ); let threshold = contract_state.threshold; + // Initialize identity for storage; this is an entry point into Running. + ctx.triple_storage.set_me(me); + ctx.presignature_storage.set_me(me); let triple_task = TripleSpawnerTask::run(me, threshold, epoch, ctx); let presign_task = PresignatureSpawnerTask::run( me, @@ -399,6 +402,9 @@ impl ConsensusProtocol for WaitingForConsensusState { return NodeState::WaitingForConsensus(self); }; + // Initialize identity for storage; this is an entry point into Running. + ctx.triple_storage.set_me(me); + ctx.presignature_storage.set_me(me); let triple_task = TripleSpawnerTask::run(me, self.threshold, self.epoch, ctx); let presign_task = PresignatureSpawnerTask::run( me, diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 35d0777d9..e95d3c903 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -375,10 +375,6 @@ impl PresignatureSpawner { self.ongoing.contains_key(&id) } - pub async fn contains_used(&self, id: PresignatureId) -> bool { - self.presignatures.contains_used(id).await - } - /// Returns the number of unspent presignatures available in the manager. pub async fn len_generated(&self) -> usize { self.presignatures.len_generated().await @@ -480,7 +476,7 @@ impl PresignatureSpawner { // to use the same triple as any other node. // TODO: have all this part be a separate task such that finding a pair of triples is done in parallel instead // of waiting for storage to respond here. - let Some(triples) = self.triples.take_mine(self.me).await else { + let Some(triples) = self.triples.take_mine().await else { return; }; @@ -566,9 +562,10 @@ impl PresignatureSpawner { "starting protocol to generate a new presignature", ); - let Some(slot) = self.presignatures.reserve(id.id).await else { + let Some(slot) = self.presignatures.create_slot(id.id, owner).await else { return Err(InitializationError::BadParameters(format!( - "id collision: presignature_id={id:?}" + "presignature {} is already generating, in use, or stored", + id.id ))); }; @@ -696,6 +693,7 @@ impl PresignatureSpawner { ) { let mut last_active_warn: Option = None; let mut stockpile_interval = time::interval(Duration::from_millis(100)); + stockpile_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); let mut expiration_interval = tokio::time::interval(Duration::from_secs(1)); let mut posits = self.msg.subscribe_presignature_posit().await; diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 7f89a32fb..6ec47964e 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -328,7 +328,7 @@ impl SignOrganizer { let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { - if let Some(taken) = ctx.presignatures.take_mine(ctx.me).await { + if let Some(taken) = ctx.presignatures.take_mine().await { let Some(holders) = taken.artifact.holders() else { tracing::error!( id = taken.artifact.id, diff --git a/chain-signatures/node/src/protocol/sync/mod.rs b/chain-signatures/node/src/protocol/sync/mod.rs index 106e4dc81..d8b5391cc 100644 --- a/chain-signatures/node/src/protocol/sync/mod.rs +++ b/chain-signatures/node/src/protocol/sync/mod.rs @@ -219,7 +219,7 @@ impl SyncTask { match handle.await { Ok(responses) => { // Process sync responses: update artifact participants based on not_found data - if let Err(err) = self.process_sync_responses(responses, me, threshold).await { + if let Err(err) = self.process_sync_responses(responses, threshold).await { tracing::warn!(?err, "failed to process sync responses"); } tracing::debug!(elapsed = ?start.elapsed(), "processed broadcast"); @@ -236,9 +236,8 @@ impl SyncTask { } } - // TODO: use reserved values instead. Note that we cannot fetch our own triples via reserved async fn new_update(&self, me: Participant) -> Option { - let triples = match self.triples.fetch_owned(me).await { + let triples = match self.triples.fetch_owned_with_reserved().await { Ok(ids) => ids, Err(err) => { tracing::warn!( @@ -248,7 +247,7 @@ impl SyncTask { return None; } }; - let presignatures = match self.presignatures.fetch_owned(me).await { + let presignatures = match self.presignatures.fetch_owned_with_reserved().await { Ok(ids) => ids, Err(err) => { tracing::warn!( @@ -272,7 +271,6 @@ impl SyncTask { async fn process_sync_responses( &self, responses: Vec<(Participant, SyncPeerResponse)>, - me: Participant, threshold: usize, ) -> Result<(), String> { for (peer, result) in responses { @@ -294,13 +292,13 @@ impl SyncTask { // Batch remove peer from all triples and prune let triple_res = self .triples - .remove_holder_and_prune(me, peer, threshold, &response.triples) + .remove_holder_and_prune(peer, threshold, &response.triples) .await; // Batch remove peer from all presignatures and prune let presig_res = self .presignatures - .remove_holder_and_prune(me, peer, threshold, &response.presignatures) + .remove_holder_and_prune(peer, threshold, &response.presignatures) .await; match (triple_res, presig_res) { diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index d6b4d9c0c..237397440 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -388,10 +388,6 @@ impl TripleSpawner { self.ongoing.contains_key(&id) } - pub async fn contains_used(&self, id: TripleId) -> bool { - self.triple_storage.contains_used(id).await - } - /// Returns the number of unspent triples assigned to this node. pub async fn len_mine(&self) -> usize { self.triple_storage.len_by_owner(self.me).await @@ -528,9 +524,9 @@ impl TripleSpawner { timeout: Duration, ) -> Result<(), InitializationError> { // Check if the `id` is already in the system. Error out and have the next cycle try again. - let Some(slot) = self.triple_storage.reserve(id).await else { + let Some(slot) = self.triple_storage.create_slot(id, proposer).await else { return Err(InitializationError::BadParameters(format!( - "id collision: pair_id={id}" + "triple {id} is already generating, in use, or stored" ))); }; @@ -581,6 +577,7 @@ impl TripleSpawner { ongoing_gen_tx: watch::Sender, ) { let mut stockpile_interval = tokio::time::interval(Duration::from_millis(100)); + stockpile_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut expiration_interval = tokio::time::interval(Duration::from_secs(1)); let mut posits = self.msg.subscribe_triple_posit().await; diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index b07c049cd..b54dac002 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -2,11 +2,10 @@ use cait_sith::protocol::Participant; use deadpool_redis::{Connection, Pool}; use near_sdk::AccountId; use redis::{AsyncCommands, FromRedisValue, ToRedisArgs}; -use std::collections::HashSet; -use std::sync::Arc; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, OnceLock}; use std::{fmt, time::Instant}; use tokio::sync::RwLock; -use tokio::task::JoinHandle; use tracing; use super::{owner_key, STORAGE_VERSION}; @@ -17,6 +16,8 @@ pub enum StorageError { ConnectionFailed, #[error("redis operation failed: {0}")] RedisFailed(String), + #[error("ProtocolStorage::set_me() was not called")] + NotInitialized, } #[derive(Debug, Clone)] @@ -68,37 +69,23 @@ pub trait ProtocolArtifact: fn set_holders(&mut self, holders: Vec); } -/// A pre-reserved slot for an artifact that will eventually be inserted. +/// A handle for inserting a generated artifact into storage. +/// Tracks the artifact ID in the `generating` set until insertion or drop. pub struct ArtifactSlot { id: A::Id, storage: ProtocolStorage, - stored: bool, } impl ArtifactSlot { pub async fn insert(&mut self, artifact: A, owner: Participant) -> bool { - self.stored = self.storage.insert(artifact, owner).await; - self.stored - } - - pub fn unreserve(&self) -> Option> { - if self.stored { - return None; - } - - let storage = self.storage.clone(); - let id = self.id; - let task = tokio::spawn(async move { - tracing::info!(id, "unreserving artifact"); - storage.unreserve(id).await; - }); - Some(task) + self.storage.insert(artifact, owner).await } } impl Drop for ArtifactSlot { fn drop(&mut self) { - self.unreserve(); + self.storage + .remove_reserved(self.id, ReservedKind::Generating); } } @@ -115,10 +102,7 @@ pub struct ArtifactTakenDropper { impl Drop for ArtifactTakenDropper { fn drop(&mut self) { if let Some(storage) = self.dropper.take() { - let id = self.id; - tokio::spawn(async move { - storage.unreserve(id).await; - }); + storage.remove_reserved(self.id, ReservedKind::Using); } } } @@ -139,14 +123,48 @@ impl ArtifactTaken { } } +#[derive(Debug, Clone, Copy)] +enum ReservedKind { + Generating, + Using, +} + +/// Tracks artifact IDs that are in-flight but not yet in Redis. +/// Protected by a single `RwLock` to avoid multi-lock ordering issues. +#[derive(Debug)] +struct ReservedState { + /// IDs currently being generated. Value is `true` if this node is the owner/proposer. + generating: HashMap, + /// IDs taken from Redis and actively consumed by a protocol. + /// Value is `true` if this node is the owner of the artifact. + using: HashMap, +} + +impl ReservedState { + fn new() -> Self { + Self { + generating: HashMap::new(), + using: HashMap::new(), + } + } + + fn contains_generating(&self, id: &Id) -> bool { + self.generating.contains_key(id) + } + + fn contains_reserved(&self, id: &Id) -> bool { + self.generating.contains_key(id) || self.using.contains_key(id) + } +} + #[derive(Debug)] pub struct ProtocolStorage { redis_pool: Pool, artifact_key: String, - used: Arc>>, - reserved: Arc>>, + reserved: Arc>>, owner_keys: String, account_id: AccountId, + me: Arc>, _phantom: std::marker::PhantomData, } @@ -155,10 +173,10 @@ impl Clone for ProtocolStorage { Self { redis_pool: self.redis_pool.clone(), artifact_key: self.artifact_key.clone(), - used: self.used.clone(), reserved: self.reserved.clone(), owner_keys: self.owner_keys.clone(), account_id: self.account_id.clone(), + me: self.me.clone(), _phantom: std::marker::PhantomData, } } @@ -167,23 +185,63 @@ impl Clone for ProtocolStorage { impl ProtocolStorage { pub fn new(pool: &Pool, account_id: &AccountId, base_prefix: &str) -> Self { let artifact_key = format!("{base_prefix}:{STORAGE_VERSION}:{account_id}"); - let used = Arc::new(RwLock::new(HashSet::new())); - let reserved = Arc::new(RwLock::new(HashSet::new())); + let state = Arc::new(RwLock::new(ReservedState::new())); let owner_keys = format!("{base_prefix}_owners:{STORAGE_VERSION}:{account_id}"); Self { redis_pool: pool.clone(), artifact_key, - used, - reserved, + reserved: state, owner_keys, account_id: account_id.clone(), + me: Arc::new(OnceLock::new()), _phantom: std::marker::PhantomData, } } } impl ProtocolStorage { + /// Set this node's participant identity. Must be called before using + /// methods that depend on ownership + pub fn set_me(&self, me: Participant) { + let _ = self.me.set(me); + } + + fn me(&self) -> Result { + self.me.get().copied().ok_or_else(|| { + tracing::error!("ProtocolStorage::set_me() was not called"); + StorageError::NotInitialized + }) + } + + /// Remove an ID from the reserved state, trying synchronous lock first. + /// Falls back to spawning an async task if the lock is contended. + fn remove_reserved(&self, id: A::Id, kind: ReservedKind) { + let reserved = self.reserved.clone(); + if let Ok(mut state) = reserved.try_write() { + match kind { + ReservedKind::Generating => state.generating.remove(&id), + ReservedKind::Using => state.using.remove(&id), + }; + return; + } + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.spawn(async move { + let mut state = reserved.write().await; + match kind { + ReservedKind::Generating => state.generating.remove(&id), + ReservedKind::Using => state.using.remove(&id), + }; + }); + } else { + tracing::warn!( + id, + ?kind, + "dropped with contended lock outside tokio runtime; id may remain reserved" + ); + } + } + async fn connect(&self) -> Option { self.redis_pool .get() @@ -194,13 +252,17 @@ impl ProtocolStorage { .ok() } - pub async fn fetch_owned(&self, me: Participant) -> Result, StorageError> { + pub async fn fetch_owned(&self) -> Result, StorageError> { + self.fetch_owned_by(self.me()?).await + } + + pub async fn fetch_owned_by(&self, owner: Participant) -> Result, StorageError> { let Some(mut conn) = self.connect().await else { return Err(StorageError::ConnectionFailed); }; let owned: HashSet = conn - .smembers(owner_key(&self.owner_keys, me)) + .smembers(owner_key(&self.owner_keys, owner)) .await .map_err(|err| { tracing::warn!(?err, "failed to fetch my owned artifacts"); @@ -210,51 +272,65 @@ impl ProtocolStorage { Ok(owned.into_iter().collect()) } - pub async fn reserve(&self, id: A::Id) -> Option> { - let used = self.used.read().await; - if used.contains(&id) { - return None; + /// Create a slot for generating an artifact with the given ID. + /// Tracks the ID in the `generating` set until the slot is inserted or dropped. + /// Returns `None` if the ID is already generating, in use, or stored in Redis. + pub async fn create_slot(&self, id: A::Id, owner: Participant) -> Option> { + { + let mut state = self.reserved.write().await; + if state.using.contains_key(&id) { + tracing::error!(id, "cannot create slot: artifact is currently in use"); + return None; + } + if state.contains_generating(&id) { + tracing::error!( + id, + "cannot create slot: artifact is already being generated" + ); + return None; + } + let me = self.me().ok()?; + state.generating.insert(id, owner == me); } - if !self.reserved.write().await.insert(id) { + if self.contains(id).await { + self.reserved.write().await.generating.remove(&id); + tracing::error!(id, "cannot create slot: artifact already exists in storage"); return None; } - drop(used); - - let start = Instant::now(); - let Some(mut conn) = self.connect().await else { - self.reserved.write().await.remove(&id); - return None; - }; + Some(ArtifactSlot { + id, + storage: self.clone(), + }) + } - // Check directly whether the artifact is already stored in Redis. - let artifact_exists: Result = conn.hexists(&self.artifact_key, id).await; - let elapsed = start.elapsed(); - crate::metrics::storage::REDIS_LATENCY - .with_label_values(&[A::METRIC_LABEL, "reserve"]) - .observe(elapsed.as_millis() as f64); + /// Check if an artifact is currently being generated (mine or peer). + pub async fn contains_generating(&self, id: A::Id) -> bool { + self.reserved.read().await.contains_generating(&id) + } - match artifact_exists { - Ok(true) => { - // artifact already stored, reserve cannot be done, remove reservation - self.reserved.write().await.remove(&id); - None - } - // artifact does not exist, reservation successful - Ok(false) => Some(ArtifactSlot { - id, - storage: self.clone(), - stored: false, - }), - Err(err) => { - self.reserved.write().await.remove(&id); - tracing::warn!(id, ?err, ?elapsed, "failed to reserve artifact"); - None - } - } + pub async fn contains_reserved(&self, id: A::Id) -> bool { + self.reserved.read().await.contains_reserved(&id) } - async fn unreserve(&self, id: A::Id) -> bool { - self.reserved.write().await.remove(&id) + /// Owned artifacts in Redis plus owned using and owned generating. + /// This is the full set that should be advertised during state sync to prevent + /// peers from pruning artifacts that are still actively in use. + pub async fn fetch_owned_with_reserved(&self) -> Result, StorageError> { + let mut ids = self.fetch_owned().await?; + let state = self.reserved.read().await; + ids.extend( + state + .generating + .iter() + .filter_map(|(&id, &mine)| mine.then_some(id)), + ); + ids.extend( + state + .using + .iter() + .filter_map(|(&id, &mine)| mine.then_some(id)), + ); + Ok(ids) } pub async fn remove_outdated( @@ -336,20 +412,14 @@ impl ProtocolStorage { match result { Ok((outdated, not_found)) => { - if !outdated.is_empty() { - tracing::info!(?outdated, ?elapsed, "removed outdated artifacts"); - // remove outdated entries from our in-memory reserved set - let mut reserved = self.reserved.write().await; - for id in outdated.iter() { - reserved.remove(id); - } - drop(reserved); - // remove outdated entries from our in-memory used set - let mut used = self.used.write().await; - for id in outdated.iter() { - used.remove(id); - } - } + // Filter out artifacts that are on this node but not in Redis: + // - `generating`: being generated, not yet persisted + // - `using`: taken from Redis, actively consumed by a protocol + let state = self.reserved.read().await; + let not_found: Vec<_> = not_found + .into_iter() + .filter(|id| !state.contains_reserved(id)) + .collect(); Ok(RemoveOutdatedResult::new(outdated, not_found)) } Err(err) => { @@ -362,7 +432,8 @@ impl ProtocolStorage { /// Insert an artifact into storage under `owner`'s ownership set. /// Holders must be set on the artifact before calling this; they are /// persisted as a dedicated Redis set for later holder-tracking. - pub async fn insert(&self, artifact: A, owner: Participant) -> bool { + /// Private: callers must use `create_slot()` + `ArtifactSlot::insert()`. + async fn insert(&self, artifact: A, owner: Participant) -> bool { const SCRIPT: &str = r#" local artifact_key = KEYS[1] local owner_keys = KEYS[2] @@ -385,11 +456,6 @@ impl ProtocolStorage { let start = Instant::now(); let id = artifact.id(); - let used = self.used.read().await; - if used.contains(&id) { - tracing::warn!(id, "artifact already marked used"); - return false; - } let holders: Vec = artifact .holders() @@ -412,7 +478,6 @@ impl ProtocolStorage { .arg(holders.as_slice()) .invoke_async(&mut conn) .await; - drop(used); let elapsed = start.elapsed(); crate::metrics::storage::REDIS_LATENCY @@ -420,10 +485,7 @@ impl ProtocolStorage { .observe(elapsed.as_millis() as f64); match outcome { - Ok(()) => { - self.reserved.write().await.remove(&id); - true - } + Ok(()) => true, Err(err) => { tracing::warn!(id, ?err, ?elapsed, "failed to insert artifact"); false @@ -462,8 +524,14 @@ impl ProtocolStorage { } } - pub async fn contains_used(&self, id: A::Id) -> bool { - self.used.read().await.contains(&id) + /// Check if an artifact is currently being consumed by an active protocol. + pub async fn contains_using(&self, id: A::Id) -> bool { + self.reserved.read().await.using.contains_key(&id) + } + + /// Returns the set of artifact IDs currently being consumed by active protocols. + pub async fn using_ids(&self) -> HashSet { + self.reserved.read().await.using.keys().copied().collect() } pub async fn take(&self, id: A::Id, owner: Participant) -> Option> { @@ -491,14 +559,15 @@ impl ProtocolStorage { "#; let start = Instant::now(); - if !self.used.write().await.insert(id) { - tracing::warn!(id, "taking artifact that is already used"); + let mine = owner == self.me().ok()?; + if self.reserved.write().await.using.insert(id, mine).is_some() { + tracing::warn!(id, "taking artifact that is already in use"); return None; } let Some(mut conn) = self.connect().await else { tracing::warn!(id, "failed to take artifact: connection failed"); - self.used.write().await.remove(&id); + self.reserved.write().await.using.remove(&id); return None; }; let result: Result<(A, Vec), _> = redis::Script::new(SCRIPT) @@ -521,7 +590,7 @@ impl ProtocolStorage { Some(ArtifactTaken::new(artifact, self.clone())) } Err(err) => { - self.used.write().await.remove(&id); + self.reserved.write().await.using.remove(&id); tracing::warn!(id, ?err, ?elapsed, "failed to take artifact"); None } @@ -602,8 +671,9 @@ impl ProtocolStorage { .with_label_values(&[A::METRIC_LABEL, "clear"]) .observe(elapsed.as_millis() as f64); - self.reserved.write().await.clear(); - self.used.write().await.clear(); + let mut state = self.reserved.write().await; + state.generating.clear(); + state.using.clear(); // if the outcome is None, it means the script failed or there was an error. outcome.is_some() @@ -612,7 +682,7 @@ impl ProtocolStorage { /// Take one artifact owned by the given participant. /// It is very important to NOT reuse the same artifact twice for two different /// protocols. - pub async fn take_mine(&self, me: Participant) -> Option> { + pub async fn take_mine(&self) -> Option> { const SCRIPT: &str = r#" local artifact_key = KEYS[1] local mine_key = KEYS[2] @@ -644,6 +714,7 @@ impl ProtocolStorage { let start = Instant::now(); let mut conn = self.connect().await?; + let me = self.me().ok()?; let result: Result)>, _> = redis::Script::new(SCRIPT) .key(&self.artifact_key) .key(owner_key(&self.owner_keys, me)) @@ -659,10 +730,8 @@ impl ProtocolStorage { Ok(Some((mut artifact, holders))) => { let holders = holders.into_iter().map(Participant::from).collect(); artifact.set_holders(holders); - // mark reserved and used in-memory so that it won't be reserved or reused locally let id = artifact.id(); - self.reserved.write().await.insert(id); - self.used.write().await.insert(id); + self.reserved.write().await.using.insert(id, true); let taken = ArtifactTaken::new(artifact, self.clone()); tracing::debug!(id, ?elapsed, "took mine artifact"); Some(taken) @@ -675,20 +744,16 @@ impl ProtocolStorage { } } - /// Check if an artifact is reserved. - pub async fn contains_reserved(&self, id: A::Id) -> bool { - self.reserved.read().await.contains(&id) - } - pub fn artifact_key(&self) -> &str { &self.artifact_key } - /// Batch remove a peer from holders for a set of artifact IDs, and prune artifacts below threshold if owned by `me`. + /// Batch remove a peer from holders for a set of artifact IDs, and prune + /// artifacts that fall below the holder threshold. + /// Assumes the given IDs are owned by this node for ownership-set cleanup. /// Returns (Vec, Vec) pub async fn remove_holder_and_prune( &self, - me: Participant, peer: Participant, threshold: usize, ids: &[A::Id], @@ -697,7 +762,8 @@ impl ProtocolStorage { return Ok((vec![], vec![])); } - // Lua script expects: KEYS[1]=artifact_key, KEYS[2]=owner_key, ARGV[1]=peer, ARGV[2]=threshold, ARGV[3...]=ids + // Lua script expects: KEYS[1]=artifact_key, KEYS[2]=owner_key, + // ARGV[1]=peer, ARGV[2]=threshold, ARGV[3...]=ids const SCRIPT: &str = r#" local artifact_key = KEYS[1] local owner_key = KEYS[2] @@ -707,22 +773,21 @@ impl ProtocolStorage { local updated = {} for i = 3, #ARGV do local id = ARGV[i] - -- Error if 'me' does not own this artifact + -- Skip if not owned by me (defense against malicious/buggy peer responses) if redis.call('SISMEMBER', owner_key, id) == 0 then - return redis.error_reply('OWNERSHIP_VIOLATION:' .. id) - end - -- Remove peer from holders set - local holders_key = artifact_key .. ':holders:' .. id - redis.call('SREM', holders_key, peer) - local count = redis.call('SCARD', holders_key) - if count < threshold then - -- Prune: remove artifact, holders set, and owner set entry - redis.call('HDEL', artifact_key, id) - redis.call('DEL', holders_key) - redis.call('SREM', owner_key, id) - table.insert(removed, id) - else - table.insert(updated, id) + -- noop: not our artifact + elseif redis.call('EXISTS', artifact_key .. ':holders:' .. id) == 1 then + local holders_key = artifact_key .. ':holders:' .. id + redis.call('SREM', holders_key, peer) + local count = redis.call('SCARD', holders_key) + if count < threshold then + redis.call('HDEL', artifact_key, id) + redis.call('DEL', holders_key) + redis.call('SREM', owner_key, id) + table.insert(removed, id) + else + table.insert(updated, id) + end end end return {removed, updated} @@ -734,7 +799,7 @@ impl ProtocolStorage { type SyncResult = Result<(Vec, Vec), redis::RedisError>; let result: SyncResult = redis::Script::new(SCRIPT) .key(&self.artifact_key) - .key(owner_key(&self.owner_keys, me)) + .key(owner_key(&self.owner_keys, self.me()?)) .arg(Into::::into(peer)) .arg(threshold as i64) .arg(ids) diff --git a/integration-tests/benches/store.rs b/integration-tests/benches/store.rs index 2f14f6519..311b48a90 100644 --- a/integration-tests/benches/store.rs +++ b/integration-tests/benches/store.rs @@ -87,8 +87,8 @@ fn env() -> (Runtime, SyncEnv) { .unwrap(); let redis = spawner.spawn_redis().await; - let triples = redis.triple_storage(&node_id); - let presignatures = redis.presignature_storage(&node_id); + let triples = redis.triple_storage(&node_id, me); + let presignatures = redis.presignature_storage(&node_id, me); { let participants = into_contract_participants(&participants); redis @@ -158,7 +158,7 @@ fn bench_load_keys(c: &mut Criterion) { for i in 0..1000 { let t = dummy_pair(i); env.triples - .reserve(t.id) + .create_slot(t.id, env.me) .await .unwrap() .insert(t, env.me) @@ -174,7 +174,7 @@ fn bench_load_keys(c: &mut Criterion) { for i in 0..1000 { let p = dummy_presignature(i); env.presignatures - .reserve(p.id) + .create_slot(p.id, env.me) .await .unwrap() .insert(p, env.me) @@ -187,7 +187,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine triple keys", |b| { b.iter(|| { let task = || async { - let _ = env.triples.fetch_owned(env.me).await; + let _ = env.triples.fetch_owned().await; }; rt.block_on(task()); @@ -197,7 +197,7 @@ fn bench_load_keys(c: &mut Criterion) { c.bench_function("load 1024 mine presignature keys", |b| { b.iter(|| { let task = || async { - let _ = env.presignatures.fetch_owned(env.me).await; + let _ = env.presignatures.fetch_owned().await; }; rt.block_on(task()); diff --git a/integration-tests/src/cluster/mod.rs b/integration-tests/src/cluster/mod.rs index 21a45a1b7..489e8889c 100644 --- a/integration-tests/src/cluster/mod.rs +++ b/integration-tests/src/cluster/mod.rs @@ -3,7 +3,6 @@ pub mod spawner; use std::collections::{HashMap, HashSet}; use mpc_contract::primitives::Participants; -use mpc_node::storage::{PresignatureStorage, TripleStorage}; use mpc_primitives::{Chain, Checkpoint}; use near_workspaces::network::Sandbox; use near_workspaces::types::{Finality, NearToken}; @@ -95,17 +94,6 @@ impl Cluster { self.nodes.contract() } - pub fn triples(&self, idx: usize) -> TripleStorage { - self.nodes.ctx().redis.triple_storage(self.account_id(idx)) - } - - pub fn presignatures(&self, idx: usize) -> PresignatureStorage { - self.nodes - .ctx() - .redis - .presignature_storage(self.account_id(idx)) - } - pub async fn contract_state(&self) -> anyhow::Result { let state: ProtocolContractState = self .contract() diff --git a/integration-tests/src/containers.rs b/integration-tests/src/containers.rs index 13b22dd1b..3a15554f1 100644 --- a/integration-tests/src/containers.rs +++ b/integration-tests/src/containers.rs @@ -389,12 +389,24 @@ impl Redis { .unwrap() } - pub fn triple_storage(&self, id: &AccountId) -> mpc_node::storage::TripleStorage { - TriplePair::storage(&self.pool(), id) + pub fn triple_storage( + &self, + id: &AccountId, + me: Participant, + ) -> mpc_node::storage::TripleStorage { + let storage = TriplePair::storage(&self.pool(), id); + storage.set_me(me); + storage } - pub fn presignature_storage(&self, id: &AccountId) -> mpc_node::storage::PresignatureStorage { - Presignature::storage(&self.pool(), id) + pub fn presignature_storage( + &self, + id: &AccountId, + me: Participant, + ) -> mpc_node::storage::PresignatureStorage { + let storage = Presignature::storage(&self.pool(), id); + storage.set_me(me); + storage } pub async fn stockpile_triples(&self, cfg: &NodeConfig, participants: &Participants, mul: u32) { @@ -403,15 +415,15 @@ impl Redis { .participants .keys() .map(|account_id| { - ( - Participant::from( - *participants - .account_to_participant_id - .get(account_id) - .unwrap(), - ), - TriplePair::storage(&pool, account_id), - ) + let me = Participant::from( + *participants + .account_to_participant_id + .get(account_id) + .unwrap(), + ); + let storage = TriplePair::storage(&pool, account_id); + storage.set_me(me); + (me, storage) }) .collect::>(); @@ -444,7 +456,7 @@ impl Redis { storage .get(me) .unwrap() - .reserve(pair_id) + .create_slot(pair_id, *owner) .await .unwrap() .insert(pair, *owner) diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index 4b8efce2a..45ce23baf 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -16,7 +16,6 @@ use crate::containers::DockerClient; use anyhow::Context as _; use cluster::spawner::ClusterSpawner; -use deadpool_redis::Pool; use ethers::types::{Address, U256}; use mpc_contract::config::{PresignatureConfig, ProtocolConfig, TripleConfig}; use mpc_contract::primitives::CandidateInfo; @@ -24,7 +23,6 @@ use mpc_node::gcp::GcpService; use mpc_node::indexer_eth::EthConfig; use mpc_node::indexer_hydration::HydrationConfig; use mpc_node::indexer_sol::SolConfig; -use mpc_node::storage::triple_storage::{TriplePair, TripleStorage}; use mpc_node::{logs, mesh, node_client, storage}; use mpc_primitives::{Chain, Checkpoint}; use near_workspaces::network::Sandbox; @@ -238,10 +236,6 @@ impl Nodes { Ok(()) } - pub async fn triple_storage(&self, redis_pool: &Pool, account_id: &AccountId) -> TripleStorage { - TriplePair::storage(redis_pool, account_id) - } - pub async fn gcp_services(&self) -> anyhow::Result> { let mut gcp_services = Vec::new(); match self { diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 4e0038ea3..d535b33b8 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -570,6 +570,7 @@ impl MpcFixtureNodeBuilder { let triple_storage = TriplePair::storage(&context.redis_pool, &self.participant_info.account_id); + triple_storage.set_me(self.me); if fixture_config.use_preshared_triples { // removing here because we can't clone a triple @@ -580,7 +581,7 @@ impl MpcFixtureNodeBuilder { if pair.holders.is_none() { pair.holders = Some(pair.triple0.public.participants.clone()); } - let mut slot = triple_storage.reserve(pair_id).await.unwrap(); + let mut slot = triple_storage.create_slot(pair_id, owner).await.unwrap(); slot.insert(pair, owner).await; } } @@ -588,6 +589,7 @@ impl MpcFixtureNodeBuilder { let presignature_storage = Presignature::storage(&context.redis_pool, &self.participant_info.account_id); + presignature_storage.set_me(self.me); if fixture_config.use_preshared_presignatures { // removing here because we can't clone a presignature @@ -598,7 +600,7 @@ impl MpcFixtureNodeBuilder { presignature_share.holders = Some(presignature_share.participants.clone()); } let mut slot = presignature_storage - .reserve(presignature_share.id) + .create_slot(presignature_share.id, owner) .await .unwrap(); slot.insert(presignature_share, owner).await; diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 47c76d001..28acba891 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -183,16 +183,34 @@ impl MpcFixtureNode { /// Get the list of triple IDs this node owns in storage (sorted). pub async fn owned_triples(&self) -> Vec { - let mut ids = self.triple_storage.fetch_owned(self.me).await.unwrap(); + let mut ids = self.triple_storage.fetch_owned().await.unwrap(); ids.sort(); ids } /// Get the list of presignature IDs this node owns in storage (sorted). pub async fn owned_presignatures(&self) -> Vec { + let mut ids = self.presignature_storage.fetch_owned().await.unwrap(); + ids.sort(); + ids + } + + /// Owned + owned using + owned generating, sorted. + pub async fn owned_triples_with_reserved(&self) -> Vec { + let mut ids = self + .triple_storage + .fetch_owned_with_reserved() + .await + .unwrap(); + ids.sort(); + ids + } + + /// Owned + owned using + owned generating, sorted. + pub async fn owned_presignatures_with_reserved(&self) -> Vec { let mut ids = self .presignature_storage - .fetch_owned(self.me) + .fetch_owned_with_reserved() .await .unwrap(); ids.sort(); @@ -208,11 +226,11 @@ impl MpcFixtureNode { response: &mpc_node::protocol::sync::SyncUpdate, ) { self.triple_storage - .remove_holder_and_prune(self.me, peer, threshold, &response.triples) + .remove_holder_and_prune(peer, threshold, &response.triples) .await .expect("remove_holder_and_prune triples failed"); self.presignature_storage - .remove_holder_and_prune(self.me, peer, threshold, &response.presignatures) + .remove_holder_and_prune(peer, threshold, &response.presignatures) .await .expect("remove_holder_and_prune presignatures failed"); } diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index aa6bb6ab2..0e6f2c9ea 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -67,7 +67,7 @@ pub(crate) async fn insert_triples_for_owner( let holders = holders.to_vec(); for id in ids { triples - .reserve(id) + .create_slot(id, owner) .await .unwrap() .insert(dummy_pair_with_holders(id, holders.clone()), owner) @@ -84,7 +84,7 @@ pub(crate) async fn insert_presignatures_for_owner( let holders = holders.to_vec(); for id in ids { presignatures - .reserve(id) + .create_slot(id, owner) .await .unwrap() .insert(dummy_presignature_with_holders(id, holders.clone()), owner) diff --git a/integration-tests/tests/cases/mod.rs b/integration-tests/tests/cases/mod.rs index 441b5b77f..5588b164e 100644 --- a/integration-tests/tests/cases/mod.rs +++ b/integration-tests/tests/cases/mod.rs @@ -24,6 +24,7 @@ pub mod mpc; pub mod nightly; pub mod solana; pub mod solana_stream; +pub mod state_sync; pub mod store; pub mod sync; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 3cf6409b3..5a0897870 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -27,282 +27,6 @@ const PRESIGNATURES_FILE: &str = "tmp/presignatures.json"; const TRIPLE_PAIRS_PER_OWNER: usize = 50; const PRESIGNATURES_PER_OWNER: usize = 25; -use super::helpers::{ - assert_presig_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, - insert_triples_for_owner, -}; - -#[test(tokio::test(flavor = "multi_thread"))] -async fn test_sync_noop_when_fully_synced() { - let fixture = MpcFixtureBuilder::default() - .only_generate_signatures() - .build() - .await; - - let node0 = &fixture.nodes[0]; - let node1 = &fixture.nodes[1]; - - // Snapshot node1's owned artifacts before sync. - let node1_triples_before = node1.owned_triples().await; - let node1_presigs_before = node1.owned_presignatures().await; - assert!( - !node1_triples_before.is_empty(), - "node1 should own triples from fixture" - ); - assert!( - !node1_presigs_before.is_empty(), - "node1 should own presignatures from fixture" - ); - - let node0_triples = node0.owned_triples().await; - let node0_presigs = node0.owned_presignatures().await; - - // Responder side: node1 receives node0's directory and reports what it's missing. - let response = node1 - .sync(node0.me, node0_triples.clone(), node0_presigs.clone()) - .await; - assert!( - response.triples.is_empty(), - "node1 should have all of node0's triples" - ); - assert!( - response.presignatures.is_empty(), - "node1 should have all of node0's presignatures" - ); - - // Caller side: node0 processes the response (nothing to remove since response is empty). - node0.process_sync_response(node1.me, 2, &response).await; - - // Verify node1's artifact state is exactly unchanged after sync. - let node1_triples_after = node1.owned_triples().await; - let node1_presigs_after = node1.owned_presignatures().await; - assert_eq!( - node1_triples_after, node1_triples_before, - "node1 triples should be unchanged after sync" - ); - assert_eq!( - node1_presigs_after, node1_presigs_before, - "node1 presignatures should be unchanged after sync" - ); - - // Verify node0's artifact state is also unchanged. - let node0_triples_after = node0.owned_triples().await; - let node0_presigs_after = node0.owned_presignatures().await; - assert_eq!( - node0_triples_after, node0_triples, - "node0 triples should be unchanged after sync" - ); - assert_eq!( - node0_presigs_after, node0_presigs, - "node0 presignatures should be unchanged after sync" - ); - - // Verify holders are full (all 3 nodes) for every artifact on both nodes. - let all_participants = fixture.sorted_participants(); - for node in [node0, node1] { - for id in &node.owned_triples().await { - let holders = node.triple_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "triple {id} on {:?} should have all participants as holders", - node.me - ); - } - for id in &node.owned_presignatures().await { - let holders = node.presignature_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "presignature {id} on {:?} should have all participants as holders", - node.me - ); - } - } -} - -#[test(tokio::test(flavor = "multi_thread"))] -async fn test_sync_prune_below_threshold() { - let fixture = MpcFixtureBuilder::default() - .only_generate_signatures() - .build() - .await; - - let node0 = &fixture.nodes[0]; - let node1 = &fixture.nodes[1]; - let node2 = &fixture.nodes[2]; - let all_participants = fixture.sorted_participants(); - - // Snapshot fixture artifacts before any mutations. - let node0_triples_before = node0.owned_triples().await; - let node0_presigs_before = node0.owned_presignatures().await; - - // Insert artifact id=99 only on node0's storage (claimed by all 3 holders). - insert_triples_for_owner(&node0.triple_storage, node0.me, &all_participants, 99..=99).await; - insert_presignatures_for_owner( - &node0.presignature_storage, - node0.me, - &all_participants, - 99..=99, - ) - .await; - - // node0 tells node1 "I own id=99", node1 responds "I don't have it". - let response = node1.sync(node0.me, vec![99], vec![99]).await; - assert_eq!(response.triples, vec![99]); - assert_eq!(response.presignatures, vec![99]); - - // node2 also doesn't have it. - let response2 = node2.sync(node0.me, vec![99], vec![99]).await; - assert_eq!(response2.triples, vec![99]); - assert_eq!(response2.presignatures, vec![99]); - - // Process node1's response first (removes node1, 2 holders remain = threshold, survives). - node0.process_sync_response(node1.me, 2, &response).await; - assert_triples_owned_state(&node0.triple_storage, node0.me, &[99], &[]).await; - assert_presig_owned_state(&node0.presignature_storage, node0.me, &[99], &[]).await; - - // Verify holders of 99: node1 removed, only node0 + node2 remain. - let mut expected_holders = vec![node0.me, node2.me]; - expected_holders.sort(); - let holders_99 = node0.triple_storage.fetch_holders(99).await; - assert_eq!( - holders_99, expected_holders, - "triple 99 should have node0+node2 as holders after first sync" - ); - let holders_99 = node0.presignature_storage.fetch_holders(99).await; - assert_eq!( - holders_99, expected_holders, - "presig 99 should have node0+node2 as holders after first sync" - ); - - // Process node2's response (removes node2, 1 holder < threshold → pruned). - node0.process_sync_response(node2.me, 2, &response2).await; - assert_triples_owned_state(&node0.triple_storage, node0.me, &[], &[99]).await; - assert_presig_owned_state(&node0.presignature_storage, node0.me, &[], &[99]).await; - - // Verify fixture artifacts on node0 are unchanged (only id=99 was pruned). - let node0_triples_after = node0.owned_triples().await; - let node0_presigs_after = node0.owned_presignatures().await; - assert_eq!( - node0_triples_after, node0_triples_before, - "node0 fixture triples should be unchanged" - ); - assert_eq!( - node0_presigs_after, node0_presigs_before, - "node0 fixture presignatures should be unchanged" - ); - - // Verify holders of fixture artifacts are still full on node0. - for id in &node0_triples_after { - let holders = node0.triple_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "fixture triple {id} should still have all holders" - ); - } - for id in &node0_presigs_after { - let holders = node0.presignature_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "fixture presig {id} should still have all holders" - ); - } -} - -/// Orphaned artifact: owner doesn't have id=77 but other nodes do. -/// When owner broadcasts its directory (without id=77), responders remove it -/// via remove_outdated. -#[test(tokio::test(flavor = "multi_thread"))] -async fn test_sync_remove_outdated_orphan() { - let fixture = MpcFixtureBuilder::default() - .only_generate_signatures() - .build() - .await; - - let node0 = &fixture.nodes[0]; - let node1 = &fixture.nodes[1]; - let all_participants = fixture.sorted_participants(); - - // Snapshot fixture artifacts on node1 before mutations. - let node1_triples_before = node1.owned_triples().await; - let node1_presigs_before = node1.owned_presignatures().await; - - // Insert id=77 owned by node0, but only on node1 (NOT on node0). - insert_triples_for_owner(&node1.triple_storage, node0.me, &all_participants, 77..=77).await; - insert_presignatures_for_owner( - &node1.presignature_storage, - node0.me, - &all_participants, - 77..=77, - ) - .await; - - // Verify node1 has id=77 before sync. - assert_triples_owned_state(&node1.triple_storage, node0.me, &[77], &[]).await; - assert_presig_owned_state(&node1.presignature_storage, node0.me, &[77], &[]).await; - - // Verify holders of id=77 are all participants. - let holders = node1.triple_storage.fetch_holders(77).await; - assert_eq!( - holders, all_participants, - "triple 77 should have all participants as holders before sync" - ); - let holders = node1.presignature_storage.fetch_holders(77).await; - assert_eq!( - holders, all_participants, - "presig 77 should have all participants as holders before sync" - ); - - // node0 broadcasts its directory (which does NOT include id=77 since node0 doesn't have it). - // Responder runs remove_outdated, which removes id=77. - let node0_triples = node0.owned_triples().await; - let node0_presigs = node0.owned_presignatures().await; - assert!(!node0_triples.contains(&77), "node0 should not own id=77"); - - let response = node1 - .sync(node0.me, node0_triples.clone(), node0_presigs.clone()) - .await; - assert!( - response.triples.is_empty(), - "node1 should not report any missing triples (remove_outdated handles orphans)" - ); - assert!( - response.presignatures.is_empty(), - "node1 should not report any missing presignatures" - ); - - // After sync, id=77 should be removed from node1 (via remove_outdated). - assert_triples_owned_state(&node1.triple_storage, node0.me, &[], &[77]).await; - assert_presig_owned_state(&node1.presignature_storage, node0.me, &[], &[77]).await; - - // Verify fixture artifacts on node1 are unchanged (only id=77 was removed). - let node1_triples_after = node1.owned_triples().await; - let node1_presigs_after = node1.owned_presignatures().await; - assert_eq!( - node1_triples_after, node1_triples_before, - "node1 fixture triples should be unchanged" - ); - assert_eq!( - node1_presigs_after, node1_presigs_before, - "node1 fixture presignatures should be unchanged" - ); - - // Verify holders of fixture artifacts are still full on node1. - for id in &node1_triples_after { - let holders = node1.triple_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "fixture triple {id} should still have all holders" - ); - } - for id in &node1_presigs_after { - let holders = node1.presignature_storage.fetch_holders(*id).await; - assert_eq!( - holders, all_participants, - "fixture presig {id} should still have all holders" - ); - } -} - #[test(tokio::test(flavor = "multi_thread"))] async fn test_basic_generate_keys() { let network = MpcFixtureBuilder::new(5, 4).build().await; @@ -373,7 +97,7 @@ async fn test_basic_generate_triples() { for node in &network.nodes { let mut nodes_shares = BTreeMap::new(); for peer in &network.nodes { - let triple_ids = node.triple_storage.fetch_owned(peer.me).await.unwrap(); + let triple_ids = node.triple_storage.fetch_owned_by(peer.me).await.unwrap(); let mut peer_triples = Vec::with_capacity(triple_ids.len()); for triple_id in triple_ids { let pair = conn @@ -427,7 +151,7 @@ async fn test_basic_generate_presignature() { for peer in &network.nodes { let presignature_ids = node .presignature_storage - .fetch_owned(peer.me) + .fetch_owned_by(peer.me) .await .unwrap(); let mut peer_presignatures = Vec::with_capacity(presignature_ids.len()); diff --git a/integration-tests/tests/cases/state_sync.rs b/integration-tests/tests/cases/state_sync.rs new file mode 100644 index 000000000..94a9ec69b --- /dev/null +++ b/integration-tests/tests/cases/state_sync.rs @@ -0,0 +1,490 @@ +use integration_tests::mpc_fixture::MpcFixtureBuilder; +use test_log::test; + +use std::time::Duration; + +use super::helpers::{ + assert_presig_owned_state, assert_triples_owned_state, insert_presignatures_for_owner, + insert_triples_for_owner, +}; + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sync_noop_when_fully_synced() { + let fixture = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + let node0 = &fixture.nodes[0]; + let node1 = &fixture.nodes[1]; + + // Snapshot node1's owned artifacts before sync. + let node1_triples_before = node1.owned_triples().await; + let node1_presigs_before = node1.owned_presignatures().await; + assert!( + !node1_triples_before.is_empty(), + "node1 should own triples from fixture" + ); + assert!( + !node1_presigs_before.is_empty(), + "node1 should own presignatures from fixture" + ); + + let node0_triples = node0.owned_triples().await; + let node0_presigs = node0.owned_presignatures().await; + + // Responder side: node1 receives node0's sync update and reports what it's missing. + let response = node1 + .sync(node0.me, node0_triples.clone(), node0_presigs.clone()) + .await; + assert!( + response.triples.is_empty(), + "node1 should have all of node0's triples" + ); + assert!( + response.presignatures.is_empty(), + "node1 should have all of node0's presignatures" + ); + + // Caller side: node0 processes the response (nothing to remove since response is empty). + node0.process_sync_response(node1.me, 2, &response).await; + + // Verify node1's artifact state is exactly unchanged after sync. + let node1_triples_after = node1.owned_triples().await; + let node1_presigs_after = node1.owned_presignatures().await; + assert_eq!( + node1_triples_after, node1_triples_before, + "node1 triples should be unchanged after sync" + ); + assert_eq!( + node1_presigs_after, node1_presigs_before, + "node1 presignatures should be unchanged after sync" + ); + + // Verify node0's artifact state is also unchanged. + let node0_triples_after = node0.owned_triples().await; + let node0_presigs_after = node0.owned_presignatures().await; + assert_eq!( + node0_triples_after, node0_triples, + "node0 triples should be unchanged after sync" + ); + assert_eq!( + node0_presigs_after, node0_presigs, + "node0 presignatures should be unchanged after sync" + ); + + // Verify holders are full (all 3 nodes) for every artifact on both nodes. + let all_participants = fixture.sorted_participants(); + for node in [node0, node1] { + for id in &node.owned_triples().await { + let holders = node.triple_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "triple {id} on {:?} should have all participants as holders", + node.me + ); + } + for id in &node.owned_presignatures().await { + let holders = node.presignature_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "presignature {id} on {:?} should have all participants as holders", + node.me + ); + } + } +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sync_prune_below_threshold() { + let fixture = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + let node0 = &fixture.nodes[0]; + let node1 = &fixture.nodes[1]; + let node2 = &fixture.nodes[2]; + let all_participants = fixture.sorted_participants(); + + // Snapshot fixture artifacts before any mutations. + let node0_triples_before = node0.owned_triples().await; + let node0_presigs_before = node0.owned_presignatures().await; + + // Insert artifact id=99 only on node0's storage (claimed by all 3 holders). + insert_triples_for_owner(&node0.triple_storage, node0.me, &all_participants, 99..=99).await; + insert_presignatures_for_owner( + &node0.presignature_storage, + node0.me, + &all_participants, + 99..=99, + ) + .await; + + // node0 tells node1 "I own id=99", node1 responds "I don't have it". + let response = node1.sync(node0.me, vec![99], vec![99]).await; + assert_eq!(response.triples, vec![99]); + assert_eq!(response.presignatures, vec![99]); + + // node2 also doesn't have it. + let response2 = node2.sync(node0.me, vec![99], vec![99]).await; + assert_eq!(response2.triples, vec![99]); + assert_eq!(response2.presignatures, vec![99]); + + // Process node1's response first (removes node1, 2 holders remain = threshold, survives). + node0.process_sync_response(node1.me, 2, &response).await; + assert_triples_owned_state(&node0.triple_storage, node0.me, &[99], &[]).await; + assert_presig_owned_state(&node0.presignature_storage, node0.me, &[99], &[]).await; + + // Verify holders of 99: node1 removed, only node0 + node2 remain. + let mut expected_holders = vec![node0.me, node2.me]; + expected_holders.sort(); + let holders_99 = node0.triple_storage.fetch_holders(99).await; + assert_eq!( + holders_99, expected_holders, + "triple 99 should have node0+node2 as holders after first sync" + ); + let holders_99 = node0.presignature_storage.fetch_holders(99).await; + assert_eq!( + holders_99, expected_holders, + "presig 99 should have node0+node2 as holders after first sync" + ); + + // Process node2's response (removes node2, 1 holder < threshold → pruned). + node0.process_sync_response(node2.me, 2, &response2).await; + assert_triples_owned_state(&node0.triple_storage, node0.me, &[], &[99]).await; + assert_presig_owned_state(&node0.presignature_storage, node0.me, &[], &[99]).await; + + // Verify fixture artifacts on node0 are unchanged (only id=99 was pruned). + let node0_triples_after = node0.owned_triples().await; + let node0_presigs_after = node0.owned_presignatures().await; + assert_eq!( + node0_triples_after, node0_triples_before, + "node0 fixture triples should be unchanged" + ); + assert_eq!( + node0_presigs_after, node0_presigs_before, + "node0 fixture presignatures should be unchanged" + ); + + // Verify holders of fixture artifacts are still full on node0. + for id in &node0_triples_after { + let holders = node0.triple_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "fixture triple {id} should still have all holders" + ); + } + for id in &node0_presigs_after { + let holders = node0.presignature_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "fixture presig {id} should still have all holders" + ); + } +} + +/// Orphaned artifact: owner doesn't have id=77 but other nodes do. +/// When owner broadcasts its sync update (without id=77), responders remove it +/// via remove_outdated. +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sync_remove_outdated_orphan() { + let fixture = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + let node0 = &fixture.nodes[0]; + let node1 = &fixture.nodes[1]; + let all_participants = fixture.sorted_participants(); + + // Snapshot fixture artifacts on node1 before mutations. + let node1_triples_before = node1.owned_triples().await; + let node1_presigs_before = node1.owned_presignatures().await; + + // Insert id=77 owned by node0, but only on node1 (NOT on node0). + insert_triples_for_owner(&node1.triple_storage, node0.me, &all_participants, 77..=77).await; + insert_presignatures_for_owner( + &node1.presignature_storage, + node0.me, + &all_participants, + 77..=77, + ) + .await; + + // Verify node1 has id=77 before sync. + assert_triples_owned_state(&node1.triple_storage, node0.me, &[77], &[]).await; + assert_presig_owned_state(&node1.presignature_storage, node0.me, &[77], &[]).await; + + // Verify holders of id=77 are all participants. + let holders = node1.triple_storage.fetch_holders(77).await; + assert_eq!( + holders, all_participants, + "triple 77 should have all participants as holders before sync" + ); + let holders = node1.presignature_storage.fetch_holders(77).await; + assert_eq!( + holders, all_participants, + "presig 77 should have all participants as holders before sync" + ); + + // node0 broadcasts its sync update (which does NOT include id=77 since node0 doesn't have it). + // Responder runs remove_outdated, which removes id=77. + let node0_triples = node0.owned_triples().await; + let node0_presigs = node0.owned_presignatures().await; + assert!(!node0_triples.contains(&77), "node0 should not own id=77"); + + let response = node1 + .sync(node0.me, node0_triples.clone(), node0_presigs.clone()) + .await; + assert!( + response.triples.is_empty(), + "node1 should not report any missing triples (remove_outdated handles orphans)" + ); + assert!( + response.presignatures.is_empty(), + "node1 should not report any missing presignatures" + ); + + // After sync, id=77 should be removed from node1 (via remove_outdated). + assert_triples_owned_state(&node1.triple_storage, node0.me, &[], &[77]).await; + assert_presig_owned_state(&node1.presignature_storage, node0.me, &[], &[77]).await; + + // Verify fixture artifacts on node1 are unchanged (only id=77 was removed). + let node1_triples_after = node1.owned_triples().await; + let node1_presigs_after = node1.owned_presignatures().await; + assert_eq!( + node1_triples_after, node1_triples_before, + "node1 fixture triples should be unchanged" + ); + assert_eq!( + node1_presigs_after, node1_presigs_before, + "node1 fixture presignatures should be unchanged" + ); + + // Verify holders of fixture artifacts are still full on node1. + for id in &node1_triples_after { + let holders = node1.triple_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "fixture triple {id} should still have all holders" + ); + } + for id in &node1_presigs_after { + let holders = node1.presignature_storage.fetch_holders(*id).await; + assert_eq!( + holders, all_participants, + "fixture presig {id} should still have all holders" + ); + } +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sync_matrix() { + #[derive(Debug, Clone, Copy)] + enum ArtifactState { + Generating, + Stored, + Using, + None, + } + + struct ExpectedCallerState { + /// Should the artifact appear in the caller's sync update? + in_update: bool, + /// Should the caller still have it in Redis after process_sync_response? + stored_after: bool, + } + + struct ExpectedResponderState { + /// Should the responder report the artifact as not_found (missing)? + missing: bool, + /// Should the responder still have it in Redis after sync? + stored_after: bool, + } + + struct Case { + caller: ArtifactState, + responder: ArtifactState, + expected_caller: ExpectedCallerState, + expected_responder: ExpectedResponderState, + } + + // Caller is always the owner (mine=true), responder is the peer (mine=false). + #[rustfmt::skip] // cargo fmt makes the matrix unreadable + let test_matrix = [ + // Caller: Stored (in update) ────────────────────────────── + Case { caller: ArtifactState::Stored, responder: ArtifactState::Stored, expected_caller: ExpectedCallerState { in_update: true, stored_after: true }, expected_responder: ExpectedResponderState { missing: false, stored_after: true } }, + Case { caller: ArtifactState::Stored, responder: ArtifactState::Generating, expected_caller: ExpectedCallerState { in_update: true, stored_after: true }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Stored, responder: ArtifactState::Using, expected_caller: ExpectedCallerState { in_update: true, stored_after: true }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Stored, responder: ArtifactState::None, expected_caller: ExpectedCallerState { in_update: true, stored_after: true }, expected_responder: ExpectedResponderState { missing: true, stored_after: false } }, + // Caller: Generating (in update) ────────────────────────── + Case { caller: ArtifactState::Generating, responder: ArtifactState::Stored, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: true } }, + Case { caller: ArtifactState::Generating, responder: ArtifactState::Generating, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Generating, responder: ArtifactState::Using, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Generating, responder: ArtifactState::None, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: true, stored_after: false } }, + // Caller: Using (in update) ────────────────────────────── + Case { caller: ArtifactState::Using, responder: ArtifactState::Stored, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: true } }, + Case { caller: ArtifactState::Using, responder: ArtifactState::Generating, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Using, responder: ArtifactState::Using, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::Using, responder: ArtifactState::None, expected_caller: ExpectedCallerState { in_update: true, stored_after: false }, expected_responder: ExpectedResponderState { missing: true, stored_after: false } }, + // Caller: None (NOT in update) ──────────────────────────── + Case { caller: ArtifactState::None, responder: ArtifactState::Stored, expected_caller: ExpectedCallerState { in_update: false, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::None, responder: ArtifactState::Generating, expected_caller: ExpectedCallerState { in_update: false, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::None, responder: ArtifactState::Using, expected_caller: ExpectedCallerState { in_update: false, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + Case { caller: ArtifactState::None, responder: ArtifactState::None, expected_caller: ExpectedCallerState { in_update: false, stored_after: false }, expected_responder: ExpectedResponderState { missing: false, stored_after: false } }, + ]; + + let fixture = MpcFixtureBuilder::default() + .only_generate_signatures() + .build() + .await; + + let caller = &fixture.nodes[0]; + let responder = &fixture.nodes[1]; + let all_participants = fixture.sorted_participants(); + + for (i, case) in test_matrix.iter().enumerate() { + let id: u64 = 800 + i as u64; + tracing::info!(id, ?case.caller, ?case.responder, "=== case {i} ==="); + + // Hold slots/taken artifacts alive until assertions are done. + let mut caller_slot = None; + let mut caller_taken = None; + let mut responder_slot = None; + let mut responder_taken = None; + + // --- Set up caller (owner) state --- + match case.caller { + ArtifactState::Stored => { + insert_triples_for_owner( + &caller.triple_storage, + caller.me, + &all_participants, + id..=id, + ) + .await; + } + ArtifactState::Generating => { + caller_slot = Some( + caller + .triple_storage + .create_slot(id, caller.me) + .await + .unwrap(), + ); + } + ArtifactState::Using => { + insert_triples_for_owner( + &caller.triple_storage, + caller.me, + &all_participants, + id..=id, + ) + .await; + caller_taken = Some(caller.triple_storage.take(id, caller.me).await.unwrap()); + } + ArtifactState::None => {} + } + + // --- Set up responder (peer) state --- + match case.responder { + ArtifactState::Stored => { + insert_triples_for_owner( + &responder.triple_storage, + caller.me, + &all_participants, + id..=id, + ) + .await; + } + ArtifactState::Generating => { + responder_slot = Some( + responder + .triple_storage + .create_slot(id, caller.me) + .await + .unwrap(), + ); + } + ArtifactState::Using => { + insert_triples_for_owner( + &responder.triple_storage, + caller.me, + &all_participants, + id..=id, + ) + .await; + responder_taken = Some(responder.triple_storage.take(id, caller.me).await.unwrap()); + } + ArtifactState::None => {} + } + + // --- Build caller's sync update --- + let caller_update = caller.owned_triples_with_reserved().await; + assert_eq!( + caller_update.contains(&id), + case.expected_caller.in_update, + "case {i}: caller={:?} → expected in_update={}", + case.caller, + case.expected_caller.in_update, + ); + + // --- Responder processes the sync update --- + let response = responder + .sync( + caller.me, + caller_update, + vec![], // this matrix only tests triples + ) + .await; + + // Verify the full SyncUpdate response from the responder. + assert_eq!( + response.from, responder.me, + "case {i}: response.from should be the responder", + ); + assert_eq!( + response.triples.contains(&id), + case.expected_responder.missing, + "case {i}: caller={:?}, responder={:?} → expected missing={}", + case.caller, + case.responder, + case.expected_responder.missing, + ); + + // --- Verify responder's Redis state after sync --- + assert_eq!( + responder + .triple_storage + .contains_by_owner(id, caller.me) + .await, + case.expected_responder.stored_after, + "case {i}: caller={:?}, responder={:?} → expected responder stored_after={}", + case.caller, + case.responder, + case.expected_responder.stored_after, + ); + + // --- Caller processes the sync response (remove holder / prune) --- + caller + .process_sync_response(responder.me, 2, &response) + .await; + assert_eq!( + caller.triple_storage.contains_by_owner(id, caller.me).await, + case.expected_caller.stored_after, + "case {i}: caller={:?}, responder={:?} → expected caller stored_after={}", + case.caller, + case.responder, + case.expected_caller.stored_after, + ); + + // Clean up held slots/taken before next iteration. + drop(caller_slot); + drop(caller_taken); + drop(responder_slot); + drop(responder_taken); + // Wait for async Drop cleanup. + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/integration-tests/tests/cases/store.rs b/integration-tests/tests/cases/store.rs index 0e3a9cf5f..67f36e8d0 100644 --- a/integration-tests/tests/cases/store.rs +++ b/integration-tests/tests/cases/store.rs @@ -22,7 +22,7 @@ async fn test_triple_persistence() -> anyhow::Result<()> { let (_, _, msg) = MessageChannel::new(); let node0_id = "party0.near".parse().unwrap(); let redis = containers::Redis::run(&spawner).await; - let triple_storage = redis.triple_storage(&node0_id); + let triple_storage = redis.triple_storage(&node0_id, node0); let triple_spawner = TripleSpawner::new(node0, 5, 123, &triple_storage, msg, node0_id.to_string()); @@ -38,13 +38,13 @@ async fn test_triple_persistence() -> anyhow::Result<()> { assert_eq!(triple_spawner.len_potential().await, 0); triple_storage - .reserve(triple_id1) + .create_slot(triple_id1, node1) .await .unwrap() .insert(dummy_pair(triple_id1), node1) .await; triple_storage - .reserve(triple_id2) + .create_slot(triple_id2, node1) .await .unwrap() .insert(dummy_pair(triple_id2), node1) @@ -59,9 +59,9 @@ async fn test_triple_persistence() -> anyhow::Result<()> { assert_eq!(triple_storage.len_by_owner(node0).await, 0); assert_eq!(triple_spawner.len_potential().await, 2); - // Take triple pairs and check that they are removed from the storage and added to used set - triple_storage.take(triple_id1, node1).await.unwrap(); - triple_storage.take(triple_id2, node1).await.unwrap(); + // Take triple pairs and check that they are removed from the storage and marked as using + let _taken1 = triple_storage.take(triple_id1, node1).await.unwrap(); + let _taken2 = triple_storage.take(triple_id2, node1).await.unwrap(); assert!(!triple_spawner.contains(triple_id1).await); assert!(!triple_spawner.contains(triple_id2).await); assert!(!triple_spawner.contains_mine(triple_id1).await); @@ -69,33 +69,31 @@ async fn test_triple_persistence() -> anyhow::Result<()> { assert_eq!(triple_storage.len_generated().await, 0); assert_eq!(triple_spawner.len_mine().await, 0); assert_eq!(triple_spawner.len_potential().await, 0); - assert!(triple_storage.contains_used(triple_id1).await); - assert!(triple_storage.contains_used(triple_id2).await); + assert!(triple_storage.contains_using(triple_id1).await); + assert!(triple_storage.contains_using(triple_id2).await); - // Attempt to re-reserve used triples and check that it cannot be reserved since it is used. - assert!(triple_storage.reserve(triple_id1).await.is_none()); - assert!(triple_storage.reserve(triple_id2).await.is_none()); - assert!(!triple_spawner.contains(triple_id1).await); - assert!(!triple_spawner.contains(triple_id2).await); + // Attempt to re-create slot for in-use triples and check that it fails + assert!(triple_storage + .create_slot(triple_id1, node1) + .await + .is_none()); + assert!(triple_storage + .create_slot(triple_id2, node1) + .await + .is_none()); let id3 = 3; let id4: u64 = 4; - // check that reserve and unreserve works: - let slot = triple_storage.reserve(id3).await.unwrap(); - if let Some(task) = slot.unreserve() { - task.await.unwrap(); - } - // Add mine triple and check that it is in the storage triple_storage - .reserve(id3) + .create_slot(id3, node0) .await .unwrap() .insert(dummy_pair(id3), node0) .await; triple_storage - .reserve(id4) + .create_slot(id4, node0) .await .unwrap() .insert(dummy_pair(id4), node0) @@ -108,9 +106,9 @@ async fn test_triple_persistence() -> anyhow::Result<()> { assert_eq!(triple_spawner.len_mine().await, 2); assert_eq!(triple_spawner.len_potential().await, 2); - // Take mine triple pairs and check that they are removed from the storage and added to used set - triple_storage.take_mine(node0).await.unwrap(); - triple_storage.take_mine(node0).await.unwrap(); + // Take mine triple pairs and check that they are removed from the storage and marked as using + let _taken3 = triple_storage.take_mine().await.unwrap(); + let _taken4 = triple_storage.take_mine().await.unwrap(); assert!(!triple_spawner.contains(id3).await); assert!(!triple_spawner.contains(id4).await); assert!(!triple_spawner.contains_mine(id3).await); @@ -119,20 +117,18 @@ async fn test_triple_persistence() -> anyhow::Result<()> { assert_eq!(triple_spawner.len_mine().await, 0); assert!(triple_storage.is_empty().await); assert_eq!(triple_spawner.len_potential().await, 0); - assert!(triple_storage.contains_used(id3).await); - assert!(triple_storage.contains_used(id4).await); + assert!(triple_storage.contains_using(id3).await); + assert!(triple_storage.contains_using(id4).await); - // Attempt to re-insert used mine triples and check that it fails - assert!(triple_storage.reserve(id3).await.is_none()); - assert!(triple_storage.reserve(id4).await.is_none()); - assert!(!triple_spawner.contains(id3).await); - assert!(!triple_spawner.contains(id4).await); + // Attempt to re-create slot for in-use mine triples and check that it fails + assert!(triple_storage.create_slot(id3, node0).await.is_none()); + assert!(triple_storage.create_slot(id4, node0).await.is_none()); assert!(triple_storage.clear().await); // Have our node0 observe shares for triples 10 to 15 where node1 is owner. for id in 10..=15 { triple_storage - .reserve(id) + .create_slot(id, node1) .await .unwrap() .insert(dummy_pair(id), node1) @@ -142,7 +138,7 @@ async fn test_triple_persistence() -> anyhow::Result<()> { // Have our node0 own 16 to 20 for id in 16..=20 { triple_storage - .reserve(id) + .create_slot(id, node0) .await .unwrap() .insert(dummy_pair(id), node0) @@ -179,8 +175,8 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { let (_, _, msg) = MessageChannel::new(); let node0_id = "party0.near".parse().unwrap(); let redis = containers::Redis::run(&spawner).await; - let triple_storage = redis.triple_storage(&node0_id); - let presignature_storage = redis.presignature_storage(&node0_id); + let triple_storage = redis.triple_storage(&node0_id, node0); + let presignature_storage = redis.presignature_storage(&node0_id, node0); let presignature_spawner = PresignatureSpawner::new( Participant::from(0), 5, @@ -204,16 +200,10 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { assert!(presignature_storage.is_empty().await); assert_eq!(presignature_spawner.len_potential().await, 0); - // check that reserve then dropping unreserves the slot: - let slot = presignature_storage.reserve(presignature.id).await.unwrap(); - if let Some(task) = slot.unreserve() { - task.await.unwrap(); - } - // Insert presignature owned by node1, with our node0 view being that it is a foreign presignature assert!( presignature_storage - .reserve(presignature.id) + .create_slot(presignature.id, node1) .await .unwrap() .insert(presignature, node1) @@ -227,18 +217,17 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { assert_eq!(presignature_spawner.len_mine().await, 0); assert_eq!(presignature_spawner.len_potential().await, 1); - // Take presignature and check that it is removed from the storage and added to used set - presignature_storage.take(id, node1).await.unwrap(); + // Take presignature and check that it is removed from the storage and marked as using + let _taken_ps1 = presignature_storage.take(id, node1).await.unwrap(); assert!(!presignature_storage.contains(id).await); assert!(!presignature_spawner.contains_mine(id).await); assert_eq!(presignature_storage.len_generated().await, 0); assert_eq!(presignature_spawner.len_mine().await, 0); assert_eq!(presignature_spawner.len_potential().await, 0); - assert!(presignature_storage.contains_used(id).await); + assert!(presignature_storage.contains_using(id).await); - // Attempt to re-insert used presignature and check that it fails - assert!(presignature_storage.reserve(id).await.is_none()); - assert!(!presignature_spawner.contains(id).await); + // Attempt to re-create slot for in-use presignature and check that it fails + assert!(presignature_storage.create_slot(id, node1).await.is_none()); let id2 = 2; let mine_presignature = dummy_presignature(id2); @@ -246,7 +235,7 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { // Add a presignature to our own node0 assert!( presignature_storage - .reserve(id2) + .create_slot(id2, node0) .await .unwrap() .insert(mine_presignature, node0) @@ -259,25 +248,24 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { assert_eq!(presignature_spawner.len_mine().await, 1); assert_eq!(presignature_spawner.len_potential().await, 1); - // Take mine presignature and check that it is removed from the storage and added to used set - presignature_storage.take_mine(node0).await.unwrap(); + // Take mine presignature and check that it is removed from the storage and marked as using + let _taken_ps2 = presignature_storage.take_mine().await.unwrap(); assert!(!presignature_storage.contains(id2).await); assert!(!presignature_spawner.contains_mine(id2).await); assert_eq!(presignature_storage.len_generated().await, 0); assert_eq!(presignature_spawner.len_mine().await, 0); assert!(presignature_storage.is_empty().await); assert_eq!(presignature_spawner.len_potential().await, 0); - assert!(presignature_storage.contains_used(id2).await); + assert!(presignature_storage.contains_using(id2).await); - // Attempt to re-insert used mine presignature and check that it fails - assert!(presignature_storage.reserve(id2).await.is_none()); - assert!(!presignature_spawner.contains(id2).await); + // Attempt to re-create slot for in-use mine presignature and check that it fails + assert!(presignature_storage.create_slot(id2, node0).await.is_none()); presignature_storage.clear().await; // Have our node0 observe shares for triples 10 to 15 where node1 is owner. for id in 10..=15 { presignature_storage - .reserve(id) + .create_slot(id, node1) .await .unwrap() .insert(dummy_presignature(id), node1) @@ -287,7 +275,7 @@ async fn test_presignature_persistence() -> anyhow::Result<()> { // Have our node0 own 16 to 20 for id in 16..=20 { presignature_storage - .reserve(id) + .create_slot(id, node0) .await .unwrap() .insert(dummy_presignature(id), node0) diff --git a/integration-tests/tests/cases/sync.rs b/integration-tests/tests/cases/sync.rs index 526d2607b..110e42430 100644 --- a/integration-tests/tests/cases/sync.rs +++ b/integration-tests/tests/cases/sync.rs @@ -26,10 +26,10 @@ async fn test_state_sync_e2e_large_outdated_stockpile() { let redis = spawner.prespawn_redis().await; // immediately add to triples/presignatures storage the triples/presignatures we want to invalidate. - let node0_triples = redis.triple_storage(&node0_account_id); - let node0_presignatures = redis.presignature_storage(&node0_account_id); - let node1_triples = redis.triple_storage(&node1_account_id); - let node1_presignatures = redis.presignature_storage(&node1_account_id); + let node0_triples = redis.triple_storage(&node0_account_id, node0); + let node0_presignatures = redis.presignature_storage(&node0_account_id, node0); + let node1_triples = redis.triple_storage(&node1_account_id, node1); + let node1_presignatures = redis.presignature_storage(&node1_account_id, node1); // insert triples that will be invalidated after a sync, since nobody else has them. // node0 is saying that they have 0 to 5, but node1 will sync and say they have 4 and 5 only. @@ -110,12 +110,12 @@ async fn test_state_sync_e2e() { tokio::time::sleep(Duration::from_secs(1)).await; // Get triple/presignature storage for each node. - let node0_triples = redis.triple_storage(&node0_account_id); - let node0_presignatures = redis.presignature_storage(&node0_account_id); - let node1_triples = redis.triple_storage(&node1_account_id); - let node1_presignatures = redis.presignature_storage(&node1_account_id); - let node2_triples = redis.triple_storage(&node2_account_id); - let node2_presignatures = redis.presignature_storage(&node2_account_id); + let node0_triples = redis.triple_storage(&node0_account_id, node0); + let node0_presignatures = redis.presignature_storage(&node0_account_id, node0); + let node1_triples = redis.triple_storage(&node1_account_id, node1); + let node1_presignatures = redis.presignature_storage(&node1_account_id, node1); + let node2_triples = redis.triple_storage(&node2_account_id, node2); + let node2_presignatures = redis.presignature_storage(&node2_account_id, node2); // Populate 3 triples and 3 presignatures: each node owns 1, all nodes hold shares. for storage in [&node0_triples, &node1_triples, &node2_triples] {