Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions chain-signatures/node/src/protocol/presignature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl PresignatureSpawner {
/// Starts a new presignature generation protocol.
async fn propose_posit(&mut self, active: &[Participant]) {
// To ensure there is no contention between different nodes we are only using triples
// that we proposed. This way in a non-BFT environment we are guaranteed to never try
// that we own. This way in a non-BFT environment we are guaranteed to never try
// to use the same triple as any other node.
// TODO: have all this part be a separate task such that finding a pair of triples is done in parallel instead
// of waiting for storage to respond here.
Expand Down Expand Up @@ -554,9 +554,9 @@ impl PresignatureSpawner {
timeout: Duration,
) -> Result<(), InitializationError> {
let (owner, triples) = match positor {
Positor::Proposer(proposer, triples) => (proposer, PendingTriples::Available(triples)),
Positor::Deliberator(proposer) => (
proposer,
Positor::Proposer(owner, triples) => (owner, PendingTriples::Available(triples)),
Positor::Deliberator(owner) => (
owner,
PendingTriples::InStorage(id.pair_id, self.triples.clone()),
),
};
Expand Down
53 changes: 21 additions & 32 deletions chain-signatures/node/src/protocol/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use mpc_contract::config::ProtocolConfig;
use cait_sith::protocol::{Action, InitializationError, Participant};
use cait_sith::triples::{TriplePub, TripleShare};
use chrono::Utc;
use highway::{HighwayHash, HighwayHasher};
use k256::elliptic_curve::group::GroupEncoding;
use k256::Secp256k1;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
Expand All @@ -40,7 +38,7 @@ pub struct Triple {
struct TripleGenerator {
id: TripleId,
me: Participant,
proposer: Participant,
owner: Participant,
participants: Vec<Participant>,
/// Option to temporarily move it to a blocking task. Must be Some in all
/// other circumstances.
Expand All @@ -59,7 +57,7 @@ impl TripleGenerator {
pub async fn new(
id: TripleId,
me: Participant,
proposer: Participant,
owner: Participant,
threshold: usize,
participants: &[Participant],
timeout: Duration,
Expand All @@ -73,8 +71,7 @@ impl TripleGenerator {
let _ = _node_account_id;

let mut participants = participants.to_vec();
// Participants can be out of order, so let's sort them before doing anything. Critical
// for the triple_is_mine check:
// Participants can be out of order, so let's sort them before doing anything.
participants.sort();

let protocol =
Expand All @@ -84,7 +81,7 @@ impl TripleGenerator {
Ok(Self {
id,
me,
proposer,
owner,
participants,
protocol: Some(Box::new(protocol)),
timeout,
Expand Down Expand Up @@ -142,7 +139,7 @@ impl TripleGenerator {
}
Err(err) => {
crate::metrics::protocols::TRIPLE_GENERATOR_FAILURES.inc();
if self.proposer == self.me {
if self.owner == self.me {
crate::metrics::protocols::TRIPLE_GENERATOR_OWNED_FAILURES.inc();
}
tracing::warn!(
Expand All @@ -159,7 +156,7 @@ impl TripleGenerator {
Ok(action) => action,
Err(err) => {
crate::metrics::protocols::TRIPLE_GENERATOR_FAILURES.inc();
if self.proposer == self.me {
if self.owner == self.me {
crate::metrics::protocols::TRIPLE_GENERATOR_OWNED_FAILURES.inc();
}
tracing::warn!(
Expand All @@ -185,7 +182,7 @@ impl TripleGenerator {
// Wait for the next set of messages to arrive.
let Some(msg) = self.recv().await else {
crate::metrics::protocols::TRIPLE_GENERATOR_FAILURES.inc();
if self.proposer == self.me {
if self.owner == self.me {
crate::metrics::protocols::TRIPLE_GENERATOR_OWNED_FAILURES.inc();
}
break;
Expand Down Expand Up @@ -250,20 +247,12 @@ impl TripleGenerator {
public: second.1.clone(),
};

// For simplicity, assign both triples to the same owner based on the first triple
let triple_owner = {
let big_c = first.public.big_c;
let entropy = HighwayHasher::default().hash64(&big_c.to_bytes()) as usize;
let num_participants = self.participants.len();
self.participants[entropy % num_participants]
};
let pair_is_mine = triple_owner == self.me;
let pair_is_mine = self.owner == self.me;

tracing::debug!(
id = ?self.id,
me = ?self.me,
proposer = ?self.proposer,
?triple_owner,
owner = ?self.owner,
pair_is_mine,
participants = ?self.participants,
big_a0 = ?first.public.big_a.to_base58(),
Expand All @@ -272,7 +261,7 @@ impl TripleGenerator {
"completed triple pair generation"
);

if self.proposer == self.me {
if pair_is_mine {
crate::metrics::protocols::NUM_TOTAL_HISTORICAL_TRIPLE_GENERATIONS_OWNED_SUCCESS.inc();
}
let pair = TriplePair {
Expand All @@ -281,7 +270,7 @@ impl TripleGenerator {
triple1: second,
holders: Some(self.participants.clone()),
};
self.slot.insert(pair, triple_owner).await;
self.slot.insert(pair, self.owner).await;
break;
}
}
Expand Down Expand Up @@ -320,8 +309,8 @@ pub struct TripleSpawner {
/// through max introduction and concurrent generation in the system.
ongoing: JoinMap<TripleId, ()>,

/// The set of ongoing triples that were introduced to the system by the current node.
ongoing_introduced: HashSet<TripleId>,
/// The set of ongoing triples that are owned by the current node.
ongoing_owned: HashSet<TripleId>,

/// The protocol posits that are currently in progress.
posits: Posits<TripleId, ()>,
Expand All @@ -342,7 +331,7 @@ impl fmt::Debug for TripleSpawner {
.field("me", &self.me)
.field("threshold", &self.threshold)
.field("epoch", &self.epoch)
.field("ongoing_introduced", &self.ongoing_introduced)
.field("ongoing_owned", &self.ongoing_owned)
.finish()
}
}
Expand All @@ -367,7 +356,7 @@ impl TripleSpawner {
epoch,
triple_storage: storage.clone(),
ongoing: JoinMap::new(),
ongoing_introduced: HashSet::new(),
ongoing_owned: HashSet::new(),
posits: Posits::new(me),
msg,
node_account_id,
Expand Down Expand Up @@ -402,7 +391,7 @@ impl TripleSpawner {
}

pub fn len_introduced(&self) -> usize {
self.posits.len_proposed() + self.ongoing_introduced.len()
self.posits.len_proposed() + self.ongoing_owned.len()
}

/// Returns the number of unspent triples we will have in the manager once
Expand Down Expand Up @@ -502,14 +491,14 @@ impl TripleSpawner {
)
.await;
}
self.ongoing_introduced.insert(id);
self.ongoing_owned.insert(id);
}

if let Err(err) = self
.generate_with_id(id, &participants, positor.id(), timeout)
.await
{
self.ongoing_introduced.remove(&id);
self.ongoing_owned.remove(&id);
tracing::warn!(
id,
?participants,
Expand All @@ -524,7 +513,7 @@ impl TripleSpawner {
&mut self,
id: TripleId,
participants: &[Participant],
proposer: Participant,
owner: Participant,
timeout: Duration,
) -> Result<(), InitializationError> {
// Check if the `id` is already in the system. Error out and have the next cycle try again.
Expand All @@ -538,7 +527,7 @@ impl TripleSpawner {
let generator = TripleGenerator::new(
id,
self.me,
proposer,
owner,
self.threshold,
participants,
timeout,
Expand Down Expand Up @@ -615,7 +604,7 @@ impl TripleSpawner {
id
}
};
self.ongoing_introduced.remove(&id);
self.ongoing_owned.remove(&id);
let _ = ongoing_gen_tx.send(self.ongoing.len());
}
_ = stockpile_interval.tick(), if active.len() >= self.threshold => {
Expand Down
Loading