From f6a054d1071ccc6602fac22a373275cb90ea4441 Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Thu, 13 Nov 2025 11:15:24 -0500 Subject: [PATCH 1/4] ECN support --- candidate_base.go | 83 ++++++++++++++++++++++++++++++++++++++++++++--- transport.go | 14 ++++++++ 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/candidate_base.go b/candidate_base.go index 0e38d4cc..5322bb31 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -14,9 +14,11 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/pion/stun/v3" + "github.com/pion/transport/v3" ) type candidateBase struct { @@ -233,6 +235,7 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { return } + // todo amir: it seems like bufferPool is not needed as this method is the only accessor bufferPoolBuffer := bufferPool.Get() defer bufferPool.Put(bufferPoolBuffer) buf, ok := bufferPoolBuffer.([]byte) @@ -240,8 +243,10 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { return } + c.enableSocketOptions() + oob := make([]byte, 128) // buffer for out of band packet attributes for { - n, srcAddr, err := c.conn.ReadFrom(buf) + n, attr, srcAddr, err := c.readPacketWithAttributes(buf, oob) if err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, net.ErrClosed) { agent.log.Warnf("Failed to read from candidate %s: %v", c, err) @@ -250,10 +255,80 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { return } - c.handleInboundPacket(buf[:n], srcAddr) + c.handleInboundPacket(buf[:n], attr, srcAddr) } } +func (c *candidateBase) enableSocketOptions() { + if uc, ok := c.conn.(*net.UDPConn); ok { + raw, _ := uc.SyscallConn() + _ = raw.Control(func(fd uintptr) { + syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_RECVTOS, 1) // TOS/ECN + }) + } +} + +// Reads a packet including its out of band attributes like ECN +// if the underlying conn supports it. +func (c *candidateBase) readPacketWithAttributes( + buf []byte, oob []byte) (n int, attr *transport.PacketAttributes, srcAddr net.Addr, err error) { + attr = nil + var uc *net.UDPConn + var ok bool + + // in case the underlying socket is not udp socket (not a net.UDPConn) + if uc, ok = c.conn.(*net.UDPConn); !ok { + n, srcAddr, err = c.conn.ReadFrom(buf) + return + } + + return c.doReadPacketWithAttributes(buf, oob, uc) +} + +// Reads a packet including its out of band attributes like ECN if possible. +func (c *candidateBase) doReadPacketWithAttributes( + buf []byte, oob []byte, uc *net.UDPConn) (n int, attr *transport.PacketAttributes, srcAddr net.Addr, err error) { + var oobn int + var flags int + var udpAddr *net.UDPAddr + n, oobn, flags, udpAddr, err = uc.ReadMsgUDP(buf, oob) + srcAddr = udpAddr + + attr = transport.NewPacketAttributes() + if oobn <= 0 { + return + } + + _ = flags + + // Parse control messages for ECN/TOS + cms, err := syscall.ParseSocketControlMessage(oob[:oobn]) + if err != nil { + return + } + + for _, cm := range cms { + // IPv4 TOS + if cm.Header.Level == syscall.IPPROTO_IP && cm.Header.Type == syscall.IP_TOS { + if len(cm.Data) > 0 { + tos := cm.Data[0] + ecn := tos & 0x03 // ECN is the two least significant bits + attr.WithECN(transport.ECN(ecn)) + } + } + // IPv6 Traffic Class + if cm.Header.Level == syscall.IPPROTO_IPV6 && cm.Header.Type == syscall.IPV6_TCLASS { + if len(cm.Data) > 0 { + tos := cm.Data[0] + ecn := tos & 0x03 // ECN is the two least significant bits + attr.WithECN(transport.ECN(ecn)) + } + } + } + + return +} + func (c *candidateBase) validateSTUNTrafficCache(addr net.Addr) bool { if candidate, ok := c.remoteCandidateCaches[toAddrPort(addr)]; ok { candidate.seen(false) @@ -271,7 +346,7 @@ func (c *candidateBase) addRemoteCandidateCache(candidate Candidate, srcAddr net c.remoteCandidateCaches[toAddrPort(srcAddr)] = candidate } -func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { +func (c *candidateBase) handleInboundPacket(buf []byte, attr *transport.PacketAttributes, srcAddr net.Addr) { agent := c.agent() if stun.IsMessage(buf) { @@ -309,7 +384,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) { } // Note: This will return packetio.ErrFull if the buffer ever manages to fill up. - n, err := agent.buf.Write(buf) + n, err := agent.buf.WriteWithAttributes(buf, attr) if err != nil { agent.log.Warnf("Failed to write packet: %s", err) diff --git a/transport.go b/transport.go index 31e12866..2047dd9e 100644 --- a/transport.go +++ b/transport.go @@ -10,6 +10,7 @@ import ( "time" "github.com/pion/stun/v3" + "github.com/pion/transport/v3" ) // Dial connects to the remote agent, acting as the controlling ice agent. @@ -79,6 +80,19 @@ func (c *Conn) Read(p []byte) (int, error) { return n, err } +// ReadWithAttributes implements the Conn ReadWithAttributes method. +func (c *Conn) ReadWithAttributes(b []byte, attr *transport.PacketAttributes) (n int, err error) { + err = c.agent.loop.Err() + if err != nil { + return 0, err + } + + n, err = c.agent.buf.ReadWithAttributes(b, attr) + c.bytesReceived.Add(uint64(n)) //nolint:gosec // G115 + + return n, err +} + // Write implements the Conn Write method. func (c *Conn) Write(packet []byte) (int, error) { err := c.agent.loop.Err() From 234f338cdfd2140f20db16fbc63785715a0fafe9 Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Fri, 14 Nov 2025 14:26:02 -0500 Subject: [PATCH 2/4] Adapt with transport changes --- candidate_base.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/candidate_base.go b/candidate_base.go index 5322bb31..f7defb32 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -245,8 +245,9 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { c.enableSocketOptions() oob := make([]byte, 128) // buffer for out of band packet attributes + attr := transport.NewPacketAttributesWithLen(transport.MaxAttributesLen) for { - n, attr, srcAddr, err := c.readPacketWithAttributes(buf, oob) + n, srcAddr, err := c.readPacketWithAttributes(buf, oob, attr) if err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, net.ErrClosed) { agent.log.Warnf("Failed to read from candidate %s: %v", c, err) @@ -271,8 +272,7 @@ func (c *candidateBase) enableSocketOptions() { // Reads a packet including its out of band attributes like ECN // if the underlying conn supports it. func (c *candidateBase) readPacketWithAttributes( - buf []byte, oob []byte) (n int, attr *transport.PacketAttributes, srcAddr net.Addr, err error) { - attr = nil + buf []byte, oob []byte, attr *transport.PacketAttributes) (n int, srcAddr net.Addr, err error) { var uc *net.UDPConn var ok bool @@ -282,19 +282,18 @@ func (c *candidateBase) readPacketWithAttributes( return } - return c.doReadPacketWithAttributes(buf, oob, uc) + return c.doReadPacketWithAttributes(buf, oob, attr, uc) } // Reads a packet including its out of band attributes like ECN if possible. func (c *candidateBase) doReadPacketWithAttributes( - buf []byte, oob []byte, uc *net.UDPConn) (n int, attr *transport.PacketAttributes, srcAddr net.Addr, err error) { + buf []byte, oob []byte, attr *transport.PacketAttributes, uc *net.UDPConn) (n int, srcAddr net.Addr, err error) { var oobn int var flags int var udpAddr *net.UDPAddr n, oobn, flags, udpAddr, err = uc.ReadMsgUDP(buf, oob) srcAddr = udpAddr - attr = transport.NewPacketAttributes() if oobn <= 0 { return } @@ -313,7 +312,7 @@ func (c *candidateBase) doReadPacketWithAttributes( if len(cm.Data) > 0 { tos := cm.Data[0] ecn := tos & 0x03 // ECN is the two least significant bits - attr.WithECN(transport.ECN(ecn)) + attr.Buffer[0] = ecn } } // IPv6 Traffic Class @@ -321,7 +320,7 @@ func (c *candidateBase) doReadPacketWithAttributes( if len(cm.Data) > 0 { tos := cm.Data[0] ecn := tos & 0x03 // ECN is the two least significant bits - attr.WithECN(transport.ECN(ecn)) + attr.Buffer[0] = ecn } } } From 284a658ff358299d42fd5cca4814e31b298795e7 Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Fri, 14 Nov 2025 19:39:58 -0500 Subject: [PATCH 3/4] Buffer of length 1 is sufficient for ECN for now --- candidate_base.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/candidate_base.go b/candidate_base.go index f7defb32..a9035741 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -235,7 +235,6 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { return } - // todo amir: it seems like bufferPool is not needed as this method is the only accessor bufferPoolBuffer := bufferPool.Get() defer bufferPool.Put(bufferPoolBuffer) buf, ok := bufferPoolBuffer.([]byte) @@ -245,7 +244,7 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { c.enableSocketOptions() oob := make([]byte, 128) // buffer for out of band packet attributes - attr := transport.NewPacketAttributesWithLen(transport.MaxAttributesLen) + attr := transport.NewPacketAttributesWithLen(1) for { n, srcAddr, err := c.readPacketWithAttributes(buf, oob, attr) if err != nil { From ffd34447c4fed2a7870c0906190678a647d5e4b4 Mon Sep 17 00:00:00 2001 From: Amirmohammad Ghasemi Date: Fri, 14 Nov 2025 19:51:21 -0500 Subject: [PATCH 4/4] Reset attributes buffer --- candidate_base.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/candidate_base.go b/candidate_base.go index a9035741..af5df114 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -246,6 +246,7 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { oob := make([]byte, 128) // buffer for out of band packet attributes attr := transport.NewPacketAttributesWithLen(1) for { + attr.Reset() n, srcAddr, err := c.readPacketWithAttributes(buf, oob, attr) if err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, net.ErrClosed) { @@ -255,7 +256,7 @@ func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) { return } - c.handleInboundPacket(buf[:n], attr, srcAddr) + c.handleInboundPacket(buf[:n], attr.GetReadPacketAttributes(), srcAddr) } } @@ -312,6 +313,7 @@ func (c *candidateBase) doReadPacketWithAttributes( tos := cm.Data[0] ecn := tos & 0x03 // ECN is the two least significant bits attr.Buffer[0] = ecn + attr.BytesCopied = 1 } } // IPv6 Traffic Class @@ -320,6 +322,7 @@ func (c *candidateBase) doReadPacketWithAttributes( tos := cm.Data[0] ecn := tos & 0x03 // ECN is the two least significant bits attr.Buffer[0] = ecn + attr.BytesCopied = 1 } } }