fix: backpressure & stalled protocol generation#701
fix: backpressure & stalled protocol generation#701ChaoticTempest wants to merge 6 commits intodevelopfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses protocol stalls caused by backpressure in internal bounded queues by making inbox fanout non-blocking and increasing buffering for posit-related subscriptions.
Changes:
- Introduces lossy, non-blocking
Subscriber::try_send_lossy()and tracks channel capacity insideSubscriber. - Increases posit-init subscriber channel capacity (notably to
1 << 24) and updatesMessageInbox::{send,publish}to avoid.awaiton per-subscriber sends. - Updates signature posit routing to use lossy non-blocking sends and adds a regression test ensuring ready messages are not blocked by posit backlog.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
chain-signatures/node/src/protocol/signature.rs |
Routes signature-task posit messages via non-blocking lossy send to avoid stalling the spawner. |
chain-signatures/node/src/protocol/message/sub.rs |
Extends Subscriber to store capacity and adds try_send_lossy() plus capacity-aware constructors. |
chain-signatures/node/src/protocol/message/mod.rs |
Uses lossy non-blocking delivery in MessageInbox and increases posit-init capacities; adds a backpressure regression test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -139,24 +143,21 @@ impl MessageInbox { | |||
| .triple | |||
| .entry(message.id) | |||
| .or_default() | |||
| .send(message) | |||
| .await; | |||
| .try_send_lossy(message, "triple"); | |||
| } | |||
| Message::Presignature(message) => { | |||
| let _ = self | |||
| .presignature | |||
| .entry(message.id) | |||
| .or_default() | |||
| .send(message) | |||
| .await; | |||
| .try_send_lossy(message, "presignature"); | |||
| } | |||
| Message::Signature(message) => { | |||
| let _ = self | |||
| .signature | |||
| .entry((message.id, message.presignature_id)) | |||
| .or_default() | |||
| .send(message) | |||
| .await; | |||
| .try_send_lossy(message, "signature"); | |||
| } | |||
There was a problem hiding this comment.
MessageInbox::send() now uses try_send_lossy() for all message variants (Generating/Resharing/Ready/Triple/Presignature/Signature as well as Posits). This changes the semantics from backpressure to potential message loss whenever any subscriber channel is full. If only posit fanout should be lossy/non-blocking, consider keeping awaited send() (or a different strategy) for non-posit messages so protocol-critical messages aren’t dropped under load.
| /// This should be enough to hold a few messages in the inbox. | ||
| pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; |
There was a problem hiding this comment.
The doc comment above these constants says the channel size is "enough to hold a few messages in the inbox", but MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE is now 1 << 24. This comment is now misleading; consider updating/adding per-constant documentation explaining why posit channels are so large (and what the units are: messages, not bytes).
| /// This should be enough to hold a few messages in the inbox. | |
| pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; | |
| /// Maximum size (in messages) of a regular subscription inbox; this only needs to hold | |
| /// a few protocol messages without immediately backpressuring senders. | |
| pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; | |
| /// Maximum size (in messages) of a posit subscription inbox. | |
| /// | |
| /// Posit channels can accumulate a large number of small control-plane updates, so they | |
| /// use a much larger capacity (2^24 messages) to reduce the risk of backpressure or | |
| /// deadlock under high concurrency. Note: this value is a count of messages, not bytes. |
| let (sub, rx) = match sub { | ||
| Self::Subscribed(_) | Self::Unknown => Self::subscribed(), | ||
| Self::Unsubscribed(tx, rx) => (Self::Subscribed(tx), rx), | ||
| Self::Subscribed(_, _) | Self::Unknown => Self::subscribed(), |
There was a problem hiding this comment.
Subscriber::subscribe() does not preserve the previously configured capacity when the subscriber is already in Subscribed(..) state: it falls back to Self::subscribed() (default capacity). This can silently shrink channels that were intentionally created with a larger capacity (e.g. the posit init channels) if a second subscription overrides the first. Consider matching Self::Subscribed(_, capacity) separately and calling subscribed_with_capacity(capacity) so the capacity remains stable across resubscriptions.
| Self::Subscribed(_, _) | Self::Unknown => Self::subscribed(), | |
| Self::Subscribed(_, capacity) => Self::subscribed_with_capacity(capacity), | |
| Self::Unknown => Self::subscribed(), |
| Err(mpsc::error::TrySendError::Full(_)) => { | ||
| tracing::warn!( | ||
| subscriber = name, | ||
| capacity = self.capacity(), | ||
| "dropping message because subscriber channel is full" | ||
| ); | ||
| Ok(()) |
There was a problem hiding this comment.
try_send_lossy() logs a warn! for every dropped message when the channel is full. Under sustained overload this can easily become a log flood (and the logging itself can become a bottleneck). Consider rate-limiting/sampling these warnings and/or emitting a counter/metric for drops, while keeping occasional structured context (subscriber name, capacity).
jakmeier
left a comment
There was a problem hiding this comment.
Hm, I think this could be seen as a temporary solution, mostly to check if this is the actual problem. Probably we have some cyclic dependencies between channels that completely lock up the system, which should be largely resolved with your change.
A better solution would be to identify cyclic dependencies and decouple them. Then, the worst thing that can happen with a full buffer is that things become really slow. But it shouldn't lock up the system.
In general, adding really large buffers to avoid backpressure problems just moves the problem to a different place. If more messages arrive at a channel than can be handled by the consumer, the number of messages in the channel will keep increasing until something stops it. In this case, OOM or hitting the limit and messages being dropped.
|
|
||
| /// This should be enough to hold a few messages in the inbox. | ||
| pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; | ||
| pub const MAX_MESSAGE_POSIT_SUB_CHANNEL_SIZE: usize = 1 << 24; |
There was a problem hiding this comment.
So that would be ~16 M entries for every channel. I guess that is still better than unbound. But we will run into memory issues if these grow large and yes we still don't have a strong guarantee against the blocking.
To keep an eye on how much memory we need, can you add metrics for each channel capacity?
An estimation of buffered channel messages should be possible to read like this:
impl Subscriber {
pub fn estimated_queue_len(&self) -> usize {
match self {
Self::Subscribed(tx) | Self::Unsubscribed(tx, _) => tx.max_capacity() - tx.capacity(),
Self::Unknown => 0,
}
}
}Then we can have a gague vector tracking how many messages are queue up per channel.
That way, if we run into memory problems, we will see which channel is to blame. And even if it doesn't run into a full OOM crash, we can observe channel queue lengths and find out which tasks exactly are being stalled.
There was a problem hiding this comment.
yep, I'll add in metrics
|
one of the thing this PR misses is why we start cascading into these full channels in the first place. I was talking with @volovyks the other day about this, but it seems like since sync now only utilizes We could add back the |
|
It seems like it is time to add a limit on the number of concurrent requests we can process. Others should be added to the backlog and processed later. This PR should fix /sync (increased timeouts). We shouldn't generate anything when the node is out of sync; that is the point. |
|
added metrics now for channels. A channel name (i.e. triple_posit, triple) and a channel id (i.e. "singleton" or triple id) is used to specify the specific channel. The metrics for the channels will be removed for non singleton ones once they complete (that is not to say that they stop appearing in grafana but that the entries in our node will no longer report them) |
volovyks
left a comment
There was a problem hiding this comment.
Can we move the metrics to a different PR? It would be great to see what the situation is now and how lossy fix or any other change affects it.
Also, I want to confirm we are not breaking the cardinality rule in metrics.
jakmeier
left a comment
There was a problem hiding this comment.
LGTM
I would also prefer merging just the metrics in a first PR, as @volovyks asked. But if untangling it is too much work, personally I am okay with merging it together.
One other note: You report queue lengths when sending but not when removing an element. Metrics might look a bit strange, staying at a high value even when the queue becomes empty, only updating once the next message is sent. I don't have a solution. But because of this quirk I think in dashboards we should label it as max queue size to avoid confusion.
This should fix the issue with protocols getting stalled due to backpressure from queues.
This bumps the default size for posits to 2^24. This is a lot so if we ever get to it, we would have other problems at that point.
MessageInbox::{send, publish}is no longer blocking so this alleviates the issue where sending one message could block another from being sent. This internally usestry_send_lossywhich logs on full that we're dropping the message