Skip to content
Merged

Dev #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/slipstreamplus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func main() {

// Health checker
checker := health.NewChecker(mgr, &cfg.HealthCheck)
if cfg.Strategy == "packet_split" {
checker.SetPacketSplit(true)
}
checker.Start()

// Load balancer
Expand Down
225 changes: 213 additions & 12 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package health

import (
"context"
"encoding/binary"
"fmt"
"io"
"log"
Expand All @@ -11,6 +12,7 @@ import (

"github.com/ParsaKSH/SlipStream-Plus/internal/config"
"github.com/ParsaKSH/SlipStream-Plus/internal/engine"
"github.com/ParsaKSH/SlipStream-Plus/internal/tunnel"
)

// An instance is HEALTHY only after a successful tunnel probe (SOCKS5/SSH).
Expand All @@ -22,11 +24,13 @@ import (
const maxConsecutiveFailures = 3

type Checker struct {
manager *engine.Manager
interval time.Duration
timeout time.Duration
ctx context.Context
cancel context.CancelFunc
manager *engine.Manager
interval time.Duration
timeout time.Duration
target string // health_check.target (e.g. "google.com")
packetSplit bool // true when strategy=packet_split
ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
failures map[int]int
Expand All @@ -44,6 +48,7 @@ func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker {
manager: mgr,
interval: cfg.IntervalDuration(),
timeout: timeout,
target: cfg.Target,
ctx: ctx,
cancel: cancel,
failures: make(map[int]int),
Expand All @@ -52,8 +57,18 @@ func NewChecker(mgr *engine.Manager, cfg *config.HealthCheckConfig) *Checker {

func (c *Checker) Start() {
go c.run()
log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, unhealthy_after=%d failures)",
c.interval, c.timeout, maxConsecutiveFailures)
mode := "connection"
if c.packetSplit {
mode = "packet-split"
}
log.Printf("[health] checker started (interval=%s, tunnel_timeout=%s, target=%s, mode=%s, unhealthy_after=%d failures)",
c.interval, c.timeout, c.target, mode, maxConsecutiveFailures)
}

// SetPacketSplit enables framing protocol health checks.
// Must be called before Start().
func (c *Checker) SetPacketSplit(enabled bool) {
c.packetSplit = enabled
}

func (c *Checker) Stop() {
Expand Down Expand Up @@ -110,7 +125,6 @@ func (c *Checker) checkOne(inst *engine.Instance) {
// Step 1: Quick TCP connect — is the process even running?
conn, err := net.DialTimeout("tcp", inst.Addr(), 3*time.Second)
if err != nil {
// Process is not listening → immediately unhealthy
failCount := c.recordFailure(inst.ID())
if inst.State() != engine.StateUnhealthy {
log.Printf("[health] instance %d (%s:%d) UNHEALTHY: process not listening: %v",
Expand All @@ -128,7 +142,6 @@ func (c *Checker) checkOne(inst *engine.Instance) {
conn.Close()

// Step 2: Tunnel probe — does the tunnel actually work?
// This sends data through the DNS tunnel and measures real RTT.
var rtt time.Duration
switch inst.Config.Mode {
case "ssh":
Expand All @@ -138,7 +151,6 @@ func (c *Checker) checkOne(inst *engine.Instance) {
}

if err != nil {
// Tunnel probe failed
failCount := c.recordFailure(inst.ID())
if failCount >= maxConsecutiveFailures {
if inst.State() != engine.StateUnhealthy {
Expand All @@ -159,7 +171,41 @@ func (c *Checker) checkOne(inst *engine.Instance) {
return
}

// Tunnel probe succeeded → HEALTHY with real latency
// Step 3: End-to-end probe.
// In packet_split mode: test if instance's upstream speaks our framing protocol.
// In normal mode: full SOCKS5 CONNECT + HTTP through the tunnel.
if c.target != "" && inst.Config.Mode != "ssh" {
var e2eRtt time.Duration
var e2eErr error

if c.packetSplit {
e2eRtt, e2eErr = c.probeFramingProtocol(inst)
} else {
e2eRtt, e2eErr = c.probeEndToEnd(inst)
}

if e2eErr != nil {
failCount := c.recordFailure(inst.ID())
if failCount >= maxConsecutiveFailures {
if inst.State() != engine.StateUnhealthy {
log.Printf("[health] instance %d (%s:%d) UNHEALTHY after %d e2e failures: %v",
inst.ID(), inst.Config.Domain, inst.Config.Port, failCount, e2eErr)
inst.SetState(engine.StateUnhealthy)
inst.SetLastPingMs(-1)
}
} else {
log.Printf("[health] instance %d (%s:%d) e2e probe failed (%d/%d): %v",
inst.ID(), inst.Config.Domain, inst.Config.Port,
failCount, maxConsecutiveFailures, e2eErr)
}
return
}
if e2eRtt > rtt {
rtt = e2eRtt
}
}

// All probes succeeded → HEALTHY
c.recordSuccess(inst.ID())

pingMs := rtt.Milliseconds()
Expand All @@ -169,7 +215,7 @@ func (c *Checker) checkOne(inst *engine.Instance) {
inst.SetLastPingMs(pingMs)

if inst.State() != engine.StateHealthy {
log.Printf("[health] instance %d (%s:%d) now HEALTHY (tunnel_rtt=%dms)",
log.Printf("[health] instance %d (%s:%d) now HEALTHY (rtt=%dms)",
inst.ID(), inst.Config.Domain, inst.Config.Port, pingMs)
inst.SetState(engine.StateHealthy)
}
Expand Down Expand Up @@ -218,3 +264,158 @@ func (c *Checker) probeSSH(inst *engine.Instance) (time.Duration, error) {
}
return time.Since(start), nil
}

// probeEndToEnd does a full SOCKS5 CONNECT through the tunnel to the health
// check target (port 80), sends an HTTP HEAD request, and verifies a response.
// This tests the entire path: instance → DNS tunnel → centralserver → SOCKS upstream → internet.
func (c *Checker) probeEndToEnd(inst *engine.Instance) (time.Duration, error) {
start := time.Now()

conn, err := net.DialTimeout("tcp", inst.Addr(), c.timeout)
if err != nil {
return 0, fmt.Errorf("e2e connect: %w", err)
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(c.timeout))

// SOCKS5 greeting (no auth)
if _, err := conn.Write([]byte{0x05, 0x01, 0x00}); err != nil {
return 0, fmt.Errorf("e2e socks greeting: %w", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
return 0, fmt.Errorf("e2e socks greeting resp: %w", err)
}
if greeting[0] != 0x05 {
return 0, fmt.Errorf("e2e bad socks version: %d", greeting[0])
}

// SOCKS5 CONNECT to target:80
domain := c.target
connectReq := make([]byte, 0, 4+1+len(domain)+2)
connectReq = append(connectReq, 0x05, 0x01, 0x00, 0x03) // VER CMD RSV ATYP(domain)
connectReq = append(connectReq, byte(len(domain))) // domain length
connectReq = append(connectReq, []byte(domain)...) // domain
portBuf := make([]byte, 2)
binary.BigEndian.PutUint16(portBuf, 80)
connectReq = append(connectReq, portBuf...)

if _, err := conn.Write(connectReq); err != nil {
return 0, fmt.Errorf("e2e socks connect: %w", err)
}

// Read CONNECT response (VER REP RSV ATYP)
connectResp := make([]byte, 4)
if _, err := io.ReadFull(conn, connectResp); err != nil {
return 0, fmt.Errorf("e2e socks connect resp: %w", err)
}
if connectResp[1] != 0x00 {
return 0, fmt.Errorf("e2e socks connect rejected: 0x%02x", connectResp[1])
}

// Drain bind address
switch connectResp[3] {
case 0x01:
io.ReadFull(conn, make([]byte, 4+2))
case 0x03:
lb := make([]byte, 1)
io.ReadFull(conn, lb)
io.ReadFull(conn, make([]byte, int(lb[0])+2))
case 0x04:
io.ReadFull(conn, make([]byte, 16+2))
default:
io.ReadFull(conn, make([]byte, 4+2))
}

// Send HTTP HEAD request
httpReq := fmt.Sprintf("HEAD / HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n", domain)
if _, err := conn.Write([]byte(httpReq)); err != nil {
return 0, fmt.Errorf("e2e http write: %w", err)
}

// Read HTTP response (at least status line)
respBuf := make([]byte, 128)
n, err := conn.Read(respBuf)
if err != nil && n == 0 {
return 0, fmt.Errorf("e2e http read: %w", err)
}
if n < 12 || string(respBuf[:4]) != "HTTP" {
return 0, fmt.Errorf("e2e bad http response: %q", string(respBuf[:n]))
}

return time.Since(start), nil
}

// probeFramingProtocol tests if the instance's upstream speaks our framing protocol
// (i.e., is connected to centralserver). It sends a SYN frame and expects a valid
// frame response. Instances whose upstream is a plain SOCKS5 proxy will fail.
func (c *Checker) probeFramingProtocol(inst *engine.Instance) (time.Duration, error) {
start := time.Now()

conn, err := net.DialTimeout("tcp", inst.Addr(), c.timeout)
if err != nil {
return 0, fmt.Errorf("frame probe connect: %w", err)
}
defer conn.Close()

if tc, ok := conn.(*net.TCPConn); ok {
tc.SetNoDelay(true)
}
conn.SetDeadline(time.Now().Add(c.timeout))

// Build SYN targeting health_check.target:80
domain := c.target
synPayload := make([]byte, 0, 1+1+len(domain)+2)
synPayload = append(synPayload, 0x03) // ATYP = domain
synPayload = append(synPayload, byte(len(domain))) // domain length
synPayload = append(synPayload, []byte(domain)...) // domain
synPayload = append(synPayload, 0x00, 0x50) // port 80

// Use a unique probe ConnID (high range to avoid collision)
probeConnID := uint32(0xFFFF0000) + uint32(inst.ID())

synFrame := &tunnel.Frame{
ConnID: probeConnID,
SeqNum: 0,
Flags: tunnel.FlagSYN,
Payload: synPayload,
}

if err := tunnel.WriteFrame(conn, synFrame); err != nil {
return 0, fmt.Errorf("frame probe write SYN: %w", err)
}

// Send DATA with HTTP HEAD so the target actually responds
httpReq := fmt.Sprintf("HEAD / HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n", domain)
dataFrame := &tunnel.Frame{
ConnID: probeConnID,
SeqNum: 1,
Flags: tunnel.FlagData,
Payload: []byte(httpReq),
}
if err := tunnel.WriteFrame(conn, dataFrame); err != nil {
return 0, fmt.Errorf("frame probe write DATA: %w", err)
}

// Read response frame from centralserver.
// centralserver → connects target, forwards HTTP, target responds → reverse frame.
// plain SOCKS5 → can't parse frame → timeout/error.
respFrame, err := tunnel.ReadFrame(conn)
if err != nil {
return 0, fmt.Errorf("frame probe read: %w", err)
}

if respFrame.ConnID != probeConnID {
return 0, fmt.Errorf("frame probe wrong connID: got %d, want %d",
respFrame.ConnID, probeConnID)
}

// Valid frame = centralserver is there. Send FIN to clean up.
tunnel.WriteFrame(conn, &tunnel.Frame{
ConnID: probeConnID,
SeqNum: 2,
Flags: tunnel.FlagFIN,
})

return time.Since(start), nil
}
Loading