From 9fa021eeebd14ac484412236b76f810fc60f69a9 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 10:58:24 -0500 Subject: [PATCH 1/6] Fix ephemeral port leak: pass context to ClientSession.DialStream ClientSession.DialStream didn't accept a context, so when the caller's deadline expired, the blocked readResponse kept the ephemeral port reserved until HandshakeTimeout fired (20-30s per server). With 6 servers tried sequentially, a single failed dial could hold ports for minutes. Now DialStream accepts context and spawns a goroutine that closes the stream when the context is cancelled, immediately interrupting any blocked read/write and freeing the ephemeral port. Also: Client.DialStream now passes ctx to all ClientSession.DialStream calls (phases 0-3) and checks ctx.Err() between phases. --- pkg/dmsg/client_dial.go | 8 ++++---- pkg/dmsg/client_session.go | 24 ++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/dmsg/client_dial.go b/pkg/dmsg/client_dial.go index 88c437b3..5d54c57b 100644 --- a/pkg/dmsg/client_dial.go +++ b/pkg/dmsg/client_dial.go @@ -40,7 +40,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { // Phase 0: Try cached route first (server that last successfully reached this destination). if cachedSrvPK, ok := ce.getCachedRoute(addr.PK); ok { if dSes, ok := ce.clientSession(ce.porter, cachedSrvPK); ok { - stream, err := dSes.DialStream(addr) + stream, err := dSes.DialStream(ctx, addr) if err != nil { ce.log.WithError(err).WithField("server", cachedSrvPK). Debug("DialStream failed via cached route, evicting") @@ -58,7 +58,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { // Sort by latency so the lowest-latency server is tried first. delegatedSessions := ce.sortedDelegatedSessions(entry.Client.DelegatedServers) for _, dSes := range delegatedSessions { - stream, err := dSes.DialStream(addr) + stream, err := dSes.DialStream(ctx, addr) if err != nil { ce.log.WithError(err).WithField("server", dSes.RemotePK()). Debug("DialStream failed via existing session, trying next server") @@ -73,7 +73,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { // Sorted by latency. meshSessions := ce.sortedMeshSessions(entry.Client.DelegatedServers) for _, ses := range meshSessions { - stream, err := ses.DialStream(addr) + stream, err := ses.DialStream(ctx, addr) if err != nil { ce.log.WithError(err).WithField("server", ses.RemotePK()). Debug("DialStream failed via mesh, trying next server") @@ -89,7 +89,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { if err != nil { continue } - stream, err := dSes.DialStream(addr) + stream, err := dSes.DialStream(ctx, addr) if err != nil { ce.log.WithError(err).WithField("server", srvPK). Debug("DialStream failed via new session, trying next server") diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index bfe8f24d..9a290e9a 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -2,6 +2,7 @@ package dmsg import ( + "context" "errors" "net" "time" @@ -28,7 +29,9 @@ func makeClientSession(entity *EntityCommon, porter *netutil.Porter, conn net.Co } // DialStream attempts to dial a stream to a remote client via the dmsg server that this session is connected to. -func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { +// The context is used to cancel the dial if the caller's deadline expires — this prevents ephemeral port +// leaks when many dials are attempted and the caller gives up before the handshake completes. +func (cs *ClientSession) DialStream(ctx context.Context, dst Addr) (dStr *Stream, err error) { log := cs.log. WithField("func", "ClientSession.DialStream"). WithField("dst_addr", dst) @@ -37,7 +40,7 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { return nil, err } - // Close stream on failure. + // Close stream on failure — this frees the reserved ephemeral port. defer func() { if err != nil { log.WithError(err). @@ -46,6 +49,23 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) { } }() + // If the caller's context is cancelled, close the stream to interrupt + // any blocked read/write and free the ephemeral port immediately. + ctxDone := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + dStr.Close() //nolint:errcheck + case <-ctxDone: + } + }() + defer close(ctxDone) + + // Check context before starting. + if ctx.Err() != nil { + return nil, ctx.Err() + } + // Prepare deadline. if err = dStr.SetDeadline(time.Now().Add(HandshakeTimeout)); err != nil { return nil, err From b4fde6ca683da8ccac464cabd9d1aa5deaf5f33d Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 16:59:17 -0500 Subject: [PATCH 2/6] Fix lint: misspell (canceled) and gosec (unhandled Close) --- pkg/dmsg/client_session.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index 9a290e9a..1efdae40 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -49,13 +49,13 @@ func (cs *ClientSession) DialStream(ctx context.Context, dst Addr) (dStr *Stream } }() - // If the caller's context is cancelled, close the stream to interrupt + // If the caller's context is canceled, close the stream to interrupt // any blocked read/write and free the ephemeral port immediately. ctxDone := make(chan struct{}) go func() { select { case <-ctx.Done(): - dStr.Close() //nolint:errcheck + _ = dStr.Close() case <-ctxDone: } }() From 693433854f44bab14378874ca83d4cc838df4596 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:03:39 -0500 Subject: [PATCH 3/6] Fix errcheck: use nolint comment instead of blank identifier --- pkg/dmsg/client_session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index 1efdae40..b29ee59b 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -55,7 +55,7 @@ func (cs *ClientSession) DialStream(ctx context.Context, dst Addr) (dStr *Stream go func() { select { case <-ctx.Done(): - _ = dStr.Close() + dStr.Close() //nolint:errcheck case <-ctxDone: } }() From bf9efff93cdf32f0efa66d91fd8d19a58c15df77 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:07:23 -0500 Subject: [PATCH 4/6] Fix gosec: add gosec to nolint directive --- pkg/dmsg/client_session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dmsg/client_session.go b/pkg/dmsg/client_session.go index b29ee59b..7e961f75 100644 --- a/pkg/dmsg/client_session.go +++ b/pkg/dmsg/client_session.go @@ -55,7 +55,7 @@ func (cs *ClientSession) DialStream(ctx context.Context, dst Addr) (dStr *Stream go func() { select { case <-ctx.Done(): - dStr.Close() //nolint:errcheck + dStr.Close() //nolint:errcheck,gosec case <-ctxDone: } }() From c875112459055ea3e56ba6664505521f6f80ccf7 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:18:08 -0500 Subject: [PATCH 5/6] Fix flaky TestHTTPTransport_RoundTrip: increase session stabilization wait --- pkg/dmsghttp/http_transport_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/dmsghttp/http_transport_test.go b/pkg/dmsghttp/http_transport_test.go index d0b40158..4f250a01 100644 --- a/pkg/dmsghttp/http_transport_test.go +++ b/pkg/dmsghttp/http_transport_test.go @@ -81,8 +81,10 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { Timeout: 10 * time.Second, } - // Allow time for dmsg sessions to stabilize on macOS - time.Sleep(200 * time.Millisecond) + // Allow time for dmsg sessions to stabilize across all platforms. + // CI runners are slower; 200ms was insufficient for noise handshakes + // to complete across 5 servers × 4 clients. + time.Sleep(2 * time.Second) // Act: http clients send requests concurrently. // - client1 sends "/index.html" requests. From 01f1a2c2482346f67c11bc5a0f68106844b1bab5 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:22:59 -0500 Subject: [PATCH 6/6] Fix flaky test: increase HTTP client timeout from 10s to 30s for CI --- pkg/dmsghttp/http_transport_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/dmsghttp/http_transport_test.go b/pkg/dmsghttp/http_transport_test.go index 4f250a01..3401473b 100644 --- a/pkg/dmsghttp/http_transport_test.go +++ b/pkg/dmsghttp/http_transport_test.go @@ -70,15 +70,15 @@ func TestHTTPTransport_RoundTrip(t *testing.T) { // Configure timeouts to prevent hanging on errors. httpC1 := http.Client{ Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client1")), - Timeout: 10 * time.Second, + Timeout: 30 * time.Second, } httpC2 := http.Client{ Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client2")), - Timeout: 10 * time.Second, + Timeout: 30 * time.Second, } httpC3 := http.Client{ Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client3")), - Timeout: 10 * time.Second, + Timeout: 30 * time.Second, } // Allow time for dmsg sessions to stabilize across all platforms.