diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 7b5357a3..35d0777d 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -694,6 +694,7 @@ impl PresignatureSpawner { mut cfg: watch::Receiver, ongoing_gen_tx: watch::Sender, ) { + let mut last_active_warn: Option = None; let mut stockpile_interval = time::interval(Duration::from_millis(100)); let mut expiration_interval = tokio::time::interval(Duration::from_secs(1)); let mut posits = self.msg.subscribe_presignature_posit().await; @@ -732,21 +733,26 @@ impl PresignatureSpawner { self.ongoing_owned.remove(&id); let _ = ongoing_gen_tx.send(self.ongoing.len()); } - _ = stockpile_interval.tick(), if active.len() >= self.threshold => { - self.stockpile(&active, &protocol).await; - let _ = ongoing_gen_tx.send(self.ongoing.len()); - - crate::metrics::storage::NUM_PRESIGNATURES_MINE - - .set(self.len_mine().await as i64); - crate::metrics::storage::NUM_PRESIGNATURES_TOTAL - - .set(self.len_generated().await as i64); - crate::metrics::protocols::NUM_PRESIGNATURE_GENERATORS_TOTAL - - .set( - self.len_potential().await as i64 - self.len_generated().await as i64, + _ = stockpile_interval.tick() => { + if active.len() >= self.threshold { + last_active_warn = None; + self.stockpile(&active, &protocol).await; + let _ = ongoing_gen_tx.send(self.ongoing.len()); + + crate::metrics::storage::NUM_PRESIGNATURES_MINE + .set(self.len_mine().await as i64); + crate::metrics::storage::NUM_PRESIGNATURES_TOTAL + .set(self.len_generated().await as i64); + crate::metrics::protocols::NUM_PRESIGNATURE_GENERATORS_TOTAL + .set(self.len_potential().await as i64 - self.len_generated().await as i64); + } else if last_active_warn.is_none_or(|i: Instant| i.elapsed() > Duration::from_secs(60)) { + tracing::warn!( + ?active, + threshold = self.threshold, + "not enough active participants to generate presignatures" ); + last_active_warn = Some(Instant::now()); + } } Ok(()) = cfg.changed() => { protocol = cfg.borrow().protocol.clone(); diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 18bd58c5..2d2f7765 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -273,7 +273,6 @@ impl SignOrganizer { let (presignature_id, presignature, active) = if is_proposer { tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature"); let active = active.iter().copied().collect::>(); - let mut recycle = Vec::new(); let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { @@ -287,7 +286,13 @@ impl SignOrganizer { }; let participants = intersect_vec(&[holders, &active]); if participants.len() < ctx.threshold { - recycle.push(taken); + tracing::warn!( + ?sign_id, + id = taken.artifact.id, + ?holders, + ?active, + "discarding presignature due to inactive participants" + ); continue; } @@ -298,13 +303,6 @@ impl SignOrganizer { }) .await; - let presignatures = ctx.presignatures.clone(); - tokio::spawn(async move { - for taken in recycle { - presignatures.recycle_mine(me, taken).await; - } - }); - let (taken, participants) = match fetch { Ok(value) => value, Err(_) => { @@ -630,9 +628,8 @@ impl SignPositor { if counter.enough_rejects(ctx.threshold) { tracing::warn!(?sign_id, ?round, ?from, "received enough REJECTs, reorganizing"); - if let Some(taken) = presignature { - tracing::warn!(?sign_id, "recycling presignature due to REJECTs"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + if let Some(_taken) = presignature { + tracing::warn!(?sign_id, "discarding presignature due to REJECTs"); } state.bump_round(); return SignPhase::Organizing(SignOrganizer); @@ -672,9 +669,8 @@ impl SignPositor { ?round, "proposer posit deadline reached, expiring round" ); - if let Some(taken) = presignature { - tracing::warn!(?sign_id, "recycling presignature due to proposer timeout"); - ctx.presignatures.recycle_mine(ctx.me, taken).await; + if let Some(_taken) = presignature { + tracing::warn!(?sign_id, "discarding presignature due to proposer timeout"); } } else { tracing::warn!(?sign_id, me=?ctx.me, ?proposer, "deliberator posit timeout waiting for Start, reorganizing"); diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 414ae5ca..d6b4d9c0 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -557,10 +557,6 @@ impl TripleSpawner { /// Generate new triples if this node owns fewer than the per-node minimum /// (`min_triples`) and the network-wide total hasn't reached the cap (`max_triples`). async fn stockpile(&mut self, participants: &[Participant], cfg: &ProtocolConfig) { - if participants.len() < self.threshold { - return; - } - let not_enough_triples = { // Network-wide cap: stop generating once total potential triples reach max_triples. if self.len_potential().await >= cfg.triple.max_triples as usize { @@ -590,6 +586,7 @@ impl TripleSpawner { let mut active = mesh_state.borrow().active().keys_vec(); let mut protocol = cfg.borrow().protocol.clone(); + let mut last_active_warn = None; loop { tokio::select! { @@ -618,22 +615,28 @@ impl TripleSpawner { self.ongoing_introduced.remove(&id); let _ = ongoing_gen_tx.send(self.ongoing.len()); } - _ = stockpile_interval.tick(), if active.len() >= self.threshold => { - self.stockpile(&active, &protocol).await; - let _ = ongoing_gen_tx.send(self.ongoing.len()); - - crate::metrics::storage::NUM_TRIPLES_MINE - - .set(self.len_mine().await as i64); - crate::metrics::storage::NUM_TRIPLES_TOTAL - - .set(self.triple_storage.len_generated().await as i64); - crate::metrics::protocols::NUM_TRIPLE_GENERATORS_INTRODUCED - - .set(self.len_introduced() as i64); - crate::metrics::protocols::NUM_TRIPLE_GENERATORS_TOTAL - - .set(self.len_ongoing() as i64); + _ = stockpile_interval.tick() => { + if active.len() >= self.threshold { + last_active_warn = None; + self.stockpile(&active, &protocol).await; + let _ = ongoing_gen_tx.send(self.ongoing.len()); + + crate::metrics::storage::NUM_TRIPLES_MINE + .set(self.len_mine().await as i64); + crate::metrics::storage::NUM_TRIPLES_TOTAL + .set(self.triple_storage.len_generated().await as i64); + crate::metrics::protocols::NUM_TRIPLE_GENERATORS_INTRODUCED + .set(self.len_introduced() as i64); + crate::metrics::protocols::NUM_TRIPLE_GENERATORS_TOTAL + .set(self.len_ongoing() as i64); + } else if last_active_warn.is_none_or(|i: Instant| i.elapsed() > Duration::from_secs(60)) { + tracing::warn!( + ?active, + threshold = self.threshold, + "not enough active participants to generate triples" + ); + last_active_warn = Some(Instant::now()); + } } Ok(()) = cfg.changed() => { protocol = cfg.borrow().protocol.clone(); diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index b926be7b..ed352ca6 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -675,78 +675,6 @@ impl ProtocolStorage { } } - /// Return a taken artifact back to the available pool. - pub async fn recycle_mine(&self, me: Participant, taken: ArtifactTaken) -> bool { - const SCRIPT: &str = r#" - local artifact_key = KEYS[1] - local mine_key = KEYS[2] - local artifact_id = ARGV[1] - local artifact = ARGV[2] - local num_holders = tonumber(ARGV[3]) - - -- Add back to artifact hash map - redis.call("HSET", artifact_key, artifact_id, artifact) - - -- Add back to mine set - redis.call("SADD", mine_key, artifact_id) - - -- Restore holders set - local holders_key = artifact_key .. ':holders:' .. artifact_id - redis.call("DEL", holders_key) - if num_holders > 0 then - redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders)) - end - - return 1 - "#; - - let start = Instant::now(); - let (artifact, mut dropper) = taken.take(); - // We manually handle the return, so we don't want the dropper to unreserve it. - dropper.dropper.take(); - - let id = artifact.id(); - let holders: Vec = artifact - .holders() - .expect("holders must be set before recycle") - .iter() - .map(|p| Into::::into(*p)) - .collect(); - - let Some(mut conn) = self.connect().await else { - tracing::warn!(id, "failed to return artifact: connection failed"); - return false; - }; - - let result: Result = redis::Script::new(SCRIPT) - .key(&self.artifact_key) - .key(owner_key(&self.owner_keys, me)) - .arg(id) - .arg(&artifact) - .arg(holders.len() as i64) - .arg(holders.as_slice()) - .invoke_async(&mut conn) - .await; - - let elapsed = start.elapsed(); - crate::metrics::storage::REDIS_LATENCY - .with_label_values(&[A::METRIC_LABEL, "return_mine"]) - .observe(elapsed.as_millis() as f64); - - match result { - Ok(_) => { - self.reserved.write().await.remove(&id); - self.used.write().await.remove(&id); - tracing::info!(id, ?elapsed, "returned mine artifact"); - true - } - Err(err) => { - tracing::warn!(id, ?err, ?elapsed, "failed to return mine artifact"); - false - } - } - } - /// Check if an artifact is reserved. pub async fn contains_reserved(&self, id: A::Id) -> bool { self.reserved.read().await.contains(&id)