From e1795017d3a9712d25545a82a483b5c7e2156fc6 Mon Sep 17 00:00:00 2001 From: Michael Uray <25169478+MichaelUray@users.noreply.github.com> Date: Thu, 9 Apr 2026 07:16:54 +0000 Subject: [PATCH 1/3] fix(client): prevent ICE reconnection loop when peers share the same public IP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When two peers are behind the same NAT (same public IP), ICE P2P connections could never be established because of a reconnection loop: 1. Guard on peer A sees no P2P connection and sends a new offer 2. Peer B receives the offer with a new session ID 3. Peer B cancels its active ICE agent and creates a new one 4. Before the new ICE agent can complete connectivity checks, peer B's guard sends a new offer to peer A 5. Peer A cancels its ICE agent → cycle repeats indefinitely The fix adds a short protection window (3 seconds) after agent creation during which new offers with different session IDs are skipped. This is long enough to prevent the guard-vs-guard race (guards fire every 3s) but short enough to not delay crash recovery — the ICE FailedTimeout is only 6 seconds, so at worst recovery is delayed by 3s. This replaces the earlier unconditional skip approach based on reviewer feedback (pappz): unconditional blocking during agentConnecting would delay crash-recovery by the full ICE timeout (6s+). The time-windowed approach preserves the session-ID mechanism for crash detection while eliminating the reconnection loop. Candidate buffering (previously in this PR) has been removed per reviewer feedback — the race condition it protected against is practically impossible given the timing constraints. Fixes: netbirdio/netbird#3669 Related: netbirdio/netbird#2703 Related: netbirdio/netbird#3339 --- client/internal/peer/worker_ice.go | 100 +++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 29bf5aaaa74..160c22c8407 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -21,6 +21,15 @@ import ( "github.com/netbirdio/netbird/route" ) +// agentLoopProtectionWindow is the maximum age of a freshly created ICE +// agent during which OnNewOffer will refuse to replace it with a different +// session ID. Must be short enough not to stall crash recovery (the ICE +// FailedTimeout is 6s) and long enough to cover the millisecond-range race +// between two guards stomping on each other when both peers are behind the +// same NAT. 3s is well below the guard's 3s initial tick but still longer +// than typical STUN round-trips. +const agentLoopProtectionWindow = 3 * time.Second + type ICEConnInfo struct { RemoteConn net.Conn RosenpassPubKey []byte @@ -46,6 +55,7 @@ type WorkerICE struct { agent *icemaker.ThreadSafeAgent agentDialerCancel context.CancelFunc agentConnecting bool // while it is true, drop all incoming offers + agentStartTime time.Time // when the current agent was created; used to bound the loop-prevention window lastSuccess time.Time // with this avoid the too frequent ICE agent recreation // remoteSessionID represents the peer's session identifier from the latest remote offer. remoteSessionID ICESessionID @@ -95,36 +105,75 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn * return w, nil } +// handleExistingAgent checks whether an incoming offer should be skipped +// (because the current agent is still valid) or accepted (tearing down the +// existing agent first). It returns true when the offer must be skipped. +// The caller must hold w.muxAgent. +func (w *WorkerICE) handleExistingAgent(remoteOfferAnswer *OfferAnswer) (skip bool) { + if w.agent == nil && !w.agentConnecting { + return false + } + + // backward compatibility with old clients that do not send session ID + if remoteOfferAnswer.SessionID == nil { + w.log.Debugf("agent already exists, skipping the offer") + return true + } + + if w.remoteSessionID == *remoteOfferAnswer.SessionID { + w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString()) + return true + } + + // Reconnection-loop protection. + // + // When two peers are behind the same NAT (same public IP) both of + // their guards can fire almost simultaneously, each sending an offer + // with a new session ID that tears down the other side's freshly + // created ICE agent before it has a chance to finish. We therefore + // ignore a replacement offer if the current agent was created only + // moments ago AND is still in the connecting phase. + // + // The window is intentionally small (agentLoopProtectionWindow): + // * the guard's initial tick is 3s, so two guards can only stomp + // on each other during the first few hundred ms after recreate; + // * once the window has elapsed we MUST accept offers with a new + // session ID, otherwise crash-recovery of the remote peer would + // be stalled for the full ICE failed-timeout (6s) instead of + // happening immediately. See pappz's review comment on PR #5805. + agentAge := time.Since(w.agentStartTime) + if w.agentConnecting && agentAge < agentLoopProtectionWindow { + w.log.Infof("ICE agent started %s ago, skipping new offer with different session to break reconnection loop: %s", + agentAge, remoteOfferAnswer.SessionIDString()) + return true + } + + w.log.Debugf("agent already exists, recreate the connection") + w.remoteSessionChanged = true + w.agentDialerCancel() + if w.agent != nil { + if err := w.agent.Close(); err != nil { + w.log.Warnf("failed to close ICE agent: %s", err) + } + } + + sessionID, err := NewICESessionID() + if err != nil { + w.log.Errorf("failed to create new session ID: %s", err) + } + w.sessionID = sessionID + w.agent = nil + + return false +} + func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString()) w.muxAgent.Lock() defer w.muxAgent.Unlock() - if w.agent != nil || w.agentConnecting { - // backward compatibility with old clients that do not send session ID - if remoteOfferAnswer.SessionID == nil { - w.log.Debugf("agent already exists, skipping the offer") - return - } - if w.remoteSessionID == *remoteOfferAnswer.SessionID { - w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString()) - return - } - w.log.Debugf("agent already exists, recreate the connection") - w.remoteSessionChanged = true - w.agentDialerCancel() - if w.agent != nil { - if err := w.agent.Close(); err != nil { - w.log.Warnf("failed to close ICE agent: %s", err) - } - } - - sessionID, err := NewICESessionID() - if err != nil { - w.log.Errorf("failed to create new session ID: %s", err) - } - w.sessionID = sessionID - w.agent = nil + if w.handleExistingAgent(remoteOfferAnswer) { + return } var preferredCandidateTypes []ice.CandidateType @@ -146,6 +195,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.agent = agent w.agentDialerCancel = dialerCancel w.agentConnecting = true + w.agentStartTime = time.Now() if remoteOfferAnswer.SessionID != nil { w.remoteSessionID = *remoteOfferAnswer.SessionID } else { From 3adbc13b793ec0d81f9b133f6552e9a0d8726f7d Mon Sep 17 00:00:00 2001 From: Michael Uray <25169478+MichaelUray@users.noreply.github.com> Date: Thu, 16 Apr 2026 05:01:12 +0000 Subject: [PATCH 2/3] fix(client): address CodeRabbit review on ICE candidate buffer and session ID handling - Add pendingCandidates buffer for remote candidates arriving before ICE agent initialization (bounded to 50 entries) - Route candidateViaRoutes check before buffering so routed candidates are filtered identically regardless of arrival timing - Extract addRemoteCandidate helper used by both live and buffered paths ensuring shouldAddExtraCandidate logic runs for all candidates - Clear stale session ID (set to "") when NewICESessionID() fails in handleExistingAgent and closeAgent instead of keeping previous value - Clear pendingCandidates on agent close to prevent stale data Co-Authored-By: Claude Opus 4.6 (1M context) --- client/internal/peer/worker_ice.go | 58 ++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 160c22c8407..2a0b8d3cdc7 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -30,6 +30,10 @@ import ( // than typical STUN round-trips. const agentLoopProtectionWindow = 3 * time.Second +// maxPendingCandidates is the maximum number of remote ICE candidates that +// can be buffered before the local agent is initialized. +const maxPendingCandidates = 50 + type ICEConnInfo struct { RemoteConn net.Conn RosenpassPubKey []byte @@ -73,6 +77,10 @@ type WorkerICE struct { // we record the last known state of the ICE agent to avoid duplicate on disconnected events lastKnownState ice.ConnectionState + // pendingCandidates buffers remote candidates that arrive before the ICE agent is initialized. + // They are flushed through the normal candidate pipeline once the agent is ready. + pendingCandidates []ice.Candidate + // portForwardAttempted tracks if we've already tried port forwarding this session portForwardAttempted bool } @@ -160,8 +168,10 @@ func (w *WorkerICE) handleExistingAgent(remoteOfferAnswer *OfferAnswer) (skip bo sessionID, err := NewICESessionID() if err != nil { w.log.Errorf("failed to create new session ID: %s", err) + w.sessionID = "" + } else { + w.sessionID = sessionID } - w.sessionID = sessionID w.agent = nil return false @@ -202,6 +212,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.remoteSessionID = "" } + w.flushPendingCandidates(agent) + go w.connect(dialerCtx, agent, remoteOfferAnswer) } @@ -210,16 +222,31 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA w.muxAgent.Lock() defer w.muxAgent.Unlock() w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) - if w.agent == nil { - w.log.Warnf("ICE Agent is not initialized yet") + + // Filter routed candidates before buffering or adding, so both paths + // behave identically regardless of arrival timing. + if candidateViaRoutes(candidate, haRoutes) { return } - if candidateViaRoutes(candidate, haRoutes) { + if w.agent == nil { + if len(w.pendingCandidates) >= maxPendingCandidates { + w.log.Warnf("pending candidate buffer full (%d), dropping candidate: %s", maxPendingCandidates, candidate.Type()) + return + } + w.log.Infof("ICE Agent not ready, buffering remote candidate: %s", candidate.Type()) + w.pendingCandidates = append(w.pendingCandidates, candidate) return } - if err := w.agent.AddRemoteCandidate(candidate); err != nil { + w.addRemoteCandidate(w.agent, candidate) +} + +// addRemoteCandidate adds a remote candidate to the agent and, when applicable, +// also generates and adds the synthetic extra server-reflexive candidate. +// This is the single code path for both live and buffered candidates. +func (w *WorkerICE) addRemoteCandidate(agent *icemaker.ThreadSafeAgent, candidate ice.Candidate) { + if err := agent.AddRemoteCandidate(candidate); err != nil { w.log.Errorf("error while handling remote candidate") return } @@ -233,13 +260,27 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA return } - if err := w.agent.AddRemoteCandidate(extraSrflx); err != nil { + if err := agent.AddRemoteCandidate(extraSrflx); err != nil { w.log.Errorf("error while handling remote candidate") return } } } +// flushPendingCandidates drains the buffered candidates through the normal +// candidate pipeline once the ICE agent is ready. The caller must hold w.muxAgent. +func (w *WorkerICE) flushPendingCandidates(agent *icemaker.ThreadSafeAgent) { + if len(w.pendingCandidates) == 0 { + return + } + + w.log.Infof("flushing %d buffered remote candidates", len(w.pendingCandidates)) + for _, c := range w.pendingCandidates { + w.addRemoteCandidate(agent, c) + } + w.pendingCandidates = nil +} + func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { return w.localUfrag, w.localPwd } @@ -381,11 +422,14 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C sessionID, err := NewICESessionID() if err != nil { w.log.Errorf("failed to create new session ID: %s", err) + w.sessionID = "" + } else { + w.sessionID = sessionID } - w.sessionID = sessionID w.agent = nil w.agentConnecting = false w.remoteSessionID = "" + w.pendingCandidates = nil } return sessionChanged } From 90b59d944572ee26f597994912b988beb2760fbe Mon Sep 17 00:00:00 2001 From: Michael Uray <25169478+MichaelUray@users.noreply.github.com> Date: Thu, 16 Apr 2026 06:15:35 +0000 Subject: [PATCH 3/3] fix(client): address maintainer review - drop candidate buffer, call OnRemoteCandidate synchronously Per pappz's review on PR #5805: 1. Remove the candidate buffer entirely. The race between OnRemoteCandidate and OnNewOffer requires the remote peer to complete STUN gathering and signal a candidate back in microseconds (channel read time), which is far faster than the tens-to-hundreds of milliseconds of network I/O needed for candidate discovery. In practice this almost never happens. 2. Remove the 'go' prefix from conn.OnRemoteCandidate() in engine.go so candidates are processed synchronously in the signaling receive loop. This is the correct fix for the theoretical race: candidates now arrive in the same goroutine as offers, eliminating the ordering issue without needing a buffer. The agentLoopProtectionWindow guard (3s) is kept as-is since it correctly handles both the reconnection loop scenario (same-NAT peers) and crash recovery (new session ID accepted after the window expires). --- client/internal/engine.go | 2 +- client/internal/peer/worker_ice.go | 45 ++---------------------------- 2 files changed, 4 insertions(+), 43 deletions(-) diff --git a/client/internal/engine.go b/client/internal/engine.go index 8d7e02bd552..700dcba3df4 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1625,7 +1625,7 @@ func (e *Engine) receiveSignalEvents() { return err } - go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) + conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) case sProto.Body_MODE: case sProto.Body_GO_IDLE: e.connMgr.DeactivatePeer(conn) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 2a0b8d3cdc7..400e810800b 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -30,10 +30,6 @@ import ( // than typical STUN round-trips. const agentLoopProtectionWindow = 3 * time.Second -// maxPendingCandidates is the maximum number of remote ICE candidates that -// can be buffered before the local agent is initialized. -const maxPendingCandidates = 50 - type ICEConnInfo struct { RemoteConn net.Conn RosenpassPubKey []byte @@ -77,10 +73,6 @@ type WorkerICE struct { // we record the last known state of the ICE agent to avoid duplicate on disconnected events lastKnownState ice.ConnectionState - // pendingCandidates buffers remote candidates that arrive before the ICE agent is initialized. - // They are flushed through the normal candidate pipeline once the agent is ready. - pendingCandidates []ice.Candidate - // portForwardAttempted tracks if we've already tried port forwarding this session portForwardAttempted bool } @@ -212,8 +204,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.remoteSessionID = "" } - w.flushPendingCandidates(agent) - go w.connect(dialerCtx, agent, remoteOfferAnswer) } @@ -230,57 +220,29 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA } if w.agent == nil { - if len(w.pendingCandidates) >= maxPendingCandidates { - w.log.Warnf("pending candidate buffer full (%d), dropping candidate: %s", maxPendingCandidates, candidate.Type()) - return - } - w.log.Infof("ICE Agent not ready, buffering remote candidate: %s", candidate.Type()) - w.pendingCandidates = append(w.pendingCandidates, candidate) + w.log.Warnf("ICE Agent is not initialized yet") return } - w.addRemoteCandidate(w.agent, candidate) -} - -// addRemoteCandidate adds a remote candidate to the agent and, when applicable, -// also generates and adds the synthetic extra server-reflexive candidate. -// This is the single code path for both live and buffered candidates. -func (w *WorkerICE) addRemoteCandidate(agent *icemaker.ThreadSafeAgent, candidate ice.Candidate) { - if err := agent.AddRemoteCandidate(candidate); err != nil { + if err := w.agent.AddRemoteCandidate(candidate); err != nil { w.log.Errorf("error while handling remote candidate") return } if shouldAddExtraCandidate(candidate) { - // sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port) - // this is useful when network has an existing port forwarding rule for the wireguard port and this peer extraSrflx, err := extraSrflxCandidate(candidate) if err != nil { w.log.Errorf("failed creating extra server reflexive candidate %s", err) return } - if err := agent.AddRemoteCandidate(extraSrflx); err != nil { + if err := w.agent.AddRemoteCandidate(extraSrflx); err != nil { w.log.Errorf("error while handling remote candidate") return } } } -// flushPendingCandidates drains the buffered candidates through the normal -// candidate pipeline once the ICE agent is ready. The caller must hold w.muxAgent. -func (w *WorkerICE) flushPendingCandidates(agent *icemaker.ThreadSafeAgent) { - if len(w.pendingCandidates) == 0 { - return - } - - w.log.Infof("flushing %d buffered remote candidates", len(w.pendingCandidates)) - for _, c := range w.pendingCandidates { - w.addRemoteCandidate(agent, c) - } - w.pendingCandidates = nil -} - func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { return w.localUfrag, w.localPwd } @@ -429,7 +391,6 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C w.agent = nil w.agentConnecting = false w.remoteSessionID = "" - w.pendingCandidates = nil } return sessionChanged }