diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml new file mode 100644 index 00000000..d70e78e9 --- /dev/null +++ b/.github/workflows/e2e.yaml @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: 2026 The Pion community +# SPDX-License-Identifier: MIT + +name: E2E +on: + pull_request: + branches: + - master + push: + branches: + - master + +jobs: + e2e-test: + name: Test + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: checkout + uses: actions/checkout@v6 + - name: test + run: | + docker build -t pion-turn-e2e -f e2e/Dockerfile . + docker run -i --rm pion-turn-e2e diff --git a/e2e/Dockerfile b/e2e/Dockerfile new file mode 100644 index 00000000..39205302 --- /dev/null +++ b/e2e/Dockerfile @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: 2026 The Pion community +# SPDX-License-Identifier: MIT + +FROM docker.io/library/golang:1.26-trixie + +RUN apt-get update && apt-get install -y coturn + +COPY . /go/src/github.com/pion/turn +WORKDIR /go/src/github.com/pion/turn/e2e + +CMD ["go", "test", "-tags=coturn", "-v", "."] diff --git a/e2e/e2e.go b/e2e/e2e.go new file mode 100644 index 00000000..ccf94616 --- /dev/null +++ b/e2e/e2e.go @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2026 The Pion community +// SPDX-License-Identifier: MIT + +// Package e2e contains end to end tests for pion/turn +package e2e diff --git a/e2e/e2e_coturn_test.go b/e2e/e2e_coturn_test.go new file mode 100644 index 00000000..8c15ebcf --- /dev/null +++ b/e2e/e2e_coturn_test.go @@ -0,0 +1,341 @@ +// SPDX-FileCopyrightText: 2026 The Pion community +// SPDX-License-Identifier: MIT + +//go:build coturn && !js +// +build coturn,!js + +package e2e + +import ( + "bufio" + "fmt" + "io" + "net" + "os" + "os/exec" + "strings" + "testing" + "time" +) + +const ( + coturnServerPort = 3478 + coturnConfigFile = "/tmp/turnserver.conf" + coturnStartupDelay = 1 * time.Second +) + +// serverCoturn starts a coturn TURN server. +// +//nolint:varnamelen +func serverCoturn(m *testmgr) { + go func() { + m.serverMutex.Lock() + defer m.serverMutex.Unlock() + + host, portStr, err := net.SplitHostPort(m.serverAddr) + if err != nil { + m.errChan <- fmt.Errorf("failed to parse serverAddr: %w", err) + + return + } + + config := fmt.Sprintf(` +listening-port=%s +listening-ip=%s +relay-ip=%s +external-ip=%s +user=%s:%s +realm=%s +lt-cred-mech +fingerprint +no-tls +no-dtls +log-file=stdout +`, portStr, host, host, host, m.username, m.password, m.realm) + + configFile := fmt.Sprintf("/tmp/turnserver-%s.conf", m.serverAddr) + err = os.WriteFile(configFile, []byte(config), 0o600) + if err != nil { + m.errChan <- err + + return + } + defer func() { + _ = os.Remove(configFile) + }() + + // Start coturn server + //nolint:noctx,gosec // Not using CommandContext to avoid goroutine leaks, config file is test-created + cmd := exec.Command("turnserver", "-c", configFile) + cmd.Stdout = io.Discard + cmd.Stderr = io.Discard + + if err := cmd.Start(); err != nil { + m.errChan <- fmt.Errorf("failed to start coturn: %w (is coturn installed?)", err) + + return + } + + // Ensure server has time to start + time.Sleep(coturnStartupDelay) + + m.serverReady <- struct{}{} + + // Wait for context cancellation + <-m.ctx.Done() + + // Kill and wait for process to fully exit + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + // Wait for process to clean up completely + _ = cmd.Wait() + + // Small delay to let goroutines fully clean up + time.Sleep(100 * time.Millisecond) + + m.serverDone <- nil + close(m.serverDone) + }() +} + +// clientCoturn uses turnutils_uclient to test against a Pion TURN server. +// +//nolint:varnamelen +func clientCoturn(m *testmgr) { + select { + case <-m.serverReady: + // OK + case <-time.After(time.Second * 5): + m.errChan <- errServerTimeout + + return + } + + m.clientMutex.Lock() + defer m.clientMutex.Unlock() + + // Use turnutils_uclient to connect to the Pion server + // -v: verbose + // -u: username + // -w: password + // -r: realm + // -e: peer address (echo server) + // -n: number of messages + // -m: message size + // -W: time to run (seconds) + args := []string{ + // "-v", + "-u", m.username, + "-w", m.password, + "-r", m.realm, + "-n", "1", + "-m", "1", + "-W", "5", + m.serverAddr, + } + + //nolint:noctx,gosec + cmd := exec.Command("turnutils_uclient", args...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + m.errChan <- fmt.Errorf("failed to create stdout pipe: %w", err) + + return + } + stderr, err := cmd.StderrPipe() + if err != nil { + m.errChan <- fmt.Errorf("failed to create stderr pipe: %w", err) + + return + } + + if err := cmd.Start(); err != nil { + m.errChan <- fmt.Errorf("failed to start turnutils_uclient: %w (is coturn installed?)", err) + + return + } + + // Look for success indicators in coturn client output + go func() { + scanner := bufio.NewScanner(io.MultiReader(stdout, stderr)) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "success") || strings.Contains(line, "allocate msg sent") { + m.messageReceived <- "success" + } + } + }() + + _ = cmd.Wait() + m.clientDone <- nil + close(m.clientDone) +} + +//nolint:varnamelen +func serverCoturnIPv6(m *testmgr) { + go func() { + m.serverMutex.Lock() + defer m.serverMutex.Unlock() + + host, portStr, err := net.SplitHostPort(m.serverAddr) + if err != nil { + m.errChan <- fmt.Errorf("failed to parse serverAddr: %w", err) + + return + } + + // Coturn requires explicit dual-stack configuration to properly handle IPv6. + // Both IPv4 (0.0.0.0) and IPv6 addresses must be specified separately. + // See: https://github.com/coturn/coturn/issues/1294 + config := fmt.Sprintf(` +listening-port=%s +listening-ip=0.0.0.0 +listening-ip=%s +relay-ip=%s +external-ip=%s +user=%s:%s +realm=%s +lt-cred-mech +fingerprint +no-tls +no-dtls +log-file=stdout +verbose +`, portStr, host, host, host, m.username, m.password, m.realm) + + configFile := fmt.Sprintf("/tmp/turnserver-%s.conf", m.serverAddr) + err = os.WriteFile(configFile, []byte(config), 0o600) + if err != nil { + m.errChan <- err + + return + } + defer func() { + _ = os.Remove(configFile) + }() + + //nolint:noctx,gosec + cmd := exec.Command("turnserver", "-c", configFile) + cmd.Stdout = io.Discard + cmd.Stderr = io.Discard + + if err := cmd.Start(); err != nil { + m.errChan <- fmt.Errorf("failed to start coturn: %w (is coturn installed?)", err) + + return + } + + time.Sleep(coturnStartupDelay) + + m.serverReady <- struct{}{} + + <-m.ctx.Done() + + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + _ = cmd.Wait() + + time.Sleep(100 * time.Millisecond) + + m.serverDone <- nil + close(m.serverDone) + }() +} + +//nolint:varnamelen +func clientCoturnIPv6(m *testmgr) { + select { + case <-m.serverReady: + case <-time.After(time.Second * 5): + m.errChan <- errServerTimeout + + return + } + + m.clientMutex.Lock() + defer m.clientMutex.Unlock() + + host, portStr, err := net.SplitHostPort(m.serverAddr) + if err != nil { + m.errChan <- err + + return + } + + args := []string{ + // "-v", + "-y", + "-x", + "-u", m.username, + "-w", m.password, + "-r", m.realm, + "-p", portStr, + "-n", "1", + "-m", "1", + "-W", "5", + host, + } + + //nolint:noctx,gosec + cmd := exec.Command("turnutils_uclient", args...) + + stdout, err := cmd.StdoutPipe() + if err != nil { + m.errChan <- fmt.Errorf("failed to create stdout pipe: %w", err) + + return + } + stderr, err := cmd.StderrPipe() + if err != nil { + m.errChan <- fmt.Errorf("failed to create stderr pipe: %w", err) + + return + } + + if err := cmd.Start(); err != nil { + m.errChan <- fmt.Errorf("failed to start turnutils_uclient: %w (is coturn installed?)", err) + + return + } + + go func() { + scanner := bufio.NewScanner(io.MultiReader(stdout, stderr)) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "success") || strings.Contains(line, "allocate msg sent") { + m.messageReceived <- "success" + } + } + }() + + _ = cmd.Wait() + m.clientDone <- nil + close(m.clientDone) +} + +func TestPionCoturnE2EClientServer(t *testing.T) { + t.Parallel() + t.Run("CoturnServer", func(t *testing.T) { + t.Parallel() + testPionE2ESimple(t, serverCoturn, clientPion) + }) + t.Run("CoturnClient", func(t *testing.T) { + t.Parallel() + testPionE2ESimple(t, serverPion, clientCoturn) + }) +} + +func TestPionCoturnE2EClientServerIPv6(t *testing.T) { + t.Parallel() + t.Run("CoturnServerIPv6", func(t *testing.T) { + t.Parallel() + testPionE2ESimpleIPv6(t, serverCoturnIPv6, clientPionIPv6) + }) + t.Run("CoturnClientIPv6", func(t *testing.T) { + t.Parallel() + testPionE2ESimpleIPv6(t, serverPionIPv6, clientCoturnIPv6) + }) +} diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go new file mode 100644 index 00000000..ce28b8fe --- /dev/null +++ b/e2e/e2e_test.go @@ -0,0 +1,431 @@ +// SPDX-FileCopyrightText: 2026 The Pion community +// SPDX-License-Identifier: MIT + +//go:build !js +// +build !js + +package e2e + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/pion/logging" + "github.com/pion/transport/v4/test" + "github.com/pion/turn/v5" + "github.com/pion/turn/v5/internal/auth" + "github.com/stretchr/testify/assert" +) + +const ( + testUsername = "user" + testPassword = "pass" + testRealm = "pion.ly" + testMessage = "Hello TURN" + testTimeLimit = 5 * time.Second +) + +var ( + errServerTimeout = errors.New("waiting on serverReady err: timeout") + errNoRelayAddress = errors.New("allocation succeeded but no relay address") +) + +func randomPort(tb testing.TB) int { + tb.Helper() + conn, err := net.ListenPacket("udp4", "127.0.0.1:0") // nolint: noctx + assert.NoError(tb, err, "failed to pick port") + + defer func() { + _ = conn.Close() + }() + switch addr := conn.LocalAddr().(type) { + case *net.UDPAddr: + return addr.Port + default: + assert.Fail(tb, "failed to acquire port", "unknown addr type %T", addr) + + return 0 + } +} + +type testmgr struct { + ctx context.Context //nolint:containedctx + serverAddr string + username string + password string + realm string + clientMutex *sync.Mutex + clientConn net.PacketConn + clientDone chan error + serverMutex *sync.Mutex + serverConn net.PacketConn + server *turn.Server + serverReady chan struct{} + serverDone chan error + errChan chan error + messageReceived chan string + client func(*testmgr) + serverFunc func(*testmgr) +} + +func newTestMgr( + t *testing.T, + ctx context.Context, + serverAddr string, + username, password, realm string, + serverFunc, client func(*testmgr), +) *testmgr { + t.Helper() + + return &testmgr{ + ctx: ctx, + serverAddr: serverAddr, + username: username, + password: password, + realm: realm, + clientMutex: &sync.Mutex{}, + serverMutex: &sync.Mutex{}, + serverReady: make(chan struct{}), + serverDone: make(chan error), + clientDone: make(chan error), + errChan: make(chan error), + messageReceived: make(chan string), + serverFunc: serverFunc, + client: client, + } +} + +func (m *testmgr) assert(t *testing.T) { + t.Helper() + + go m.serverFunc(m) + go m.client(m) + + defer func() { + if m.clientConn != nil { + _ = m.clientConn.Close() + } + if m.serverConn != nil { + _ = m.serverConn.Close() + } + if m.server != nil { + _ = m.server.Close() + } + }() + + select { + case err := <-m.errChan: + assert.NoError(t, err) + case msg := <-m.messageReceived: + assert.Equal(t, testMessage, msg) + case <-m.clientDone: + case <-time.After(testTimeLimit): + assert.Fail(t, "Test timeout") + } +} + +//nolint:cyclop +func (m *testmgr) cleanup(t *testing.T) { + t.Helper() + + clientDone, serverDone := false, false + for { + select { + case err := <-m.clientDone: + if err != nil { + t.Logf("Client error: %v", err) + } + clientDone = true + if clientDone && serverDone { + return + } + + case err := <-m.serverDone: + if err != nil { + t.Logf("Server error: %v", err) + } + serverDone = true + if clientDone && serverDone { + return + } + + case <-time.After(testTimeLimit): + assert.Fail(t, "Test timeout waiting for cleanup") + + return + } + } +} + +//nolint:varnamelen +func clientPion(m *testmgr) { + select { + case <-m.serverReady: + // OK + case <-time.After(time.Second * 5): + m.errChan <- errServerTimeout + + return + } + + m.clientMutex.Lock() + defer m.clientMutex.Unlock() + + var err error + lc := net.ListenConfig{} + m.clientConn, err = lc.ListenPacket(m.ctx, "udp4", "127.0.0.1:0") + if err != nil { + m.errChan <- err + + return + } + + client, err := turn.NewClient(&turn.ClientConfig{ + TURNServerAddr: m.serverAddr, + Username: m.username, + Password: m.password, + Realm: m.realm, + Conn: m.clientConn, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + if err != nil { + m.errChan <- err + + return + } + defer client.Close() + + if err = client.Listen(); err != nil { + m.errChan <- err + + return + } + + relayConn, err := client.Allocate() + if err != nil { + m.errChan <- err + + return + } + defer func() { + _ = relayConn.Close() + }() + + // Verify we got a relay address + if relayConn.LocalAddr() == nil { + m.errChan <- errNoRelayAddress + + return + } + + // Signal success + m.messageReceived <- testMessage + + m.clientDone <- nil + close(m.clientDone) +} + +//nolint:varnamelen +func serverPion(m *testmgr) { + m.serverMutex.Lock() + defer m.serverMutex.Unlock() + + var err error + lc := net.ListenConfig{} + m.serverConn, err = lc.ListenPacket(m.ctx, "udp4", m.serverAddr) + if err != nil { + m.errChan <- err + + return + } + + m.server, err = turn.NewServer(turn.ServerConfig{ + Realm: m.realm, + AuthHandler: func(ra *auth.RequestAttributes) (userID string, key []byte, ok bool) { + if ra.Username == m.username && ra.Realm == m.realm { + return ra.Username, turn.GenerateAuthKey(ra.Username, m.realm, m.password), true + } + + return "", nil, false + }, + PacketConnConfigs: []turn.PacketConnConfig{ + { + PacketConn: m.serverConn, + RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: "127.0.0.1"}, + }, + }, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + if err != nil { + m.errChan <- err + + return + } + + m.serverReady <- struct{}{} + + <-m.ctx.Done() + m.serverDone <- nil + close(m.serverDone) +} + +//nolint:varnamelen +func clientPionIPv6(m *testmgr) { + select { + case <-m.serverReady: + case <-time.After(time.Second * 5): + m.errChan <- errServerTimeout + + return + } + + m.clientMutex.Lock() + defer m.clientMutex.Unlock() + + var err error + lc := net.ListenConfig{} + m.clientConn, err = lc.ListenPacket(m.ctx, "udp6", "[::1]:0") + if err != nil { + m.errChan <- err + + return + } + + client, err := turn.NewClient(&turn.ClientConfig{ + STUNServerAddr: m.serverAddr, + TURNServerAddr: m.serverAddr, + Username: m.username, + Password: m.password, + Realm: m.realm, + Conn: m.clientConn, + RequestedAddressFamily: turn.RequestedAddressFamilyIPv6, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + if err != nil { + m.errChan <- err + + return + } + defer client.Close() + + if err = client.Listen(); err != nil { + m.errChan <- err + + return + } + + relayConn, err := client.Allocate() + if err != nil { + m.errChan <- err + + return + } + defer func() { + _ = relayConn.Close() + }() + + if relayConn.LocalAddr() == nil { + m.errChan <- errNoRelayAddress + + return + } + + m.messageReceived <- testMessage + + m.clientDone <- nil + close(m.clientDone) +} + +//nolint:varnamelen +func serverPionIPv6(m *testmgr) { + m.serverMutex.Lock() + defer m.serverMutex.Unlock() + + var err error + lc := net.ListenConfig{} + m.serverConn, err = lc.ListenPacket(m.ctx, "udp6", m.serverAddr) + if err != nil { + m.errChan <- err + + return + } + + host, _, err := net.SplitHostPort(m.serverAddr) + if err != nil { + m.errChan <- err + + return + } + + m.server, err = turn.NewServer(turn.ServerConfig{ + Realm: m.realm, + AuthHandler: func(ra *auth.RequestAttributes) (userID string, key []byte, ok bool) { + if ra.Username == m.username && ra.Realm == m.realm { + return ra.Username, turn.GenerateAuthKey(ra.Username, m.realm, m.password), true + } + + return "", nil, false + }, + PacketConnConfigs: []turn.PacketConnConfig{ + { + PacketConn: m.serverConn, + RelayAddressGenerator: &turn.RelayAddressGeneratorNone{Address: host}, + }, + }, + LoggerFactory: logging.NewDefaultLoggerFactory(), + }) + if err != nil { + m.errChan <- err + + return + } + + m.serverReady <- struct{}{} + + <-m.ctx.Done() + m.serverDone <- nil + close(m.serverDone) +} + +func testPionE2ESimple(t *testing.T, server, client func(*testmgr)) { + t.Helper() + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + serverAddr := fmt.Sprintf("127.0.0.1:%d", randomPort(t)) + mgr := newTestMgr(t, ctx, serverAddr, testUsername, testPassword, testRealm, server, client) + defer mgr.cleanup(t) + mgr.assert(t) +} + +func testPionE2ESimpleIPv6(t *testing.T, server, client func(*testmgr)) { + t.Helper() + lim := test.TimeOut(time.Second * 30) + defer lim.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + serverAddr := fmt.Sprintf("[::1]:%d", randomPort(t)) + mgr := newTestMgr(t, ctx, serverAddr, testUsername, testPassword, testRealm, server, client) + defer mgr.cleanup(t) + mgr.assert(t) +} + +func TestPionE2ESimple(t *testing.T) { + t.Parallel() + testPionE2ESimple(t, serverPion, clientPion) +} + +func TestPionE2ESimpleIPv6(t *testing.T) { + t.Parallel() + testPionE2ESimpleIPv6(t, serverPionIPv6, clientPionIPv6) +}