Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions internal/arq/arq.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ type queuedDataRemover interface {
type Logger interface {
Debugf(format string, args ...any)
Infof(format string, args ...any)
Warnf(format string, args ...any)
Errorf(format string, args ...any)
}

type dummyLogger struct{}

func (d *dummyLogger) Debugf(f string, a ...any) {}
func (d *dummyLogger) Infof(f string, a ...any) {}
func (d *dummyLogger) Warnf(f string, a ...any) {}
func (d *dummyLogger) Errorf(f string, a ...any) {}

type arqDataItem struct {
Expand Down Expand Up @@ -1377,6 +1379,7 @@ func (a *ARQ) checkRetransmits() {
}
} else if now.Sub(info.CreatedAt) >= a.dataPacketTTL && info.Retries >= a.maxDataRetries {
a.mu.Unlock()
a.logger.Warnf("⚠️ <yellow>ARQ max retransmissions <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> SN: <cyan>%d</cyan></yellow>", a.streamID, a.sessionID, sn)
a.Close("Max retransmissions exceeded", CloseOptions{SendRST: true})
return
}
Expand Down Expand Up @@ -1409,6 +1412,7 @@ func (a *ARQ) checkRetransmits() {
packetType = uint8(Enums.PACKET_STREAM_RESEND)
}

a.logger.Debugf("🔄 <yellow>ARQ retransmit <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> SN: <cyan>%d</cyan></yellow>", a.streamID, j.sn)
a.enqueuer.PushTXPacket(
priority,
packetType,
Expand Down Expand Up @@ -1542,7 +1546,9 @@ func (a *ARQ) handleTerminalRetransmitState(now time.Time) bool {
return false
}

idleDur := now.Sub(a.lastActivity)
a.mu.Unlock()
a.logger.Infof("⏰ <yellow>ARQ inactivity timeout <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> Idle: <cyan>%s</cyan></yellow>", a.streamID, a.sessionID, idleDur.Truncate(time.Second))
a.Close("Stream Inactivity Timeout (Dead)", CloseOptions{SendRST: true})
return true
}
Expand Down Expand Up @@ -1585,6 +1591,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) {
if exceededRetries {
reason = "Control packet max retransmissions exceeded"
}
a.logger.Debugf("⚠️ <yellow>ARQ control expired <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Type: <cyan>0x%02x</cyan> <magenta>|</magenta> Reason: <cyan>%s</cyan></yellow>", a.streamID, info.PacketType, reason)
a.mu.Unlock()
a.handleTrackedPacketTTLExpiry(info.PacketType, reason)
a.mu.Lock()
Expand All @@ -1606,6 +1613,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) {
continue
}

a.logger.Debugf("🔄 <yellow>ARQ control retransmit <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Type: <cyan>0x%02x</cyan> <magenta>|</magenta> Retry: <cyan>%d</cyan></yellow>", a.streamID, info.PacketType, info.Retries+1)
info.LastSentAt = now
info.Retries++
growth := 1.2
Expand Down
1 change: 1 addition & 0 deletions internal/arq/arq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type testLogger struct {

func (l *testLogger) Debugf(format string, args ...any) { l.t.Logf("[DEBUG] "+format, args...) }
func (l *testLogger) Infof(format string, args ...any) { l.t.Logf("[INFO] "+format, args...) }
func (l *testLogger) Warnf(format string, args ...any) { l.t.Logf("[WARN] "+format, args...) }
func (l *testLogger) Errorf(format string, args ...any) { l.t.Logf("[ERROR] "+format, args...) }

type eofAfterDataConn struct {
Expand Down
73 changes: 72 additions & 1 deletion internal/client/async_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"masterdnsvpn-go/internal/client/handlers"
DnsParser "masterdnsvpn-go/internal/dnsparser"
fragmentStore "masterdnsvpn-go/internal/fragmentstore"
"masterdnsvpn-go/internal/logger"
)

type asyncPacket struct {
Expand Down Expand Up @@ -225,7 +226,11 @@ func (c *Client) StartAsyncRuntime(parentCtx context.Context) error {
}()
}

// 10. Lifecycle cleanup.
// 10. Periodic stats logger.
c.asyncWG.Add(1)
go c.asyncStatsLogger(runtimeCtx)

// 11. Lifecycle cleanup.
c.asyncWG.Add(1)
go func() {
defer c.asyncWG.Done()
Expand Down Expand Up @@ -306,6 +311,72 @@ func (c *Client) asyncStreamCleanupWorker(ctx context.Context) {
}
}

// asyncStatsLogger periodically logs connection and stream statistics.
func (c *Client) asyncStatsLogger(ctx context.Context) {
defer c.asyncWG.Done()

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.logPeriodicStats()
}
}
}

