From 23ee0be20be92215791152ef7b2a2e1345d5df99 Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Mon, 23 Mar 2026 03:33:00 +0330 Subject: [PATCH 1/3] feat: Add an end-to-end health check using SOCKS5 and HTTP to a configurable target through the tunnel. --- internal/health/checker.go | 120 ++++++++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 7 deletions(-) diff --git a/internal/health/checker.go b/internal/health/checker.go index 5cb5ed5..5fd90ac 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -2,6 +2,7 @@ package health import ( "context" + "encoding/binary" "fmt" "io" "log" @@ -25,6 +26,7 @@ type Checker struct { manager *engine.Manager interval time.Duration timeout time.Duration + target string // health_check.target (e.g. "google.com") ctx context.Context cancel context.CancelFunc @@ -44,6 +46,7 @@ func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker { manager: mgr, interval: cfg.IntervalDuration(), timeout: timeout, + target: cfg.Target, ctx: ctx, cancel: cancel, failures: make(map[int]int), @@ -52,8 +55,8 @@ func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker { func (c *Checker) Start() { go c.run() - log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, unhealthy_after=%d failures)", - c.interval, c.timeout, maxConsecutiveFailures) + log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, target=%s, unhealthy_after=%d failures)", + c.interval, c.timeout, c.target, maxConsecutiveFailures) } func (c *Checker) Stop() { @@ -110,7 +113,6 @@ func (c *Checker) checkOne(inst *engine.Instance) { // Step 1: Quick TCP connect — is the process even running? conn, err := net.DialTimeout("tcp", inst.Addr(), 3*time.Second) if err != nil { - // Process is not listening → immediately unhealthy failCount := c.recordFailure(inst.ID()) if inst.State() != engine.StateUnhealthy { log.Printf("[health] instance %d (%s:%d) UNHEALTHY: process not listening: %v", @@ -128,7 +130,6 @@ func (c *Checker) checkOne(inst *engine.Instance) { conn.Close() // Step 2: Tunnel probe — does the tunnel actually work? - // This sends data through the DNS tunnel and measures real RTT. var rtt time.Duration switch inst.Config.Mode { case "ssh": @@ -138,7 +139,6 @@ func (c *Checker) checkOne(inst *engine.Instance) { } if err != nil { - // Tunnel probe failed failCount := c.recordFailure(inst.ID()) if failCount >= maxConsecutiveFailures { if inst.State() != engine.StateUnhealthy { @@ -159,7 +159,32 @@ func (c *Checker) checkOne(inst *engine.Instance) { return } - // Tunnel probe succeeded → HEALTHY with real latency + // Step 3: End-to-end probe — full SOCKS5 CONNECT + HTTP through tunnel. + // Tests entire path: instance → DNS tunnel → target server → SOCKS → internet. + if c.target != "" && inst.Config.Mode != "ssh" { + e2eRtt, e2eErr := c.probeEndToEnd(inst) + if e2eErr != nil { + failCount := c.recordFailure(inst.ID()) + if failCount >= maxConsecutiveFailures { + if inst.State() != engine.StateUnhealthy { + log.Printf("[health] instance %d (%s:%d) UNHEALTHY after %d e2e failures: %v", + inst.ID(), inst.Config.Domain, inst.Config.Port, failCount, e2eErr) + inst.SetState(engine.StateUnhealthy) + inst.SetLastPingMs(-1) + } + } else { + log.Printf("[health] instance %d (%s:%d) e2e probe failed (%d/%d): %v", + inst.ID(), inst.Config.Domain, inst.Config.Port, + failCount, maxConsecutiveFailures, e2eErr) + } + return + } + if e2eRtt > rtt { + rtt = e2eRtt + } + } + + // All probes succeeded → HEALTHY c.recordSuccess(inst.ID()) pingMs := rtt.Milliseconds() @@ -169,7 +194,7 @@ func (c *Checker) checkOne(inst *engine.Instance) { inst.SetLastPingMs(pingMs) if inst.State() != engine.StateHealthy { - log.Printf("[health] instance %d (%s:%d) now HEALTHY (tunnel_rtt=%dms)", + log.Printf("[health] instance %d (%s:%d) now HEALTHY (rtt=%dms)", inst.ID(), inst.Config.Domain, inst.Config.Port, pingMs) inst.SetState(engine.StateHealthy) } @@ -218,3 +243,84 @@ func (c *Checker) probeSSH(inst *engine.Instance) (time.Duration, error) { } return time.Since(start), nil } + +// probeEndToEnd does a full SOCKS5 CONNECT through the tunnel to the health +// check target (port 80), sends an HTTP HEAD request, and verifies a response. +// This tests the entire path: instance → DNS tunnel → centralserver → SOCKS upstream → internet. +func (c *Checker) probeEndToEnd(inst *engine.Instance) (time.Duration, error) { + start := time.Now() + + conn, err := net.DialTimeout("tcp", inst.Addr(), c.timeout) + if err != nil { + return 0, fmt.Errorf("e2e connect: %w", err) + } + defer conn.Close() + conn.SetDeadline(time.Now().Add(c.timeout)) + + // SOCKS5 greeting (no auth) + if _, err := conn.Write([]byte{0x05, 0x01, 0x00}); err != nil { + return 0, fmt.Errorf("e2e socks greeting: %w", err) + } + greeting := make([]byte, 2) + if _, err := io.ReadFull(conn, greeting); err != nil { + return 0, fmt.Errorf("e2e socks greeting resp: %w", err) + } + if greeting[0] != 0x05 { + return 0, fmt.Errorf("e2e bad socks version: %d", greeting[0]) + } + + // SOCKS5 CONNECT to target:80 + domain := c.target + connectReq := make([]byte, 0, 4+1+len(domain)+2) + connectReq = append(connectReq, 0x05, 0x01, 0x00, 0x03) // VER CMD RSV ATYP(domain) + connectReq = append(connectReq, byte(len(domain))) // domain length + connectReq = append(connectReq, []byte(domain)...) // domain + portBuf := make([]byte, 2) + binary.BigEndian.PutUint16(portBuf, 80) + connectReq = append(connectReq, portBuf...) + + if _, err := conn.Write(connectReq); err != nil { + return 0, fmt.Errorf("e2e socks connect: %w", err) + } + + // Read CONNECT response (VER REP RSV ATYP) + connectResp := make([]byte, 4) + if _, err := io.ReadFull(conn, connectResp); err != nil { + return 0, fmt.Errorf("e2e socks connect resp: %w", err) + } + if connectResp[1] != 0x00 { + return 0, fmt.Errorf("e2e socks connect rejected: 0x%02x", connectResp[1]) + } + + // Drain bind address + switch connectResp[3] { + case 0x01: + io.ReadFull(conn, make([]byte, 4+2)) + case 0x03: + lb := make([]byte, 1) + io.ReadFull(conn, lb) + io.ReadFull(conn, make([]byte, int(lb[0])+2)) + case 0x04: + io.ReadFull(conn, make([]byte, 16+2)) + default: + io.ReadFull(conn, make([]byte, 4+2)) + } + + // Send HTTP HEAD request + httpReq := fmt.Sprintf("HEAD / HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n", domain) + if _, err := conn.Write([]byte(httpReq)); err != nil { + return 0, fmt.Errorf("e2e http write: %w", err) + } + + // Read HTTP response (at least status line) + respBuf := make([]byte, 128) + n, err := conn.Read(respBuf) + if err != nil && n == 0 { + return 0, fmt.Errorf("e2e http read: %w", err) + } + if n < 12 || string(respBuf[:4]) != "HTTP" { + return 0, fmt.Errorf("e2e bad http response: %q", string(respBuf[:n])) + } + + return time.Since(start), nil +} From 75ce081237f8076838279531e290addfb49f59d8 Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Mon, 23 Mar 2026 04:00:24 +0330 Subject: [PATCH 2/3] feat: Add packet-split framing protocol health check mode to the checker. --- cmd/slipstreamplus/main.go | 3 ++ internal/health/checker.go | 105 +++++++++++++++++++++++++++++++++---- 2 files changed, 97 insertions(+), 11 deletions(-) diff --git a/cmd/slipstreamplus/main.go b/cmd/slipstreamplus/main.go index dfcb18d..780fca9 100644 --- a/cmd/slipstreamplus/main.go +++ b/cmd/slipstreamplus/main.go @@ -78,6 +78,9 @@ func main() { // Health checker checker := health.NewChecker(mgr, &cfg.HealthCheck) + if cfg.Strategy == "packet_split" { + checker.SetPacketSplit(true) + } checker.Start() // Load balancer diff --git a/internal/health/checker.go b/internal/health/checker.go index 5fd90ac..67b7197 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -12,6 +12,7 @@ import ( "github.com/ParsaKSH/SlipStream-Plus/internal/config" "github.com/ParsaKSH/SlipStream-Plus/internal/engine" + "github.com/ParsaKSH/SlipStream-Plus/internal/tunnel" ) // An instance is HEALTHY only after a successful tunnel probe (SOCKS5/SSH). @@ -23,12 +24,13 @@ import ( const maxConsecutiveFailures = 3 type Checker struct { - manager *engine.Manager - interval time.Duration - timeout time.Duration - target string // health_check.target (e.g. "google.com") - ctx context.Context - cancel context.CancelFunc + manager *engine.Manager + interval time.Duration + timeout time.Duration + target string // health_check.target (e.g. "google.com") + packetSplit bool // true when strategy=packet_split + ctx context.Context + cancel context.CancelFunc mu sync.Mutex failures map[int]int @@ -55,8 +57,18 @@ func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker { func (c *Checker) Start() { go c.run() - log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, target=%s, unhealthy_after=%d failures)", - c.interval, c.timeout, c.target, maxConsecutiveFailures) + mode := "connection" + if c.packetSplit { + mode = "packet-split" + } + log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, target=%s, mode=%s, unhealthy_after=%d failures)", + c.interval, c.timeout, c.target, mode, maxConsecutiveFailures) +} + +// SetPacketSplit enables framing protocol health checks. +// Must be called before Start(). +func (c *Checker) SetPacketSplit(enabled bool) { + c.packetSplit = enabled } func (c *Checker) Stop() { @@ -159,10 +171,19 @@ func (c *Checker) checkOne(inst *engine.Instance) { return } - // Step 3: End-to-end probe — full SOCKS5 CONNECT + HTTP through tunnel. - // Tests entire path: instance → DNS tunnel → target server → SOCKS → internet. + // Step 3: End-to-end probe. + // In packet_split mode: test if instance's upstream speaks our framing protocol. + // In normal mode: full SOCKS5 CONNECT + HTTP through the tunnel. if c.target != "" && inst.Config.Mode != "ssh" { - e2eRtt, e2eErr := c.probeEndToEnd(inst) + var e2eRtt time.Duration + var e2eErr error + + if c.packetSplit { + e2eRtt, e2eErr = c.probeFramingProtocol(inst) + } else { + e2eRtt, e2eErr = c.probeEndToEnd(inst) + } + if e2eErr != nil { failCount := c.recordFailure(inst.ID()) if failCount >= maxConsecutiveFailures { @@ -324,3 +345,65 @@ func (c *Checker) probeEndToEnd(inst *engine.Instance) (time.Duration, error) { return time.Since(start), nil } + +// probeFramingProtocol tests if the instance's upstream speaks our framing protocol +// (i.e., is connected to centralserver). It sends a SYN frame and expects a valid +// frame response. Instances whose upstream is a plain SOCKS5 proxy will fail. +func (c *Checker) probeFramingProtocol(inst *engine.Instance) (time.Duration, error) { + start := time.Now() + + conn, err := net.DialTimeout("tcp", inst.Addr(), c.timeout) + if err != nil { + return 0, fmt.Errorf("frame probe connect: %w", err) + } + defer conn.Close() + + if tc, ok := conn.(*net.TCPConn); ok { + tc.SetNoDelay(true) + } + conn.SetDeadline(time.Now().Add(c.timeout)) + + // Build SYN targeting health_check.target:80 + domain := c.target + synPayload := make([]byte, 0, 1+1+len(domain)+2) + synPayload = append(synPayload, 0x03) // ATYP = domain + synPayload = append(synPayload, byte(len(domain))) // domain length + synPayload = append(synPayload, []byte(domain)...) // domain + synPayload = append(synPayload, 0x00, 0x50) // port 80 + + // Use a unique probe ConnID (high range to avoid collision) + probeConnID := uint32(0xFFFF0000) + uint32(inst.ID()) + + synFrame := &tunnel.Frame{ + ConnID: probeConnID, + SeqNum: 0, + Flags: tunnel.FlagSYN, + Payload: synPayload, + } + + if err := tunnel.WriteFrame(conn, synFrame); err != nil { + return 0, fmt.Errorf("frame probe write SYN: %w", err) + } + + // Read response frame from centralserver. + // If upstream is centralserver → valid frame back. + // If upstream is plain SOCKS5 → timeout/garbage → fail. + respFrame, err := tunnel.ReadFrame(conn) + if err != nil { + return 0, fmt.Errorf("frame probe read: %w", err) + } + + if respFrame.ConnID != probeConnID { + return 0, fmt.Errorf("frame probe wrong connID: got %d, want %d", + respFrame.ConnID, probeConnID) + } + + // Valid frame = centralserver is there. Send FIN to clean up. + tunnel.WriteFrame(conn, &tunnel.Frame{ + ConnID: probeConnID, + SeqNum: 1, + Flags: tunnel.FlagFIN, + }) + + return time.Since(start), nil +} From fd3f7dc7eea5298b534ecd1d6330796bdd756a0c Mon Sep 17 00:00:00 2001 From: ParsaKSH Date: Mon, 23 Mar 2026 04:03:12 +0330 Subject: [PATCH 3/3] feat: Send HTTP HEAD request in a DATA frame during health probe to elicit target response and update FIN frame sequence number. --- internal/health/checker.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/health/checker.go b/internal/health/checker.go index 67b7197..618cd4c 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -385,9 +385,21 @@ func (c *Checker) probeFramingProtocol(inst *engine.Instance) (time.Duration, er return 0, fmt.Errorf("frame probe write SYN: %w", err) } + // Send DATA with HTTP HEAD so the target actually responds + httpReq := fmt.Sprintf("HEAD / HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n", domain) + dataFrame := &tunnel.Frame{ + ConnID: probeConnID, + SeqNum: 1, + Flags: tunnel.FlagData, + Payload: []byte(httpReq), + } + if err := tunnel.WriteFrame(conn, dataFrame); err != nil { + return 0, fmt.Errorf("frame probe write DATA: %w", err) + } + // Read response frame from centralserver. - // If upstream is centralserver → valid frame back. - // If upstream is plain SOCKS5 → timeout/garbage → fail. + // centralserver → connects target, forwards HTTP, target responds → reverse frame. + // plain SOCKS5 → can't parse frame → timeout/error. respFrame, err := tunnel.ReadFrame(conn) if err != nil { return 0, fmt.Errorf("frame probe read: %w", err) @@ -401,7 +413,7 @@ func (c *Checker) probeFramingProtocol(inst *engine.Instance) (time.Duration, er // Valid frame = centralserver is there. Send FIN to clean up. tunnel.WriteFrame(conn, &tunnel.Frame{ ConnID: probeConnID, - SeqNum: 1, + SeqNum: 2, Flags: tunnel.FlagFIN, })