From cafecbbb25665959d1c23259b4e21c6c70dfc1d5 Mon Sep 17 00:00:00 2001 From: Phuong Date: Fri, 13 Mar 2026 01:04:40 +0900 Subject: [PATCH 1/5] Add fix for backpressure on posit --- .../node/src/protocol/message/mod.rs | 92 ++++++++++++++++--- .../node/src/protocol/message/sub.rs | 66 ++++++++++--- .../node/src/protocol/signature.rs | 14 +-- 3 files changed, 141 insertions(+), 31 deletions(-) diff --git a/chain-signatures/node/src/protocol/message/mod.rs b/chain-signatures/node/src/protocol/message/mod.rs index e580b2931..14a520df4 100644 --- a/chain-signatures/node/src/protocol/message/mod.rs +++ b/chain-signatures/node/src/protocol/message/mod.rs @@ -2,7 +2,7 @@ mod filter; mod sub; mod types; -pub use sub::Subscriber; +pub use sub::{Subscriber, MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE}; use crate::protocol::message::sub::{ SubscribeId, SubscribeRequest, SubscribeRequestAction, SubscribeResponse, @@ -87,11 +87,17 @@ impl MessageInbox { resharing: Subscriber::unsubscribed(), ready: Subscriber::unsubscribed(), triple: HashMap::new(), - triple_init: Subscriber::unsubscribed(), + triple_init: Subscriber::unsubscribed_with_capacity( + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), presignature: HashMap::new(), - presignature_init: Subscriber::unsubscribed(), + presignature_init: Subscriber::unsubscribed_with_capacity( + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), signature: HashMap::new(), - signature_init: Subscriber::unsubscribed(), + signature_init: Subscriber::unsubscribed_with_capacity( + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), } } @@ -101,26 +107,24 @@ impl MessageInbox { PositProtocolId::Triple(id) => { let _ = self .triple_init - .send((id, message.from, message.action)) - .await; + .try_send_lossy((id, message.from, message.action), "triple_init"); } PositProtocolId::Presignature(id) => { let _ = self .presignature_init - .send((id, message.from, message.action)) - .await; + .try_send_lossy((id, message.from, message.action), "presignature_init"); } PositProtocolId::Signature(sign_id, presignature_id, round) => { - let _ = self - .signature_init - .send(( + let _ = self.signature_init.try_send_lossy( + ( sign_id, presignature_id, round, message.from, message.action, - )) - .await; + ), + "signature_init", + ); } }, Message::Generating(message) => { @@ -1470,4 +1474,66 @@ mod tests { inbox.abort(); } + + #[tokio::test] + async fn test_signature_posit_backpressure_does_not_block_ready_messages() { + let (_inbox_tx, inbox_rx) = mpsc::channel(1); + let (_filter_tx, filter_rx) = mpsc::channel(1); + let (_subscribe_tx, subscribe_rx) = mpsc::channel(1); + let mut inbox = MessageInbox::new(inbox_rx, filter_rx, subscribe_rx); + + inbox.signature_init = sub::Subscriber::unsubscribed_with_capacity(1); + + let (signature_req, signature_resp) = + sub::SubscribeRequest::subscribe(sub::SubscribeId::Signatures); + inbox.process_subscribe(signature_req); + let mut signature_posit_rx = match signature_resp.await.unwrap() { + sub::SubscribeResponse::SignaturePosit(rx) => rx, + _ => panic!("expected signature posit subscription"), + }; + + let (ready_req, ready_resp) = sub::SubscribeRequest::subscribe(sub::SubscribeId::Ready); + inbox.process_subscribe(ready_req); + let mut ready_rx = match ready_resp.await.unwrap() { + sub::SubscribeResponse::Ready(rx) => rx, + _ => panic!("expected ready subscription"), + }; + + let sign_id = SignId::new([9; 32]); + let from = Participant::from(0); + let mut messages = Vec::with_capacity(sub::MAX_MESSAGE_SUB_CHANNEL_SIZE + 2); + for round in 0..=sub::MAX_MESSAGE_SUB_CHANNEL_SIZE { + messages.push(Message::Posit(PositMessage { + id: PositProtocolId::Signature(sign_id, 77, round), + from, + action: PositAction::Accept, + })); + } + messages.push(Message::Ready(ReadyMessage { + epoch: 1, + from, + nonce: 1, + token: 1, + })); + + let publish_result = + tokio::time::timeout(Duration::from_millis(100), inbox.publish(messages)).await; + + assert!( + publish_result.is_ok(), + "signature posit backpressure should not stall unrelated inbox messages" + ); + + let ready_message = tokio::time::timeout(Duration::from_millis(100), ready_rx.recv()) + .await + .expect("ready message should not be blocked by signature posit backlog") + .expect("ready subscription unexpectedly closed"); + assert_eq!(ready_message.epoch, 1); + + let first_signature_posit = signature_posit_rx + .recv() + .await + .expect("signature posit subscription unexpectedly closed"); + assert_eq!(first_signature_posit.0, sign_id); + } } diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index e947dea18..bd0b5d581 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -13,6 +13,7 @@ use crate::protocol::triple::TripleId; /// This should be enough to hold a few messages in the inbox. pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; +pub const MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE: usize = 1 << 24; pub enum SubscribeId { Generating, @@ -73,20 +74,28 @@ pub enum Subscriber { /// way to convert from an Unsubscribed to a Subscribed subscription. Unknown, /// A subscribed channel where the subscriber has a handle to the receiver. - Subscribed(mpsc::Sender), + Subscribed(mpsc::Sender, usize), /// An unsubscribed channel where there's potentially messages that have yet to be sent. - Unsubscribed(mpsc::Sender, mpsc::Receiver), + Unsubscribed(mpsc::Sender, mpsc::Receiver, usize), } impl Subscriber { pub fn subscribed() -> (Self, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(MAX_MESSAGE_SUB_CHANNEL_SIZE); - (Self::Subscribed(tx), rx) + Self::subscribed_with_capacity(MAX_MESSAGE_SUB_CHANNEL_SIZE) + } + + pub fn subscribed_with_capacity(capacity: usize) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(capacity); + (Self::Subscribed(tx, capacity), rx) } pub fn unsubscribed() -> Self { - let (tx, rx) = mpsc::channel(MAX_MESSAGE_SUB_CHANNEL_SIZE); - Self::Unsubscribed(tx, rx) + Self::unsubscribed_with_capacity(MAX_MESSAGE_SUB_CHANNEL_SIZE) + } + + pub fn unsubscribed_with_capacity(capacity: usize) -> Self { + let (tx, rx) = mpsc::channel(capacity); + Self::Unsubscribed(tx, rx, capacity) } /// Convert this subscriber into a subscribed one, returning the receiver. @@ -94,8 +103,8 @@ impl Subscriber { pub fn subscribe(&mut self) -> mpsc::Receiver { let sub = std::mem::replace(self, Self::Unknown); let (sub, rx) = match sub { - Self::Subscribed(_) | Self::Unknown => Self::subscribed(), - Self::Unsubscribed(tx, rx) => (Self::Subscribed(tx), rx), + Self::Subscribed(_, _) | Self::Unknown => Self::subscribed(), + Self::Unsubscribed(tx, rx, capacity) => (Self::Subscribed(tx, capacity), rx), }; *self = sub; rx @@ -103,15 +112,50 @@ impl Subscriber { /// Unsubscribe from the subscriber, converting it into an unsubscribed one. pub fn unsubscribe(&mut self) { - if matches!(self, Self::Subscribed(_) | Self::Unknown) { + if matches!(self, Self::Unknown) { *self = Self::unsubscribed(); + return; + } + + let capacity = self.capacity(); + if matches!(self, Self::Subscribed(_, _)) { + *self = Self::unsubscribed_with_capacity(capacity); + } + } + + fn capacity(&self) -> usize { + match self { + Self::Subscribed(_, capacity) | Self::Unsubscribed(_, _, capacity) => *capacity, + Self::Unknown => MAX_MESSAGE_SUB_CHANNEL_SIZE, } } pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { match self { - Self::Subscribed(tx) => tx.send(msg).await, - Self::Unsubscribed(tx, _) => tx.send(msg).await, + Self::Subscribed(tx, _) => tx.send(msg).await, + Self::Unsubscribed(tx, _, _) => tx.send(msg).await, + Self::Unknown => Ok(()), + } + } + + pub fn try_send_lossy( + &self, + msg: T, + name: &'static str, + ) -> Result<(), mpsc::error::SendError> { + match self { + Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => match tx.try_send(msg) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + subscriber = name, + capacity = self.capacity(), + "dropping message because subscriber channel is full" + ); + Ok(()) + } + Err(mpsc::error::TrySendError::Closed(msg)) => Err(mpsc::error::SendError(msg)), + }, Self::Unknown => Ok(()), } } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index dfcac4e02..c1a2c4d61 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1206,17 +1206,17 @@ impl SignatureSpawner { if from == self.me { return; } - let _ = self - .inboxes - .entry(sign_id) - .or_default() - .send(SignTaskMessage::PositMessage { + if let Err(err) = self.inboxes.entry(sign_id).or_default().try_send_lossy( + SignTaskMessage::PositMessage { presignature_id, round, from, action, - }) - .await; + }, + "sign_task_posit", + ) { + tracing::error!(?err, ?sign_id, "failed to send posit message"); + } } fn handle_completion(&mut self, sign_id: SignId) { From 94ccac4eea31cad2c57cc0dec1d45d74310aff70 Mon Sep 17 00:00:00 2001 From: Phuong Date: Fri, 13 Mar 2026 01:14:42 +0900 Subject: [PATCH 2/5] Use try_send for regular message inbox --- .../node/src/protocol/message/mod.rs | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/chain-signatures/node/src/protocol/message/mod.rs b/chain-signatures/node/src/protocol/message/mod.rs index 14a520df4..17232b485 100644 --- a/chain-signatures/node/src/protocol/message/mod.rs +++ b/chain-signatures/node/src/protocol/message/mod.rs @@ -101,7 +101,7 @@ impl MessageInbox { } } - async fn send(&mut self, message: Message) { + fn send(&mut self, message: Message) { match message { Message::Posit(message) => match message.id { PositProtocolId::Triple(id) => { @@ -128,13 +128,13 @@ impl MessageInbox { } }, Message::Generating(message) => { - let _ = self.generating.send(message).await; + let _ = self.generating.try_send_lossy(message, "generating"); } Message::Resharing(message) => { - let _ = self.resharing.send(message).await; + let _ = self.resharing.try_send_lossy(message, "resharing"); } Message::Ready(message) => { - let _ = self.ready.send(message).await; + let _ = self.ready.try_send_lossy(message, "ready"); } Message::Triple(message) => { // NOTE: not logging the error because this is simply just channel closure. @@ -143,24 +143,21 @@ impl MessageInbox { .triple .entry(message.id) .or_default() - .send(message) - .await; + .try_send_lossy(message, "triple"); } Message::Presignature(message) => { let _ = self .presignature .entry(message.id) .or_default() - .send(message) - .await; + .try_send_lossy(message, "presignature"); } Message::Signature(message) => { let _ = self .signature .entry((message.id, message.presignature_id)) .or_default() - .send(message) - .await; + .try_send_lossy(message, "signature"); } Message::Unknown(entries) => { tracing::warn!( @@ -223,9 +220,9 @@ impl MessageInbox { } /// Publish messages to subscribers - async fn publish(&mut self, messages: Vec) { + fn publish(&mut self, messages: Vec) { for message in messages { - self.send(message).await; + self.send(message); } } @@ -362,7 +359,7 @@ impl MessageInbox { let messages = self.filter(messages); let messages_len = messages.len(); - self.publish(messages).await; + self.publish(messages); crate::metrics::messaging::NUM_RECEIVED_ENCRYPTED_TOTAL .inc_by(messages_len as f64); @@ -1030,12 +1027,17 @@ mod tests { use mpc_keys::hpke::{self, Ciphered}; use mpc_primitives::SignId; use serde::{de::DeserializeOwned, Deserialize, Serialize}; + use tokio::sync::mpsc; use crate::{ config::{Config, LocalConfig, NetworkConfig, OverrideConfig}, protocol::{ contract::primitives::{ParticipantMap, Participants}, - message::{GeneratingMessage, Message, SignatureMessage, SignedMessage, TripleMessage}, + message::{ + sub, GeneratingMessage, Message, MessageInbox, PositMessage, PositProtocolId, + ReadyMessage, SignatureMessage, SignedMessage, TripleMessage, + }, + posit::PositAction, ParticipantInfo, }, rpc::ContractStateWatcher, @@ -1516,14 +1518,7 @@ mod tests { token: 1, })); - let publish_result = - tokio::time::timeout(Duration::from_millis(100), inbox.publish(messages)).await; - - assert!( - publish_result.is_ok(), - "signature posit backpressure should not stall unrelated inbox messages" - ); - + inbox.publish(messages); let ready_message = tokio::time::timeout(Duration::from_millis(100), ready_rx.recv()) .await .expect("ready message should not be blocked by signature posit backlog") From 70f1c40853ba9ea093c7b6478b456c80685df70c Mon Sep 17 00:00:00 2001 From: Phuong Date: Mon, 30 Mar 2026 22:41:26 +0900 Subject: [PATCH 3/5] Add metrics for message channels --- .../node/src/metrics/messaging.rs | 21 +- .../node/src/protocol/message/filter.rs | 28 ++- .../node/src/protocol/message/mod.rs | 207 ++++++++++++++---- .../node/src/protocol/message/sub.rs | 163 ++++++++++---- .../node/src/protocol/signature.rs | 29 ++- chain-signatures/node/src/web/mod.rs | 4 +- 6 files changed, 358 insertions(+), 94 deletions(-) diff --git a/chain-signatures/node/src/metrics/messaging.rs b/chain-signatures/node/src/metrics/messaging.rs index 6439fa2d0..ef6d52fbe 100644 --- a/chain-signatures/node/src/metrics/messaging.rs +++ b/chain-signatures/node/src/metrics/messaging.rs @@ -1,6 +1,6 @@ use std::sync::LazyLock; -use prometheus::{exponential_buckets, Counter, Histogram, HistogramVec}; +use prometheus::{exponential_buckets, Counter, Histogram, HistogramVec, IntGaugeVec}; use super::{ try_create_counter_vec_with_node_account_id, try_create_histogram_vec_with_node_account_id, @@ -78,3 +78,22 @@ pub(crate) static WEB_ENDPOINT_LATENCY: LazyLock = LazyLock::new(| ) .unwrap() }); + +pub(crate) static CHANNEL_QUEUE_SIZE: LazyLock = LazyLock::new(|| { + super::try_create_int_gauge_vec_with_node_account_id( + "multichain_message_channel_queue_size", + "Estimated number of buffered messages queued per message channel", + &["channel", "channel_id"], + ) + .unwrap() +}); + +pub(crate) fn set_channel_queue_size(channel: &str, channel_id: &str, len: usize) { + CHANNEL_QUEUE_SIZE + .with_label_values(&[channel, channel_id]) + .set(len as i64); +} + +pub(crate) fn remove_channel_queue_size(channel: &str, channel_id: &str) { + let _ = CHANNEL_QUEUE_SIZE.remove_label_values(&[channel, channel_id]); +} diff --git a/chain-signatures/node/src/protocol/message/filter.rs b/chain-signatures/node/src/protocol/message/filter.rs index b98aa6255..e6605d0ac 100644 --- a/chain-signatures/node/src/protocol/message/filter.rs +++ b/chain-signatures/node/src/protocol/message/filter.rs @@ -13,13 +13,18 @@ pub const MAX_FILTER_SIZE: NonZeroUsize = NonZeroUsize::new(64 * 1024).unwrap(); #[derive(Debug)] pub(crate) struct MessageFilter { + filter_tx: mpsc::Sender<(Protocols, u64)>, filter_rx: mpsc::Receiver<(Protocols, u64)>, filter: lru::LruCache<(Protocols, u64), ()>, } impl MessageFilter { - pub fn new(filter_rx: mpsc::Receiver<(Protocols, u64)>) -> Self { + pub fn new( + filter_tx: mpsc::Sender<(Protocols, u64)>, + filter_rx: mpsc::Receiver<(Protocols, u64)>, + ) -> Self { Self { + filter_tx, filter_rx, filter: lru::LruCache::new(MAX_FILTER_SIZE), } @@ -37,15 +42,26 @@ impl MessageFilter { }; self.filter.put((msg_type, id), ()); + crate::metrics::messaging::set_channel_queue_size( + "filter", + "singleton", + self.filter_tx.max_capacity() - self.filter_tx.capacity(), + ); } pub fn try_update(&mut self) { - loop { - let (msg_type, id) = match self.filter_rx.try_recv() { - Ok(filter) => filter, - Err(TryRecvError::Empty | TryRecvError::Disconnected) => return, - }; + let mut updated = false; + while let Ok((msg_type, id)) = self.filter_rx.try_recv() { self.filter.put((msg_type, id), ()); + updated = true; + } + + if updated { + crate::metrics::messaging::set_channel_queue_size( + "filter", + "singleton", + self.filter_tx.max_capacity() - self.filter_tx.capacity(), + ); } } diff --git a/chain-signatures/node/src/protocol/message/mod.rs b/chain-signatures/node/src/protocol/message/mod.rs index 17232b485..a83506412 100644 --- a/chain-signatures/node/src/protocol/message/mod.rs +++ b/chain-signatures/node/src/protocol/message/mod.rs @@ -40,6 +40,36 @@ pub const MAX_MESSAGE_INCOMING: usize = 1024 * 1024; pub const MAX_MESSAGE_OUTGOING: usize = 1024 * 1024; pub const MAX_OUTBOX_PAYLOAD_LIMIT: usize = 256 * 1024; +const SINGLETON_CHANNEL_ID: &str = "singleton"; + +fn estimated_channel_queue_len(tx: &mpsc::Sender) -> usize { + tx.max_capacity() - tx.capacity() +} + +fn report_channel_queue_len(name: &'static str, channel_id: &str, tx: &mpsc::Sender) { + crate::metrics::messaging::set_channel_queue_size( + name, + channel_id, + estimated_channel_queue_len(tx), + ); +} + +fn report_singleton_channel_queue_len(name: &'static str, tx: &mpsc::Sender) { + report_channel_queue_len(name, SINGLETON_CHANNEL_ID, tx); +} + +fn triple_channel_id(id: TripleId) -> String { + id.to_string() +} + +fn presignature_channel_id(id: PresignatureId) -> String { + id.to_string() +} + +fn signature_channel_id(sign_id: SignId, presignature_id: PresignatureId) -> String { + format!("{}:{}", presignature_id, hex::encode(sign_id.request_id)) +} + pub struct MessageInbox { /// encrypted messages that are pending to be decrypted. These are messages that we received /// from other nodes that weren't able to be processed yet due to missing info such as the @@ -55,9 +85,11 @@ pub struct MessageInbox { filter: MessageFilter, /// Incoming messages that are pending to be processed. These are encrypted and signed. + inbox_tx: mpsc::Sender, inbox_rx: mpsc::Receiver, /// Subscription requests from MessageChannel + subscribe_tx: mpsc::Sender, subscribe_rx: mpsc::Receiver, generating: Subscriber, @@ -73,29 +105,40 @@ pub struct MessageInbox { impl MessageInbox { pub fn new( + inbox_tx: mpsc::Sender, inbox_rx: mpsc::Receiver, + filter_tx: mpsc::Sender<(Protocols, u64)>, filter_rx: mpsc::Receiver<(Protocols, u64)>, + subscribe_tx: mpsc::Sender, subscribe_rx: mpsc::Receiver, ) -> Self { Self { try_decrypt: VecDeque::new(), idempotent: lru::LruCache::new(MAX_FILTER_SIZE), - filter: MessageFilter::new(filter_rx), + filter: MessageFilter::new(filter_tx, filter_rx), + inbox_tx, inbox_rx, + subscribe_tx, subscribe_rx, - generating: Subscriber::unsubscribed(), - resharing: Subscriber::unsubscribed(), - ready: Subscriber::unsubscribed(), + generating: Subscriber::unsubscribed("generating", SINGLETON_CHANNEL_ID), + resharing: Subscriber::unsubscribed("resharing", SINGLETON_CHANNEL_ID), + ready: Subscriber::unsubscribed("ready", SINGLETON_CHANNEL_ID), triple: HashMap::new(), triple_init: Subscriber::unsubscribed_with_capacity( + "triple_init", + SINGLETON_CHANNEL_ID, sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, ), presignature: HashMap::new(), presignature_init: Subscriber::unsubscribed_with_capacity( + "presignature_init", + SINGLETON_CHANNEL_ID, sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, ), signature: HashMap::new(), signature_init: Subscriber::unsubscribed_with_capacity( + "signature_init", + SINGLETON_CHANNEL_ID, sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, ), } @@ -105,14 +148,12 @@ impl MessageInbox { match message { Message::Posit(message) => match message.id { PositProtocolId::Triple(id) => { - let _ = self - .triple_init - .try_send_lossy((id, message.from, message.action), "triple_init"); + let _ = self.triple_init.try_send_lossy((id, message.from, message.action)); } PositProtocolId::Presignature(id) => { let _ = self .presignature_init - .try_send_lossy((id, message.from, message.action), "presignature_init"); + .try_send_lossy((id, message.from, message.action)); } PositProtocolId::Signature(sign_id, presignature_id, round) => { let _ = self.signature_init.try_send_lossy( @@ -123,18 +164,17 @@ impl MessageInbox { message.from, message.action, ), - "signature_init", ); } }, Message::Generating(message) => { - let _ = self.generating.try_send_lossy(message, "generating"); + let _ = self.generating.try_send_lossy(message); } Message::Resharing(message) => { - let _ = self.resharing.try_send_lossy(message, "resharing"); + let _ = self.resharing.try_send_lossy(message); } Message::Ready(message) => { - let _ = self.ready.try_send_lossy(message, "ready"); + let _ = self.ready.try_send_lossy(message); } Message::Triple(message) => { // NOTE: not logging the error because this is simply just channel closure. @@ -142,22 +182,29 @@ impl MessageInbox { let _ = self .triple .entry(message.id) - .or_default() - .try_send_lossy(message, "triple"); + .or_insert_with(|| Subscriber::unsubscribed("triple", triple_channel_id(message.id))) + .try_send_lossy(message); } Message::Presignature(message) => { let _ = self .presignature .entry(message.id) - .or_default() - .try_send_lossy(message, "presignature"); + .or_insert_with(|| { + Subscriber::unsubscribed("presignature", presignature_channel_id(message.id)) + }) + .try_send_lossy(message); } Message::Signature(message) => { let _ = self .signature .entry((message.id, message.presignature_id)) - .or_default() - .try_send_lossy(message, "signature"); + .or_insert_with(|| { + Subscriber::unsubscribed( + "signature", + signature_channel_id(message.id, message.presignature_id), + ) + }) + .try_send_lossy(message); } Message::Unknown(entries) => { tracing::warn!( @@ -239,6 +286,7 @@ impl MessageInbox { SubscribeId::Generating => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.generating.subscribe(); + self.generating.report_queue_len(); let _ = resp.send(SubscribeResponse::Generating(rx)); } SubscribeRequestAction::Unsubscribe => { @@ -248,6 +296,7 @@ impl MessageInbox { SubscribeId::Resharing => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.resharing.subscribe(); + self.resharing.report_queue_len(); let _ = resp.send(SubscribeResponse::Resharing(rx)); } SubscribeRequestAction::Unsubscribe => { @@ -256,22 +305,35 @@ impl MessageInbox { }, SubscribeId::Triple(id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self.triple.entry(id).or_default().subscribe(); + let sub = self + .triple + .entry(id) + .or_insert_with(|| Subscriber::unsubscribed("triple", triple_channel_id(id))); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Triple(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.triple.remove(&id).is_none() { + if let Some(sub) = self.triple.remove(&id) { + sub.clear_queue_len_metric(); + } else { tracing::warn!(id, "trying to unsub from an unknown triple subscription"); } } }, SubscribeId::Presignature(id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self.presignature.entry(id).or_default().subscribe(); + let sub = self.presignature.entry(id).or_insert_with(|| { + Subscriber::unsubscribed("presignature", presignature_channel_id(id)) + }); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Presignature(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.presignature.remove(&id).is_none() { + if let Some(sub) = self.presignature.remove(&id) { + sub.clear_queue_len_metric(); + } else { tracing::warn!( id, "trying to unsub from an unknown presignature subscription" @@ -281,15 +343,20 @@ impl MessageInbox { }, SubscribeId::Signature(sign_id, presignature_id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self - .signature - .entry((sign_id, presignature_id)) - .or_default() - .subscribe(); + let sub = self.signature.entry((sign_id, presignature_id)).or_insert_with(|| { + Subscriber::unsubscribed( + "signature", + signature_channel_id(sign_id, presignature_id), + ) + }); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Signature(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.signature.remove(&(sign_id, presignature_id)).is_none() { + if let Some(sub) = self.signature.remove(&(sign_id, presignature_id)) { + sub.clear_queue_len_metric(); + } else { tracing::warn!( ?sign_id, ?presignature_id, @@ -301,37 +368,45 @@ impl MessageInbox { SubscribeId::Ready => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.ready.subscribe(); + self.ready.report_queue_len(); let _ = resp.send(SubscribeResponse::Ready(rx)); } SubscribeRequestAction::Unsubscribe => { self.ready.unsubscribe(); + self.ready.report_queue_len(); } }, SubscribeId::Triples => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.triple_init.subscribe(); + self.triple_init.report_queue_len(); let _ = resp.send(SubscribeResponse::TriplePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.triple_init.unsubscribe(); + self.triple_init.report_queue_len(); } }, SubscribeId::Presignatures => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.presignature_init.subscribe(); + self.presignature_init.report_queue_len(); let _ = resp.send(SubscribeResponse::PresignaturePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.presignature_init.unsubscribe(); + self.presignature_init.report_queue_len(); } }, SubscribeId::Signatures => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.signature_init.subscribe(); + self.signature_init.report_queue_len(); let _ = resp.send(SubscribeResponse::SignaturePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.signature_init.unsubscribe(); + self.signature_init.report_queue_len(); } }, } @@ -342,9 +417,11 @@ impl MessageInbox { tokio::select! { _ = self.filter.update() => {} Some(sub) = self.subscribe_rx.recv() => { + report_singleton_channel_queue_len("subscribe", &self.subscribe_tx); self.process_subscribe(sub); } Some(encrypted) = self.inbox_rx.recv() => { + report_singleton_channel_queue_len("incoming", &self.inbox_tx); let config = config.borrow().clone(); let expiration = Duration::from_millis(config.protocol.message_timeout); let participants = contract.participant_map().await; @@ -383,8 +460,15 @@ impl MessageChannel { let (outbox_tx, outbox_rx) = mpsc::channel(MAX_MESSAGE_OUTGOING); let (filter_tx, filter_rx) = mpsc::channel(MAX_FILTER_SIZE.into()); let (subscribe_tx, subscribe_rx) = mpsc::channel(16384); - let inbox = MessageInbox::new(inbox_rx, filter_rx, subscribe_rx); - let outbox = MessageOutbox::new(outbox_rx); + let inbox = MessageInbox::new( + inbox_tx.clone(), + inbox_rx, + filter_tx.clone(), + filter_rx, + subscribe_tx.clone(), + subscribe_rx, + ); + let outbox = MessageOutbox::new(outbox_tx.clone(), outbox_rx); let channel = Self { inbox: inbox_tx, @@ -393,6 +477,11 @@ impl MessageChannel { filter: filter_tx, }; + report_singleton_channel_queue_len("incoming", &channel.inbox); + report_singleton_channel_queue_len("outgoing", &channel.outgoing); + report_singleton_channel_queue_len("filter", &channel.filter); + report_singleton_channel_queue_len("subscribe", &channel.subscribe); + (inbox, outbox, channel) } @@ -416,6 +505,16 @@ impl MessageChannel { .await { tracing::error!(?err, "outbox: failed to send message to participants"); + } else { + report_singleton_channel_queue_len("outgoing", &self.outgoing); + } + } + + pub async fn receive(&self, encrypted: Ciphered) { + if let Err(err) = self.inbox.send(encrypted).await { + tracing::error!(?err, "failed to forward an encrypted protocol message"); + } else { + report_singleton_channel_queue_len("incoming", &self.inbox); } } @@ -424,18 +523,24 @@ impl MessageChannel { pub async fn filter(&self, msg: &M) { if let Err(err) = self.filter.send((M::PROTOCOL, msg.id())).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } pub async fn filter_triple(&self, id: TripleId) { if let Err(err) = self.filter.send((Protocols::Triple, id)).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } pub async fn filter_presignature(&self, id: PresignatureId) { if let Err(err) = self.filter.send((Protocols::Presignature, id)).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } @@ -448,6 +553,7 @@ impl MessageChannel { if self.subscribe.send(req).await.is_err() { return None; }; + report_singleton_channel_queue_len("subscribe", &self.subscribe); let Ok(subscription) = resp.await else { return None; }; @@ -476,6 +582,8 @@ impl MessageChannel { .is_err() { tracing::warn!(id, "unable to send unsubscribe request for triple message"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -506,6 +614,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for triple posits"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -537,6 +647,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for presignature"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -564,6 +676,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for presignature posits"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -611,6 +725,8 @@ impl MessageChannel { ?presignature_id, "unable to send unsubscribe request for signature" ); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -639,6 +755,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for signature posit"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -770,6 +888,7 @@ pub struct Partition { /// These messages will be signed and encrypted before being sent out. pub struct MessageOutbox { /// The messages that are pending to be sent to other nodes. + outbox_tx: mpsc::Sender, outbox_rx: mpsc::Receiver, // NOTE: we have FromParticipant here to circumvent the chance that we change Participant @@ -781,8 +900,9 @@ pub struct MessageOutbox { } impl MessageOutbox { - pub fn new(outbox_rx: mpsc::Receiver) -> Self { + pub fn new(outbox_tx: mpsc::Sender, outbox_rx: mpsc::Receiver) -> Self { Self { + outbox_tx, outbox_rx, messages: HashMap::new(), } @@ -931,6 +1051,7 @@ impl MessageOutbox { loop { tokio::select! { Some((msg, (from, to, timestamp))) = self.outbox_rx.recv() => { + report_singleton_channel_queue_len("outgoing", &self.outbox_tx); // add it to the outbox and sort it by from and to participant let entry = self.messages.entry((from, to)).or_default(); entry.push((msg, timestamp)); @@ -1021,6 +1142,8 @@ const fn cbor_name(value: &ciborium::Value) -> &'static str { #[cfg(test)] mod tests { + use super::SINGLETON_CHANNEL_ID; + use std::time::Duration; use cait_sith::protocol::Participant; @@ -1372,7 +1495,7 @@ mod tests { }), ]; let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.inbox.send(encrypted).await.unwrap(); + channel.receive(encrypted).await; let mut recv1 = channel.subscribe_triple(1).await; let mut recv2 = channel.subscribe_triple(2).await; @@ -1426,7 +1549,7 @@ mod tests { let mut recv3 = channel.subscribe_triple(3).await; channel.filter_triple(filter_id).await; - channel.inbox.send(encrypted).await.unwrap(); + channel.receive(encrypted).await; let (m1, m3) = match tokio::join!(recv1.recv(), recv3.recv()) { (Some(m1), Some(m3)) => (m1, m3), @@ -1451,7 +1574,7 @@ mod tests { // should be received by the subscribers. { let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.inbox.send(encrypted).await.unwrap(); + channel.receive(encrypted).await; let mut recv1 = tokio::time::timeout(Duration::from_millis(300), channel.subscribe_triple(1)) .await @@ -1479,12 +1602,16 @@ mod tests { #[tokio::test] async fn test_signature_posit_backpressure_does_not_block_ready_messages() { - let (_inbox_tx, inbox_rx) = mpsc::channel(1); - let (_filter_tx, filter_rx) = mpsc::channel(1); - let (_subscribe_tx, subscribe_rx) = mpsc::channel(1); - let mut inbox = MessageInbox::new(inbox_rx, filter_rx, subscribe_rx); - - inbox.signature_init = sub::Subscriber::unsubscribed_with_capacity(1); + let (inbox_tx, inbox_rx) = mpsc::channel(1); + let (filter_tx, filter_rx) = mpsc::channel(1); + let (subscribe_tx, subscribe_rx) = mpsc::channel(1); + let mut inbox = MessageInbox::new(inbox_tx, inbox_rx, filter_tx, filter_rx, subscribe_tx, subscribe_rx); + + inbox.signature_init = sub::Subscriber::unsubscribed_with_capacity( + "signature_init", + SINGLETON_CHANNEL_ID, + 1, + ); let (signature_req, signature_resp) = sub::SubscribeRequest::subscribe(sub::SubscribeId::Signatures); diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index bd0b5d581..d4b6b4a99 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -72,39 +72,74 @@ impl SubscribeRequest { pub enum Subscriber { /// Temporary/replaceable value, and will never be used. Only here so we can have a /// way to convert from an Unsubscribed to a Subscribed subscription. - Unknown, + Unknown(SubscriberMetrics), /// A subscribed channel where the subscriber has a handle to the receiver. - Subscribed(mpsc::Sender, usize), + Subscribed(mpsc::Sender, SubscriberMetrics), /// An unsubscribed channel where there's potentially messages that have yet to be sent. - Unsubscribed(mpsc::Sender, mpsc::Receiver, usize), + Unsubscribed(mpsc::Sender, mpsc::Receiver, SubscriberMetrics), +} + +#[derive(Clone)] +pub struct SubscriberMetrics { + name: &'static str, + channel_id: String, + capacity: usize, } impl Subscriber { - pub fn subscribed() -> (Self, mpsc::Receiver) { - Self::subscribed_with_capacity(MAX_MESSAGE_SUB_CHANNEL_SIZE) + pub fn subscribed(name: &'static str, channel_id: impl Into) -> (Self, mpsc::Receiver) { + Self::subscribed_with_capacity(name, channel_id, MAX_MESSAGE_SUB_CHANNEL_SIZE) + } + + pub fn subscribed_with_capacity( + name: &'static str, + channel_id: impl Into, + capacity: usize, + ) -> (Self, mpsc::Receiver) { + let metrics = SubscriberMetrics { + name, + channel_id: channel_id.into(), + capacity, + }; + Self::subscribed_with_metrics(metrics) + } + + fn subscribed_with_metrics(metrics: SubscriberMetrics) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(metrics.capacity); + (Self::Subscribed(tx, metrics), rx) } - pub fn subscribed_with_capacity(capacity: usize) -> (Self, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(capacity); - (Self::Subscribed(tx, capacity), rx) + pub fn unsubscribed(name: &'static str, channel_id: impl Into) -> Self { + Self::unsubscribed_with_capacity(name, channel_id, MAX_MESSAGE_SUB_CHANNEL_SIZE) } - pub fn unsubscribed() -> Self { - Self::unsubscribed_with_capacity(MAX_MESSAGE_SUB_CHANNEL_SIZE) + pub fn unsubscribed_with_capacity( + name: &'static str, + channel_id: impl Into, + capacity: usize, + ) -> Self { + let metrics = SubscriberMetrics { + name, + channel_id: channel_id.into(), + capacity, + }; + Self::unsubscribed_with_metrics(metrics) } - pub fn unsubscribed_with_capacity(capacity: usize) -> Self { - let (tx, rx) = mpsc::channel(capacity); - Self::Unsubscribed(tx, rx, capacity) + fn unsubscribed_with_metrics(metrics: SubscriberMetrics) -> Self { + let (tx, rx) = mpsc::channel(metrics.capacity); + Self::Unsubscribed(tx, rx, metrics) } /// Convert this subscriber into a subscribed one, returning the receiver. /// If the subscriber is already subscribed, it overrides the existing subscription. pub fn subscribe(&mut self) -> mpsc::Receiver { - let sub = std::mem::replace(self, Self::Unknown); + let sub = std::mem::replace(self, Self::Unknown(self.metrics().clone())); let (sub, rx) = match sub { - Self::Subscribed(_, _) | Self::Unknown => Self::subscribed(), - Self::Unsubscribed(tx, rx, capacity) => (Self::Subscribed(tx, capacity), rx), + Self::Subscribed(_, metrics) | Self::Unknown(metrics) => { + Self::subscribed_with_metrics(metrics) + } + Self::Unsubscribed(tx, rx, metrics) => (Self::Subscribed(tx, metrics), rx), }; *self = sub; rx @@ -112,43 +147,81 @@ impl Subscriber { /// Unsubscribe from the subscriber, converting it into an unsubscribed one. pub fn unsubscribe(&mut self) { - if matches!(self, Self::Unknown) { - *self = Self::unsubscribed(); + if matches!(self, Self::Unknown(_)) { + *self = Self::unsubscribed_with_metrics(self.metrics().clone()); return; } - let capacity = self.capacity(); if matches!(self, Self::Subscribed(_, _)) { - *self = Self::unsubscribed_with_capacity(capacity); + *self = Self::unsubscribed_with_metrics(self.metrics().clone()); } } - fn capacity(&self) -> usize { + pub fn estimated_queue_len(&self) -> usize { match self { - Self::Subscribed(_, capacity) | Self::Unsubscribed(_, _, capacity) => *capacity, - Self::Unknown => MAX_MESSAGE_SUB_CHANNEL_SIZE, + Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => tx.max_capacity() - tx.capacity(), + Self::Unknown(_) => 0, } } - pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { + pub fn report_queue_len(&self) { + let metrics = self.metrics(); + crate::metrics::messaging::set_channel_queue_size( + metrics.name, + &metrics.channel_id, + self.estimated_queue_len(), + ); + } + + fn metrics(&self) -> &SubscriberMetrics { match self { - Self::Subscribed(tx, _) => tx.send(msg).await, - Self::Unsubscribed(tx, _, _) => tx.send(msg).await, - Self::Unknown => Ok(()), + Self::Subscribed(_, metrics) + | Self::Unsubscribed(_, _, metrics) + | Self::Unknown(metrics) => metrics, } } - pub fn try_send_lossy( - &self, - msg: T, - name: &'static str, - ) -> Result<(), mpsc::error::SendError> { + fn capacity(&self) -> usize { + self.metrics().capacity + } + + pub fn channel_id(&self) -> &str { + &self.metrics().channel_id + } + + pub fn clear_queue_len_metric(&self) { + let metrics = self.metrics(); + crate::metrics::messaging::remove_channel_queue_size(metrics.name, &metrics.channel_id); + } + + pub fn subscriber_name(&self) -> &'static str { + self.metrics().name + } + + fn report_after_enqueue(&self) { + self.report_queue_len(); + } + + pub fn is_unknown(&self) -> bool { + matches!(self, Self::Unknown(_)) + } + + pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { match self { + Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => tx.send(msg).await, + Self::Unknown(_) => Ok(()), + } + } + + pub fn try_send_lossy(&self, msg: T) -> Result<(), mpsc::error::SendError> { + let result = match self { Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => match tx.try_send(msg) { Ok(()) => Ok(()), Err(mpsc::error::TrySendError::Full(_)) => { + let metrics = self.metrics(); tracing::warn!( - subscriber = name, + subscriber = metrics.name, + subscriber_id = metrics.channel_id, capacity = self.capacity(), "dropping message because subscriber channel is full" ); @@ -156,13 +229,27 @@ impl Subscriber { } Err(mpsc::error::TrySendError::Closed(msg)) => Err(mpsc::error::SendError(msg)), }, - Self::Unknown => Ok(()), - } + Self::Unknown(_) => Ok(()), + }; + + self.report_after_enqueue(); + result } } -impl Default for Subscriber { - fn default() -> Self { - Self::unsubscribed() +#[cfg(test)] +mod tests { + use super::Subscriber; + + #[test] + fn estimated_queue_len_tracks_buffered_messages() { + let sub = Subscriber::unsubscribed_with_capacity("test", "singleton", 4); + + assert_eq!(sub.estimated_queue_len(), 0); + + sub.try_send_lossy(1u8).unwrap(); + sub.try_send_lossy(2u8).unwrap(); + + assert_eq!(sub.estimated_queue_len(), 2); } } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 804dc4498..f4ed8f7ce 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1229,7 +1229,13 @@ impl SignatureSpawner { } // Subscribe to (or create) the posit inbox for this sign request - let rx = self.inboxes.entry(sign_id).or_default().subscribe(); + let rx = self + .inboxes + .entry(sign_id) + .or_insert_with(|| { + Subscriber::unsubscribed("sign_task_posit", hex::encode(sign_id.request_id)) + }) + .subscribe(); let task = SignTask { me: self.me, participants, @@ -1264,21 +1270,28 @@ impl SignatureSpawner { if from == self.me { return; } - if let Err(err) = self.inboxes.entry(sign_id).or_default().try_send_lossy( + if let Err(err) = self + .inboxes + .entry(sign_id) + .or_insert_with(|| { + Subscriber::unsubscribed("sign_task_posit", hex::encode(sign_id.request_id)) + }) + .try_send_lossy( SignTaskMessage::PositMessage { presignature_id, round, from, action, }, - "sign_task_posit", ) { tracing::error!(?err, ?sign_id, "failed to send posit message"); } } fn handle_completion(&mut self, sign_id: SignId) { - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "completion"); if self.tasks.abort(sign_id) { tracing::info!(?sign_id, "aborting signature task due to completion event"); @@ -1367,13 +1380,17 @@ impl SignatureSpawner { Ok(outcome) => outcome, Err(sign_id) => { tracing::warn!(?sign_id, "signature task interrupted"); - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "interruption"); continue; } }; - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "task completion"); match result { Ok(()) => { diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 5d84a09a6..127f4158c 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -133,9 +133,7 @@ async fn msg( for encrypted in encrypted.into_iter() { let msg_channel = state.msg_channel.clone(); tokio::spawn(async move { - if let Err(err) = msg_channel.inbox.send(encrypted).await { - tracing::error!(?err, "failed to forward an encrypted protocol message"); - } + msg_channel.receive(encrypted).await; }); } WEB_ENDPOINT_LATENCY From a40e0abae98567a795126a38c80591af52ab2693 Mon Sep 17 00:00:00 2001 From: Phuong Date: Mon, 30 Mar 2026 22:41:35 +0900 Subject: [PATCH 4/5] clippy --- .../node/src/protocol/message/filter.rs | 2 +- .../node/src/protocol/message/mod.rs | 86 +++++++++++-------- chain-signatures/node/src/web/mod.rs | 2 +- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/chain-signatures/node/src/protocol/message/filter.rs b/chain-signatures/node/src/protocol/message/filter.rs index e6605d0ac..fb6af60b8 100644 --- a/chain-signatures/node/src/protocol/message/filter.rs +++ b/chain-signatures/node/src/protocol/message/filter.rs @@ -1,6 +1,6 @@ use std::num::NonZeroUsize; -use tokio::sync::mpsc::{self, error::TryRecvError}; +use tokio::sync::mpsc; use super::types::{MessageFilterId, Protocols}; diff --git a/chain-signatures/node/src/protocol/message/mod.rs b/chain-signatures/node/src/protocol/message/mod.rs index a83506412..a3a6ddb48 100644 --- a/chain-signatures/node/src/protocol/message/mod.rs +++ b/chain-signatures/node/src/protocol/message/mod.rs @@ -148,23 +148,23 @@ impl MessageInbox { match message { Message::Posit(message) => match message.id { PositProtocolId::Triple(id) => { - let _ = self.triple_init.try_send_lossy((id, message.from, message.action)); - } - PositProtocolId::Presignature(id) => { let _ = self - .presignature_init + .triple_init .try_send_lossy((id, message.from, message.action)); } + PositProtocolId::Presignature(id) => { + let _ = + self.presignature_init + .try_send_lossy((id, message.from, message.action)); + } PositProtocolId::Signature(sign_id, presignature_id, round) => { - let _ = self.signature_init.try_send_lossy( - ( - sign_id, - presignature_id, - round, - message.from, - message.action, - ), - ); + let _ = self.signature_init.try_send_lossy(( + sign_id, + presignature_id, + round, + message.from, + message.action, + )); } }, Message::Generating(message) => { @@ -182,7 +182,9 @@ impl MessageInbox { let _ = self .triple .entry(message.id) - .or_insert_with(|| Subscriber::unsubscribed("triple", triple_channel_id(message.id))) + .or_insert_with(|| { + Subscriber::unsubscribed("triple", triple_channel_id(message.id)) + }) .try_send_lossy(message); } Message::Presignature(message) => { @@ -190,7 +192,10 @@ impl MessageInbox { .presignature .entry(message.id) .or_insert_with(|| { - Subscriber::unsubscribed("presignature", presignature_channel_id(message.id)) + Subscriber::unsubscribed( + "presignature", + presignature_channel_id(message.id), + ) }) .try_send_lossy(message); } @@ -305,10 +310,9 @@ impl MessageInbox { }, SubscribeId::Triple(id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let sub = self - .triple - .entry(id) - .or_insert_with(|| Subscriber::unsubscribed("triple", triple_channel_id(id))); + let sub = self.triple.entry(id).or_insert_with(|| { + Subscriber::unsubscribed("triple", triple_channel_id(id)) + }); let rx = sub.subscribe(); sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Triple(rx)); @@ -343,12 +347,15 @@ impl MessageInbox { }, SubscribeId::Signature(sign_id, presignature_id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let sub = self.signature.entry((sign_id, presignature_id)).or_insert_with(|| { - Subscriber::unsubscribed( - "signature", - signature_channel_id(sign_id, presignature_id), - ) - }); + let sub = self + .signature + .entry((sign_id, presignature_id)) + .or_insert_with(|| { + Subscriber::unsubscribed( + "signature", + signature_channel_id(sign_id, presignature_id), + ) + }); let rx = sub.subscribe(); sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Signature(rx)); @@ -510,7 +517,7 @@ impl MessageChannel { } } - pub async fn receive(&self, encrypted: Ciphered) { + pub async fn send_inbox(&self, encrypted: Ciphered) { if let Err(err) = self.inbox.send(encrypted).await { tracing::error!(?err, "failed to forward an encrypted protocol message"); } else { @@ -900,7 +907,10 @@ pub struct MessageOutbox { } impl MessageOutbox { - pub fn new(outbox_tx: mpsc::Sender, outbox_rx: mpsc::Receiver) -> Self { + pub fn new( + outbox_tx: mpsc::Sender, + outbox_rx: mpsc::Receiver, + ) -> Self { Self { outbox_tx, outbox_rx, @@ -1495,7 +1505,7 @@ mod tests { }), ]; let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.receive(encrypted).await; + channel.send_inbox(encrypted).await; let mut recv1 = channel.subscribe_triple(1).await; let mut recv2 = channel.subscribe_triple(2).await; @@ -1549,7 +1559,7 @@ mod tests { let mut recv3 = channel.subscribe_triple(3).await; channel.filter_triple(filter_id).await; - channel.receive(encrypted).await; + channel.send_inbox(encrypted).await; let (m1, m3) = match tokio::join!(recv1.recv(), recv3.recv()) { (Some(m1), Some(m3)) => (m1, m3), @@ -1574,7 +1584,7 @@ mod tests { // should be received by the subscribers. { let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.receive(encrypted).await; + channel.send_inbox(encrypted).await; let mut recv1 = tokio::time::timeout(Duration::from_millis(300), channel.subscribe_triple(1)) .await @@ -1605,14 +1615,18 @@ mod tests { let (inbox_tx, inbox_rx) = mpsc::channel(1); let (filter_tx, filter_rx) = mpsc::channel(1); let (subscribe_tx, subscribe_rx) = mpsc::channel(1); - let mut inbox = MessageInbox::new(inbox_tx, inbox_rx, filter_tx, filter_rx, subscribe_tx, subscribe_rx); - - inbox.signature_init = sub::Subscriber::unsubscribed_with_capacity( - "signature_init", - SINGLETON_CHANNEL_ID, - 1, + let mut inbox = MessageInbox::new( + inbox_tx, + inbox_rx, + filter_tx, + filter_rx, + subscribe_tx, + subscribe_rx, ); + inbox.signature_init = + sub::Subscriber::unsubscribed_with_capacity("signature_init", SINGLETON_CHANNEL_ID, 1); + let (signature_req, signature_resp) = sub::SubscribeRequest::subscribe(sub::SubscribeId::Signatures); inbox.process_subscribe(signature_req); diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 127f4158c..2999ccc0b 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -133,7 +133,7 @@ async fn msg( for encrypted in encrypted.into_iter() { let msg_channel = state.msg_channel.clone(); tokio::spawn(async move { - msg_channel.receive(encrypted).await; + msg_channel.send_inbox(encrypted).await; }); } WEB_ENDPOINT_LATENCY From 5ad8dc8b1a87f3fe9f84e1312ca23aa8941405cd Mon Sep 17 00:00:00 2001 From: Phuong Date: Mon, 30 Mar 2026 23:28:08 +0900 Subject: [PATCH 5/5] Add SubscriberKind --- .../node/src/protocol/message/sub.rs | 128 ++++++++++-------- .../node/src/protocol/signature.rs | 7 +- 2 files changed, 73 insertions(+), 62 deletions(-) diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index d4b6b4a99..0b9af4df2 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -69,14 +69,19 @@ impl SubscribeRequest { } } -pub enum Subscriber { +pub struct Subscriber { + metrics: SubscriberMetrics, + kind: SubscriberKind, +} + +pub enum SubscriberKind { /// Temporary/replaceable value, and will never be used. Only here so we can have a /// way to convert from an Unsubscribed to a Subscribed subscription. - Unknown(SubscriberMetrics), + Unknown, /// A subscribed channel where the subscriber has a handle to the receiver. - Subscribed(mpsc::Sender, SubscriberMetrics), + Subscribed(mpsc::Sender), /// An unsubscribed channel where there's potentially messages that have yet to be sent. - Unsubscribed(mpsc::Sender, mpsc::Receiver, SubscriberMetrics), + Unsubscribed(mpsc::Sender, mpsc::Receiver), } #[derive(Clone)] @@ -87,7 +92,10 @@ pub struct SubscriberMetrics { } impl Subscriber { - pub fn subscribed(name: &'static str, channel_id: impl Into) -> (Self, mpsc::Receiver) { + pub fn subscribed( + name: &'static str, + channel_id: impl Into, + ) -> (Self, mpsc::Receiver) { Self::subscribed_with_capacity(name, channel_id, MAX_MESSAGE_SUB_CHANNEL_SIZE) } @@ -106,7 +114,13 @@ impl Subscriber { fn subscribed_with_metrics(metrics: SubscriberMetrics) -> (Self, mpsc::Receiver) { let (tx, rx) = mpsc::channel(metrics.capacity); - (Self::Subscribed(tx, metrics), rx) + ( + Self { + metrics, + kind: SubscriberKind::Subscribed(tx), + }, + rx, + ) } pub fn unsubscribed(name: &'static str, channel_id: impl Into) -> Self { @@ -128,74 +142,69 @@ impl Subscriber { fn unsubscribed_with_metrics(metrics: SubscriberMetrics) -> Self { let (tx, rx) = mpsc::channel(metrics.capacity); - Self::Unsubscribed(tx, rx, metrics) + Self { + metrics, + kind: SubscriberKind::Unsubscribed(tx, rx), + } } /// Convert this subscriber into a subscribed one, returning the receiver. /// If the subscriber is already subscribed, it overrides the existing subscription. pub fn subscribe(&mut self) -> mpsc::Receiver { - let sub = std::mem::replace(self, Self::Unknown(self.metrics().clone())); - let (sub, rx) = match sub { - Self::Subscribed(_, metrics) | Self::Unknown(metrics) => { - Self::subscribed_with_metrics(metrics) + let kind = std::mem::replace(&mut self.kind, SubscriberKind::Unknown); + let (next_kind, rx) = match kind { + SubscriberKind::Subscribed(_) | SubscriberKind::Unknown => { + let (tx, rx) = mpsc::channel(self.metrics.capacity); + (SubscriberKind::Subscribed(tx), rx) } - Self::Unsubscribed(tx, rx, metrics) => (Self::Subscribed(tx, metrics), rx), + SubscriberKind::Unsubscribed(tx, rx) => (SubscriberKind::Subscribed(tx), rx), }; - *self = sub; + self.kind = next_kind; rx } /// Unsubscribe from the subscriber, converting it into an unsubscribed one. pub fn unsubscribe(&mut self) { - if matches!(self, Self::Unknown(_)) { - *self = Self::unsubscribed_with_metrics(self.metrics().clone()); - return; - } - - if matches!(self, Self::Subscribed(_, _)) { - *self = Self::unsubscribed_with_metrics(self.metrics().clone()); + if let SubscriberKind::Subscribed(_) = self.kind { + let (tx, rx) = mpsc::channel(self.metrics.capacity); + self.kind = SubscriberKind::Unsubscribed(tx, rx); } } pub fn estimated_queue_len(&self) -> usize { - match self { - Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => tx.max_capacity() - tx.capacity(), - Self::Unknown(_) => 0, + match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + tx.max_capacity() - tx.capacity() + } + SubscriberKind::Unknown => 0, } } pub fn report_queue_len(&self) { - let metrics = self.metrics(); crate::metrics::messaging::set_channel_queue_size( - metrics.name, - &metrics.channel_id, + self.metrics.name, + &self.metrics.channel_id, self.estimated_queue_len(), ); } - fn metrics(&self) -> &SubscriberMetrics { - match self { - Self::Subscribed(_, metrics) - | Self::Unsubscribed(_, _, metrics) - | Self::Unknown(metrics) => metrics, - } - } - fn capacity(&self) -> usize { - self.metrics().capacity + self.metrics.capacity } pub fn channel_id(&self) -> &str { - &self.metrics().channel_id + &self.metrics.channel_id } pub fn clear_queue_len_metric(&self) { - let metrics = self.metrics(); - crate::metrics::messaging::remove_channel_queue_size(metrics.name, &metrics.channel_id); + crate::metrics::messaging::remove_channel_queue_size( + self.metrics.name, + &self.metrics.channel_id, + ); } pub fn subscriber_name(&self) -> &'static str { - self.metrics().name + self.metrics.name } fn report_after_enqueue(&self) { @@ -203,33 +212,36 @@ impl Subscriber { } pub fn is_unknown(&self) -> bool { - matches!(self, Self::Unknown(_)) + matches!(self.kind, SubscriberKind::Unknown) } pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { - match self { - Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => tx.send(msg).await, - Self::Unknown(_) => Ok(()), + match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + tx.send(msg).await + } + SubscriberKind::Unknown => Ok(()), } } pub fn try_send_lossy(&self, msg: T) -> Result<(), mpsc::error::SendError> { - let result = match self { - Self::Subscribed(tx, _) | Self::Unsubscribed(tx, _, _) => match tx.try_send(msg) { - Ok(()) => Ok(()), - Err(mpsc::error::TrySendError::Full(_)) => { - let metrics = self.metrics(); - tracing::warn!( - subscriber = metrics.name, - subscriber_id = metrics.channel_id, - capacity = self.capacity(), - "dropping message because subscriber channel is full" - ); - Ok(()) + let result = match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + match tx.try_send(msg) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + subscriber = self.metrics.name, + subscriber_id = self.metrics.channel_id, + capacity = self.capacity(), + "dropping message because subscriber channel is full" + ); + Ok(()) + } + Err(mpsc::error::TrySendError::Closed(msg)) => Err(mpsc::error::SendError(msg)), } - Err(mpsc::error::TrySendError::Closed(msg)) => Err(mpsc::error::SendError(msg)), - }, - Self::Unknown(_) => Ok(()), + } + SubscriberKind::Unknown => Ok(()), }; self.report_after_enqueue(); diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index f4ed8f7ce..e004eb7f8 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1276,14 +1276,13 @@ impl SignatureSpawner { .or_insert_with(|| { Subscriber::unsubscribed("sign_task_posit", hex::encode(sign_id.request_id)) }) - .try_send_lossy( - SignTaskMessage::PositMessage { + .try_send_lossy(SignTaskMessage::PositMessage { presignature_id, round, from, action, - }, - ) { + }) + { tracing::error!(?err, ?sign_id, "failed to send posit message"); } }