func (c *Client) logPeriodicStats() {
if c.log == nil {
return
}

// Stream stats
c.streamsMu.RLock()
total := len(c.active_streams)
active, draining, pending := 0, 0, 0
for _, s := range c.active_streams {
if s == nil {
continue
}
switch s.StatusValue() {
case streamStatusActive:
active++
case streamStatusDraining, streamStatusClosing, streamStatusTimeWait:
draining++
case streamStatusPending, streamStatusSocksConnecting:
pending++
}
}
c.streamsMu.RUnlock()

// Balancer stats
bs := c.balancer.Stats()

c.log.Infof(
"📊 <green>Stats <magenta>|</magenta> Streams: <cyan>%d</cyan> active, <cyan>%d</cyan> pending, <cyan>%d</cyan> draining (<cyan>%d</cyan> total) <magenta>|</magenta> Resolvers: <cyan>%d</cyan>/<cyan>%d</cyan> valid</green>",
active, pending, draining, total, bs.Valid, bs.Total,
)

if c.log.Enabled(logger.LevelDebug) {
for _, e := range bs.Entries {
var lossStr string
if e.Sent > 0 {
lossPct := float64(e.Sent-e.Acked) * 100 / float64(e.Sent)
lossStr = fmt.Sprintf("%.1f%%", lossPct)
} else {
lossStr = "N/A"
}
c.log.Debugf(
"📊 <cyan>Resolver %s <magenta>|</magenta> Valid: <cyan>%t</cyan> <magenta>|</magenta> Sent: <cyan>%d</cyan> <magenta>|</magenta> Acked: <cyan>%d</cyan> <magenta>|</magenta> Loss: <cyan>%s</cyan> <magenta>|</magenta> AvgRTT: <cyan>%dms</cyan></cyan>",
e.Key, e.Valid, e.Sent, e.Acked, lossStr, e.AvgRTTMicro/1000,
)
}
}
}

