diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index 3c664c0..e6a1242 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -206,7 +206,7 @@ func (cs *centralServer) dispatchFrame(frame *tunnel.Frame, source net.Conn) { cs.handleRST(frame) return } - cs.handleData(frame) + cs.handleData(frame, source) } func (cs *centralServer) handleSYN(frame *tunnel.Frame, source net.Conn) { @@ -424,7 +424,7 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { log.Printf("[central] conn=%d: all sources failed", connID) } -func (cs *centralServer) handleData(frame *tunnel.Frame) { +func (cs *centralServer) handleData(frame *tunnel.Frame, source net.Conn) { cs.mu.RLock() state, ok := cs.conns[frame.ConnID] cs.mu.RUnlock() @@ -435,6 +435,19 @@ func (cs *centralServer) handleData(frame *tunnel.Frame) { state.mu.Lock() defer state.mu.Unlock() + // If this source isn't known yet (e.g., after tunnel recycling), add it. + // This ensures responses can flow back through the new connection. + found := false + for _, s := range state.sources { + if s == source { + found = true + break + } + } + if !found { + state.sources = append(state.sources, source) + } + state.reorderer.Insert(frame.SeqNum, frame.Payload) if state.target == nil { return // buffered, flushed when upstream connects diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index d05efd1..6cf449c 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "math/rand" "net" "sync" "sync/atomic" @@ -30,8 +31,9 @@ type TunnelConn struct { closed bool incoming chan *Frame createdAt time.Time - lastRead atomic.Int64 // unix timestamp of last successful read - writeErrs atomic.Int32 // consecutive write errors + maxAge time.Duration // per-connection jittered max age + lastRead atomic.Int64 // unix timestamp of last successful read + writeErrs atomic.Int32 // consecutive write errors } // TunnelPool manages persistent connections to all healthy instances. @@ -151,8 +153,7 @@ func (p *TunnelPool) refreshConnections() { if !activeIDs[id] || tc.closed { shouldRemove = true - } else if now.Sub(tc.createdAt) > tunnelMaxAge { - // Connection too old → recycle to get a fresh QUIC stream + } else if now.Sub(tc.createdAt) > tc.maxAge { log.Printf("[tunnel-pool] recycling instance %d connection (age=%s)", id, now.Sub(tc.createdAt).Round(time.Second)) shouldRemove = true @@ -202,11 +203,15 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) tc.SetNoDelay(true) } + // Jittered max age: tunnelMaxAge ± 50% + jitter := time.Duration(rand.Int63n(int64(tunnelMaxAge))) - tunnelMaxAge/2 + tunnel := &TunnelConn{ inst: inst, conn: conn, incoming: make(chan *Frame, 256), createdAt: time.Now(), + maxAge: tunnelMaxAge + jitter, } tunnel.lastRead.Store(time.Now().Unix())