diff --git a/client/internal/engine.go b/client/internal/engine.go index 8d7e02bd552..700dcba3df4 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1625,7 +1625,7 @@ func (e *Engine) receiveSignalEvents() { return err } - go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) + conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) case sProto.Body_MODE: case sProto.Body_GO_IDLE: e.connMgr.DeactivatePeer(conn) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 29bf5aaaa74..400e810800b 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -21,6 +21,15 @@ import ( "github.com/netbirdio/netbird/route" ) +// agentLoopProtectionWindow is the maximum age of a freshly created ICE +// agent during which OnNewOffer will refuse to replace it with a different +// session ID. Must be short enough not to stall crash recovery (the ICE +// FailedTimeout is 6s) and long enough to cover the millisecond-range race +// between two guards stomping on each other when both peers are behind the +// same NAT. 3s is well below the guard's 3s initial tick but still longer +// than typical STUN round-trips. +const agentLoopProtectionWindow = 3 * time.Second + type ICEConnInfo struct { RemoteConn net.Conn RosenpassPubKey []byte @@ -46,6 +55,7 @@ type WorkerICE struct { agent *icemaker.ThreadSafeAgent agentDialerCancel context.CancelFunc agentConnecting bool // while it is true, drop all incoming offers + agentStartTime time.Time // when the current agent was created; used to bound the loop-prevention window lastSuccess time.Time // with this avoid the too frequent ICE agent recreation // remoteSessionID represents the peer's session identifier from the latest remote offer. remoteSessionID ICESessionID @@ -95,36 +105,77 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn * return w, nil } +// handleExistingAgent checks whether an incoming offer should be skipped +// (because the current agent is still valid) or accepted (tearing down the +// existing agent first). It returns true when the offer must be skipped. +// The caller must hold w.muxAgent. +func (w *WorkerICE) handleExistingAgent(remoteOfferAnswer *OfferAnswer) (skip bool) { + if w.agent == nil && !w.agentConnecting { + return false + } + + // backward compatibility with old clients that do not send session ID + if remoteOfferAnswer.SessionID == nil { + w.log.Debugf("agent already exists, skipping the offer") + return true + } + + if w.remoteSessionID == *remoteOfferAnswer.SessionID { + w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString()) + return true + } + + // Reconnection-loop protection. + // + // When two peers are behind the same NAT (same public IP) both of + // their guards can fire almost simultaneously, each sending an offer + // with a new session ID that tears down the other side's freshly + // created ICE agent before it has a chance to finish. We therefore + // ignore a replacement offer if the current agent was created only + // moments ago AND is still in the connecting phase. + // + // The window is intentionally small (agentLoopProtectionWindow): + // * the guard's initial tick is 3s, so two guards can only stomp + // on each other during the first few hundred ms after recreate; + // * once the window has elapsed we MUST accept offers with a new + // session ID, otherwise crash-recovery of the remote peer would + // be stalled for the full ICE failed-timeout (6s) instead of + // happening immediately. See pappz's review comment on PR #5805. + agentAge := time.Since(w.agentStartTime) + if w.agentConnecting && agentAge < agentLoopProtectionWindow { + w.log.Infof("ICE agent started %s ago, skipping new offer with different session to break reconnection loop: %s", + agentAge, remoteOfferAnswer.SessionIDString()) + return true + } + + w.log.Debugf("agent already exists, recreate the connection") + w.remoteSessionChanged = true + w.agentDialerCancel() + if w.agent != nil { + if err := w.agent.Close(); err != nil { + w.log.Warnf("failed to close ICE agent: %s", err) + } + } + + sessionID, err := NewICESessionID() + if err != nil { + w.log.Errorf("failed to create new session ID: %s", err) + w.sessionID = "" + } else { + w.sessionID = sessionID + } + w.agent = nil + + return false +} + func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString()) w.muxAgent.Lock() defer w.muxAgent.Unlock() - if w.agent != nil || w.agentConnecting { - // backward compatibility with old clients that do not send session ID - if remoteOfferAnswer.SessionID == nil { - w.log.Debugf("agent already exists, skipping the offer") - return - } - if w.remoteSessionID == *remoteOfferAnswer.SessionID { - w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString()) - return - } - w.log.Debugf("agent already exists, recreate the connection") - w.remoteSessionChanged = true - w.agentDialerCancel() - if w.agent != nil { - if err := w.agent.Close(); err != nil { - w.log.Warnf("failed to close ICE agent: %s", err) - } - } - - sessionID, err := NewICESessionID() - if err != nil { - w.log.Errorf("failed to create new session ID: %s", err) - } - w.sessionID = sessionID - w.agent = nil + if w.handleExistingAgent(remoteOfferAnswer) { + return } var preferredCandidateTypes []ice.CandidateType @@ -146,6 +197,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.agent = agent w.agentDialerCancel = dialerCancel w.agentConnecting = true + w.agentStartTime = time.Now() if remoteOfferAnswer.SessionID != nil { w.remoteSessionID = *remoteOfferAnswer.SessionID } else { @@ -160,12 +212,15 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA w.muxAgent.Lock() defer w.muxAgent.Unlock() w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) - if w.agent == nil { - w.log.Warnf("ICE Agent is not initialized yet") + + // Filter routed candidates before buffering or adding, so both paths + // behave identically regardless of arrival timing. + if candidateViaRoutes(candidate, haRoutes) { return } - if candidateViaRoutes(candidate, haRoutes) { + if w.agent == nil { + w.log.Warnf("ICE Agent is not initialized yet") return } @@ -175,8 +230,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA } if shouldAddExtraCandidate(candidate) { - // sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port) - // this is useful when network has an existing port forwarding rule for the wireguard port and this peer extraSrflx, err := extraSrflxCandidate(candidate) if err != nil { w.log.Errorf("failed creating extra server reflexive candidate %s", err) @@ -331,8 +384,10 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C sessionID, err := NewICESessionID() if err != nil { w.log.Errorf("failed to create new session ID: %s", err) + w.sessionID = "" + } else { + w.sessionID = sessionID } - w.sessionID = sessionID w.agent = nil w.agentConnecting = false w.remoteSessionID = ""