Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,45 @@ func (a *Agent) addRemotePassiveTCPCandidate(remoteCandidate Candidate) {
}
}

// replaceRedundantPeerReflexiveCandidates removes any peer-reflexive candidates
// from the given set that have the same transport address as cand.
// It also updates any candidate pairs and local candidate caches that
// referenced the removed peer-reflexive candidates to reference cand instead.
// It is implemented according to RFC 8838 §11.4.
// It returns the updated set of candidates.
func (a *Agent) replaceRedundantPeerReflexiveCandidates(set []Candidate, cand Candidate) []Candidate {
if cand.Type() != CandidateTypePeerReflexive {
var replacedPrflx []Candidate

for i := 0; i < len(set); i++ {
existing := set[i]
if existing.Type() == CandidateTypePeerReflexive && existing.transportAddressEqual(cand) {
replacedPrflx = append(replacedPrflx, existing)
set = append(set[:i], set[i+1:]...)
i--
}
}

for _, oldRemote := range replacedPrflx {
for _, pair := range a.checklist {
if pair.Remote == oldRemote {
oldPriority := pair.priority()
pair.Remote = cand
pair.setPriorityOverride(oldPriority)
}
}

for _, locals := range a.localCandidates {
for _, local := range locals {
local.replaceRemoteCandidateCacheValues(oldRemote, cand)
}
}
}
}

return set
}

// addRemoteCandidate assumes you are holding the lock (must be execute using a.run).
func (a *Agent) addRemoteCandidate(cand Candidate) { //nolint:cyclop
set := a.remoteCandidates[cand.NetworkType()]
Expand All @@ -1024,6 +1063,11 @@ func (a *Agent) addRemoteCandidate(cand Candidate) { //nolint:cyclop
}
}

// RFC 8838 §11.4: If a trickled candidate is redundant with an existing
// peer-reflexive candidate (same transport address), prefer the signaled
// candidate and replace the peer-reflexive one.
set = a.replaceRedundantPeerReflexiveCandidates(set, cand)

acceptRemotePassiveTCPCandidate := false
// Assert that TCP4 or TCP6 is a enabled NetworkType locally
if !a.disableActiveTCP && cand.TCPType() == TCPTypePassive {
Expand All @@ -1044,7 +1088,9 @@ func (a *Agent) addRemoteCandidate(cand Candidate) { //nolint:cyclop
if cand.TCPType() != TCPTypePassive {
if localCandidates, ok := a.localCandidates[cand.NetworkType()]; ok {
for _, localCandidate := range localCandidates {
a.addPair(localCandidate, cand)
if a.findPair(localCandidate, cand) == nil {
a.addPair(localCandidate, cand)
}
}
}
}
Expand Down Expand Up @@ -1487,6 +1533,14 @@ func (a *Agent) handleInboundRequest(
RelPort: 0,
}

// A peer-reflexive candidate SHOULD take its priority from the PRIORITY
// attribute in the Binding Request that discovered it.
var prio PriorityAttr
err = prio.GetFrom(msg)
if err == nil {
prflxCandidateConfig.Priority = uint32(prio)
}

prflxCandidate, err := NewCandidatePeerReflexive(&prflxCandidateConfig)
if err != nil {
a.log.Errorf("Failed to create new remote prflx candidate (%s)", err)
Expand Down
24 changes: 13 additions & 11 deletions agent_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func (a *Agent) onConnectionStateChange(s ConnectionState) {

type handlerNotifier struct {
sync.Mutex
running bool
notifiers sync.WaitGroup
runningConnectionStates bool
runningCandidates bool
runningCandidatePairs bool
notifiers sync.WaitGroup

connectionStates []ConnectionState
connectionStateFunc func(ConnectionState)
Expand Down Expand Up @@ -99,7 +101,7 @@ func (h *handlerNotifier) EnqueueConnectionState(state ConnectionState) {
for {
h.Lock()
if len(h.connectionStates) == 0 {
h.running = false
h.runningConnectionStates = false
h.Unlock()

return
Expand All @@ -112,8 +114,8 @@ func (h *handlerNotifier) EnqueueConnectionState(state ConnectionState) {
}

h.connectionStates = append(h.connectionStates, state)
if !h.running {
h.running = true
if !h.runningConnectionStates {
h.runningConnectionStates = true
h.notifiers.Add(1)
go notify()
}
Expand All @@ -134,7 +136,7 @@ func (h *handlerNotifier) EnqueueCandidate(cand Candidate) {
for {
h.Lock()
if len(h.candidates) == 0 {
h.running = false
h.runningCandidates = false
h.Unlock()

return
Expand All @@ -147,8 +149,8 @@ func (h *handlerNotifier) EnqueueCandidate(cand Candidate) {
}

h.candidates = append(h.candidates, cand)
if !h.running {
h.running = true
if !h.runningCandidates {
h.runningCandidates = true
h.notifiers.Add(1)
go notify()
}
Expand All @@ -169,7 +171,7 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(pair *CandidatePair) {
for {
h.Lock()
if len(h.selectedCandidatePairs) == 0 {
h.running = false
h.runningCandidatePairs = false
h.Unlock()

return
Expand All @@ -182,8 +184,8 @@ func (h *handlerNotifier) EnqueueSelectedCandidatePair(pair *CandidatePair) {
}

h.selectedCandidatePairs = append(h.selectedCandidatePairs, pair)
if !h.running {
h.running = true
if !h.runningCandidatePairs {
h.runningCandidatePairs = true
h.notifiers.Add(1)
go notify()
}
Expand Down
4 changes: 3 additions & 1 deletion agent_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ func TestHandlerNotifier_Close_AlreadyClosed(t *testing.T) {
assert.True(t, isClosed(notifier.done), "expected h.done to remain closed after second Close")

// sanity: no enqueues should start after close.
require.False(t, notifier.running)
require.False(t, notifier.runningConnectionStates)
require.False(t, notifier.runningCandidates)
require.False(t, notifier.runningCandidatePairs)
require.Zero(t, len(notifier.connectionStates))
require.Zero(t, len(notifier.candidates))
require.Zero(t, len(notifier.selectedCandidatePairs))
Expand Down
Loading
Loading