Skip to content
Merged

Dev #17

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/proxy/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte

// Tell client: connection successful
clientConn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0})
log.Printf("[proxy] conn#%d: packet-split SYN sent, connID=%d", connID, tunnelConnID)

port := binary.BigEndian.Uint16(portBytes)
log.Printf("[proxy] conn#%d: packet-split mode, %d instances, port %d",
Expand Down Expand Up @@ -308,6 +309,7 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte
}()

wg.Wait()
log.Printf("[proxy] conn#%d: packet-split done, tx=%d rx=%d", connID, txN, rxN)

// Track TX/RX on instances (distributed proportionally)
nInstances := int64(len(socksHealthy))
Expand Down
54 changes: 24 additions & 30 deletions internal/tunnel/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ import (
// writeTimeout prevents writes from blocking forever on stalled connections.
const writeTimeout = 10 * time.Second

// readTimeout is applied per ReadFrame call. If no frame arrives within this
// duration, we check if the tunnel is stale.
const readTimeout = 30 * time.Second

// staleThreshold: if we've sent data recently but haven't received anything
// in this long, the connection is considered half-dead.
// staleThreshold: if we've sent data but haven't received anything
// in this long, the connection is considered half-dead and will be
// force-closed by refreshConnections (which triggers reconnect).
const staleThreshold = 20 * time.Second

// TunnelConn wraps a persistent TCP connection to a single instance.
Expand Down Expand Up @@ -112,7 +109,7 @@ func (p *TunnelPool) SendFrame(instID int, f *Frame) error {
return tc.writeFrame(f)
}

// refreshConnections reconnects dead/stale tunnels and adds new healthy instances.
// refreshConnections reconnects dead/stale tunnels and adds new ones.
func (p *TunnelPool) refreshConnections() {
healthy := p.mgr.HealthyInstances()
nowMs := time.Now().UnixMilli()
Expand All @@ -132,11 +129,13 @@ func (p *TunnelPool) refreshConnections() {
shouldRemove = true
} else {
// Detect half-dead connections:
// If we wrote recently but haven't read in staleThreshold, connection is dead.
// If we wrote recently but haven't read in staleThreshold,
// the QUIC tunnel is likely dead but local TCP is still open.
// Force-close it so readLoop exits and we can reconnect.
lastW := tc.lastWrite.Load()
lastR := tc.lastRead.Load()
if lastW > 0 && (nowMs-lastR) > staleThreshold.Milliseconds() {
log.Printf("[tunnel-pool] instance %d: stale connection detected (last_read=%dms ago, last_write=%dms ago), recycling",
log.Printf("[tunnel-pool] instance %d: stale (last_read=%dms ago, last_write=%dms ago), force-closing",
id, nowMs-lastR, nowMs-lastW)
shouldRemove = true
}
Expand Down Expand Up @@ -183,7 +182,7 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error)
conn: conn,
}
tunnel.lastRead.Store(now)
tunnel.lastWrite.Store(0) // no writes yet
tunnel.lastWrite.Store(0)

p.wg.Add(1)
go func() {
Expand All @@ -194,6 +193,11 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error)
return tunnel, nil
}

// readLoop reads frames WITHOUT any read deadline.
// CRITICAL: ReadFrame must NEVER be interrupted mid-read, because partial
// header reads corrupt the entire frame stream (all subsequent reads get
// misaligned). The only way to stop readLoop is to close the connection
// (via tc.close()), which makes ReadFrame return an error.
func (p *TunnelPool) readLoop(tc *TunnelConn) {
for {
select {
Expand All @@ -202,17 +206,11 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {
default:
}

// Set read deadline so we don't block forever on dead connections.
// If timeout fires, we loop back and try again — refreshConnections
// will detect staleness and force close if needed.
tc.conn.SetReadDeadline(time.Now().Add(readTimeout))
// NO read deadline here! ReadFrame blocks until a complete frame
// arrives or the connection closes. read deadline would cause
// partial header reads → stream corruption.
frame, err := ReadFrame(tc.conn)
if err != nil {
if isTimeoutErr(err) {
// Read timed out — not necessarily dead, just no data.
// refreshConnections will check staleness.
continue
}
if err != io.EOF && !isClosedErr(err) {
log.Printf("[tunnel-pool] instance %d read error: %v", tc.inst.ID(), err)
}
Expand All @@ -222,12 +220,18 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {

tc.lastRead.Store(time.Now().UnixMilli())

// Diagnostic: log reverse frames (data coming back from instances)
if frame.IsReverse() || frame.IsFIN() || frame.IsRST() {
log.Printf("[tunnel-pool] instance %d: recv frame conn=%d seq=%d flags=0x%02x len=%d",
tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload))
}

if v, ok := p.handlers.Load(frame.ConnID); ok {
ch := v.(chan *Frame)
select {
case ch <- frame:
default:
// Buffer full — drop frame silently
// Buffer full — drop silently
}
}
}
Expand Down Expand Up @@ -267,13 +271,3 @@ func isClosedErr(err error) bool {
return strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "connection reset by peer")
}

func isTimeoutErr(err error) bool {
if err == nil {
return false
}
if ne, ok := err.(net.Error); ok {
return ne.Timeout()
}
return false
}
Loading