// drainQueues removes any stale packets from TX and RX channels.
// Buffers from the RX channel are returned to the pool to prevent leaks.
func (c *Client) drainQueues() {
Expand Down
51 changes: 51 additions & 0 deletions internal/client/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,54 @@ func xorshift64(v uint64) uint64 {
v ^= v << 17
return v
}

// ConnectionStatEntry holds per-resolver stats for periodic reporting.
type ConnectionStatEntry struct {
Key string
Valid bool
Sent uint64
Acked uint64
AvgRTTMicro uint64
}

// BalancerStats holds aggregated snapshot stats.
type BalancerStats struct {
Total int
Valid int
Entries []ConnectionStatEntry
}

// Stats returns a snapshot of per-connection statistics.
func (b *Balancer) Stats() BalancerStats {
snap := b.snapshot.Load()
if snap == nil {
return BalancerStats{}
}

entries := make([]ConnectionStatEntry, 0, len(snap.connections))
for idx, conn := range snap.connections {
if conn == nil {
continue
}
entry := ConnectionStatEntry{
Key: conn.Key,
Valid: conn.IsValid,
}
if idx < len(snap.stats) && snap.stats[idx] != nil {
s := snap.stats[idx]
entry.Sent = s.sent.Load()
entry.Acked = s.acked.Load()
cnt := s.rttCount.Load()
if cnt > 0 {
entry.AvgRTTMicro = s.rttMicrosSum.Load() / cnt
}
}
entries = append(entries, entry)
}

return BalancerStats{
Total: len(snap.connections),
Valid: len(snap.valid),
Entries: entries,
}
}
9 changes: 9 additions & 0 deletions internal/client/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ dispatchLoop:

conns := c.selectTargetConnections(finalPacket.packetType, selectedStreamID)
if len(conns) == 0 {
if c.log != nil {
c.log.Warnf("⚠️ <yellow>No valid connections for dispatch <magenta>|</magenta> PacketType: <cyan>0x%02x</cyan> <magenta>|</magenta> Stream: <cyan>%d</cyan></yellow>", finalPacket.packetType, selectedStreamID)
}
if !wasPacked {
if selected != nil {
key := Enums.PacketIdentityKey(selected.StreamID, item.PacketType, item.SequenceNum, item.FragmentID)
Expand Down Expand Up @@ -347,11 +350,17 @@ dispatchLoop:

encoded, err := c.buildEncodedAutoWithCompressionTrace(opts)
if err != nil {
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Dispatch encode failed: <cyan>%v</cyan></yellow>", err)
}
continue
}

dnsPacket, err := buildTunnelTXTQuestion(domain, encoded)
if err != nil {
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Dispatch DNS build failed: <cyan>%v</cyan></yellow>", err)
}
continue
}

Expand Down
6 changes: 5 additions & 1 deletion internal/client/dns_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (l *DNSListener) handleQuery(ctx context.Context, data []byte, addr *net.UD

l.client.ProcessDNSQuery(data, addr, func(resp []byte) {
if l.conn != nil {
_, _ = l.conn.WriteToUDP(resp, addr)
if _, err := l.conn.WriteToUDP(resp, addr); err != nil {
if l.client.log != nil {
l.client.log.Warnf("⚠️ <yellow>DNS response write failed: <cyan>%v</cyan></yellow>", err)
}
}
}
})
}
Expand Down
26 changes: 25 additions & 1 deletion internal/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (c *Client) InitializeSession(maxAttempts int) error {
}
}

if c.log != nil {
c.log.Warnf("⚠️ <yellow>Session init failed after <cyan>%d</cyan> attempts</yellow>", maxAttempts)
}
return ErrSessionInitFailed
}

Expand All @@ -62,11 +65,17 @@ func (c *Client) initializeSessionOnce() error {

query, err := c.buildSessionQuery(conn.Domain, Enums.PACKET_SESSION_INIT, initPayload)
if err != nil {
if c.log != nil {
c.log.Warnf("⚠️ <yellow>Session init query build failed: <cyan>%v</cyan></yellow>", err)
}
return ErrSessionInitFailed
}

packet, err := c.exchangeDNSOverConnection(conn, query, c.mtuTestTimeout)
if err != nil {
if c.log != nil {
c.log.Debugf("🔄 <yellow>Session init DNS exchange failed: <cyan>%v</cyan></yellow>", err)
}
return ErrSessionInitFailed
}

Expand All @@ -75,7 +84,11 @@ func (c *Client) initializeSessionOnce() error {
if len(packet.Payload) < sessionBusyPayloadSize || !bytes.Equal(packet.Payload[:sessionBusyPayloadSize], verifyCode[:]) {
return ErrSessionInitFailed
}
c.setSessionInitBusyUntil(time.Now().Add(c.cfg.SessionInitBusyRetryInterval()))
busyRetry := c.cfg.SessionInitBusyRetryInterval()
c.setSessionInitBusyUntil(time.Now().Add(busyRetry))
if c.log != nil {
c.log.Warnf("⚠️ <yellow>Server busy, retry after <cyan>%s</cyan></yellow>", busyRetry)
}
return ErrSessionInitBusy
case Enums.PACKET_SESSION_ACCEPT:
if len(packet.Payload) < sessionAcceptPayloadSize || !bytes.Equal(packet.Payload[3:7], verifyCode[:]) {
Expand All @@ -91,6 +104,14 @@ func (c *Client) initializeSessionOnce() error {
c.clearSessionInitBusyUntil()
c.resetSessionInitState()
c.clearSessionResetPending()
if c.log != nil {
c.log.Infof(
"✅ <green>Session Accepted <magenta>|</magenta> ID: <cyan>%d</cyan> <magenta>|</magenta> Cookie: <cyan>%d</cyan> <magenta>|</magenta> Upload: <cyan>%s</cyan> <magenta>|</magenta> Download: <cyan>%s</cyan></green>",
c.sessionID, c.sessionCookie,
compression.TypeName(c.uploadCompression),
compression.TypeName(c.downloadCompression),
)
}
return nil
default:
return ErrSessionInitFailed
Expand Down Expand Up @@ -319,6 +340,9 @@ func (c *Client) sendSessionCloseRound(targets []Connection, deadline time.Time)
PacketType: Enums.PACKET_SESSION_CLOSE,
})
if err != nil {
if c.log != nil {
c.log.Debugf("⚠️ <yellow>Session close query build failed: <cyan>%v</cyan></yellow>", err)
}
return
}
c.sendOneWayDNSQuery(conn, query, deadline)
Expand Down
9 changes: 8 additions & 1 deletion internal/client/socks_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func (c *Client) supportsSOCKS4() bool {
func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) {
version := make([]byte, 1)
if _, err := io.ReadFull(conn, version); err != nil {
if c.log != nil {
c.log.Debugf("🔌 <yellow>SOCKS handshake read failed: <cyan>%v</cyan></yellow>", err)
}
_ = conn.Close()
return
}
Expand All @@ -83,6 +86,9 @@ func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) {
}
c.handleSOCKS4Request(ctx, conn)
default:
if c.log != nil {
c.log.Debugf("🔌 <yellow>SOCKS unknown version: <cyan>0x%02x</cyan></yellow>", version[0])
}
_ = conn.Close()
}
}
Expand Down Expand Up @@ -670,7 +676,7 @@ func (c *Client) HandleSocksConnected(packet VpnProto.Packet) error {
arqObj.SetIOReady(true)
}

