Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1625,7 +1625,7 @@ func (e *Engine) receiveSignalEvents() {
return err
}

go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
case sProto.Body_MODE:
case sProto.Body_GO_IDLE:
e.connMgr.DeactivatePeer(conn)
Expand Down
117 changes: 86 additions & 31 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import (
"github.com/netbirdio/netbird/route"
)

// agentLoopProtectionWindow is the maximum age of a freshly created ICE
// agent during which OnNewOffer will refuse to replace it with a different
// session ID. Must be short enough not to stall crash recovery (the ICE
// FailedTimeout is 6s) and long enough to cover the millisecond-range race
// between two guards stomping on each other when both peers are behind the
// same NAT. 3s is well below the guard's 3s initial tick but still longer
// than typical STUN round-trips.
const agentLoopProtectionWindow = 3 * time.Second

type ICEConnInfo struct {
RemoteConn net.Conn
RosenpassPubKey []byte
Expand All @@ -46,6 +55,7 @@ type WorkerICE struct {
agent *icemaker.ThreadSafeAgent
agentDialerCancel context.CancelFunc
agentConnecting bool // while it is true, drop all incoming offers
agentStartTime time.Time // when the current agent was created; used to bound the loop-prevention window
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
// remoteSessionID represents the peer's session identifier from the latest remote offer.
remoteSessionID ICESessionID
Expand Down Expand Up @@ -95,36 +105,77 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *
return w, nil
}

// handleExistingAgent checks whether an incoming offer should be skipped
// (because the current agent is still valid) or accepted (tearing down the
// existing agent first). It returns true when the offer must be skipped.
// The caller must hold w.muxAgent.
func (w *WorkerICE) handleExistingAgent(remoteOfferAnswer *OfferAnswer) (skip bool) {
if w.agent == nil && !w.agentConnecting {
return false
}

// backward compatibility with old clients that do not send session ID
if remoteOfferAnswer.SessionID == nil {
w.log.Debugf("agent already exists, skipping the offer")
return true
}

if w.remoteSessionID == *remoteOfferAnswer.SessionID {
w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString())
return true
}

// Reconnection-loop protection.
//
// When two peers are behind the same NAT (same public IP) both of
// their guards can fire almost simultaneously, each sending an offer
// with a new session ID that tears down the other side's freshly
// created ICE agent before it has a chance to finish. We therefore
// ignore a replacement offer if the current agent was created only
// moments ago AND is still in the connecting phase.
//
// The window is intentionally small (agentLoopProtectionWindow):
// * the guard's initial tick is 3s, so two guards can only stomp
// on each other during the first few hundred ms after recreate;
// * once the window has elapsed we MUST accept offers with a new
// session ID, otherwise crash-recovery of the remote peer would
// be stalled for the full ICE failed-timeout (6s) instead of
// happening immediately. See pappz's review comment on PR #5805.
agentAge := time.Since(w.agentStartTime)
if w.agentConnecting && agentAge < agentLoopProtectionWindow {
w.log.Infof("ICE agent started %s ago, skipping new offer with different session to break reconnection loop: %s",
agentAge, remoteOfferAnswer.SessionIDString())
return true
}

w.log.Debugf("agent already exists, recreate the connection")
w.remoteSessionChanged = true
w.agentDialerCancel()
if w.agent != nil {
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
}

sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
w.sessionID = ""
} else {
w.sessionID = sessionID
}
w.agent = nil

return false
}

func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString())
w.muxAgent.Lock()
defer w.muxAgent.Unlock()

if w.agent != nil || w.agentConnecting {
// backward compatibility with old clients that do not send session ID
if remoteOfferAnswer.SessionID == nil {
w.log.Debugf("agent already exists, skipping the offer")
return
}
if w.remoteSessionID == *remoteOfferAnswer.SessionID {
w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString())
return
}
w.log.Debugf("agent already exists, recreate the connection")
w.remoteSessionChanged = true
w.agentDialerCancel()
if w.agent != nil {
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
}

sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
}
w.sessionID = sessionID
w.agent = nil
if w.handleExistingAgent(remoteOfferAnswer) {
return
}

var preferredCandidateTypes []ice.CandidateType
Expand All @@ -146,6 +197,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.agent = agent
w.agentDialerCancel = dialerCancel
w.agentConnecting = true
w.agentStartTime = time.Now()
if remoteOfferAnswer.SessionID != nil {
w.remoteSessionID = *remoteOfferAnswer.SessionID
} else {
Expand All @@ -160,12 +212,15 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
w.muxAgent.Lock()
defer w.muxAgent.Unlock()
w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String())
if w.agent == nil {
w.log.Warnf("ICE Agent is not initialized yet")

// Filter routed candidates before buffering or adding, so both paths
// behave identically regardless of arrival timing.
if candidateViaRoutes(candidate, haRoutes) {
return
}

if candidateViaRoutes(candidate, haRoutes) {
if w.agent == nil {
w.log.Warnf("ICE Agent is not initialized yet")
return
}

Expand All @@ -175,8 +230,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
}

if shouldAddExtraCandidate(candidate) {
// sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port)
// this is useful when network has an existing port forwarding rule for the wireguard port and this peer
extraSrflx, err := extraSrflxCandidate(candidate)
if err != nil {
w.log.Errorf("failed creating extra server reflexive candidate %s", err)
Expand Down Expand Up @@ -331,8 +384,10 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
w.sessionID = ""
} else {
w.sessionID = sessionID
}
w.sessionID = sessionID
w.agent = nil
w.agentConnecting = false
w.remoteSessionID = ""
Expand Down
Loading