diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 32f0b5dc..92e6ba3c 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -265,10 +265,16 @@ func (ss *ServerSession) bridgeStream(log logrus.FieldLogger, yStr io.ReadWriteC } log.Debug("Forwarded stream response.") - // Clear the read deadline before the long-lived bidirectional copy. - if conn, ok := yStr.(net.Conn); ok { - conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec - } + // Set an idle timeout on both sides of the bridge. If no data flows + // in either direction for this duration, both streams are closed. + // Without this, half-dead connections (client disconnected without + // sending FIN) cause goroutines to leak indefinitely — observed as + // 55K+ stuck goroutines in production dmsg servers. + const streamIdleTimeout = 5 * time.Minute + + // Wrap both streams with idle-timeout deadlines. + yStr = &idleTimeoutConn{rwc: yStr, timeout: streamIdleTimeout} + yStr2 = &idleTimeoutConn{rwc: yStr2, timeout: streamIdleTimeout} log.Info("Serving stream.") ss.m.RecordStream(metrics.DeltaConnect) @@ -276,6 +282,33 @@ func (ss *ServerSession) bridgeStream(log logrus.FieldLogger, yStr io.ReadWriteC return netutil.CopyReadWriteCloser(yStr, yStr2) } +// idleTimeoutConn wraps a ReadWriteCloser with per-operation deadlines. +// If the underlying connection supports SetReadDeadline/SetWriteDeadline, +// each Read/Write resets the deadline. If the connection goes idle (no data +// in either direction), the deadline fires and the blocked io.Copy returns. +type idleTimeoutConn struct { + rwc io.ReadWriteCloser + timeout time.Duration +} + +func (c *idleTimeoutConn) Read(p []byte) (int, error) { + if conn, ok := c.rwc.(net.Conn); ok { + conn.SetReadDeadline(time.Now().Add(c.timeout)) //nolint:errcheck,gosec + } + return c.rwc.Read(p) +} + +func (c *idleTimeoutConn) Write(p []byte) (int, error) { + if conn, ok := c.rwc.(net.Conn); ok { + conn.SetWriteDeadline(time.Now().Add(c.timeout)) //nolint:errcheck,gosec + } + return c.rwc.Write(p) +} + +func (c *idleTimeoutConn) Close() error { + return c.rwc.Close() +} + // forwardViaPeer tries to forward a stream request through peer server sessions. // This is only called for client-originated requests (not peer-originated, enforcing 1-hop max). func (ss *ServerSession) forwardViaPeer(log logrus.FieldLogger, yStr io.ReadWriteCloser, req StreamRequest) error {