Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ type Candidate interface {
seen(outbound bool)
start(a *Agent, conn net.PacketConn, initializedCh <-chan struct{})
writeTo(raw []byte, dst Candidate) (int, error)
writeBatchTo(rawPackets [][]byte, dst Candidate) (int, error)
}
63 changes: 63 additions & 0 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"time"

"github.com/pion/stun/v3"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

type candidateBase struct {
Expand All @@ -35,6 +37,8 @@ type candidateBase struct {
lastSent atomic.Int64
lastReceived atomic.Int64
conn net.PacketConn
ipv4Conn *ipv4.PacketConn
ipv6Conn *ipv6.PacketConn

currAgent *Agent
closeCh chan struct{}
Expand Down Expand Up @@ -227,6 +231,12 @@ func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-cha
c.closeCh = make(chan struct{})
c.closedCh = make(chan struct{})

if c.networkType.IsIPv6() {
c.ipv6Conn = ipv6.NewPacketConn(conn)
} else {
c.ipv4Conn = ipv4.NewPacketConn(conn)
}

go c.recvLoop(initializedCh)
}
Comment on lines 24 to 241
Copy link
Member

@JoTurk JoTurk Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, did you see my response to you in the discord about why the batching implemented in #608 wasn't merged yet [1]?

pion/ice isn't the correct layer to add this, and we have implementation for batching packet conn in pion transport, that can be used today with a mux, Maybe we can add it to pion webrtc and make it optional?, Maybe we can make pion switch to batching when it detects high throughput?

you can rebase @cnderrauber change from #608 but please look at the conversation in that PR and why we got push back when we tried to add batching by default.

https://discord.com/channels/1352636971591274548/1352636972614680659/1450454270305636382

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @joeturki, I may have jumped the gun here after missing the discord message.

I took a minute to review the discussion in #608, and believe that my motivation maybe a bit different. Full disclosure, I don't have the architectural depth of pion yet, so please don't hesitate to educate me.

My understanding of #608 is that batching can improve sending packets over multiple connections (udp mux) to multiple peers. I also heard people being concerned about the buffers and fixed time intervals causing extra latency.

In my use case I am mostly concerned about two peers and reducing latency to the max. Here, I'd like to give the user the choice to batch. In my wishful thinking, datachannels could accept multiple messages in the send method. For our application, we actually have a fixed time interval at the high level and call send many many times. So we know when we want to batch or not.

Very open to other ideas. I think #608 wouldn't solve our use case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello.
UDPMux is still typically one underlying socket multiplexing many remote addresses/ufrags, not multiple connections. in #608 there is a WriteBatchInterval to configure intervals.
either ways #608 does multiple things. not just batching, and it can be configured for many uses. but it centralizes batching policy inside the mux rather than at the call site. ICE shouldn't be super aware that we do batching or not. We should abstract it in pion/transport like how #608 did.

I think once this is cleaned we can get it merged.


Expand Down Expand Up @@ -391,6 +401,59 @@ func (c *candidateBase) writeTo(raw []byte, dst Candidate) (int, error) {
return n, nil
}

func (c *candidateBase) writeBatchTo(rawPackets [][]byte, dst Candidate) (int, error) {
if len(rawPackets) == 0 {
return 0, nil
}

dstAddr := dst.addr()

// Build messages for batch write.
messages := make([]ipv4.Message, len(rawPackets))
for i, raw := range rawPackets {
messages[i] = ipv4.Message{
Buffers: [][]byte{raw},
Addr: dstAddr,
}
}

// WriteBatch uses sendmmsg on Linux for improved performance.
// On other platforms it writes one message at a time, so we loop.
totalWritten := 0
for totalWritten < len(messages) {
var n int
var err error

if c.ipv6Conn != nil {
n, err = c.ipv6Conn.WriteBatch(messages[totalWritten:], 0)
} else {
n, err = c.ipv4Conn.WriteBatch(messages[totalWritten:], 0)
}

if err != nil {
// If the connection is closed, we should return the error.
if errors.Is(err, io.ErrClosedPipe) {
return totalWritten, err
}
c.agent().log.Infof("Failed to send batch packets: %v", err)

return totalWritten, nil
}

if n == 0 {
break
}

totalWritten += n
}

if totalWritten > 0 {
c.seen(true)
}

return totalWritten, nil
}

// TypePreference returns the type preference for this candidate.
func (c *candidateBase) TypePreference() uint16 {
pref := c.Type().Preference()
Expand Down
95 changes: 95 additions & 0 deletions candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/pion/logging"
"github.com/stretchr/testify/require"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

const localhostIPStr = "127.0.0.1"
Expand Down Expand Up @@ -620,6 +622,99 @@ func TestCandidateWriteTo(t *testing.T) {
require.Error(t, err, "writing to closed conn")
}

func TestCandidateWriteBatchTo(t *testing.T) {
testCases := []struct {
name string
network string
ip net.IP
networkType NetworkType
setupConn func(*candidateBase, *net.UDPConn)
}{
{
name: "UDP_IPv4",
network: "udp4",
ip: net.IP{127, 0, 0, 1},
networkType: NetworkTypeUDP4,
setupConn: func(c *candidateBase, conn *net.UDPConn) {
c.ipv4Conn = ipv4.NewPacketConn(conn)
},
},
{
name: "UDP_IPv6",
network: "udp6",
ip: net.IPv6loopback,
networkType: NetworkTypeUDP6,
setupConn: func(c *candidateBase, conn *net.UDPConn) {
c.ipv6Conn = ipv6.NewPacketConn(conn)
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
receiverConn, err := net.ListenUDP(tc.network, &net.UDPAddr{IP: tc.ip, Port: 0})
if err != nil {
t.Skipf("%s not available on this system", tc.name)
}
defer func() { _ = receiverConn.Close() }()

senderConn, err := net.ListenUDP(tc.network, &net.UDPAddr{IP: tc.ip, Port: 0})
require.NoError(t, err, "error creating test UDP sender")
defer func() { _ = senderConn.Close() }()

loggerFactory := logging.NewDefaultLoggerFactory()

c1 := &candidateBase{
conn: senderConn,
networkType: tc.networkType,
currAgent: &Agent{
log: loggerFactory.NewLogger("agent"),
},
}
tc.setupConn(c1, senderConn)

c2 := &candidateBase{
resolvedAddr: receiverConn.LocalAddr(),
}

// Test with empty batch.
n, err := c1.writeBatchTo([][]byte{}, c2)
require.NoError(t, err, "writing empty batch should not error")
require.Equal(t, 0, n, "writing empty batch should return 0")

// Test with single packet.
n, err = c1.writeBatchTo([][]byte{[]byte("test1")}, c2)
require.NoError(t, err, "writing single packet batch")
require.Equal(t, 1, n, "should have written 1 message")

// Read the packet on receiver side.
buf := make([]byte, 1024)
require.NoError(t, receiverConn.SetReadDeadline(time.Now().Add(time.Second)))
nr, _, err := receiverConn.ReadFromUDP(buf)
require.NoError(t, err, "reading packet")
require.Equal(t, "test1", string(buf[:nr]))

// Test with multiple packets.
packets := [][]byte{
[]byte("packet1"),
[]byte("packet2"),
[]byte("packet3"),
}
n, err = c1.writeBatchTo(packets, c2)
require.NoError(t, err, "writing multiple packets batch")
require.Equal(t, len(packets), n, "should have written all messages")

// Read all packets.
for i := 0; i < len(packets); i++ {
require.NoError(t, receiverConn.SetReadDeadline(time.Now().Add(time.Second)))
nr, _, err = receiverConn.ReadFromUDP(buf)
require.NoError(t, err, "reading packet %d", i)
require.Equal(t, string(packets[i]), string(buf[:nr]))
}
})
}
}

func TestMarshalUnmarshalCandidateWithZoneID(t *testing.T) {
candidateWithZoneID := mustCandidateHost(t, &CandidateHostConfig{
Network: NetworkTypeUDP6.String(),
Expand Down
9 changes: 9 additions & 0 deletions candidatepair.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,19 @@ func (p *CandidatePair) priority() uint64 {
return (1<<32-1)*localMin(g, d) + 2*localMax(g, d) + cmp(g, d)
}

// Write sends a single packet on the candidate pair.
// Returns the number of bytes written.
func (p *CandidatePair) Write(b []byte) (int, error) {
return p.Local.writeTo(b, p.Remote)
}

// WriteBatch sends multiple packets on the candidate pair.
// On Linux, this uses sendmmsg for improved performance.
// Returns the number of packets successfully written.
func (p *CandidatePair) WriteBatch(packets [][]byte) (int, error) {
return p.Local.writeBatchTo(packets, p.Remote)
}

func (a *Agent) sendSTUN(msg *stun.Message, local, remote Candidate) {
_, err := local.writeTo(msg.Raw, remote)
if err != nil {
Expand Down
Loading