From d4c6d70f0cd4a23fb6f8aecbf902c6c616113700 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 17:17:13 -0500 Subject: [PATCH 01/18] Allow port mapping for all candidates --- agent.go | 9 +++++++ agent_config.go | 3 +++ agent_test.go | 36 +++++++++++++++++++++++++ candidate.go | 4 +++ candidate_base.go | 16 ++++++++++- candidate_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 1 deletion(-) diff --git a/agent.go b/agent.go index 391fde80..9f13a75f 100644 --- a/agent.go +++ b/agent.go @@ -165,6 +165,9 @@ type Agent struct { automaticRenomination bool renominationInterval time.Duration lastRenominationTime time.Time + + // Port mapping support for container + mapPort func(candidate Candidate) int } // NewAgent creates a new Agent. @@ -275,6 +278,8 @@ func newAgentWithConfig(config *AgentConfig, opts ...AgentOption) (*Agent, error automaticRenomination: false, renominationInterval: 3 * time.Second, // Default matching libwebrtc + + mapPort: config.MapPortHanlder, } agent.connectionStateNotifier = &handlerNotifier{ @@ -880,6 +885,10 @@ func (a *Agent) addCandidate(ctx context.Context, cand Candidate, candidateConn return } } + // Callback for mapPort before candidate starts + if a.mapPort != nil { + cand.setMappedPort(a.mapPort(cand)) + } a.setCandidateExtensions(cand) cand.start(a, candidateConn, a.startedCh) diff --git a/agent_config.go b/agent_config.go index ae250e35..c7163223 100644 --- a/agent_config.go +++ b/agent_config.go @@ -211,6 +211,9 @@ type AgentConfig struct { // switched to that irrespective of relative priority between current selected pair // and priority of the pair being switched to. EnableUseCandidateCheckPriority bool + + // MapPortHanlder is the handler used to compute mapped port for host candidate. + MapPortHanlder func(candidate Candidate) int } // initWithDefaults populates an agent and falls back to defaults if fields are unset. diff --git a/agent_test.go b/agent_test.go index c258a492..df723439 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2270,3 +2270,39 @@ func TestAutomaticRenominationRelayToDirect(t *testing.T) { shouldRenominate := agent.shouldRenominate(relayPair, hostPair) require.True(t, shouldRenominate, "Should always renominate from relay to direct connection") } + +func TestMapPortHandler(t *testing.T) { + + agent, err := NewAgent(&AgentConfig{ + MapPortHanlder: func(cand Candidate) int { + return cand.Port() + 1000 + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, agent.Close()) + }() + + dummyConn := &net.UDPConn{} + + for i := 0; i < 5; i++ { + cfg := CandidateHostConfig{ + Network: "udp", + Address: "192.168.0.2", + Port: 1000 + i, + Component: 1, + } + + cand, errCand := NewCandidateHost(&cfg) + require.NoError(t, errCand) + err = agent.addCandidate(context.Background(), cand, dummyConn) + require.NoError(t, err) + } + + actualCandidates, err := agent.GetLocalCandidates() + require.NoError(t, err) + + for _, candidate := range actualCandidates { + require.Equal(t, candidate.Port()+1000, candidate.MappedPort()) + } +} diff --git a/candidate.go b/candidate.go index 89082f98..6e7dca23 100644 --- a/candidate.go +++ b/candidate.go @@ -46,6 +46,10 @@ type Candidate interface { Address() string Port() int + // Port mapping support for containers + MappedPort() int + setMappedPort(port int) + Priority() uint32 // A transport address related to a diff --git a/candidate_base.go b/candidate_base.go index 0e38d4cc..81bf40b9 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -27,6 +27,7 @@ type candidateBase struct { component uint16 address string port int + mappedPort int relatedAddress *CandidateRelatedAddress tcpType TCPType @@ -96,6 +97,14 @@ func (c *candidateBase) Port() int { return c.port } +func (c *candidateBase) MappedPort() int { + return c.mappedPort +} + +func (c *candidateBase) setMappedPort(port int) { + c.mappedPort = port +} + // Type returns candidate type. func (c *candidateBase) Type() CandidateType { return c.candidateType @@ -430,6 +439,7 @@ func (c *candidateBase) Equal(other Candidate) bool { c.Type() == other.Type() && c.Address() == other.Address() && c.Port() == other.Port() && + c.MappedPort() == other.MappedPort() && c.TCPType() == other.TCPType() && c.RelatedAddress().Equal(other.RelatedAddress()) } @@ -521,6 +531,10 @@ func (c *candidateBase) Marshal() string { if val == " " { val = "" } + port := c.Port() + if c.mappedPort != 0 { + port = c.mappedPort + } val = fmt.Sprintf("%s %d %s %d %s %d typ %s", val, @@ -528,7 +542,7 @@ func (c *candidateBase) Marshal() string { c.NetworkType().NetworkShort(), c.Priority(), removeZoneIDFromAddress(c.Address()), - c.Port(), + port, c.Type()) if r := c.RelatedAddress(); r != nil && r.Address != "" && r.Port != 0 { diff --git a/candidate_test.go b/candidate_test.go index a89632fc..d16c7421 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -352,6 +352,15 @@ func mustCandidatePeerReflexiveWithExtensions( return cand } +func mustCandidateHostWithMappedPort(t *testing.T, + conf *CandidateHostConfig, + mapPort func(cand Candidate) int) Candidate { + cand, err := NewCandidateHost(conf) + require.NoError(t, err) + cand.setMappedPort(mapPort(cand)) + return cand +} + func TestCandidateMarshal(t *testing.T) { for idx, test := range []struct { candidate Candidate @@ -580,6 +589,66 @@ func TestCandidateMarshal(t *testing.T) { } } +func TestCandidateMarshalWithMappedPort(t *testing.T) { + for idx, test := range []struct { + candidate Candidate + marshaled string + expectError bool + }{ + { + mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ + Network: NetworkTypeTCP4.String(), + Address: "172.28.142.173", + Port: 7686, + Priority: 1671430143, + Foundation: "+/3713fhi", + }, func(cand Candidate) int { return 7687 }), + "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7687 typ host", + false, + }, + { + mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ + Network: NetworkTypeTCP4.String(), + Address: "172.28.142.173", + Port: 7686, + Priority: 1671430143, + Foundation: "+/3713fhi", + }, func(cand Candidate) int { + if cand.Port() != 7686 { + return 7687 + } + return 7688 + }), + "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7688 typ host", + false, + }, + { + mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ + Network: NetworkTypeTCP4.String(), + Address: "172.28.142.173", + Port: 7686, + Priority: 1671430143, + Foundation: "+/3713fhi", + }, func(cand Candidate) int { + return 0 + }), + "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7686 typ host", + false, + }, + } { + t.Run(strconv.Itoa(idx), func(t *testing.T) { + actualCandidate, err := UnmarshalCandidate(test.marshaled) + + require.NoError(t, err) + + if strings.HasPrefix(test.marshaled, "candidate:") { + require.Equal(t, test.marshaled[len("candidate:"):], actualCandidate.Marshal()) + } else { + require.Equal(t, test.marshaled, actualCandidate.Marshal()) + } + }) + } +} func TestCandidateWriteTo(t *testing.T) { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: net.IP{127, 0, 0, 1}, From 035192e45bc45411bbd2825c4662b85ef9643594 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 17:37:30 -0500 Subject: [PATCH 02/18] Fix lint --- agent_test.go | 1 - candidate_base.go | 2 +- candidate_test.go | 3 +++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/agent_test.go b/agent_test.go index df723439..b3ec2969 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2272,7 +2272,6 @@ func TestAutomaticRenominationRelayToDirect(t *testing.T) { } func TestMapPortHandler(t *testing.T) { - agent, err := NewAgent(&AgentConfig{ MapPortHanlder: func(cand Candidate) int { return cand.Port() + 1000 diff --git a/candidate_base.go b/candidate_base.go index 81bf40b9..38949112 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -425,7 +425,7 @@ func (c *candidateBase) Priority() uint32 { } // Equal is used to compare two candidateBases. -func (c *candidateBase) Equal(other Candidate) bool { +func (c *candidateBase) Equal(other Candidate) bool { //nolint:cyclop if c.addr() != other.addr() { if c.addr() == nil || other.addr() == nil { return false diff --git a/candidate_test.go b/candidate_test.go index d16c7421..9f7c84f4 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -355,9 +355,11 @@ func mustCandidatePeerReflexiveWithExtensions( func mustCandidateHostWithMappedPort(t *testing.T, conf *CandidateHostConfig, mapPort func(cand Candidate) int) Candidate { + t.Helper() cand, err := NewCandidateHost(conf) require.NoError(t, err) cand.setMappedPort(mapPort(cand)) + return cand } @@ -617,6 +619,7 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { if cand.Port() != 7686 { return 7687 } + return 7688 }), "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7688 typ host", From 48e39228e10ca49b29ea08197954cf5bcb8331ec Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 20:09:43 -0500 Subject: [PATCH 03/18] Change Option API --- agent.go | 2 -- agent_config.go | 3 --- agent_options.go | 14 ++++++++++++++ agent_test.go | 43 ++++++++++++++++++++++++++++++++++++++----- candidate_test.go | 20 +++++++++----------- 5 files changed, 61 insertions(+), 21 deletions(-) diff --git a/agent.go b/agent.go index 9f13a75f..939e85be 100644 --- a/agent.go +++ b/agent.go @@ -278,8 +278,6 @@ func newAgentWithConfig(config *AgentConfig, opts ...AgentOption) (*Agent, error automaticRenomination: false, renominationInterval: 3 * time.Second, // Default matching libwebrtc - - mapPort: config.MapPortHanlder, } agent.connectionStateNotifier = &handlerNotifier{ diff --git a/agent_config.go b/agent_config.go index c7163223..ae250e35 100644 --- a/agent_config.go +++ b/agent_config.go @@ -211,9 +211,6 @@ type AgentConfig struct { // switched to that irrespective of relative priority between current selected pair // and priority of the pair being switched to. EnableUseCandidateCheckPriority bool - - // MapPortHanlder is the handler used to compute mapped port for host candidate. - MapPortHanlder func(candidate Candidate) int } // initWithDefaults populates an agent and falls back to defaults if fields are unset. diff --git a/agent_options.go b/agent_options.go index 37e4ec31..1d87c599 100644 --- a/agent_options.go +++ b/agent_options.go @@ -291,3 +291,17 @@ func WithLoggerFactory(loggerFactory logging.LoggerFactory) AgentOption { return nil } } + +func WithMapPortHandler(handler func(cand Candidate) int, candTyp CandidateType) AgentOption { + return func(a *Agent) error { + a.mapPort = func(candidate Candidate) int { + if candidate.Type() == candTyp { + return handler(candidate) + } + + return 0 + } + + return nil + } +} diff --git a/agent_test.go b/agent_test.go index b3ec2969..307c2ea5 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2272,11 +2272,10 @@ func TestAutomaticRenominationRelayToDirect(t *testing.T) { } func TestMapPortHandler(t *testing.T) { - agent, err := NewAgent(&AgentConfig{ - MapPortHanlder: func(cand Candidate) int { - return cand.Port() + 1000 - }, - }) + handler := func(cand Candidate) int { + return cand.Port() + 1000 + } + agent, err := newAgentWithConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypeHost)) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) @@ -2305,3 +2304,37 @@ func TestMapPortHandler(t *testing.T) { require.Equal(t, candidate.Port()+1000, candidate.MappedPort()) } } + +func TestMapPortHandlerDifferentCandidateType(t *testing.T) { + handler := func(cand Candidate) int { + return cand.Port() + 1000 + } + agent, err := newAgentWithConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypePeerReflexive)) + require.NoError(t, err) + defer func() { + require.NoError(t, agent.Close()) + }() + + dummyConn := &net.UDPConn{} + + for i := 0; i < 5; i++ { + cfg := CandidateHostConfig{ + Network: "udp", + Address: "192.168.0.2", + Port: 1000 + i, + Component: 1, + } + + cand, errCand := NewCandidateHost(&cfg) + require.NoError(t, errCand) + err = agent.addCandidate(context.Background(), cand, dummyConn) + require.NoError(t, err) + } + + actualCandidates, err := agent.GetLocalCandidates() + require.NoError(t, err) + + for i, candidate := range actualCandidates { + require.Equal(t, 1000+i, candidate.Port()) + } +} diff --git a/candidate_test.go b/candidate_test.go index 9f7c84f4..a67a102e 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -603,9 +603,9 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { Address: "172.28.142.173", Port: 7686, Priority: 1671430143, - Foundation: "+/3713fhi", + Foundation: "3359356140", }, func(cand Candidate) int { return 7687 }), - "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7687 typ host", + "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7687 typ host", false, }, { @@ -614,7 +614,7 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { Address: "172.28.142.173", Port: 7686, Priority: 1671430143, - Foundation: "+/3713fhi", + Foundation: "3359356140", }, func(cand Candidate) int { if cand.Port() != 7686 { return 7687 @@ -622,7 +622,7 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { return 7688 }), - "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7688 typ host", + "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7688 typ host", false, }, { @@ -631,23 +631,21 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { Address: "172.28.142.173", Port: 7686, Priority: 1671430143, - Foundation: "+/3713fhi", + Foundation: "3359356140", }, func(cand Candidate) int { return 0 }), - "candidate:3359356140 1 tcp 1671430143 172.28.142.173 7686 typ host", + "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7686 typ host", false, }, } { t.Run(strconv.Itoa(idx), func(t *testing.T) { - actualCandidate, err := UnmarshalCandidate(test.marshaled) - - require.NoError(t, err) + candidateMarshalOutput := test.candidate.Marshal() if strings.HasPrefix(test.marshaled, "candidate:") { - require.Equal(t, test.marshaled[len("candidate:"):], actualCandidate.Marshal()) + require.Equal(t, test.marshaled[len("candidate:"):], candidateMarshalOutput) } else { - require.Equal(t, test.marshaled, actualCandidate.Marshal()) + require.Equal(t, test.marshaled, candidateMarshalOutput) } }) } From 5b36d87d624a09ae92e4d3f6f3652994372bfc72 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 20:23:49 -0500 Subject: [PATCH 04/18] Lint --- candidate_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/candidate_test.go b/candidate_test.go index a67a102e..905fbe18 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -352,7 +352,8 @@ func mustCandidatePeerReflexiveWithExtensions( return cand } -func mustCandidateHostWithMappedPort(t *testing.T, +func mustCandidateHostWithMappedPort( + t *testing.T, conf *CandidateHostConfig, mapPort func(cand Candidate) int) Candidate { t.Helper() From ddfeb35d2d55ab4d93cc53a6d351977df7776073 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 20:43:21 -0500 Subject: [PATCH 05/18] Lint --- candidate_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/candidate_test.go b/candidate_test.go index 905fbe18..9c70fbfc 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -355,7 +355,8 @@ func mustCandidatePeerReflexiveWithExtensions( func mustCandidateHostWithMappedPort( t *testing.T, conf *CandidateHostConfig, - mapPort func(cand Candidate) int) Candidate { + mapPort func(cand Candidate) int +) Candidate { t.Helper() cand, err := NewCandidateHost(conf) require.NoError(t, err) From d6881a51c0f808a6fe444a8ef51a85c06743fc65 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sat, 8 Nov 2025 21:43:34 -0500 Subject: [PATCH 06/18] Fix test --- candidate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/candidate_test.go b/candidate_test.go index 9c70fbfc..2cb9d64e 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -355,7 +355,7 @@ func mustCandidatePeerReflexiveWithExtensions( func mustCandidateHostWithMappedPort( t *testing.T, conf *CandidateHostConfig, - mapPort func(cand Candidate) int + mapPort func(cand Candidate) int, ) Candidate { t.Helper() cand, err := NewCandidateHost(conf) From f20516c86f328b2f6b2756c8584a4d6b77435bd7 Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sun, 9 Nov 2025 16:25:01 -0500 Subject: [PATCH 07/18] Fix lint eventually --- candidate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/candidate_test.go b/candidate_test.go index 2cb9d64e..f4c5e31d 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -652,6 +652,7 @@ func TestCandidateMarshalWithMappedPort(t *testing.T) { }) } } + func TestCandidateWriteTo(t *testing.T) { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: net.IP{127, 0, 0, 1}, From 88c64eef79d933c275dc3a0b4278db1f7470ee2f Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sun, 9 Nov 2025 16:33:33 -0500 Subject: [PATCH 08/18] Update test for compatibility --- agent_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent_test.go b/agent_test.go index b9139bf2..d0895be5 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2304,7 +2304,7 @@ func TestMapPortHandler(t *testing.T) { handler := func(cand Candidate) int { return cand.Port() + 1000 } - agent, err := newAgentWithConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypeHost)) + agent, err := newAgentFromConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypeHost)) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) @@ -2338,7 +2338,7 @@ func TestMapPortHandlerDifferentCandidateType(t *testing.T) { handler := func(cand Candidate) int { return cand.Port() + 1000 } - agent, err := newAgentWithConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypePeerReflexive)) + agent, err := newAgentFromConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypePeerReflexive)) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) From 6112bf7beade8c2e33bdc42ab480263c34f81c3d Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sun, 9 Nov 2025 16:38:27 -0500 Subject: [PATCH 09/18] Use public API --- agent_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent_test.go b/agent_test.go index d0895be5..43d7fc40 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2304,7 +2304,7 @@ func TestMapPortHandler(t *testing.T) { handler := func(cand Candidate) int { return cand.Port() + 1000 } - agent, err := newAgentFromConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypeHost)) + agent, err := NewAgentWithOptions(WithMapPortHandler(handler, CandidateTypeHost)) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) @@ -2338,7 +2338,7 @@ func TestMapPortHandlerDifferentCandidateType(t *testing.T) { handler := func(cand Candidate) int { return cand.Port() + 1000 } - agent, err := newAgentFromConfig(&AgentConfig{}, WithMapPortHandler(handler, CandidateTypePeerReflexive)) + agent, err := NewAgentWithOptions(WithMapPortHandler(handler, CandidateTypePeerReflexive)) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) From 460350f3e112965e5e1b066fcc9601fb413698bd Mon Sep 17 00:00:00 2001 From: xinze-zheng Date: Sun, 9 Nov 2025 16:43:00 -0500 Subject: [PATCH 10/18] Keep mappedPort private --- agent_test.go | 2 +- candidate.go | 2 +- candidate_base.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/agent_test.go b/agent_test.go index 43d7fc40..1005f30a 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2330,7 +2330,7 @@ func TestMapPortHandler(t *testing.T) { require.NoError(t, err) for _, candidate := range actualCandidates { - require.Equal(t, candidate.Port()+1000, candidate.MappedPort()) + require.Equal(t, candidate.Port()+1000, candidate.getMappedPort()) } } diff --git a/candidate.go b/candidate.go index 6e7dca23..c5b2a9ea 100644 --- a/candidate.go +++ b/candidate.go @@ -47,7 +47,7 @@ type Candidate interface { Port() int // Port mapping support for containers - MappedPort() int + getMappedPort() int setMappedPort(port int) Priority() uint32 diff --git a/candidate_base.go b/candidate_base.go index 38949112..80fb725a 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -97,7 +97,7 @@ func (c *candidateBase) Port() int { return c.port } -func (c *candidateBase) MappedPort() int { +func (c *candidateBase) getMappedPort() int { return c.mappedPort } @@ -439,7 +439,7 @@ func (c *candidateBase) Equal(other Candidate) bool { //nolint:cyclop c.Type() == other.Type() && c.Address() == other.Address() && c.Port() == other.Port() && - c.MappedPort() == other.MappedPort() && + c.getMappedPort() == other.getMappedPort() && c.TCPType() == other.TCPType() && c.RelatedAddress().Equal(other.RelatedAddress()) } From 1e988ed7f894d2a4029ee5c488485c2fa214365b Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Wed, 24 Dec 2025 22:25:28 +0800 Subject: [PATCH 11/18] Update CI --- agent.go | 1 + go.mod | 1 + go.sum | 4 +++- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/agent.go b/agent.go index 41de1f2d..2a061332 100644 --- a/agent.go +++ b/agent.go @@ -170,6 +170,7 @@ type Agent struct { // Port mapping support for container mapPort func(candidate Candidate) int + turnClientFactory func(*turn.ClientConfig) (turnClient, error) } diff --git a/go.mod b/go.mod index 009cfcb8..7c34ed67 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/wlynxg/anet v0.0.5 // indirect golang.org/x/crypto v0.33.0 // indirect diff --git a/go.sum b/go.sum index 64cfaace..c02e1e07 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -5,8 +6,9 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pion/dtls/v3 v3.0.9 h1:4AijfFRm8mAjd1gfdlB1wzJF3fjjR/VPIpJgkEtvYmM= github.com/pion/dtls/v3 v3.0.9/go.mod h1:abApPjgadS/ra1wvUzHLc3o2HvoxppAh+NZkyApL4Os= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= From 121cc452ca68fd31b455f83fd267447d6b2bc201 Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sat, 10 Jan 2026 23:22:38 +0800 Subject: [PATCH 12/18] MapPort now overwrite port --- agent.go | 4 --- agent_test.go | 68 ----------------------------------------- candidate.go | 4 --- candidate_base.go | 16 +--------- candidate_test.go | 73 -------------------------------------------- gather.go | 55 +++++++++++++++++++++++++-------- gather_test.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 121 insertions(+), 176 deletions(-) diff --git a/agent.go b/agent.go index 2ef0cde3..14c7cba2 100644 --- a/agent.go +++ b/agent.go @@ -1069,10 +1069,6 @@ func (a *Agent) addCandidate(ctx context.Context, cand Candidate, candidateConn return } } - // Callback for mapPort before candidate starts - if a.mapPort != nil { - cand.setMappedPort(a.mapPort(cand)) - } a.setCandidateExtensions(cand) cand.start(a, candidateConn, a.startedCh) diff --git a/agent_test.go b/agent_test.go index 6343e557..bd59871b 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2513,71 +2513,3 @@ func TestAutomaticRenominationRelayToDirect(t *testing.T) { shouldRenominate := agent.shouldRenominate(relayPair, hostPair) require.True(t, shouldRenominate, "Should always renominate from relay to direct connection") } - -func TestMapPortHandler(t *testing.T) { - handler := func(cand Candidate) int { - return cand.Port() + 1000 - } - agent, err := NewAgentWithOptions(WithMapPortHandler(handler, CandidateTypeHost)) - require.NoError(t, err) - defer func() { - require.NoError(t, agent.Close()) - }() - - dummyConn := &net.UDPConn{} - - for i := 0; i < 5; i++ { - cfg := CandidateHostConfig{ - Network: "udp", - Address: "192.168.0.2", - Port: 1000 + i, - Component: 1, - } - - cand, errCand := NewCandidateHost(&cfg) - require.NoError(t, errCand) - err = agent.addCandidate(context.Background(), cand, dummyConn) - require.NoError(t, err) - } - - actualCandidates, err := agent.GetLocalCandidates() - require.NoError(t, err) - - for _, candidate := range actualCandidates { - require.Equal(t, candidate.Port()+1000, candidate.getMappedPort()) - } -} - -func TestMapPortHandlerDifferentCandidateType(t *testing.T) { - handler := func(cand Candidate) int { - return cand.Port() + 1000 - } - agent, err := NewAgentWithOptions(WithMapPortHandler(handler, CandidateTypePeerReflexive)) - require.NoError(t, err) - defer func() { - require.NoError(t, agent.Close()) - }() - - dummyConn := &net.UDPConn{} - - for i := 0; i < 5; i++ { - cfg := CandidateHostConfig{ - Network: "udp", - Address: "192.168.0.2", - Port: 1000 + i, - Component: 1, - } - - cand, errCand := NewCandidateHost(&cfg) - require.NoError(t, errCand) - err = agent.addCandidate(context.Background(), cand, dummyConn) - require.NoError(t, err) - } - - actualCandidates, err := agent.GetLocalCandidates() - require.NoError(t, err) - - for i, candidate := range actualCandidates { - require.Equal(t, 1000+i, candidate.Port()) - } -} diff --git a/candidate.go b/candidate.go index c5b2a9ea..89082f98 100644 --- a/candidate.go +++ b/candidate.go @@ -46,10 +46,6 @@ type Candidate interface { Address() string Port() int - // Port mapping support for containers - getMappedPort() int - setMappedPort(port int) - Priority() uint32 // A transport address related to a diff --git a/candidate_base.go b/candidate_base.go index 3e5b275e..f7626586 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -27,7 +27,6 @@ type candidateBase struct { component uint16 address string port int - mappedPort int relatedAddress *CandidateRelatedAddress tcpType TCPType @@ -121,14 +120,6 @@ func (c *candidateBase) Port() int { return c.port } -func (c *candidateBase) getMappedPort() int { - return c.mappedPort -} - -func (c *candidateBase) setMappedPort(port int) { - c.mappedPort = port -} - // Type returns candidate type. func (c *candidateBase) Type() CandidateType { return c.candidateType @@ -463,7 +454,6 @@ func (c *candidateBase) Equal(other Candidate) bool { //nolint:cyclop c.Type() == other.Type() && c.Address() == other.Address() && c.Port() == other.Port() && - c.getMappedPort() == other.getMappedPort() && c.TCPType() == other.TCPType() && c.RelatedAddress().Equal(other.RelatedAddress()) } @@ -555,10 +545,6 @@ func (c *candidateBase) Marshal() string { if val == " " { val = "" } - port := c.Port() - if c.mappedPort != 0 { - port = c.mappedPort - } val = fmt.Sprintf("%s %d %s %d %s %d typ %s", val, @@ -566,7 +552,7 @@ func (c *candidateBase) Marshal() string { c.NetworkType().NetworkShort(), c.Priority(), removeZoneIDFromAddress(c.Address()), - port, + c.Port(), c.Type()) if r := c.RelatedAddress(); r != nil && r.Address != "" && r.Port != 0 { diff --git a/candidate_test.go b/candidate_test.go index f53370b0..c76a3db4 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -352,19 +352,6 @@ func mustCandidatePeerReflexiveWithExtensions( return cand } -func mustCandidateHostWithMappedPort( - t *testing.T, - conf *CandidateHostConfig, - mapPort func(cand Candidate) int, -) Candidate { - t.Helper() - cand, err := NewCandidateHost(conf) - require.NoError(t, err) - cand.setMappedPort(mapPort(cand)) - - return cand -} - func TestCandidateMarshal(t *testing.T) { for idx, test := range []struct { candidate Candidate @@ -593,66 +580,6 @@ func TestCandidateMarshal(t *testing.T) { } } -func TestCandidateMarshalWithMappedPort(t *testing.T) { - for idx, test := range []struct { - candidate Candidate - marshaled string - expectError bool - }{ - { - mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ - Network: NetworkTypeTCP4.String(), - Address: "172.28.142.173", - Port: 7686, - Priority: 1671430143, - Foundation: "3359356140", - }, func(cand Candidate) int { return 7687 }), - "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7687 typ host", - false, - }, - { - mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ - Network: NetworkTypeTCP4.String(), - Address: "172.28.142.173", - Port: 7686, - Priority: 1671430143, - Foundation: "3359356140", - }, func(cand Candidate) int { - if cand.Port() != 7686 { - return 7687 - } - - return 7688 - }), - "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7688 typ host", - false, - }, - { - mustCandidateHostWithMappedPort(t, &CandidateHostConfig{ - Network: NetworkTypeTCP4.String(), - Address: "172.28.142.173", - Port: 7686, - Priority: 1671430143, - Foundation: "3359356140", - }, func(cand Candidate) int { - return 0 - }), - "candidate:3359356140 0 tcp 1671430143 172.28.142.173 7686 typ host", - false, - }, - } { - t.Run(strconv.Itoa(idx), func(t *testing.T) { - candidateMarshalOutput := test.candidate.Marshal() - - if strings.HasPrefix(test.marshaled, "candidate:") { - require.Equal(t, test.marshaled[len("candidate:"):], candidateMarshalOutput) - } else { - require.Equal(t, test.marshaled, candidateMarshalOutput) - } - }) - } -} - func TestCandidateWriteTo(t *testing.T) { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: net.IP{127, 0, 0, 1}, diff --git a/gather.go b/gather.go index d47b90af..2a0d4120 100644 --- a/gather.go +++ b/gather.go @@ -415,6 +415,11 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ continue } + mappedPort := a.mapPort(candidateHost) + if mappedPort != 0 { + candidateHost.port = mappedPort + } + if err := a.addCandidate(ctx, candidateHost, connAndPort.conn); err != nil { if closeErr := candidateHost.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) @@ -512,15 +517,21 @@ func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error { //nolin return err } - c, err := NewCandidateHost(&hostConfig) + cand, err := NewCandidateHost(&hostConfig) + + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } + if err != nil { closeConnAndLog(conn, a.log, "failed to create host mux candidate: %s %d: %v", candidateIP, udpAddr.Port, err) continue } - if err := a.addCandidate(ctx, c, conn); err != nil { - if closeErr := c.close(); closeErr != nil { + if err := a.addCandidate(ctx, cand, conn); err != nil { + if closeErr := cand.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) } @@ -623,7 +634,7 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes [] RelAddr: currentAddr.IP.String(), RelPort: currentAddr.Port, } - c, err := NewCandidateServerReflexive(&srflxConfig) + cand, err := NewCandidateServerReflexive(&srflxConfig) if err != nil { closeConnAndLog(currentConn, a.log, "failed to create server reflexive candidate: %s %s %d: %v", network, @@ -634,8 +645,13 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes [] continue } - if err := a.addCandidate(ctx, c, currentConn); err != nil { - if closeErr := c.close(); closeErr != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } + + if err := a.addCandidate(ctx, cand, currentConn); err != nil { + if closeErr := cand.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) } a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err) @@ -712,15 +728,20 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*stun.UR RelAddr: localAddr.IP.String(), RelPort: localAddr.Port, } - c, err := NewCandidateServerReflexive(&srflxConfig) + cand, err := NewCandidateServerReflexive(&srflxConfig) if err != nil { closeConnAndLog(conn, a.log, "failed to create server reflexive candidate: %s %s %d: %v", network, ip, port, err) return } - if err := a.addCandidate(ctx, c, conn); err != nil { - if closeErr := c.close(); closeErr != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } + + if err := a.addCandidate(ctx, cand, conn); err != nil { + if closeErr := cand.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) } a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err) @@ -805,15 +826,20 @@ func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*stun.URI, net RelAddr: lAddr.IP.String(), RelPort: lAddr.Port, } - c, err := NewCandidateServerReflexive(&srflxConfig) + cand, err := NewCandidateServerReflexive(&srflxConfig) if err != nil { closeConnAndLog(conn, a.log, "failed to create server reflexive candidate: %s %s %d: %v", network, ip, port, err) return } - if err := a.addCandidate(ctx, c, conn); err != nil { - if closeErr := c.close(); closeErr != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } + + if err := a.addCandidate(ctx, cand, conn); err != nil { + if closeErr := cand.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) } a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v", err) @@ -1159,6 +1185,11 @@ func (a *Agent) createRelayCandidate(ctx context.Context, ep relayEndpoint, ip n return err } + mappedPort := a.mapPort(candidate) + if mappedPort != 0 { + candidate.port = mappedPort + } + if err := a.addCandidate(ctx, candidate, ep.conn); err != nil { if closeErr := candidate.close(); closeErr != nil { a.log.Warnf("Failed to close candidate: %v", closeErr) diff --git a/gather_test.go b/gather_test.go index af075b66..dc75fedc 100644 --- a/gather_test.go +++ b/gather_test.go @@ -3618,3 +3618,80 @@ func (m *mockUniversalUDPMux) GetRelayedAddr(net.Addr, time.Duration) (*net.Addr func (m *mockUniversalUDPMux) GetConnForURL(ufrag string, url string, addr net.Addr) (net.PacketConn, error) { return m.mockUDPMux.GetConn(ufrag+url, addr) } + +func TestMapPort(t *testing.T) { + mux, err := NewMultiUDPMuxFromPort(12500) + require.NoError(t, err) + listener, err := net.ListenPacket("udp4", "127.0.0.1:0") // nolint: noctx + skipOnPermission(t, err, "listening for TURN server") + require.NoError(t, err) + defer func() { + _ = listener.Close() + }() + relayPort := uint16(40000) + server, err := turn.NewServer(turn.ServerConfig{ + Realm: "pion.ly", + AuthHandler: optimisticAuthHandler, + PacketConnConfigs: []turn.PacketConnConfig{ + { + PacketConn: listener, + RelayAddressGenerator: &turn.RelayAddressGeneratorPortRange{ + RelayAddress: net.ParseIP("127.0.0.1"), + MinPort: relayPort, + MaxPort: relayPort, + MaxRetries: 1, + Address: "127.0.0.1", + }, + }, + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, server.Close()) + }() + + serverPort := listener.LocalAddr().(*net.UDPAddr).Port //nolint:forcetypeassert + turnURL := &stun.URI{ + Scheme: stun.SchemeTypeTURN, + Host: "127.0.0.1", + Port: serverPort, + Username: "username", + Password: "password", + Proto: stun.ProtoTypeUDP, + } + agent, err := NewAgentWithOptions( + WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeRelay}), + WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}), + WithUDPMux(mux), + WithUrls([]*stun.URI{turnURL}), + WithMapPortHandler(func(cand Candidate) int { + return 50000 + }, CandidateTypeHost)) + require.NoError(t, err) + + gathered := make(chan (struct{})) + + var cands []Candidate + var mu sync.Mutex + require.NoError(t, agent.OnCandidate(func(c Candidate) { + if c == nil { + close(gathered) + + return + } + mu.Lock() + cands = append(cands, c) + mu.Unlock() + })) + + require.NoError(t, agent.GatherCandidates()) + + <-gathered + for _, cand := range cands { + if cand.Type() == CandidateTypeHost { + require.Equal(t, 50000, cand.Port()) + } else { + require.Equal(t, int(relayPort), cand.Port()) + } + } +} From 874b26c7618166968715910646df4a6a29aaac43 Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sat, 10 Jan 2026 23:26:55 +0800 Subject: [PATCH 13/18] Fix mod --- go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.sum b/go.sum index 925eb2aa..34e06f7d 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pion/dtls/v3 v3.0.10 h1:k9ekkq1kaZoxnNEbyLKI8DI37j/Nbk1HWmMuywpQJgg= github.com/pion/dtls/v3 v3.0.10/go.mod h1:YEmmBYIoBsY3jmG56dsziTv/Lca9y4Om83370CXfqJ8= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= From 4b9f5d8a7caa2cfce055d7c0e9ec808bb8488323 Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sat, 10 Jan 2026 23:40:08 +0800 Subject: [PATCH 14/18] Fix nil exception --- gather.go | 54 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/gather.go b/gather.go index 3cdad73d..2d38b0b5 100644 --- a/gather.go +++ b/gather.go @@ -414,10 +414,11 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ continue } - - mappedPort := a.mapPort(candidateHost) - if mappedPort != 0 { - candidateHost.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(candidateHost) + if mappedPort != 0 { + candidateHost.port = mappedPort + } } if err := a.addCandidate(ctx, candidateHost, connAndPort.conn); err != nil { @@ -518,10 +519,11 @@ func (a *Agent) gatherCandidatesLocalUDPMux(ctx context.Context) error { //nolin } cand, err := NewCandidateHost(&hostConfig) - - mappedPort := a.mapPort(cand) - if mappedPort != 0 { - cand.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } } if err != nil { @@ -644,10 +646,11 @@ func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes [] continue } - - mappedPort := a.mapPort(cand) - if mappedPort != 0 { - cand.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } } if err := a.addCandidate(ctx, cand, currentConn); err != nil { @@ -734,10 +737,11 @@ func (a *Agent) gatherCandidatesSrflxUDPMux(ctx context.Context, urls []*stun.UR return } - - mappedPort := a.mapPort(cand) - if mappedPort != 0 { - cand.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } } if err := a.addCandidate(ctx, cand, conn); err != nil { @@ -832,10 +836,11 @@ func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*stun.URI, net return } - - mappedPort := a.mapPort(cand) - if mappedPort != 0 { - cand.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(cand) + if mappedPort != 0 { + cand.port = mappedPort + } } if err := a.addCandidate(ctx, cand, conn); err != nil { @@ -1184,10 +1189,11 @@ func (a *Agent) createRelayCandidate(ctx context.Context, ep relayEndpoint, ip n return err } - - mappedPort := a.mapPort(candidate) - if mappedPort != 0 { - candidate.port = mappedPort + if a.mapPort != nil { + mappedPort := a.mapPort(candidate) + if mappedPort != 0 { + candidate.port = mappedPort + } } if err := a.addCandidate(ctx, candidate, ep.conn); err != nil { From 681b1c3ec390575c88b1f0363d107fd718df4e7a Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sun, 11 Jan 2026 00:02:56 +0800 Subject: [PATCH 15/18] Fix test's resource leak --- gather_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/gather_test.go b/gather_test.go index d0715f6c..ecbd2786 100644 --- a/gather_test.go +++ b/gather_test.go @@ -3630,8 +3630,6 @@ func (m *mockUniversalUDPMux) GetConnForURL(ufrag string, url string, addr net.A } func TestMapPort(t *testing.T) { - mux, err := NewMultiUDPMuxFromPort(12500) - require.NoError(t, err) listener, err := net.ListenPacket("udp4", "127.0.0.1:0") // nolint: noctx skipOnPermission(t, err, "listening for TURN server") require.NoError(t, err) @@ -3672,12 +3670,15 @@ func TestMapPort(t *testing.T) { agent, err := NewAgentWithOptions( WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeRelay}), WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}), - WithUDPMux(mux), WithUrls([]*stun.URI{turnURL}), WithMapPortHandler(func(cand Candidate) int { return 50000 }, CandidateTypeHost)) require.NoError(t, err) + defer func() { + require.NoError(t, agent.Close()) + }() + require.NoError(t, err) gathered := make(chan (struct{})) From e6905e532062597e4286a4013ff462f66436d3e5 Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sun, 11 Jan 2026 17:19:10 +0800 Subject: [PATCH 16/18] More tests for MapPort --- gather_test.go | 186 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 183 insertions(+), 3 deletions(-) diff --git a/gather_test.go b/gather_test.go index ecbd2786..fd665098 100644 --- a/gather_test.go +++ b/gather_test.go @@ -3629,7 +3629,7 @@ func (m *mockUniversalUDPMux) GetConnForURL(ufrag string, url string, addr net.A return m.mockUDPMux.GetConn(ufrag+url, addr) } -func TestMapPort(t *testing.T) { +func TestMapPortRelay(t *testing.T) { listener, err := net.ListenPacket("udp4", "127.0.0.1:0") // nolint: noctx skipOnPermission(t, err, "listening for TURN server") require.NoError(t, err) @@ -3698,11 +3698,191 @@ func TestMapPort(t *testing.T) { require.NoError(t, agent.GatherCandidates()) <-gathered + var ( + sawHost bool + sawRelay bool + ) for _, cand := range cands { - if cand.Type() == CandidateTypeHost { + switch cand.Type() { + case CandidateTypeHost: + sawHost = true require.Equal(t, 50000, cand.Port()) - } else { + case CandidateTypeRelay: + sawRelay = true require.Equal(t, int(relayPort), cand.Port()) + default: + require.Failf(t, "Unexpected candidate type", "got %v", cand.Type()) } } + + require.True(t, sawHost) + require.True(t, sawRelay) +} + +func TestMapPortSflx(t *testing.T) { + stunURI := &stun.URI{ + Scheme: stun.SchemeTypeSTUN, + Host: "127.0.0.1", + Port: 3478, + } + relatedAddr := &net.UDPAddr{IP: net.IP{10, 0, 0, 1}, Port: 49000} + srflxAddr := &stun.XORMappedAddress{ + IP: net.IP{203, 0, 113, 5}, + Port: 50000, + } + + udpMuxSrflx := newMockUniversalUDPMux([]net.Addr{relatedAddr}, srflxAddr) + + agent, err := NewAgentWithOptions( + WithNetworkTypes([]NetworkType{NetworkTypeUDP4}), + WithCandidateTypes([]CandidateType{CandidateTypeServerReflexive}), + WithUDPMuxSrflx(udpMuxSrflx), + WithMapPortHandler(func(cand Candidate) int { + return 50001 + }, CandidateTypeServerReflexive), + ) + require.NoError(t, err) + defer func() { + require.NoError(t, agent.Close()) + }() + + require.NoError(t, agent.OnCandidate(func(Candidate) {})) + + agent.gatherCandidatesSrflxUDPMux(context.Background(), []*stun.URI{stunURI}, []NetworkType{NetworkTypeUDP4}) + + candidates, err := agent.GetLocalCandidates() + require.NoError(t, err) + require.Len(t, candidates, 1) + + srflx, ok := candidates[0].(*CandidateServerReflexive) + require.True(t, ok, "expected server reflexive candidate") + require.Equal(t, srflxAddr.IP.String(), srflx.Address()) + require.Equal(t, 50001, srflx.Port()) + require.NotNil(t, srflx.RelatedAddress()) + require.Equal(t, relatedAddr.IP.String(), srflx.RelatedAddress().Address) + require.Equal(t, relatedAddr.Port, srflx.RelatedAddress().Port) + require.Equal(t, 1, udpMuxSrflx.connCount(), "expected mux to be asked for one connection") +} + +func TestRewriteAndMapPort(t *testing.T) { //nolint:cyclop + t.Run("replace host via UDPMux", func(t *testing.T) { + mux := newMockUDPMux([]net.Addr{&net.UDPAddr{IP: net.IP{10, 0, 0, 1}, Port: 1234}}) + + agent, err := NewAgentWithOptions( + WithNet(newStubNet(t)), + WithCandidateTypes([]CandidateType{CandidateTypeHost}), + WithNetworkTypes([]NetworkType{NetworkTypeUDP4}), + WithUDPMux(mux), + WithMulticastDNSMode(MulticastDNSModeDisabled), + WithAddressRewriteRules(AddressRewriteRule{ + External: []string{"203.0.113.1"}, + Local: "10.0.0.1", + AsCandidateType: CandidateTypeHost, + Mode: AddressRewriteReplace, + }), + WithMapPortHandler(func(c Candidate) int { + if c.Port() == 1234 { + return 4321 + } else { + return 12345 + } + }, CandidateTypeHost), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, agent.Close()) + }) + + var ( + mu sync.Mutex + addresses []Candidate + done = make(chan struct{}) + ) + require.NoError(t, agent.OnCandidate(func(c Candidate) { + if c == nil { + close(done) + + return + } + mu.Lock() + addresses = append(addresses, c) + mu.Unlock() + })) + + require.NoError(t, agent.GatherCandidates()) + select { + case <-done: + case <-time.After(2 * time.Second): + require.FailNow(t, "gather did not complete") + } + + mu.Lock() + defer mu.Unlock() + require.Len(t, addresses, 1) + assert.Equal(t, "203.0.113.1", addresses[0].Address()) + assert.Equal(t, 4321, addresses[0].Port()) + assert.Equal(t, CandidateTypeHost, addresses[0].Type()) + }) + + t.Run("append host via UDPMux", func(t *testing.T) { + mux := newMockUDPMux([]net.Addr{&net.UDPAddr{IP: net.IP{10, 0, 0, 1}, Port: 1234}}) + + agent, err := NewAgentWithOptions( + WithNet(newStubNet(t)), + WithCandidateTypes([]CandidateType{CandidateTypeHost}), + WithNetworkTypes([]NetworkType{NetworkTypeUDP4}), + WithUDPMux(mux), + WithMulticastDNSMode(MulticastDNSModeDisabled), + WithAddressRewriteRules(AddressRewriteRule{ + External: []string{"203.0.113.2"}, + Local: "10.0.0.1", + AsCandidateType: CandidateTypeHost, + Mode: AddressRewriteAppend, + }), + WithMapPortHandler(func(c Candidate) int { + if c.Port() == 1234 { + return 4321 + } else { + return 12345 + } + }, CandidateTypeHost), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, agent.Close()) + }) + + var ( + mu sync.Mutex + addresses []Candidate + done = make(chan struct{}) + ) + require.NoError(t, agent.OnCandidate(func(c Candidate) { + if c == nil { + close(done) + + return + } + mu.Lock() + addresses = append(addresses, c) + mu.Unlock() + })) + + require.NoError(t, agent.GatherCandidates()) + select { + case <-done: + case <-time.After(2 * time.Second): + require.FailNow(t, "gather did not complete") + } + + mu.Lock() + defer mu.Unlock() + require.Len(t, addresses, 2) + seenAddrs := []string{addresses[0].Address(), addresses[1].Address()} + assert.ElementsMatch(t, []string{"10.0.0.1", "203.0.113.2"}, seenAddrs) + for _, cand := range addresses { + assert.Equal(t, CandidateTypeHost, cand.Type()) + assert.Equal(t, 4321, cand.Port()) + } + }) } From 804a7cec9f47cc80eed539adc25d147c93733a35 Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Sun, 11 Jan 2026 17:59:35 +0800 Subject: [PATCH 17/18] More tests for MapPort --- gather_test.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/gather_test.go b/gather_test.go index fd665098..faef9427 100644 --- a/gather_test.go +++ b/gather_test.go @@ -3629,7 +3629,7 @@ func (m *mockUniversalUDPMux) GetConnForURL(ufrag string, url string, addr net.A return m.mockUDPMux.GetConn(ufrag+url, addr) } -func TestMapPortRelay(t *testing.T) { +func TestMapPort(t *testing.T) { listener, err := net.ListenPacket("udp4", "127.0.0.1:0") // nolint: noctx skipOnPermission(t, err, "listening for TURN server") require.NoError(t, err) @@ -3668,7 +3668,7 @@ func TestMapPortRelay(t *testing.T) { Proto: stun.ProtoTypeUDP, } agent, err := NewAgentWithOptions( - WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeRelay}), + WithCandidateTypes([]CandidateType{CandidateTypeHost, CandidateTypeRelay, CandidateTypeServerReflexive}), WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}), WithUrls([]*stun.URI{turnURL}), WithMapPortHandler(func(cand Candidate) int { @@ -3701,6 +3701,7 @@ func TestMapPortRelay(t *testing.T) { var ( sawHost bool sawRelay bool + sawSrflx bool ) for _, cand := range cands { switch cand.Type() { @@ -3710,16 +3711,19 @@ func TestMapPortRelay(t *testing.T) { case CandidateTypeRelay: sawRelay = true require.Equal(t, int(relayPort), cand.Port()) + case CandidateTypeServerReflexive: + sawSrflx = true default: - require.Failf(t, "Unexpected candidate type", "got %v", cand.Type()) + require.Failf(t, "unexpected cand type", "got: %v", cand.Type()) } } require.True(t, sawHost) require.True(t, sawRelay) + require.True(t, sawSrflx) } -func TestMapPortSflx(t *testing.T) { +func TestMapPortSrflx(t *testing.T) { stunURI := &stun.URI{ Scheme: stun.SchemeTypeSTUN, Host: "127.0.0.1", @@ -3750,18 +3754,17 @@ func TestMapPortSflx(t *testing.T) { agent.gatherCandidatesSrflxUDPMux(context.Background(), []*stun.URI{stunURI}, []NetworkType{NetworkTypeUDP4}) + agent.gatherCandidatesSrflxMapped(context.Background(), []NetworkType{NetworkTypeUDP4}) + candidates, err := agent.GetLocalCandidates() require.NoError(t, err) - require.Len(t, candidates, 1) + require.Len(t, candidates, 2) - srflx, ok := candidates[0].(*CandidateServerReflexive) - require.True(t, ok, "expected server reflexive candidate") - require.Equal(t, srflxAddr.IP.String(), srflx.Address()) - require.Equal(t, 50001, srflx.Port()) - require.NotNil(t, srflx.RelatedAddress()) - require.Equal(t, relatedAddr.IP.String(), srflx.RelatedAddress().Address) - require.Equal(t, relatedAddr.Port, srflx.RelatedAddress().Port) - require.Equal(t, 1, udpMuxSrflx.connCount(), "expected mux to be asked for one connection") + for _, cand := range candidates { + srflx, ok := cand.(*CandidateServerReflexive) + require.True(t, ok) + require.Equal(t, 50001, srflx.Port()) + } } func TestRewriteAndMapPort(t *testing.T) { //nolint:cyclop From 3ded026e09650b7fde19c9868c8d7963d1d34acf Mon Sep 17 00:00:00 2001 From: Xinze Zheng Date: Fri, 23 Jan 2026 13:13:58 +0000 Subject: [PATCH 18/18] Remove candidate type filter --- agent_options.go | 8 ++------ gather_test.go | 23 +++++++++++++++++++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/agent_options.go b/agent_options.go index c527fe43..19002c59 100644 --- a/agent_options.go +++ b/agent_options.go @@ -961,14 +961,10 @@ func WithLoggerFactory(loggerFactory logging.LoggerFactory) AgentOption { } } -func WithMapPortHandler(handler func(cand Candidate) int, candTyp CandidateType) AgentOption { +func WithMapPortHandler(handler func(cand Candidate) int) AgentOption { return func(a *Agent) error { a.mapPort = func(candidate Candidate) int { - if candidate.Type() == candTyp { - return handler(candidate) - } - - return 0 + return handler(candidate) } return nil diff --git a/gather_test.go b/gather_test.go index faef9427..4dbf1a40 100644 --- a/gather_test.go +++ b/gather_test.go @@ -3672,8 +3672,13 @@ func TestMapPort(t *testing.T) { WithNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}), WithUrls([]*stun.URI{turnURL}), WithMapPortHandler(func(cand Candidate) int { + if cand.Type() != CandidateTypeHost { + return cand.Port() + } + return 50000 - }, CandidateTypeHost)) + }), + ) require.NoError(t, err) defer func() { require.NoError(t, agent.Close()) @@ -3742,8 +3747,12 @@ func TestMapPortSrflx(t *testing.T) { WithCandidateTypes([]CandidateType{CandidateTypeServerReflexive}), WithUDPMuxSrflx(udpMuxSrflx), WithMapPortHandler(func(cand Candidate) int { + if cand.Type() != CandidateTypeServerReflexive { + return cand.Port() + } + return 50001 - }, CandidateTypeServerReflexive), + }), ) require.NoError(t, err) defer func() { @@ -3784,12 +3793,15 @@ func TestRewriteAndMapPort(t *testing.T) { //nolint:cyclop Mode: AddressRewriteReplace, }), WithMapPortHandler(func(c Candidate) int { + if c.Type() != CandidateTypeHost { + return c.Port() + } if c.Port() == 1234 { return 4321 } else { return 12345 } - }, CandidateTypeHost), + }), ) require.NoError(t, err) t.Cleanup(func() { @@ -3843,12 +3855,15 @@ func TestRewriteAndMapPort(t *testing.T) { //nolint:cyclop Mode: AddressRewriteAppend, }), WithMapPortHandler(func(c Candidate) int { + if c.Type() != CandidateTypeHost { + return c.Port() + } if c.Port() == 1234 { return 4321 } else { return 12345 } - }, CandidateTypeHost), + }), ) require.NoError(t, err) t.Cleanup(func() {