feat(tcp): auto-disconnect on backlog overflow#66
Conversation
| if let Some(max) = self.max_backlog { | ||
| let len = stream.backlog_len(); | ||
| if len > max { | ||
| warn!(?token, backlog = len, max, "backlog exceeded limit, disconnecting"); | ||
| self.disconnect_at_index(i); | ||
| } |
There was a problem hiding this comment.
🔴 Backlog-overflow disconnect preserves backlog, causing infinite reconnect loop for outbound connections
When max_backlog is set and a connection's backlog exceeds the limit, disconnect_at_index(i) is called. For outbound connections, this calls close() (stream.rs:593-596 — only deregisters/shuts down the socket, does NOT clear send_backlog) and then pushes the connection with its full backlog intact to to_be_reconnected (connector.rs:110). On reconnect, reset_with_new_stream (stream.rs:175-199) also does not clear the backlog — it arms WRITABLE to drain the old messages. So the backlog that triggered the disconnect is still present (and still > max) after reconnection, causing flush_backlogs to immediately disconnect again on the next poll_with cycle. This creates an infinite disconnect→reconnect loop for outbound connections, defeating the stated goal of giving "the connection a fresh start."
Reconnect loop trace
flush_backlogsfindsbacklog_len > max→ callsdisconnect_at_indexdisconnect_at_indexcloses socket, moves connection (with full backlog) toto_be_reconnectedmaybe_reconnectfires →try_reconnect→reset_with_new_streampreserves backlog- Next
poll_with→flush_backlogsfinds backlog still > max → disconnect again - Repeat forever (bounded only by reconnect interval)
Prompt for agents
In crates/flux-network/src/tcp/connector.rs, the flush_backlogs method at lines 196-201 disconnects a connection when its backlog exceeds max_backlog, but the backlog is never cleared, so reconnected outbound connections will be immediately disconnected again.
The fix needs to clear the send_backlog before (or at the time of) disconnecting. One approach:
1. In crates/flux-network/src/tcp/stream.rs, add a public method like `pub(crate) fn clear_backlog(&mut self) { self.send_backlog.clear(); self.send_cursor = 0; }` next to the existing `backlog_len()` method around line 374.
2. In crates/flux-network/src/tcp/connector.rs flush_backlogs (around line 198-201), before calling self.disconnect_at_index(i), get a mutable reference to the stream and call clear_backlog(). Since disconnect_at_index uses swap_remove, you need to clear the backlog before that call. You could restructure as:
- Extract the stream from variant
- Call stream.clear_backlog()
- Then call self.disconnect_at_index(i)
Alternatively, modify disconnect_at_index to accept a flag indicating whether to clear the backlog, or create a separate method for backlog-overflow disconnects that clears the backlog before moving to to_be_reconnected.
Was this helpful? React with 👍 or 👎 to provide feedback.
50e22d7 to
e789604
Compare
Add TcpConnector::with_max_backlog(max, timeout) which disconnects connections whose send backlog exceeds max framed messages continuously for longer than timeout. The exceeded-since timer resets when the backlog drops back below the threshold or on reconnect (via reset_with_new_stream). The backlog is preserved across reconnects so pending messages get a chance to drain on the fresh connection. Outbound connections are automatically scheduled for reconnection; inbound connections are simply closed (remote must reconnect).
e789604 to
ada4347
Compare
| if stream.has_backlog() { | ||
| if stream.drain_backlog(self.poll.registry()) == ConnState::Disconnected { | ||
| self.disconnect_at_index(i); | ||
| continue; | ||
| } | ||
| if let Some((max, timeout)) = self.max_backlog { | ||
| let len = stream.backlog_len(); | ||
| if len > max { | ||
| // Start or continue the exceeded-since timer. | ||
| let since = *stream.backlog_exceeded_since.get_or_insert(now); | ||
| let elapsed = now.saturating_sub(since); | ||
| if elapsed >= timeout { | ||
| warn!( | ||
| ?token, | ||
| backlog = len, | ||
| max, | ||
| ?elapsed, | ||
| "backlog exceeded limit for too long, disconnecting" | ||
| ); | ||
| self.disconnect_at_index(i); | ||
| } | ||
| } else { | ||
| // Back below threshold — reset the timer. | ||
| stream.backlog_exceeded_since = None; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🔴 Stale backlog_exceeded_since timer causes premature disconnection
backlog_exceeded_since is only reset inside flush_backlogs when backlog_len() <= max (line 213), or on reconnect via reset_with_new_stream (stream.rs:191). However, drain_backlog is also called from TcpStream::poll_with (stream.rs:241), poll_with_produce (stream.rs:287), and write_or_enqueue_with (stream.rs:312) — none of which reset the timer. If one of these paths fully drains the backlog, flush_backlogs sees has_backlog() == false and skips the entire check block, leaving a stale Some(T_old) in backlog_exceeded_since. When the backlog later exceeds max again (e.g. during a transient network stall), get_or_insert(now) at line 199 returns the old timestamp instead of the current time, making elapsed include the entire quiet period. This can immediately exceed timeout and disconnect a connection that only just crossed the threshold — defeating the grace-period the feature is designed to provide.
Prompt for agents
In crates/flux-network/src/tcp/connector.rs, the flush_backlogs method (lines 180-217) only resets backlog_exceeded_since when has_backlog() is true AND backlog_len() <= max. When the backlog is drained by a different code path (e.g. poll_with writable event at stream.rs:241), has_backlog() returns false and the stale timer persists.
Fix: After the if stream.has_backlog() block (around line 216), add an else branch that resets the timer:
if stream.has_backlog() {
// ... existing code ...
} else {
// Backlog was drained (possibly via a writable event in poll_with).
stream.backlog_exceeded_since = None;
}
Alternatively, you could reset backlog_exceeded_since inside drain_backlog itself (in stream.rs around line 410) when send_backlog becomes empty, which would centralize the reset regardless of call site.
Was this helpful? React with 👍 or 👎 to provide feedback.
Adds
TcpConnector::with_max_backlog(n)— when a connection's send backlog exceedsnframed messages after a drain attempt, it is automatically disconnected.The check runs inside
flush_backlogs()at the end of everypoll_with()/poll_with_produce()cycle.Changes:
TcpStream::backlog_len()— exposes the send backlog depthConnectionManager::max_backlog: Option<usize>— configurable thresholdTcpConnector::with_max_backlog(max)— builder methodflush_backlogs()— checks threshold after drain, disconnects if exceeded