Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ impl PresignatureSpawner {
mut cfg: watch::Receiver<Config>,
ongoing_gen_tx: watch::Sender<usize>,
) {
let mut last_active_warn: Option<Instant> = None;
let mut stockpile_interval = time::interval(Duration::from_millis(100));
let mut expiration_interval = tokio::time::interval(Duration::from_secs(1));
let mut posits = self.msg.subscribe_presignature_posit().await;
Expand Down Expand Up @@ -732,21 +733,26 @@ impl PresignatureSpawner {
self.ongoing_owned.remove(&id);
let _ = ongoing_gen_tx.send(self.ongoing.len());
}
_ = stockpile_interval.tick(), if active.len() >= self.threshold => {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic is the same here. I've only added logging.

self.stockpile(&active, &protocol).await;
let _ = ongoing_gen_tx.send(self.ongoing.len());

crate::metrics::storage::NUM_PRESIGNATURES_MINE

.set(self.len_mine().await as i64);
crate::metrics::storage::NUM_PRESIGNATURES_TOTAL

.set(self.len_generated().await as i64);
crate::metrics::protocols::NUM_PRESIGNATURE_GENERATORS_TOTAL

.set(
self.len_potential().await as i64 - self.len_generated().await as i64,
_ = stockpile_interval.tick() => {
if active.len() >= self.threshold {
last_active_warn = None;
self.stockpile(&active, &protocol).await;
let _ = ongoing_gen_tx.send(self.ongoing.len());

crate::metrics::storage::NUM_PRESIGNATURES_MINE
.set(self.len_mine().await as i64);
crate::metrics::storage::NUM_PRESIGNATURES_TOTAL
.set(self.len_generated().await as i64);
crate::metrics::protocols::NUM_PRESIGNATURE_GENERATORS_TOTAL
.set(self.len_potential().await as i64 - self.len_generated().await as i64);
} else if last_active_warn.is_none_or(|i: Instant| i.elapsed() > Duration::from_secs(60)) {
tracing::warn!(
?active,
threshold = self.threshold,
"not enough active participants to generate presignatures"
);
last_active_warn = Some(Instant::now());
}
}
Ok(()) = cfg.changed() => {
protocol = cfg.borrow().protocol.clone();
Expand Down
26 changes: 11 additions & 15 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ impl SignOrganizer {
let (presignature_id, presignature, active) = if is_proposer {
tracing::info!(?sign_id, round = ?state.round, "proposer waiting for presignature");
let active = active.iter().copied().collect::<Vec<_>>();
let mut recycle = Vec::new();
let remaining = state.budget.remaining();
let fetch = tokio::time::timeout(remaining, async {
loop {
Expand All @@ -287,7 +286,13 @@ impl SignOrganizer {
};
let participants = intersect_vec(&[holders, &active]);
if participants.len() < ctx.threshold {
recycle.push(taken);
tracing::warn!(
?sign_id,
id = taken.artifact.id,
?holders,
?active,
"discarding presignature due to inactive participants"
);
continue;
}

Expand All @@ -298,13 +303,6 @@ impl SignOrganizer {
})
.await;

let presignatures = ctx.presignatures.clone();
tokio::spawn(async move {
for taken in recycle {
presignatures.recycle_mine(me, taken).await;
}
});

let (taken, participants) = match fetch {
Ok(value) => value,
Err(_) => {
Expand Down Expand Up @@ -630,9 +628,8 @@ impl SignPositor {

if counter.enough_rejects(ctx.threshold) {
tracing::warn!(?sign_id, ?round, ?from, "received enough REJECTs, reorganizing");
if let Some(taken) = presignature {
tracing::warn!(?sign_id, "recycling presignature due to REJECTs");
ctx.presignatures.recycle_mine(ctx.me, taken).await;
if let Some(_taken) = presignature {
tracing::warn!(?sign_id, "discarding presignature due to REJECTs");
}
state.bump_round();
return SignPhase::Organizing(SignOrganizer);
Expand Down Expand Up @@ -672,9 +669,8 @@ impl SignPositor {
?round,
"proposer posit deadline reached, expiring round"
);
if let Some(taken) = presignature {
tracing::warn!(?sign_id, "recycling presignature due to proposer timeout");
ctx.presignatures.recycle_mine(ctx.me, taken).await;
if let Some(_taken) = presignature {
tracing::warn!(?sign_id, "discarding presignature due to proposer timeout");
}
} else {
tracing::warn!(?sign_id, me=?ctx.me, ?proposer, "deliberator posit timeout waiting for Start, reorganizing");
Expand Down
43 changes: 23 additions & 20 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,10 +557,6 @@ impl TripleSpawner {
/// Generate new triples if this node owns fewer than the per-node minimum
/// (`min_triples`) and the network-wide total hasn't reached the cap (`max_triples`).
async fn stockpile(&mut self, participants: &[Participant], cfg: &ProtocolConfig) {
if participants.len() < self.threshold {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is already done before calling the stockpile() function

return;
}

let not_enough_triples = {
// Network-wide cap: stop generating once total potential triples reach max_triples.
if self.len_potential().await >= cfg.triple.max_triples as usize {
Expand Down Expand Up @@ -590,6 +586,7 @@ impl TripleSpawner {

let mut active = mesh_state.borrow().active().keys_vec();
let mut protocol = cfg.borrow().protocol.clone();
let mut last_active_warn = None;

loop {
tokio::select! {
Expand Down Expand Up @@ -618,22 +615,28 @@ impl TripleSpawner {
self.ongoing_introduced.remove(&id);
let _ = ongoing_gen_tx.send(self.ongoing.len());
}
_ = stockpile_interval.tick(), if active.len() >= self.threshold => {
self.stockpile(&active, &protocol).await;
let _ = ongoing_gen_tx.send(self.ongoing.len());

crate::metrics::storage::NUM_TRIPLES_MINE

.set(self.len_mine().await as i64);
crate::metrics::storage::NUM_TRIPLES_TOTAL

.set(self.triple_storage.len_generated().await as i64);
crate::metrics::protocols::NUM_TRIPLE_GENERATORS_INTRODUCED

.set(self.len_introduced() as i64);
crate::metrics::protocols::NUM_TRIPLE_GENERATORS_TOTAL

.set(self.len_ongoing() as i64);
_ = stockpile_interval.tick() => {
if active.len() >= self.threshold {
last_active_warn = None;
self.stockpile(&active, &protocol).await;
let _ = ongoing_gen_tx.send(self.ongoing.len());

crate::metrics::storage::NUM_TRIPLES_MINE
.set(self.len_mine().await as i64);
crate::metrics::storage::NUM_TRIPLES_TOTAL
.set(self.triple_storage.len_generated().await as i64);
crate::metrics::protocols::NUM_TRIPLE_GENERATORS_INTRODUCED
.set(self.len_introduced() as i64);
crate::metrics::protocols::NUM_TRIPLE_GENERATORS_TOTAL
.set(self.len_ongoing() as i64);
} else if last_active_warn.is_none_or(|i: Instant| i.elapsed() > Duration::from_secs(60)) {
tracing::warn!(
?active,
threshold = self.threshold,
"not enough active participants to generate triples"
);
last_active_warn = Some(Instant::now());
}
}
Ok(()) = cfg.changed() => {
protocol = cfg.borrow().protocol.clone();
Expand Down
72 changes: 0 additions & 72 deletions chain-signatures/node/src/storage/protocol_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,78 +675,6 @@ impl<A: ProtocolArtifact> ProtocolStorage<A> {
}
}

/// Return a taken artifact back to the available pool.
pub async fn recycle_mine(&self, me: Participant, taken: ArtifactTaken<A>) -> bool {
const SCRIPT: &str = r#"
local artifact_key = KEYS[1]
local mine_key = KEYS[2]
local artifact_id = ARGV[1]
local artifact = ARGV[2]
local num_holders = tonumber(ARGV[3])

-- Add back to artifact hash map
redis.call("HSET", artifact_key, artifact_id, artifact)

-- Add back to mine set
redis.call("SADD", mine_key, artifact_id)

-- Restore holders set
local holders_key = artifact_key .. ':holders:' .. artifact_id
redis.call("DEL", holders_key)
if num_holders > 0 then
redis.call("SADD", holders_key, unpack(ARGV, 4, 3 + num_holders))
end

return 1
"#;

let start = Instant::now();
let (artifact, mut dropper) = taken.take();
// We manually handle the return, so we don't want the dropper to unreserve it.
dropper.dropper.take();

let id = artifact.id();
let holders: Vec<u32> = artifact
.holders()
.expect("holders must be set before recycle")
.iter()
.map(|p| Into::<u32>::into(*p))
.collect();

let Some(mut conn) = self.connect().await else {
tracing::warn!(id, "failed to return artifact: connection failed");
return false;
};

let result: Result<i32, _> = redis::Script::new(SCRIPT)
.key(&self.artifact_key)
.key(owner_key(&self.owner_keys, me))
.arg(id)
.arg(&artifact)
.arg(holders.len() as i64)
.arg(holders.as_slice())
.invoke_async(&mut conn)
.await;

let elapsed = start.elapsed();
crate::metrics::storage::REDIS_LATENCY
.with_label_values(&[A::METRIC_LABEL, "return_mine"])
.observe(elapsed.as_millis() as f64);

match result {
Ok(_) => {
self.reserved.write().await.remove(&id);
self.used.write().await.remove(&id);
tracing::info!(id, ?elapsed, "returned mine artifact");
true
}
Err(err) => {
tracing::warn!(id, ?err, ?elapsed, "failed to return mine artifact");
false
}
}
}

/// Check if an artifact is reserved.
pub async fn contains_reserved(&self, id: A::Id) -> bool {
self.reserved.read().await.contains(&id)
Expand Down
Loading