diff --git a/cmd/centralserver/main.go b/cmd/centralserver/main.go index 7fdccb7..3c664c0 100644 --- a/cmd/centralserver/main.go +++ b/cmd/centralserver/main.go @@ -363,7 +363,6 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3 n, err := upstream.Read(buf) if n > 0 { - log.Printf("[central] conn=%d: upstream read %d bytes, sending reverse", connID, n) state.mu.Lock() seq := state.txSeq state.txSeq++ @@ -406,9 +405,6 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { return } - log.Printf("[central] conn=%d: sendFrame seq=%d flags=0x%02x len=%d sources=%d", - connID, frame.SeqNum, frame.Flags, len(frame.Payload), len(state.sources)) - // Try each source once, starting from current index for tries := 0; tries < len(state.sources); tries++ { idx := state.sourceIdx % len(state.sources) @@ -416,7 +412,6 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { w := state.sources[idx] if err := tunnel.WriteFrame(w, frame); err != nil { - log.Printf("[central] conn=%d: sendFrame to source %d failed: %v", connID, idx, err) // Remove dead source state.sources = append(state.sources[:idx], state.sources[idx+1:]...) if state.sourceIdx > 0 { @@ -430,12 +425,10 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) { } func (cs *centralServer) handleData(frame *tunnel.Frame) { - log.Printf("[central] conn=%d: DATA seq=%d len=%d", frame.ConnID, frame.SeqNum, len(frame.Payload)) cs.mu.RLock() state, ok := cs.conns[frame.ConnID] cs.mu.RUnlock() if !ok { - log.Printf("[central] conn=%d: DATA for unknown connID, dropping", frame.ConnID) return } diff --git a/internal/gui/dashboard.go b/internal/gui/dashboard.go index 3dbf767..a200d64 100644 --- a/internal/gui/dashboard.go +++ b/internal/gui/dashboard.go @@ -59,12 +59,16 @@ tr:hover td{background:rgba(124,108,255,0.03)} .ico{width:14px;height:14px;vertical-align:middle;fill:none;stroke:currentColor;stroke-width:2;stroke-linecap:round;stroke-linejoin:round} .btn .ico{width:12px;height:12px;margin-right:2px} .form-group{margin-bottom:14px} -.form-group label{display:block;font-size:11px;color:var(--text2);margin-bottom:4px;font-weight:500} +.form-group label{display:flex;align-items:center;gap:5px;font-size:11px;color:var(--text2);margin-bottom:4px;font-weight:500} .form-group input,.form-group select,.form-group textarea{width:100%;padding:8px 12px;background:var(--bg2);border:1px solid var(--border);border-radius:var(--rs);color:var(--text);font:13px inherit} .form-group input:focus,.form-group select:focus,.form-group textarea:focus{outline:none;border-color:var(--accent)} .form-group textarea{min-height:120px;font:11px 'SF Mono',Monaco,Consolas,monospace;resize:vertical} .form-row{display:grid;grid-template-columns:1fr 1fr;gap:12px} .form-row3{display:grid;grid-template-columns:1fr 1fr 1fr;gap:12px} +.tip-btn{display:inline-flex;align-items:center;justify-content:center;width:15px;height:15px;border-radius:50%;background:var(--bg4);border:1px solid var(--border);color:var(--text3);font-size:9px;font-weight:700;cursor:pointer;position:relative;flex-shrink:0} +.tip-btn:hover::after{content:attr(data-tip);position:absolute;bottom:calc(100% + 6px);left:50%;transform:translateX(-50%);background:var(--bg4);border:1px solid var(--border);color:var(--text);font-size:10px;font-weight:400;padding:6px 10px;border-radius:6px;white-space:pre-line;width:220px;z-index:100;pointer-events:none;box-shadow:0 4px 20px rgba(0,0,0,0.4)} +.cs-box{margin-top:10px;padding:14px;background:var(--bg2);border:1px solid rgba(124,108,255,0.25);border-radius:var(--rs)} +.cs-box .cs-title{font-size:11px;font-weight:700;color:var(--accent2);margin-bottom:10px;text-transform:uppercase;letter-spacing:.8px} canvas{width:100%;height:200px;border-radius:var(--rs);background:var(--bg2);border:1px solid var(--border)} .toast{position:fixed;bottom:20px;right:20px;padding:10px 18px;background:var(--green);color:#fff;border-radius:var(--rs);font:500 12px inherit;opacity:0;transform:translateY(8px);transition:.3s;pointer-events:none;z-index:99} .toast.show{opacity:1;transform:translateY(0)}.toast.error{background:var(--red)} @@ -205,8 +209,28 @@ canvas{width:100%;height:200px;border-radius:var(--rs);background:var(--bg2);bor
-
- +
+ + +
+
+
@@ -435,6 +459,10 @@ async function restartInst(id){ // ─── Config ─── async function loadCfg(){try{const r=await fetch('/api/config');CC=await r.json()}catch(e){}} +function onStratChange(){ + const ps=document.getElementById('cfg-strat').value==='packet_split'; + document.getElementById('cfg-cs-box').style.display=ps?'block':'none'; +} function fillForm(c){ document.getElementById('cfg-listen').value=c.socks?.listen||''; document.getElementById('cfg-strat').value=c.strategy||'round_robin'; @@ -442,6 +470,11 @@ function fillForm(c){ document.getElementById('cfg-max').value=c.socks?.max_connections||10000; document.getElementById('cfg-hi').value=c.health_check?.interval||'30s'; document.getElementById('cfg-ht').value=c.health_check?.timeout||'30s'; + // Central server + const cs=c.central_server||{}; + document.getElementById('cfg-cs-addr').value=cs.address||''; + document.getElementById('cfg-cs-chunk').value=cs.chunk_size||8192; + onStratChange(); } async function saveConfig(){ try{ @@ -452,6 +485,10 @@ async function saveConfig(){ CC.strategy=document.getElementById('cfg-strat').value; CC.health_check.interval=document.getElementById('cfg-hi').value; CC.health_check.timeout=document.getElementById('cfg-ht').value; + // Central server (only relevant for packet_split) + if(!CC.central_server)CC.central_server={}; + CC.central_server.address=document.getElementById('cfg-cs-addr').value; + CC.central_server.chunk_size=parseInt(document.getElementById('cfg-cs-chunk').value)||8192; const r=await fetch('/api/config',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(CC)}); if(!r.ok){toast(await r.text(),true);return}toast('Config saved!') }catch(e){toast('Save failed',true)} diff --git a/internal/proxy/socks5.go b/internal/proxy/socks5.go index 2dead8b..285b3d7 100644 --- a/internal/proxy/socks5.go +++ b/internal/proxy/socks5.go @@ -248,6 +248,16 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte return } + // Track connections on all used instances + for _, inst := range socksHealthy { + inst.IncrConns() + } + defer func() { + for _, inst := range socksHealthy { + inst.DecrConns() + } + }() + // Create a packet splitter for this connection tunnelConnID := s.connIDGen.Next() splitter := tunnel.NewPacketSplitter(tunnelConnID, s.tunnelPool, socksHealthy, s.chunkSize) @@ -267,11 +277,9 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte log.Printf("[proxy] conn#%d: packet-split mode, %d instances, port %d", connID, len(socksHealthy), port) - // Create a context that cancels when any instance in the pool dies ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Bidirectional relay using packet splitter var txN, rxN int64 var wg sync.WaitGroup @@ -301,6 +309,17 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte wg.Wait() + // Track TX/RX on instances (distributed proportionally) + nInstances := int64(len(socksHealthy)) + if nInstances > 0 { + txPer := txN / nInstances + rxPer := rxN / nInstances + for _, inst := range socksHealthy { + inst.AddTx(txPer) + inst.AddRx(rxPer) + } + } + // Track bytes for user data quota if user != nil { user.AddUsedBytes(txN + rxN) diff --git a/internal/tunnel/pool.go b/internal/tunnel/pool.go index 7f17c2f..d05efd1 100644 --- a/internal/tunnel/pool.go +++ b/internal/tunnel/pool.go @@ -6,20 +6,32 @@ import ( "log" "net" "sync" + "sync/atomic" "time" "github.com/ParsaKSH/SlipStream-Plus/internal/engine" ) +// tunnelMaxAge is how long a tunnel connection lives before being recycled. +// Prevents QUIC stream degradation from long-lived connections. +const tunnelMaxAge = 2 * time.Minute + +// writeTimeout is the maximum time a single frame write can take. +// Prevents blocking all connections when QUIC flow control stalls. +const writeTimeout = 10 * time.Second + // TunnelConn wraps a persistent TCP connection to a single instance. // It multiplexes many logical connections (ConnIDs) over one TCP stream. type TunnelConn struct { - inst *engine.Instance - mu sync.Mutex - conn net.Conn - writeMu sync.Mutex - closed bool - incoming chan *Frame // frames coming back from the instance + inst *engine.Instance + mu sync.Mutex + conn net.Conn + writeMu sync.Mutex + closed bool + incoming chan *Frame + createdAt time.Time + lastRead atomic.Int64 // unix timestamp of last successful read + writeErrs atomic.Int32 // consecutive write errors } // TunnelPool manages persistent connections to all healthy instances. @@ -62,7 +74,7 @@ func (p *TunnelPool) Start() { } }() - log.Printf("[tunnel-pool] started") + log.Printf("[tunnel-pool] started (max_age=%s, write_timeout=%s)", tunnelMaxAge, writeTimeout) } // Stop closes all tunnel connections. @@ -89,7 +101,6 @@ func (p *TunnelPool) RegisterConn(connID uint32) chan *Frame { func (p *TunnelPool) UnregisterConn(connID uint32) { if v, ok := p.handlers.LoadAndDelete(connID); ok { ch := v.(chan *Frame) - // Drain and close close(ch) } } @@ -122,28 +133,46 @@ func (p *TunnelPool) HealthyTunnelIDs() []int { } // refreshConnections ensures we have a tunnel to every healthy instance. +// Also recycles connections that have exceeded their max age. func (p *TunnelPool) refreshConnections() { healthy := p.mgr.HealthyInstances() + now := time.Now() p.mu.Lock() defer p.mu.Unlock() - // Remove tunnels for instances that are no longer healthy + // Remove tunnels that are unhealthy, closed, too old, or have too many write errors activeIDs := make(map[int]bool) for _, inst := range healthy { activeIDs[inst.ID()] = true } for id, tc := range p.tunnels { + shouldRemove := false + if !activeIDs[id] || tc.closed { + shouldRemove = true + } else if now.Sub(tc.createdAt) > tunnelMaxAge { + // Connection too old → recycle to get a fresh QUIC stream + log.Printf("[tunnel-pool] recycling instance %d connection (age=%s)", + id, now.Sub(tc.createdAt).Round(time.Second)) + shouldRemove = true + } else if tc.writeErrs.Load() > 3 { + // Too many consecutive write errors + log.Printf("[tunnel-pool] recycling instance %d connection (write_errors=%d)", + id, tc.writeErrs.Load()) + shouldRemove = true + } + + if shouldRemove { tc.close() delete(p.tunnels, id) } } - // Connect to new healthy instances + // Connect to healthy instances that don't have a tunnel for _, inst := range healthy { if inst.Config.Mode == "ssh" { - continue // Only SOCKS mode for packet splitting + continue } if _, exists := p.tunnels[inst.ID()]; exists { continue @@ -174,10 +203,12 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) } tunnel := &TunnelConn{ - inst: inst, - conn: conn, - incoming: make(chan *Frame, 256), + inst: inst, + conn: conn, + incoming: make(chan *Frame, 256), + createdAt: time.Now(), } + tunnel.lastRead.Store(time.Now().Unix()) // Start reader goroutine for this tunnel p.wg.Add(1) @@ -192,7 +223,6 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error) // readLoop continuously reads frames from a tunnel and dispatches them // to the appropriate ConnID handler. func (p *TunnelPool) readLoop(tc *TunnelConn) { - log.Printf("[tunnel-pool] readLoop started for instance %d", tc.inst.ID()) for { select { case <-p.stopCh: @@ -209,8 +239,7 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { return } - log.Printf("[tunnel-pool] instance %d: received frame conn=%d seq=%d flags=0x%02x len=%d", - tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload)) + tc.lastRead.Store(time.Now().Unix()) // Route frame to the registered handler for this ConnID if v, ok := p.handlers.Load(frame.ConnID); ok { @@ -218,7 +247,6 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { select { case ch <- frame: default: - // Handler's buffer is full, drop frame log.Printf("[tunnel-pool] handler buffer full for conn %d, dropping frame seq=%d", frame.ConnID, frame.SeqNum) } @@ -227,6 +255,7 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) { } // writeFrame thread-safely writes a frame to the tunnel connection. +// Includes a write deadline to prevent indefinite blocking. func (tc *TunnelConn) writeFrame(f *Frame) error { tc.writeMu.Lock() defer tc.writeMu.Unlock() @@ -234,7 +263,18 @@ func (tc *TunnelConn) writeFrame(f *Frame) error { if tc.closed { return fmt.Errorf("tunnel closed") } - return WriteFrame(tc.conn, f) + + // Set write deadline to prevent blocking forever on stalled QUIC streams + tc.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + err := WriteFrame(tc.conn, f) + tc.conn.SetWriteDeadline(time.Time{}) // clear deadline + + if err != nil { + tc.writeErrs.Add(1) + return err + } + tc.writeErrs.Store(0) // reset on success + return nil } // close marks the tunnel as closed and closes the underlying connection. diff --git a/internal/tunnel/splitter.go b/internal/tunnel/splitter.go index 9cd6e2c..eff1101 100644 --- a/internal/tunnel/splitter.go +++ b/internal/tunnel/splitter.go @@ -85,7 +85,6 @@ func (ps *PacketSplitter) SendSYN(atyp byte, addr []byte, port []byte) error { func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.Reader) int64 { buf := make([]byte, ps.chunkSize) var totalBytes int64 - log.Printf("[splitter] conn=%d: RelayClientToUpstream STARTED", ps.connID) for { select { @@ -95,7 +94,6 @@ func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.R } n, err := client.Read(buf) - log.Printf("[splitter] conn=%d: client.Read returned n=%d err=%v", ps.connID, n, err) if n > 0 { totalBytes += int64(n) @@ -113,8 +111,6 @@ func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.R } copy(frame.Payload, buf[:n]) - log.Printf("[splitter] conn=%d: sending DATA seq=%d len=%d via inst=%d", - ps.connID, frame.SeqNum, n, inst.ID()) if sendErr := ps.pool.SendFrame(inst.ID(), frame); sendErr != nil { log.Printf("[splitter] conn=%d: send to instance %d failed: %v", ps.connID, inst.ID(), sendErr)