diff --git a/chain-signatures/node/src/metrics/messaging.rs b/chain-signatures/node/src/metrics/messaging.rs index 6439fa2d0..ef6d52fbe 100644 --- a/chain-signatures/node/src/metrics/messaging.rs +++ b/chain-signatures/node/src/metrics/messaging.rs @@ -1,6 +1,6 @@ use std::sync::LazyLock; -use prometheus::{exponential_buckets, Counter, Histogram, HistogramVec}; +use prometheus::{exponential_buckets, Counter, Histogram, HistogramVec, IntGaugeVec}; use super::{ try_create_counter_vec_with_node_account_id, try_create_histogram_vec_with_node_account_id, @@ -78,3 +78,22 @@ pub(crate) static WEB_ENDPOINT_LATENCY: LazyLock = LazyLock::new(| ) .unwrap() }); + +pub(crate) static CHANNEL_QUEUE_SIZE: LazyLock = LazyLock::new(|| { + super::try_create_int_gauge_vec_with_node_account_id( + "multichain_message_channel_queue_size", + "Estimated number of buffered messages queued per message channel", + &["channel", "channel_id"], + ) + .unwrap() +}); + +pub(crate) fn set_channel_queue_size(channel: &str, channel_id: &str, len: usize) { + CHANNEL_QUEUE_SIZE + .with_label_values(&[channel, channel_id]) + .set(len as i64); +} + +pub(crate) fn remove_channel_queue_size(channel: &str, channel_id: &str) { + let _ = CHANNEL_QUEUE_SIZE.remove_label_values(&[channel, channel_id]); +} diff --git a/chain-signatures/node/src/protocol/message/filter.rs b/chain-signatures/node/src/protocol/message/filter.rs index b98aa6255..fb6af60b8 100644 --- a/chain-signatures/node/src/protocol/message/filter.rs +++ b/chain-signatures/node/src/protocol/message/filter.rs @@ -1,6 +1,6 @@ use std::num::NonZeroUsize; -use tokio::sync::mpsc::{self, error::TryRecvError}; +use tokio::sync::mpsc; use super::types::{MessageFilterId, Protocols}; @@ -13,13 +13,18 @@ pub const MAX_FILTER_SIZE: NonZeroUsize = NonZeroUsize::new(64 * 1024).unwrap(); #[derive(Debug)] pub(crate) struct MessageFilter { + filter_tx: mpsc::Sender<(Protocols, u64)>, filter_rx: mpsc::Receiver<(Protocols, u64)>, filter: lru::LruCache<(Protocols, u64), ()>, } impl MessageFilter { - pub fn new(filter_rx: mpsc::Receiver<(Protocols, u64)>) -> Self { + pub fn new( + filter_tx: mpsc::Sender<(Protocols, u64)>, + filter_rx: mpsc::Receiver<(Protocols, u64)>, + ) -> Self { Self { + filter_tx, filter_rx, filter: lru::LruCache::new(MAX_FILTER_SIZE), } @@ -37,15 +42,26 @@ impl MessageFilter { }; self.filter.put((msg_type, id), ()); + crate::metrics::messaging::set_channel_queue_size( + "filter", + "singleton", + self.filter_tx.max_capacity() - self.filter_tx.capacity(), + ); } pub fn try_update(&mut self) { - loop { - let (msg_type, id) = match self.filter_rx.try_recv() { - Ok(filter) => filter, - Err(TryRecvError::Empty | TryRecvError::Disconnected) => return, - }; + let mut updated = false; + while let Ok((msg_type, id)) = self.filter_rx.try_recv() { self.filter.put((msg_type, id), ()); + updated = true; + } + + if updated { + crate::metrics::messaging::set_channel_queue_size( + "filter", + "singleton", + self.filter_tx.max_capacity() - self.filter_tx.capacity(), + ); } } diff --git a/chain-signatures/node/src/protocol/message/mod.rs b/chain-signatures/node/src/protocol/message/mod.rs index e580b2931..a3a6ddb48 100644 --- a/chain-signatures/node/src/protocol/message/mod.rs +++ b/chain-signatures/node/src/protocol/message/mod.rs @@ -2,7 +2,7 @@ mod filter; mod sub; mod types; -pub use sub::Subscriber; +pub use sub::{Subscriber, MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE}; use crate::protocol::message::sub::{ SubscribeId, SubscribeRequest, SubscribeRequestAction, SubscribeResponse, @@ -40,6 +40,36 @@ pub const MAX_MESSAGE_INCOMING: usize = 1024 * 1024; pub const MAX_MESSAGE_OUTGOING: usize = 1024 * 1024; pub const MAX_OUTBOX_PAYLOAD_LIMIT: usize = 256 * 1024; +const SINGLETON_CHANNEL_ID: &str = "singleton"; + +fn estimated_channel_queue_len(tx: &mpsc::Sender) -> usize { + tx.max_capacity() - tx.capacity() +} + +fn report_channel_queue_len(name: &'static str, channel_id: &str, tx: &mpsc::Sender) { + crate::metrics::messaging::set_channel_queue_size( + name, + channel_id, + estimated_channel_queue_len(tx), + ); +} + +fn report_singleton_channel_queue_len(name: &'static str, tx: &mpsc::Sender) { + report_channel_queue_len(name, SINGLETON_CHANNEL_ID, tx); +} + +fn triple_channel_id(id: TripleId) -> String { + id.to_string() +} + +fn presignature_channel_id(id: PresignatureId) -> String { + id.to_string() +} + +fn signature_channel_id(sign_id: SignId, presignature_id: PresignatureId) -> String { + format!("{}:{}", presignature_id, hex::encode(sign_id.request_id)) +} + pub struct MessageInbox { /// encrypted messages that are pending to be decrypted. These are messages that we received /// from other nodes that weren't able to be processed yet due to missing info such as the @@ -55,9 +85,11 @@ pub struct MessageInbox { filter: MessageFilter, /// Incoming messages that are pending to be processed. These are encrypted and signed. + inbox_tx: mpsc::Sender, inbox_rx: mpsc::Receiver, /// Subscription requests from MessageChannel + subscribe_tx: mpsc::Sender, subscribe_rx: mpsc::Receiver, generating: Subscriber, @@ -73,64 +105,76 @@ pub struct MessageInbox { impl MessageInbox { pub fn new( + inbox_tx: mpsc::Sender, inbox_rx: mpsc::Receiver, + filter_tx: mpsc::Sender<(Protocols, u64)>, filter_rx: mpsc::Receiver<(Protocols, u64)>, + subscribe_tx: mpsc::Sender, subscribe_rx: mpsc::Receiver, ) -> Self { Self { try_decrypt: VecDeque::new(), idempotent: lru::LruCache::new(MAX_FILTER_SIZE), - filter: MessageFilter::new(filter_rx), + filter: MessageFilter::new(filter_tx, filter_rx), + inbox_tx, inbox_rx, + subscribe_tx, subscribe_rx, - generating: Subscriber::unsubscribed(), - resharing: Subscriber::unsubscribed(), - ready: Subscriber::unsubscribed(), + generating: Subscriber::unsubscribed("generating", SINGLETON_CHANNEL_ID), + resharing: Subscriber::unsubscribed("resharing", SINGLETON_CHANNEL_ID), + ready: Subscriber::unsubscribed("ready", SINGLETON_CHANNEL_ID), triple: HashMap::new(), - triple_init: Subscriber::unsubscribed(), + triple_init: Subscriber::unsubscribed_with_capacity( + "triple_init", + SINGLETON_CHANNEL_ID, + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), presignature: HashMap::new(), - presignature_init: Subscriber::unsubscribed(), + presignature_init: Subscriber::unsubscribed_with_capacity( + "presignature_init", + SINGLETON_CHANNEL_ID, + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), signature: HashMap::new(), - signature_init: Subscriber::unsubscribed(), + signature_init: Subscriber::unsubscribed_with_capacity( + "signature_init", + SINGLETON_CHANNEL_ID, + sub::MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE, + ), } } - async fn send(&mut self, message: Message) { + fn send(&mut self, message: Message) { match message { Message::Posit(message) => match message.id { PositProtocolId::Triple(id) => { let _ = self .triple_init - .send((id, message.from, message.action)) - .await; + .try_send_lossy((id, message.from, message.action)); } PositProtocolId::Presignature(id) => { - let _ = self - .presignature_init - .send((id, message.from, message.action)) - .await; + let _ = + self.presignature_init + .try_send_lossy((id, message.from, message.action)); } PositProtocolId::Signature(sign_id, presignature_id, round) => { - let _ = self - .signature_init - .send(( - sign_id, - presignature_id, - round, - message.from, - message.action, - )) - .await; + let _ = self.signature_init.try_send_lossy(( + sign_id, + presignature_id, + round, + message.from, + message.action, + )); } }, Message::Generating(message) => { - let _ = self.generating.send(message).await; + let _ = self.generating.try_send_lossy(message); } Message::Resharing(message) => { - let _ = self.resharing.send(message).await; + let _ = self.resharing.try_send_lossy(message); } Message::Ready(message) => { - let _ = self.ready.send(message).await; + let _ = self.ready.try_send_lossy(message); } Message::Triple(message) => { // NOTE: not logging the error because this is simply just channel closure. @@ -138,25 +182,34 @@ impl MessageInbox { let _ = self .triple .entry(message.id) - .or_default() - .send(message) - .await; + .or_insert_with(|| { + Subscriber::unsubscribed("triple", triple_channel_id(message.id)) + }) + .try_send_lossy(message); } Message::Presignature(message) => { let _ = self .presignature .entry(message.id) - .or_default() - .send(message) - .await; + .or_insert_with(|| { + Subscriber::unsubscribed( + "presignature", + presignature_channel_id(message.id), + ) + }) + .try_send_lossy(message); } Message::Signature(message) => { let _ = self .signature .entry((message.id, message.presignature_id)) - .or_default() - .send(message) - .await; + .or_insert_with(|| { + Subscriber::unsubscribed( + "signature", + signature_channel_id(message.id, message.presignature_id), + ) + }) + .try_send_lossy(message); } Message::Unknown(entries) => { tracing::warn!( @@ -219,9 +272,9 @@ impl MessageInbox { } /// Publish messages to subscribers - async fn publish(&mut self, messages: Vec) { + fn publish(&mut self, messages: Vec) { for message in messages { - self.send(message).await; + self.send(message); } } @@ -238,6 +291,7 @@ impl MessageInbox { SubscribeId::Generating => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.generating.subscribe(); + self.generating.report_queue_len(); let _ = resp.send(SubscribeResponse::Generating(rx)); } SubscribeRequestAction::Unsubscribe => { @@ -247,6 +301,7 @@ impl MessageInbox { SubscribeId::Resharing => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.resharing.subscribe(); + self.resharing.report_queue_len(); let _ = resp.send(SubscribeResponse::Resharing(rx)); } SubscribeRequestAction::Unsubscribe => { @@ -255,22 +310,34 @@ impl MessageInbox { }, SubscribeId::Triple(id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self.triple.entry(id).or_default().subscribe(); + let sub = self.triple.entry(id).or_insert_with(|| { + Subscriber::unsubscribed("triple", triple_channel_id(id)) + }); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Triple(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.triple.remove(&id).is_none() { + if let Some(sub) = self.triple.remove(&id) { + sub.clear_queue_len_metric(); + } else { tracing::warn!(id, "trying to unsub from an unknown triple subscription"); } } }, SubscribeId::Presignature(id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self.presignature.entry(id).or_default().subscribe(); + let sub = self.presignature.entry(id).or_insert_with(|| { + Subscriber::unsubscribed("presignature", presignature_channel_id(id)) + }); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Presignature(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.presignature.remove(&id).is_none() { + if let Some(sub) = self.presignature.remove(&id) { + sub.clear_queue_len_metric(); + } else { tracing::warn!( id, "trying to unsub from an unknown presignature subscription" @@ -280,15 +347,23 @@ impl MessageInbox { }, SubscribeId::Signature(sign_id, presignature_id) => match sub.action { SubscribeRequestAction::Subscribe(resp) => { - let rx = self + let sub = self .signature .entry((sign_id, presignature_id)) - .or_default() - .subscribe(); + .or_insert_with(|| { + Subscriber::unsubscribed( + "signature", + signature_channel_id(sign_id, presignature_id), + ) + }); + let rx = sub.subscribe(); + sub.report_queue_len(); let _ = resp.send(SubscribeResponse::Signature(rx)); } SubscribeRequestAction::Unsubscribe => { - if self.signature.remove(&(sign_id, presignature_id)).is_none() { + if let Some(sub) = self.signature.remove(&(sign_id, presignature_id)) { + sub.clear_queue_len_metric(); + } else { tracing::warn!( ?sign_id, ?presignature_id, @@ -300,37 +375,45 @@ impl MessageInbox { SubscribeId::Ready => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.ready.subscribe(); + self.ready.report_queue_len(); let _ = resp.send(SubscribeResponse::Ready(rx)); } SubscribeRequestAction::Unsubscribe => { self.ready.unsubscribe(); + self.ready.report_queue_len(); } }, SubscribeId::Triples => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.triple_init.subscribe(); + self.triple_init.report_queue_len(); let _ = resp.send(SubscribeResponse::TriplePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.triple_init.unsubscribe(); + self.triple_init.report_queue_len(); } }, SubscribeId::Presignatures => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.presignature_init.subscribe(); + self.presignature_init.report_queue_len(); let _ = resp.send(SubscribeResponse::PresignaturePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.presignature_init.unsubscribe(); + self.presignature_init.report_queue_len(); } }, SubscribeId::Signatures => match sub.action { SubscribeRequestAction::Subscribe(resp) => { let rx = self.signature_init.subscribe(); + self.signature_init.report_queue_len(); let _ = resp.send(SubscribeResponse::SignaturePosit(rx)); } SubscribeRequestAction::Unsubscribe => { self.signature_init.unsubscribe(); + self.signature_init.report_queue_len(); } }, } @@ -341,9 +424,11 @@ impl MessageInbox { tokio::select! { _ = self.filter.update() => {} Some(sub) = self.subscribe_rx.recv() => { + report_singleton_channel_queue_len("subscribe", &self.subscribe_tx); self.process_subscribe(sub); } Some(encrypted) = self.inbox_rx.recv() => { + report_singleton_channel_queue_len("incoming", &self.inbox_tx); let config = config.borrow().clone(); let expiration = Duration::from_millis(config.protocol.message_timeout); let participants = contract.participant_map().await; @@ -358,7 +443,7 @@ impl MessageInbox { let messages = self.filter(messages); let messages_len = messages.len(); - self.publish(messages).await; + self.publish(messages); crate::metrics::messaging::NUM_RECEIVED_ENCRYPTED_TOTAL .inc_by(messages_len as f64); @@ -382,8 +467,15 @@ impl MessageChannel { let (outbox_tx, outbox_rx) = mpsc::channel(MAX_MESSAGE_OUTGOING); let (filter_tx, filter_rx) = mpsc::channel(MAX_FILTER_SIZE.into()); let (subscribe_tx, subscribe_rx) = mpsc::channel(16384); - let inbox = MessageInbox::new(inbox_rx, filter_rx, subscribe_rx); - let outbox = MessageOutbox::new(outbox_rx); + let inbox = MessageInbox::new( + inbox_tx.clone(), + inbox_rx, + filter_tx.clone(), + filter_rx, + subscribe_tx.clone(), + subscribe_rx, + ); + let outbox = MessageOutbox::new(outbox_tx.clone(), outbox_rx); let channel = Self { inbox: inbox_tx, @@ -392,6 +484,11 @@ impl MessageChannel { filter: filter_tx, }; + report_singleton_channel_queue_len("incoming", &channel.inbox); + report_singleton_channel_queue_len("outgoing", &channel.outgoing); + report_singleton_channel_queue_len("filter", &channel.filter); + report_singleton_channel_queue_len("subscribe", &channel.subscribe); + (inbox, outbox, channel) } @@ -415,6 +512,16 @@ impl MessageChannel { .await { tracing::error!(?err, "outbox: failed to send message to participants"); + } else { + report_singleton_channel_queue_len("outgoing", &self.outgoing); + } + } + + pub async fn send_inbox(&self, encrypted: Ciphered) { + if let Err(err) = self.inbox.send(encrypted).await { + tracing::error!(?err, "failed to forward an encrypted protocol message"); + } else { + report_singleton_channel_queue_len("incoming", &self.inbox); } } @@ -423,18 +530,24 @@ impl MessageChannel { pub async fn filter(&self, msg: &M) { if let Err(err) = self.filter.send((M::PROTOCOL, msg.id())).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } pub async fn filter_triple(&self, id: TripleId) { if let Err(err) = self.filter.send((Protocols::Triple, id)).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } pub async fn filter_presignature(&self, id: PresignatureId) { if let Err(err) = self.filter.send((Protocols::Presignature, id)).await { tracing::warn!(?err, "failed to send filter message"); + } else { + report_singleton_channel_queue_len("filter", &self.filter); } } @@ -447,6 +560,7 @@ impl MessageChannel { if self.subscribe.send(req).await.is_err() { return None; }; + report_singleton_channel_queue_len("subscribe", &self.subscribe); let Ok(subscription) = resp.await else { return None; }; @@ -475,6 +589,8 @@ impl MessageChannel { .is_err() { tracing::warn!(id, "unable to send unsubscribe request for triple message"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -505,6 +621,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for triple posits"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -536,6 +654,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for presignature"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -563,6 +683,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for presignature posits"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -610,6 +732,8 @@ impl MessageChannel { ?presignature_id, "unable to send unsubscribe request for signature" ); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -638,6 +762,8 @@ impl MessageChannel { .is_err() { tracing::warn!("unable to send unsubscribe request for signature posit"); + } else { + report_singleton_channel_queue_len("subscribe", &self.subscribe); }; } @@ -769,6 +895,7 @@ pub struct Partition { /// These messages will be signed and encrypted before being sent out. pub struct MessageOutbox { /// The messages that are pending to be sent to other nodes. + outbox_tx: mpsc::Sender, outbox_rx: mpsc::Receiver, // NOTE: we have FromParticipant here to circumvent the chance that we change Participant @@ -780,8 +907,12 @@ pub struct MessageOutbox { } impl MessageOutbox { - pub fn new(outbox_rx: mpsc::Receiver) -> Self { + pub fn new( + outbox_tx: mpsc::Sender, + outbox_rx: mpsc::Receiver, + ) -> Self { Self { + outbox_tx, outbox_rx, messages: HashMap::new(), } @@ -930,6 +1061,7 @@ impl MessageOutbox { loop { tokio::select! { Some((msg, (from, to, timestamp))) = self.outbox_rx.recv() => { + report_singleton_channel_queue_len("outgoing", &self.outbox_tx); // add it to the outbox and sort it by from and to participant let entry = self.messages.entry((from, to)).or_default(); entry.push((msg, timestamp)); @@ -1020,18 +1152,25 @@ const fn cbor_name(value: &ciborium::Value) -> &'static str { #[cfg(test)] mod tests { + use super::SINGLETON_CHANNEL_ID; + use std::time::Duration; use cait_sith::protocol::Participant; use mpc_keys::hpke::{self, Ciphered}; use mpc_primitives::SignId; use serde::{de::DeserializeOwned, Deserialize, Serialize}; + use tokio::sync::mpsc; use crate::{ config::{Config, LocalConfig, NetworkConfig, OverrideConfig}, protocol::{ contract::primitives::{ParticipantMap, Participants}, - message::{GeneratingMessage, Message, SignatureMessage, SignedMessage, TripleMessage}, + message::{ + sub, GeneratingMessage, Message, MessageInbox, PositMessage, PositProtocolId, + ReadyMessage, SignatureMessage, SignedMessage, TripleMessage, + }, + posit::PositAction, ParticipantInfo, }, rpc::ContractStateWatcher, @@ -1366,7 +1505,7 @@ mod tests { }), ]; let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.inbox.send(encrypted).await.unwrap(); + channel.send_inbox(encrypted).await; let mut recv1 = channel.subscribe_triple(1).await; let mut recv2 = channel.subscribe_triple(2).await; @@ -1420,7 +1559,7 @@ mod tests { let mut recv3 = channel.subscribe_triple(3).await; channel.filter_triple(filter_id).await; - channel.inbox.send(encrypted).await.unwrap(); + channel.send_inbox(encrypted).await; let (m1, m3) = match tokio::join!(recv1.recv(), recv3.recv()) { (Some(m1), Some(m3)) => (m1, m3), @@ -1445,7 +1584,7 @@ mod tests { // should be received by the subscribers. { let encrypted = SignedMessage::encrypt(&batch, from, &sign_sk, &cipher_pk).unwrap(); - channel.inbox.send(encrypted).await.unwrap(); + channel.send_inbox(encrypted).await; let mut recv1 = tokio::time::timeout(Duration::from_millis(300), channel.subscribe_triple(1)) .await @@ -1470,4 +1609,67 @@ mod tests { inbox.abort(); } + + #[tokio::test] + async fn test_signature_posit_backpressure_does_not_block_ready_messages() { + let (inbox_tx, inbox_rx) = mpsc::channel(1); + let (filter_tx, filter_rx) = mpsc::channel(1); + let (subscribe_tx, subscribe_rx) = mpsc::channel(1); + let mut inbox = MessageInbox::new( + inbox_tx, + inbox_rx, + filter_tx, + filter_rx, + subscribe_tx, + subscribe_rx, + ); + + inbox.signature_init = + sub::Subscriber::unsubscribed_with_capacity("signature_init", SINGLETON_CHANNEL_ID, 1); + + let (signature_req, signature_resp) = + sub::SubscribeRequest::subscribe(sub::SubscribeId::Signatures); + inbox.process_subscribe(signature_req); + let mut signature_posit_rx = match signature_resp.await.unwrap() { + sub::SubscribeResponse::SignaturePosit(rx) => rx, + _ => panic!("expected signature posit subscription"), + }; + + let (ready_req, ready_resp) = sub::SubscribeRequest::subscribe(sub::SubscribeId::Ready); + inbox.process_subscribe(ready_req); + let mut ready_rx = match ready_resp.await.unwrap() { + sub::SubscribeResponse::Ready(rx) => rx, + _ => panic!("expected ready subscription"), + }; + + let sign_id = SignId::new([9; 32]); + let from = Participant::from(0); + let mut messages = Vec::with_capacity(sub::MAX_MESSAGE_SUB_CHANNEL_SIZE + 2); + for round in 0..=sub::MAX_MESSAGE_SUB_CHANNEL_SIZE { + messages.push(Message::Posit(PositMessage { + id: PositProtocolId::Signature(sign_id, 77, round), + from, + action: PositAction::Accept, + })); + } + messages.push(Message::Ready(ReadyMessage { + epoch: 1, + from, + nonce: 1, + token: 1, + })); + + inbox.publish(messages); + let ready_message = tokio::time::timeout(Duration::from_millis(100), ready_rx.recv()) + .await + .expect("ready message should not be blocked by signature posit backlog") + .expect("ready subscription unexpectedly closed"); + assert_eq!(ready_message.epoch, 1); + + let first_signature_posit = signature_posit_rx + .recv() + .await + .expect("signature posit subscription unexpectedly closed"); + assert_eq!(first_signature_posit.0, sign_id); + } } diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index e947dea18..0b9af4df2 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -13,6 +13,7 @@ use crate::protocol::triple::TripleId; /// This should be enough to hold a few messages in the inbox. pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; +pub const MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE: usize = 1 << 24; pub enum SubscribeId { Generating, @@ -68,7 +69,12 @@ impl SubscribeRequest { } } -pub enum Subscriber { +pub struct Subscriber { + metrics: SubscriberMetrics, + kind: SubscriberKind, +} + +pub enum SubscriberKind { /// Temporary/replaceable value, and will never be used. Only here so we can have a /// way to convert from an Unsubscribed to a Subscribed subscription. Unknown, @@ -78,47 +84,184 @@ pub enum Subscriber { Unsubscribed(mpsc::Sender, mpsc::Receiver), } +#[derive(Clone)] +pub struct SubscriberMetrics { + name: &'static str, + channel_id: String, + capacity: usize, +} + impl Subscriber { - pub fn subscribed() -> (Self, mpsc::Receiver) { - let (tx, rx) = mpsc::channel(MAX_MESSAGE_SUB_CHANNEL_SIZE); - (Self::Subscribed(tx), rx) + pub fn subscribed( + name: &'static str, + channel_id: impl Into, + ) -> (Self, mpsc::Receiver) { + Self::subscribed_with_capacity(name, channel_id, MAX_MESSAGE_SUB_CHANNEL_SIZE) + } + + pub fn subscribed_with_capacity( + name: &'static str, + channel_id: impl Into, + capacity: usize, + ) -> (Self, mpsc::Receiver) { + let metrics = SubscriberMetrics { + name, + channel_id: channel_id.into(), + capacity, + }; + Self::subscribed_with_metrics(metrics) } - pub fn unsubscribed() -> Self { - let (tx, rx) = mpsc::channel(MAX_MESSAGE_SUB_CHANNEL_SIZE); - Self::Unsubscribed(tx, rx) + fn subscribed_with_metrics(metrics: SubscriberMetrics) -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(metrics.capacity); + ( + Self { + metrics, + kind: SubscriberKind::Subscribed(tx), + }, + rx, + ) + } + + pub fn unsubscribed(name: &'static str, channel_id: impl Into) -> Self { + Self::unsubscribed_with_capacity(name, channel_id, MAX_MESSAGE_SUB_CHANNEL_SIZE) + } + + pub fn unsubscribed_with_capacity( + name: &'static str, + channel_id: impl Into, + capacity: usize, + ) -> Self { + let metrics = SubscriberMetrics { + name, + channel_id: channel_id.into(), + capacity, + }; + Self::unsubscribed_with_metrics(metrics) + } + + fn unsubscribed_with_metrics(metrics: SubscriberMetrics) -> Self { + let (tx, rx) = mpsc::channel(metrics.capacity); + Self { + metrics, + kind: SubscriberKind::Unsubscribed(tx, rx), + } } /// Convert this subscriber into a subscribed one, returning the receiver. /// If the subscriber is already subscribed, it overrides the existing subscription. pub fn subscribe(&mut self) -> mpsc::Receiver { - let sub = std::mem::replace(self, Self::Unknown); - let (sub, rx) = match sub { - Self::Subscribed(_) | Self::Unknown => Self::subscribed(), - Self::Unsubscribed(tx, rx) => (Self::Subscribed(tx), rx), + let kind = std::mem::replace(&mut self.kind, SubscriberKind::Unknown); + let (next_kind, rx) = match kind { + SubscriberKind::Subscribed(_) | SubscriberKind::Unknown => { + let (tx, rx) = mpsc::channel(self.metrics.capacity); + (SubscriberKind::Subscribed(tx), rx) + } + SubscriberKind::Unsubscribed(tx, rx) => (SubscriberKind::Subscribed(tx), rx), }; - *self = sub; + self.kind = next_kind; rx } /// Unsubscribe from the subscriber, converting it into an unsubscribed one. pub fn unsubscribe(&mut self) { - if matches!(self, Self::Subscribed(_) | Self::Unknown) { - *self = Self::unsubscribed(); + if let SubscriberKind::Subscribed(_) = self.kind { + let (tx, rx) = mpsc::channel(self.metrics.capacity); + self.kind = SubscriberKind::Unsubscribed(tx, rx); + } + } + + pub fn estimated_queue_len(&self) -> usize { + match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + tx.max_capacity() - tx.capacity() + } + SubscriberKind::Unknown => 0, } } + pub fn report_queue_len(&self) { + crate::metrics::messaging::set_channel_queue_size( + self.metrics.name, + &self.metrics.channel_id, + self.estimated_queue_len(), + ); + } + + fn capacity(&self) -> usize { + self.metrics.capacity + } + + pub fn channel_id(&self) -> &str { + &self.metrics.channel_id + } + + pub fn clear_queue_len_metric(&self) { + crate::metrics::messaging::remove_channel_queue_size( + self.metrics.name, + &self.metrics.channel_id, + ); + } + + pub fn subscriber_name(&self) -> &'static str { + self.metrics.name + } + + fn report_after_enqueue(&self) { + self.report_queue_len(); + } + + pub fn is_unknown(&self) -> bool { + matches!(self.kind, SubscriberKind::Unknown) + } + pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { - match self { - Self::Subscribed(tx) => tx.send(msg).await, - Self::Unsubscribed(tx, _) => tx.send(msg).await, - Self::Unknown => Ok(()), + match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + tx.send(msg).await + } + SubscriberKind::Unknown => Ok(()), } } + + pub fn try_send_lossy(&self, msg: T) -> Result<(), mpsc::error::SendError> { + let result = match &self.kind { + SubscriberKind::Subscribed(tx) | SubscriberKind::Unsubscribed(tx, _) => { + match tx.try_send(msg) { + Ok(()) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + subscriber = self.metrics.name, + subscriber_id = self.metrics.channel_id, + capacity = self.capacity(), + "dropping message because subscriber channel is full" + ); + Ok(()) + } + Err(mpsc::error::TrySendError::Closed(msg)) => Err(mpsc::error::SendError(msg)), + } + } + SubscriberKind::Unknown => Ok(()), + }; + + self.report_after_enqueue(); + result + } } -impl Default for Subscriber { - fn default() -> Self { - Self::unsubscribed() +#[cfg(test)] +mod tests { + use super::Subscriber; + + #[test] + fn estimated_queue_len_tracks_buffered_messages() { + let sub = Subscriber::unsubscribed_with_capacity("test", "singleton", 4); + + assert_eq!(sub.estimated_queue_len(), 0); + + sub.try_send_lossy(1u8).unwrap(); + sub.try_send_lossy(2u8).unwrap(); + + assert_eq!(sub.estimated_queue_len(), 2); } } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index bde5855db..e004eb7f8 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1229,7 +1229,13 @@ impl SignatureSpawner { } // Subscribe to (or create) the posit inbox for this sign request - let rx = self.inboxes.entry(sign_id).or_default().subscribe(); + let rx = self + .inboxes + .entry(sign_id) + .or_insert_with(|| { + Subscriber::unsubscribed("sign_task_posit", hex::encode(sign_id.request_id)) + }) + .subscribe(); let task = SignTask { me: self.me, participants, @@ -1264,21 +1270,27 @@ impl SignatureSpawner { if from == self.me { return; } - let _ = self + if let Err(err) = self .inboxes .entry(sign_id) - .or_default() - .send(SignTaskMessage::PositMessage { + .or_insert_with(|| { + Subscriber::unsubscribed("sign_task_posit", hex::encode(sign_id.request_id)) + }) + .try_send_lossy(SignTaskMessage::PositMessage { presignature_id, round, from, action, }) - .await; + { + tracing::error!(?err, ?sign_id, "failed to send posit message"); + } } fn handle_completion(&mut self, sign_id: SignId) { - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "completion"); if self.tasks.abort(sign_id) { tracing::info!(?sign_id, "aborting signature task due to completion event"); @@ -1367,13 +1379,17 @@ impl SignatureSpawner { Ok(outcome) => outcome, Err(sign_id) => { tracing::warn!(?sign_id, "signature task interrupted"); - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "interruption"); continue; } }; - self.inboxes.remove(&sign_id); + if let Some(inbox) = self.inboxes.remove(&sign_id) { + inbox.clear_queue_len_metric(); + } self.abort_delayed_watcher(sign_id, "task completion"); match result { Ok(()) => { diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 5d84a09a6..2999ccc0b 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -133,9 +133,7 @@ async fn msg( for encrypted in encrypted.into_iter() { let msg_channel = state.msg_channel.clone(); tokio::spawn(async move { - if let Err(err) = msg_channel.inbox.send(encrypted).await { - tracing::error!(?err, "failed to forward an encrypted protocol message"); - } + msg_channel.send_inbox(encrypted).await; }); } WEB_ENDPOINT_LATENCY