From 9961554880a1a3a4c70b2bd5568f89266a15a7bb Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Tue, 24 Mar 2026 02:54:47 +0330 Subject: [PATCH 1/2] Remove read timeout from the tunnel read loop to prevent stream corruption, relying on stale connection detection to force-close and reconnect. --- internal/tunnel/pool.go | 48 ++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index 34b831c..6fcf660 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -16,12 +16,9 @@ import ( // writeTimeout prevents writes from blocking forever on stalled connections. const writeTimeout = 10 * time.Second -// readTimeout is applied per ReadFrame call. If no frame arrives within this -// duration, we check if the tunnel is stale. -const readTimeout = 30 * time.Second - -// staleThreshold: if we've sent data recently but haven't received anything -// in this long, the connection is considered half-dead. +// staleThreshold: if we've sent data but haven't received anything +// in this long, the connection is considered half-dead and will be +// force-closed by refreshConnections (which triggers reconnect). const staleThreshold = 20 * time.Second // TunnelConn wraps a persistent TCP connection to a single instance. @@ -112,7 +109,7 @@ func (p *TunnelPool) SendFrame(instID int, f *Frame) error { return tc.writeFrame(f) } -// refreshConnections reconnects dead/stale tunnels and adds new healthy instances. +// refreshConnections reconnects dead/stale tunnels and adds new ones. func (p *TunnelPool) refreshConnections() { healthy := p.mgr.HealthyInstances() nowMs := time.Now().UnixMilli() @@ -132,11 +129,13 @@ func (p *TunnelPool) refreshConnections() { shouldRemove = true } else { // Detect half-dead connections: - // If we wrote recently but haven't read in staleThreshold, connection is dead. + // If we wrote recently but haven't read in staleThreshold, + // the QUIC tunnel is likely dead but local TCP is still open. + // Force-close it so readLoop exits and we can reconnect. lastW := tc.lastWrite.Load() lastR := tc.lastRead.Load() if lastW > 0 && (nowMs-lastR) > staleThreshold.Milliseconds() { - log.Printf("[tunnel-pool] instance %d: stale connection detected (last_read=%dms ago, last_write=%dms ago), recycling", + log.Printf("[tunnel-pool] instance %d: stale (last_read=%dms ago, last_write=%dms ago), force-closing", id, nowMs-lastR, nowMs-lastW) shouldRemove = true } @@ -183,7 +182,7 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) conn: conn, } tunnel.lastRead.Store(now) - tunnel.lastWrite.Store(0) // no writes yet + tunnel.lastWrite.Store(0) p.wg.Add(1) go func() { @@ -194,6 +193,11 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) return tunnel, nil } +// readLoop reads frames WITHOUT any read deadline. +// CRITICAL: ReadFrame must NEVER be interrupted mid-read, because partial +// header reads corrupt the entire frame stream (all subsequent reads get +// misaligned). The only way to stop readLoop is to close the connection +// (via tc.close()), which makes ReadFrame return an error. func (p *TunnelPool) readLoop(tc *TunnelConn) { for { select { @@ -202,17 +206,11 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { default: } - // Set read deadline so we don't block forever on dead connections. - // If timeout fires, we loop back and try again — refreshConnections - // will detect staleness and force close if needed. - tc.conn.SetReadDeadline(time.Now().Add(readTimeout)) + // NO read deadline here! ReadFrame blocks until a complete frame + // arrives or the connection closes. read deadline would cause + // partial header reads → stream corruption. frame, err := ReadFrame(tc.conn) if err != nil { - if isTimeoutErr(err) { - // Read timed out — not necessarily dead, just no data. - // refreshConnections will check staleness. - continue - } if err != io.EOF && !isClosedErr(err) { log.Printf("[tunnel-pool] instance %d read error: %v", tc.inst.ID(), err) } @@ -227,7 +225,7 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { select { case ch <- frame: default: - // Buffer full — drop frame silently + // Buffer full — drop silently } } } @@ -267,13 +265,3 @@ func isClosedErr(err error) bool { return strings.Contains(err.Error(), "use of closed network connection") || strings.Contains(err.Error(), "connection reset by peer") } - -func isTimeoutErr(err error) bool { - if err == nil { - return false - } - if ne, ok := err.(net.Error); ok { - return ne.Timeout() - } - return false -} From c8917b1f5d5e26c1f29a7106d5fa720a900bd183 Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Tue, 24 Mar 2026 03:24:45 +0330 Subject: [PATCH 2/2] feat: Add diagnostic logging for packet-split connection events and incoming tunnel frames. --- internal/proxy/socks5.go | 2 ++ internal/tunnel/pool.go | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/internal/proxy/socks5.go b/internal/proxy/socks5.go index 285b3d7..f715233 100644 --- a/internal/proxy/socks5.go +++ b/internal/proxy/socks5.go @@ -272,6 +272,7 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte // Tell client: connection successful clientConn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0}) + log.Printf("[proxy] conn#%d: packet-split SYN sent, connID=%d", connID, tunnelConnID) port := binary.BigEndian.Uint16(portBytes) log.Printf("[proxy] conn#%d: packet-split mode, %d instances, port %d", @@ -308,6 +309,7 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte }() wg.Wait() + log.Printf("[proxy] conn#%d: packet-split done, tx=%d rx=%d", connID, txN, rxN) // Track TX/RX on instances (distributed proportionally) nInstances := int64(len(socksHealthy)) diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index 6fcf660..5db735d 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -220,6 +220,12 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { tc.lastRead.Store(time.Now().UnixMilli()) + // Diagnostic: log reverse frames (data coming back from instances) + if frame.IsReverse() || frame.IsFIN() || frame.IsRST() { + log.Printf("[tunnel-pool] instance %d: recv frame conn=%d seq=%d flags=0x%02x len=%d", + tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload)) + } + if v, ok := p.handlers.Load(frame.ConnID); ok { ch := v.(chan *Frame) select {