c.log.Debugf("🔌 <green>Socks successfully connected for stream %d</green>", packet.StreamID)
c.log.Infof("🔌 <green>SOCKS connected <magenta>|</magenta> Stream: <cyan>%d</cyan></green>", packet.StreamID)
return nil
}

Expand Down Expand Up @@ -714,6 +720,7 @@ func (c *Client) HandleSocksFailure(packet VpnProto.Packet) error {
return nil
}

c.log.Warnf("🔌 <yellow>SOCKS failure <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> PacketType: <cyan>0x%02x</cyan></yellow>", packet.StreamID, packet.PacketType)
arqObj.Close("SOCKS failure received", arq.CloseOptions{Force: true})
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions internal/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ func (c *Client) new_stream(streamID uint16, conn net.Conn, targetPayload []byte
c.ensureStreamPreferredConnection(s)
}

if c.log != nil && streamID != 0 {
c.log.Infof(
"🧦 <green>Stream Created <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan> <magenta>|</magenta> Status: <cyan>%s</cyan></green>",
streamID, c.sessionID, s.StatusValue(),
)
}

return s
}

Expand Down Expand Up @@ -286,6 +293,13 @@ func (s *Stream_client) RemoveQueuedData(sequenceNum uint16) bool {
}

func (s *Stream_client) cleanupResources() {
if s.client != nil && s.client.log != nil && s.StreamID != 0 {
s.client.log.Debugf(
"🧹 <yellow>Stream Cleanup <magenta>|</magenta> Stream: <cyan>%d</cyan> <magenta>|</magenta> Session: <cyan>%d</cyan></yellow>",
s.StreamID, s.client.sessionID,
)
}

if s.NetConn != nil {
_ = s.NetConn.Close()
}
Expand Down Expand Up @@ -464,6 +478,10 @@ func (c *Client) CloseAllStreams() {
c.active_streams = make(map[uint16]*Stream_client)
c.streamsMu.Unlock()

if c.log != nil {
c.log.Infof("🧦 <yellow>Closing all streams <magenta>|</magenta> Count: <cyan>%d</cyan></yellow>", len(streams))
}

for _, s := range streams {
if s != nil {
s.Close()
Expand Down
Loading