Skip to content
Merged

Dev #13

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions cmd/centralserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func (cs *centralServer) relayUpstreamToTunnel(ctx context.Context, connID uint3

n, err := upstream.Read(buf)
if n > 0 {
log.Printf("[central] conn=%d: upstream read %d bytes, sending reverse", connID, n)
state.mu.Lock()
seq := state.txSeq
state.txSeq++
Expand Down Expand Up @@ -406,17 +405,13 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
return
}

log.Printf("[central] conn=%d: sendFrame seq=%d flags=0x%02x len=%d sources=%d",
connID, frame.SeqNum, frame.Flags, len(frame.Payload), len(state.sources))

// Try each source once, starting from current index
for tries := 0; tries < len(state.sources); tries++ {
idx := state.sourceIdx % len(state.sources)
state.sourceIdx++
w := state.sources[idx]

if err := tunnel.WriteFrame(w, frame); err != nil {
log.Printf("[central] conn=%d: sendFrame to source %d failed: %v", connID, idx, err)
// Remove dead source
state.sources = append(state.sources[:idx], state.sources[idx+1:]...)
if state.sourceIdx > 0 {
Expand All @@ -430,12 +425,10 @@ func (cs *centralServer) sendFrame(connID uint32, frame *tunnel.Frame) {
}

func (cs *centralServer) handleData(frame *tunnel.Frame) {
log.Printf("[central] conn=%d: DATA seq=%d len=%d", frame.ConnID, frame.SeqNum, len(frame.Payload))
cs.mu.RLock()
state, ok := cs.conns[frame.ConnID]
cs.mu.RUnlock()
if !ok {
log.Printf("[central] conn=%d: DATA for unknown connID, dropping", frame.ConnID)
return
}

Expand Down
43 changes: 40 additions & 3 deletions internal/gui/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ tr:hover td{background:rgba(124,108,255,0.03)}
.ico{width:14px;height:14px;vertical-align:middle;fill:none;stroke:currentColor;stroke-width:2;stroke-linecap:round;stroke-linejoin:round}
.btn .ico{width:12px;height:12px;margin-right:2px}
.form-group{margin-bottom:14px}
.form-group label{display:block;font-size:11px;color:var(--text2);margin-bottom:4px;font-weight:500}
.form-group label{display:flex;align-items:center;gap:5px;font-size:11px;color:var(--text2);margin-bottom:4px;font-weight:500}
.form-group input,.form-group select,.form-group textarea{width:100%;padding:8px 12px;background:var(--bg2);border:1px solid var(--border);border-radius:var(--rs);color:var(--text);font:13px inherit}
.form-group input:focus,.form-group select:focus,.form-group textarea:focus{outline:none;border-color:var(--accent)}
.form-group textarea{min-height:120px;font:11px 'SF Mono',Monaco,Consolas,monospace;resize:vertical}
.form-row{display:grid;grid-template-columns:1fr 1fr;gap:12px}
.form-row3{display:grid;grid-template-columns:1fr 1fr 1fr;gap:12px}
.tip-btn{display:inline-flex;align-items:center;justify-content:center;width:15px;height:15px;border-radius:50%;background:var(--bg4);border:1px solid var(--border);color:var(--text3);font-size:9px;font-weight:700;cursor:pointer;position:relative;flex-shrink:0}
.tip-btn:hover::after{content:attr(data-tip);position:absolute;bottom:calc(100% + 6px);left:50%;transform:translateX(-50%);background:var(--bg4);border:1px solid var(--border);color:var(--text);font-size:10px;font-weight:400;padding:6px 10px;border-radius:6px;white-space:pre-line;width:220px;z-index:100;pointer-events:none;box-shadow:0 4px 20px rgba(0,0,0,0.4)}
.cs-box{margin-top:10px;padding:14px;background:var(--bg2);border:1px solid rgba(124,108,255,0.25);border-radius:var(--rs)}
.cs-box .cs-title{font-size:11px;font-weight:700;color:var(--accent2);margin-bottom:10px;text-transform:uppercase;letter-spacing:.8px}
canvas{width:100%;height:200px;border-radius:var(--rs);background:var(--bg2);border:1px solid var(--border)}
.toast{position:fixed;bottom:20px;right:20px;padding:10px 18px;background:var(--green);color:#fff;border-radius:var(--rs);font:500 12px inherit;opacity:0;transform:translateY(8px);transition:.3s;pointer-events:none;z-index:99}
.toast.show{opacity:1;transform:translateY(0)}.toast.error{background:var(--red)}
Expand Down Expand Up @@ -205,8 +209,28 @@ canvas{width:100%;height:200px;border-radius:var(--rs);background:var(--bg2);bor
</div>
<div class="form-row">
<div class="form-group"><label>SOCKS Listen</label><input id="cfg-listen" placeholder="0.0.0.0:1080"></div>
<div class="form-group"><label>Strategy</label>
<select id="cfg-strat"><option value="round_robin">Round Robin</option><option value="random">Random</option><option value="least_ping">Least Ping</option><option value="least_load">Least Load</option></select>
<div class="form-group">
<label>Strategy <span class="tip-btn" data-tip="round_robin: Rotate through instances evenly.&#10;random: Pick a random healthy instance.&#10;least_ping: Use instance with lowest latency.&#10;least_load: Use instance with fewest connections.&#10;packet_split: Split every connection across ALL instances at the packet level via CentralServer.">?</span></label>
<select id="cfg-strat" onchange="onStratChange()">
<option value="round_robin">Round Robin</option>
<option value="random">Random</option>
<option value="least_ping">Least Ping</option>
<option value="least_load">Least Load</option>
<option value="packet_split">Packet Split ★</option>
</select>
</div>
</div>
<div id="cfg-cs-box" class="cs-box" style="display:none">
<div class="cs-title">Central Server Settings</div>
<div class="form-row">
<div class="form-group">
<label>Address <span class="tip-btn" data-tip="Host:port of your centralserver binary&#10;(e.g. 45.89.223.100:9500).&#10;All slipstream-server instances must point&#10;their --target-address here.">?</span></label>
<input id="cfg-cs-addr" placeholder="45.89.223.100:9500">
</div>
<div class="form-group">
<label>Chunk Size <span class="tip-btn" data-tip="Max bytes per frame sent to centralserver.&#10;Default: 8192. Larger = fewer frames,&#10;smaller = lower latency. Range: 1024–65536.">?</span></label>
<input type="number" id="cfg-cs-chunk" placeholder="8192">
</div>
</div>
</div>
<div class="form-row">
Expand Down Expand Up @@ -435,13 +459,22 @@ async function restartInst(id){

// ─── Config ───
async function loadCfg(){try{const r=await fetch('/api/config');CC=await r.json()}catch(e){}}
function onStratChange(){
const ps=document.getElementById('cfg-strat').value==='packet_split';
document.getElementById('cfg-cs-box').style.display=ps?'block':'none';
}
function fillForm(c){
document.getElementById('cfg-listen').value=c.socks?.listen||'';
document.getElementById('cfg-strat').value=c.strategy||'round_robin';
document.getElementById('cfg-buf').value=c.socks?.buffer_size||65536;
document.getElementById('cfg-max').value=c.socks?.max_connections||10000;
document.getElementById('cfg-hi').value=c.health_check?.interval||'30s';
document.getElementById('cfg-ht').value=c.health_check?.timeout||'30s';
// Central server
const cs=c.central_server||{};
document.getElementById('cfg-cs-addr').value=cs.address||'';
document.getElementById('cfg-cs-chunk').value=cs.chunk_size||8192;
onStratChange();
}
async function saveConfig(){
try{
Expand All @@ -452,6 +485,10 @@ async function saveConfig(){
CC.strategy=document.getElementById('cfg-strat').value;
CC.health_check.interval=document.getElementById('cfg-hi').value;
CC.health_check.timeout=document.getElementById('cfg-ht').value;
// Central server (only relevant for packet_split)
if(!CC.central_server)CC.central_server={};
CC.central_server.address=document.getElementById('cfg-cs-addr').value;
CC.central_server.chunk_size=parseInt(document.getElementById('cfg-cs-chunk').value)||8192;
const r=await fetch('/api/config',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify(CC)});
if(!r.ok){toast(await r.text(),true);return}toast('Config saved!')
}catch(e){toast('Save failed',true)}
Expand Down
23 changes: 21 additions & 2 deletions internal/proxy/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte
return
}

// Track connections on all used instances
for _, inst := range socksHealthy {
inst.IncrConns()
}
defer func() {
for _, inst := range socksHealthy {
inst.DecrConns()
}
}()

// Create a packet splitter for this connection
tunnelConnID := s.connIDGen.Next()
splitter := tunnel.NewPacketSplitter(tunnelConnID, s.tunnelPool, socksHealthy, s.chunkSize)
Expand All @@ -267,11 +277,9 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte
log.Printf("[proxy] conn#%d: packet-split mode, %d instances, port %d",
connID, len(socksHealthy), port)

// Create a context that cancels when any instance in the pool dies
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Bidirectional relay using packet splitter
var txN, rxN int64
var wg sync.WaitGroup

Expand Down Expand Up @@ -301,6 +309,17 @@ func (s *Server) handlePacketSplit(clientConn net.Conn, connID uint64, atyp byte

wg.Wait()

// Track TX/RX on instances (distributed proportionally)
nInstances := int64(len(socksHealthy))
if nInstances > 0 {
txPer := txN / nInstances
rxPer := rxN / nInstances
for _, inst := range socksHealthy {
inst.AddTx(txPer)
inst.AddRx(rxPer)
}
}

// Track bytes for user data quota
if user != nil {
user.AddUsedBytes(txN + rxN)
Expand Down
78 changes: 59 additions & 19 deletions internal/tunnel/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,32 @@ import (
"log"
"net"
"sync"
"sync/atomic"
"time"

"github.com/ParsaKSH/SlipStream-Plus/internal/engine"
)

// tunnelMaxAge is how long a tunnel connection lives before being recycled.
// Prevents QUIC stream degradation from long-lived connections.
const tunnelMaxAge = 2 * time.Minute

// writeTimeout is the maximum time a single frame write can take.
// Prevents blocking all connections when QUIC flow control stalls.
const writeTimeout = 10 * time.Second

// TunnelConn wraps a persistent TCP connection to a single instance.
// It multiplexes many logical connections (ConnIDs) over one TCP stream.
type TunnelConn struct {
inst *engine.Instance
mu sync.Mutex
conn net.Conn
writeMu sync.Mutex
closed bool
incoming chan *Frame // frames coming back from the instance
inst *engine.Instance
mu sync.Mutex
conn net.Conn
writeMu sync.Mutex
closed bool
incoming chan *Frame
createdAt time.Time
lastRead atomic.Int64 // unix timestamp of last successful read
writeErrs atomic.Int32 // consecutive write errors
}

// TunnelPool manages persistent connections to all healthy instances.
Expand Down Expand Up @@ -62,7 +74,7 @@ func (p *TunnelPool) Start() {
}
}()

log.Printf("[tunnel-pool] started")
log.Printf("[tunnel-pool] started (max_age=%s, write_timeout=%s)", tunnelMaxAge, writeTimeout)
}

// Stop closes all tunnel connections.
Expand All @@ -89,7 +101,6 @@ func (p *TunnelPool) RegisterConn(connID uint32) chan *Frame {
func (p *TunnelPool) UnregisterConn(connID uint32) {
if v, ok := p.handlers.LoadAndDelete(connID); ok {
ch := v.(chan *Frame)
// Drain and close
close(ch)
}
}
Expand Down Expand Up @@ -122,28 +133,46 @@ func (p *TunnelPool) HealthyTunnelIDs() []int {
}

// refreshConnections ensures we have a tunnel to every healthy instance.
// Also recycles connections that have exceeded their max age.
func (p *TunnelPool) refreshConnections() {
healthy := p.mgr.HealthyInstances()
now := time.Now()

p.mu.Lock()
defer p.mu.Unlock()

// Remove tunnels for instances that are no longer healthy
// Remove tunnels that are unhealthy, closed, too old, or have too many write errors
activeIDs := make(map[int]bool)
for _, inst := range healthy {
activeIDs[inst.ID()] = true
}
for id, tc := range p.tunnels {
shouldRemove := false

if !activeIDs[id] || tc.closed {
shouldRemove = true
} else if now.Sub(tc.createdAt) > tunnelMaxAge {
// Connection too old → recycle to get a fresh QUIC stream
log.Printf("[tunnel-pool] recycling instance %d connection (age=%s)",
id, now.Sub(tc.createdAt).Round(time.Second))
shouldRemove = true
} else if tc.writeErrs.Load() > 3 {
// Too many consecutive write errors
log.Printf("[tunnel-pool] recycling instance %d connection (write_errors=%d)",
id, tc.writeErrs.Load())
shouldRemove = true
}

if shouldRemove {
tc.close()
delete(p.tunnels, id)
}
}

// Connect to new healthy instances
// Connect to healthy instances that don't have a tunnel
for _, inst := range healthy {
if inst.Config.Mode == "ssh" {
continue // Only SOCKS mode for packet splitting
continue
}
if _, exists := p.tunnels[inst.ID()]; exists {
continue
Expand Down Expand Up @@ -174,10 +203,12 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error)
}

tunnel := &TunnelConn{
inst: inst,
conn: conn,
incoming: make(chan *Frame, 256),
inst: inst,
conn: conn,
incoming: make(chan *Frame, 256),
createdAt: time.Now(),
}
tunnel.lastRead.Store(time.Now().Unix())

// Start reader goroutine for this tunnel
p.wg.Add(1)
Expand All @@ -192,7 +223,6 @@ func (p *TunnelPool) connectInstance(inst *engine.Instance) (*TunnelConn, error)
// readLoop continuously reads frames from a tunnel and dispatches them
// to the appropriate ConnID handler.
func (p *TunnelPool) readLoop(tc *TunnelConn) {
log.Printf("[tunnel-pool] readLoop started for instance %d", tc.inst.ID())
for {
select {
case <-p.stopCh:
Expand All @@ -209,16 +239,14 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {
return
}

log.Printf("[tunnel-pool] instance %d: received frame conn=%d seq=%d flags=0x%02x len=%d",
tc.inst.ID(), frame.ConnID, frame.SeqNum, frame.Flags, len(frame.Payload))
tc.lastRead.Store(time.Now().Unix())

// Route frame to the registered handler for this ConnID
if v, ok := p.handlers.Load(frame.ConnID); ok {
ch := v.(chan *Frame)
select {
case ch <- frame:
default:
// Handler's buffer is full, drop frame
log.Printf("[tunnel-pool] handler buffer full for conn %d, dropping frame seq=%d",
frame.ConnID, frame.SeqNum)
}
Expand All @@ -227,14 +255,26 @@ func (p *TunnelPool) readLoop(tc *TunnelConn) {
}

// writeFrame thread-safely writes a frame to the tunnel connection.
// Includes a write deadline to prevent indefinite blocking.
func (tc *TunnelConn) writeFrame(f *Frame) error {
tc.writeMu.Lock()
defer tc.writeMu.Unlock()

if tc.closed {
return fmt.Errorf("tunnel closed")
}
return WriteFrame(tc.conn, f)

// Set write deadline to prevent blocking forever on stalled QUIC streams
tc.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := WriteFrame(tc.conn, f)
tc.conn.SetWriteDeadline(time.Time{}) // clear deadline

if err != nil {
tc.writeErrs.Add(1)
return err
}
tc.writeErrs.Store(0) // reset on success
return nil
}

// close marks the tunnel as closed and closes the underlying connection.
Expand Down
4 changes: 0 additions & 4 deletions internal/tunnel/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (ps *PacketSplitter) SendSYN(atyp byte, addr []byte, port []byte) error {
func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.Reader) int64 {
buf := make([]byte, ps.chunkSize)
var totalBytes int64
log.Printf("[splitter] conn=%d: RelayClientToUpstream STARTED", ps.connID)

for {
select {
Expand All @@ -95,7 +94,6 @@ func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.R
}

n, err := client.Read(buf)
log.Printf("[splitter] conn=%d: client.Read returned n=%d err=%v", ps.connID, n, err)
if n > 0 {
totalBytes += int64(n)

Expand All @@ -113,8 +111,6 @@ func (ps *PacketSplitter) RelayClientToUpstream(ctx context.Context, client io.R
}
copy(frame.Payload, buf[:n])

log.Printf("[splitter] conn=%d: sending DATA seq=%d len=%d via inst=%d",
ps.connID, frame.SeqNum, n, inst.ID())
if sendErr := ps.pool.SendFrame(inst.ID(), frame); sendErr != nil {
log.Printf("[splitter] conn=%d: send to instance %d failed: %v",
ps.connID, inst.ID(), sendErr)
Expand Down
Loading