diff --git a/internal/arq/arq.go b/internal/arq/arq.go index afa6ff5..07625ca 100644 --- a/internal/arq/arq.go +++ b/internal/arq/arq.go @@ -48,6 +48,7 @@ type queuedDataRemover interface { type Logger interface { Debugf(format string, args ...any) Infof(format string, args ...any) + Warnf(format string, args ...any) Errorf(format string, args ...any) } @@ -55,6 +56,7 @@ type dummyLogger struct{} func (d *dummyLogger) Debugf(f string, a ...any) {} func (d *dummyLogger) Infof(f string, a ...any) {} +func (d *dummyLogger) Warnf(f string, a ...any) {} func (d *dummyLogger) Errorf(f string, a ...any) {} type arqDataItem struct { @@ -1377,6 +1379,7 @@ func (a *ARQ) checkRetransmits() { } } else if now.Sub(info.CreatedAt) >= a.dataPacketTTL && info.Retries >= a.maxDataRetries { a.mu.Unlock() + a.logger.Warnf("โš ๏ธ ARQ max retransmissions | Stream: %d | Session: %d | SN: %d", a.streamID, a.sessionID, sn) a.Close("Max retransmissions exceeded", CloseOptions{SendRST: true}) return } @@ -1409,6 +1412,7 @@ func (a *ARQ) checkRetransmits() { packetType = uint8(Enums.PACKET_STREAM_RESEND) } + a.logger.Debugf("๐Ÿ”„ ARQ retransmit | Stream: %d | SN: %d", a.streamID, j.sn) a.enqueuer.PushTXPacket( priority, packetType, @@ -1542,7 +1546,9 @@ func (a *ARQ) handleTerminalRetransmitState(now time.Time) bool { return false } + idleDur := now.Sub(a.lastActivity) a.mu.Unlock() + a.logger.Infof("โฐ ARQ inactivity timeout | Stream: %d | Session: %d | Idle: %s", a.streamID, a.sessionID, idleDur.Truncate(time.Second)) a.Close("Stream Inactivity Timeout (Dead)", CloseOptions{SendRST: true}) return true } @@ -1585,6 +1591,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) { if exceededRetries { reason = "Control packet max retransmissions exceeded" } + a.logger.Debugf("โš ๏ธ ARQ control expired | Stream: %d | Type: 0x%02x | Reason: %s", a.streamID, info.PacketType, reason) a.mu.Unlock() a.handleTrackedPacketTTLExpiry(info.PacketType, reason) a.mu.Lock() @@ -1606,6 +1613,7 @@ func (a *ARQ) checkControlRetransmits(now time.Time) { continue } + a.logger.Debugf("๐Ÿ”„ ARQ control retransmit | Stream: %d | Type: 0x%02x | Retry: %d", a.streamID, info.PacketType, info.Retries+1) info.LastSentAt = now info.Retries++ growth := 1.2 diff --git a/internal/arq/arq_test.go b/internal/arq/arq_test.go index dc5126d..d262514 100644 --- a/internal/arq/arq_test.go +++ b/internal/arq/arq_test.go @@ -64,6 +64,7 @@ type testLogger struct { func (l *testLogger) Debugf(format string, args ...any) { l.t.Logf("[DEBUG] "+format, args...) } func (l *testLogger) Infof(format string, args ...any) { l.t.Logf("[INFO] "+format, args...) } +func (l *testLogger) Warnf(format string, args ...any) { l.t.Logf("[WARN] "+format, args...) } func (l *testLogger) Errorf(format string, args ...any) { l.t.Logf("[ERROR] "+format, args...) } type eofAfterDataConn struct { diff --git a/internal/client/async_runtime.go b/internal/client/async_runtime.go index 02c63f8..8a2da70 100644 --- a/internal/client/async_runtime.go +++ b/internal/client/async_runtime.go @@ -19,6 +19,7 @@ import ( "masterdnsvpn-go/internal/client/handlers" DnsParser "masterdnsvpn-go/internal/dnsparser" fragmentStore "masterdnsvpn-go/internal/fragmentstore" + "masterdnsvpn-go/internal/logger" ) type asyncPacket struct { @@ -225,7 +226,11 @@ func (c *Client) StartAsyncRuntime(parentCtx context.Context) error { }() } - // 10. Lifecycle cleanup. + // 10. Periodic stats logger. + c.asyncWG.Add(1) + go c.asyncStatsLogger(runtimeCtx) + + // 11. Lifecycle cleanup. c.asyncWG.Add(1) go func() { defer c.asyncWG.Done() @@ -306,6 +311,72 @@ func (c *Client) asyncStreamCleanupWorker(ctx context.Context) { } } +// asyncStatsLogger periodically logs connection and stream statistics. +func (c *Client) asyncStatsLogger(ctx context.Context) { + defer c.asyncWG.Done() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.logPeriodicStats() + } + } +} + +func (c *Client) logPeriodicStats() { + if c.log == nil { + return + } + + // Stream stats + c.streamsMu.RLock() + total := len(c.active_streams) + active, draining, pending := 0, 0, 0 + for _, s := range c.active_streams { + if s == nil { + continue + } + switch s.StatusValue() { + case streamStatusActive: + active++ + case streamStatusDraining, streamStatusClosing, streamStatusTimeWait: + draining++ + case streamStatusPending, streamStatusSocksConnecting: + pending++ + } + } + c.streamsMu.RUnlock() + + // Balancer stats + bs := c.balancer.Stats() + + c.log.Infof( + "๐Ÿ“Š Stats | Streams: %d active, %d pending, %d draining (%d total) | Resolvers: %d/%d valid", + active, pending, draining, total, bs.Valid, bs.Total, + ) + + if c.log.Enabled(logger.LevelDebug) { + for _, e := range bs.Entries { + var lossStr string + if e.Sent > 0 { + lossPct := float64(e.Sent-e.Acked) * 100 / float64(e.Sent) + lossStr = fmt.Sprintf("%.1f%%", lossPct) + } else { + lossStr = "N/A" + } + c.log.Debugf( + "๐Ÿ“Š Resolver %s | Valid: %t | Sent: %d | Acked: %d | Loss: %s | AvgRTT: %dms", + e.Key, e.Valid, e.Sent, e.Acked, lossStr, e.AvgRTTMicro/1000, + ) + } + } +} + // drainQueues removes any stale packets from TX and RX channels. // Buffers from the RX channel are returned to the pool to prevent leaks. func (c *Client) drainQueues() { diff --git a/internal/client/balancer.go b/internal/client/balancer.go index 0e1af30..69930d6 100644 --- a/internal/client/balancer.go +++ b/internal/client/balancer.go @@ -579,3 +579,54 @@ func xorshift64(v uint64) uint64 { v ^= v << 17 return v } + +// ConnectionStatEntry holds per-resolver stats for periodic reporting. +type ConnectionStatEntry struct { + Key string + Valid bool + Sent uint64 + Acked uint64 + AvgRTTMicro uint64 +} + +// BalancerStats holds aggregated snapshot stats. +type BalancerStats struct { + Total int + Valid int + Entries []ConnectionStatEntry +} + +// Stats returns a snapshot of per-connection statistics. +func (b *Balancer) Stats() BalancerStats { + snap := b.snapshot.Load() + if snap == nil { + return BalancerStats{} + } + + entries := make([]ConnectionStatEntry, 0, len(snap.connections)) + for idx, conn := range snap.connections { + if conn == nil { + continue + } + entry := ConnectionStatEntry{ + Key: conn.Key, + Valid: conn.IsValid, + } + if idx < len(snap.stats) && snap.stats[idx] != nil { + s := snap.stats[idx] + entry.Sent = s.sent.Load() + entry.Acked = s.acked.Load() + cnt := s.rttCount.Load() + if cnt > 0 { + entry.AvgRTTMicro = s.rttMicrosSum.Load() / cnt + } + } + entries = append(entries, entry) + } + + return BalancerStats{ + Total: len(snap.connections), + Valid: len(snap.valid), + Entries: entries, + } +} diff --git a/internal/client/dispatcher.go b/internal/client/dispatcher.go index fc69e1d..4228b77 100644 --- a/internal/client/dispatcher.go +++ b/internal/client/dispatcher.go @@ -306,6 +306,9 @@ dispatchLoop: conns := c.selectTargetConnections(finalPacket.packetType, selectedStreamID) if len(conns) == 0 { + if c.log != nil { + c.log.Warnf("โš ๏ธ No valid connections for dispatch | PacketType: 0x%02x | Stream: %d", finalPacket.packetType, selectedStreamID) + } if !wasPacked { if selected != nil { key := Enums.PacketIdentityKey(selected.StreamID, item.PacketType, item.SequenceNum, item.FragmentID) @@ -347,11 +350,17 @@ dispatchLoop: encoded, err := c.buildEncodedAutoWithCompressionTrace(opts) if err != nil { + if c.log != nil { + c.log.Debugf("โš ๏ธ Dispatch encode failed: %v", err) + } continue } dnsPacket, err := buildTunnelTXTQuestion(domain, encoded) if err != nil { + if c.log != nil { + c.log.Debugf("โš ๏ธ Dispatch DNS build failed: %v", err) + } continue } diff --git a/internal/client/dns_listener.go b/internal/client/dns_listener.go index ec33a2f..9318210 100644 --- a/internal/client/dns_listener.go +++ b/internal/client/dns_listener.go @@ -128,7 +128,11 @@ func (l *DNSListener) handleQuery(ctx context.Context, data []byte, addr *net.UD l.client.ProcessDNSQuery(data, addr, func(resp []byte) { if l.conn != nil { - _, _ = l.conn.WriteToUDP(resp, addr) + if _, err := l.conn.WriteToUDP(resp, addr); err != nil { + if l.client.log != nil { + l.client.log.Warnf("โš ๏ธ DNS response write failed: %v", err) + } + } } }) } diff --git a/internal/client/session.go b/internal/client/session.go index edb6c4c..f255678 100644 --- a/internal/client/session.go +++ b/internal/client/session.go @@ -51,6 +51,9 @@ func (c *Client) InitializeSession(maxAttempts int) error { } } + if c.log != nil { + c.log.Warnf("โš ๏ธ Session init failed after %d attempts", maxAttempts) + } return ErrSessionInitFailed } @@ -62,11 +65,17 @@ func (c *Client) initializeSessionOnce() error { query, err := c.buildSessionQuery(conn.Domain, Enums.PACKET_SESSION_INIT, initPayload) if err != nil { + if c.log != nil { + c.log.Warnf("โš ๏ธ Session init query build failed: %v", err) + } return ErrSessionInitFailed } packet, err := c.exchangeDNSOverConnection(conn, query, c.mtuTestTimeout) if err != nil { + if c.log != nil { + c.log.Debugf("๐Ÿ”„ Session init DNS exchange failed: %v", err) + } return ErrSessionInitFailed } @@ -75,7 +84,11 @@ func (c *Client) initializeSessionOnce() error { if len(packet.Payload) < sessionBusyPayloadSize || !bytes.Equal(packet.Payload[:sessionBusyPayloadSize], verifyCode[:]) { return ErrSessionInitFailed } - c.setSessionInitBusyUntil(time.Now().Add(c.cfg.SessionInitBusyRetryInterval())) + busyRetry := c.cfg.SessionInitBusyRetryInterval() + c.setSessionInitBusyUntil(time.Now().Add(busyRetry)) + if c.log != nil { + c.log.Warnf("โš ๏ธ Server busy, retry after %s", busyRetry) + } return ErrSessionInitBusy case Enums.PACKET_SESSION_ACCEPT: if len(packet.Payload) < sessionAcceptPayloadSize || !bytes.Equal(packet.Payload[3:7], verifyCode[:]) { @@ -91,6 +104,14 @@ func (c *Client) initializeSessionOnce() error { c.clearSessionInitBusyUntil() c.resetSessionInitState() c.clearSessionResetPending() + if c.log != nil { + c.log.Infof( + "โœ… Session Accepted | ID: %d | Cookie: %d | Upload: %s | Download: %s", + c.sessionID, c.sessionCookie, + compression.TypeName(c.uploadCompression), + compression.TypeName(c.downloadCompression), + ) + } return nil default: return ErrSessionInitFailed @@ -319,6 +340,9 @@ func (c *Client) sendSessionCloseRound(targets []Connection, deadline time.Time) PacketType: Enums.PACKET_SESSION_CLOSE, }) if err != nil { + if c.log != nil { + c.log.Debugf("โš ๏ธ Session close query build failed: %v", err) + } return } c.sendOneWayDNSQuery(conn, query, deadline) diff --git a/internal/client/socks_manager.go b/internal/client/socks_manager.go index 9421f13..0553dcd 100644 --- a/internal/client/socks_manager.go +++ b/internal/client/socks_manager.go @@ -69,6 +69,9 @@ func (c *Client) supportsSOCKS4() bool { func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) { version := make([]byte, 1) if _, err := io.ReadFull(conn, version); err != nil { + if c.log != nil { + c.log.Debugf("๐Ÿ”Œ SOCKS handshake read failed: %v", err) + } _ = conn.Close() return } @@ -83,6 +86,9 @@ func (c *Client) HandleSOCKS5(ctx context.Context, conn net.Conn) { } c.handleSOCKS4Request(ctx, conn) default: + if c.log != nil { + c.log.Debugf("๐Ÿ”Œ SOCKS unknown version: 0x%02x", version[0]) + } _ = conn.Close() } } @@ -670,7 +676,7 @@ func (c *Client) HandleSocksConnected(packet VpnProto.Packet) error { arqObj.SetIOReady(true) } - c.log.Debugf("๐Ÿ”Œ Socks successfully connected for stream %d", packet.StreamID) + c.log.Infof("๐Ÿ”Œ SOCKS connected | Stream: %d", packet.StreamID) return nil } @@ -714,6 +720,7 @@ func (c *Client) HandleSocksFailure(packet VpnProto.Packet) error { return nil } + c.log.Warnf("๐Ÿ”Œ SOCKS failure | Stream: %d | PacketType: 0x%02x", packet.StreamID, packet.PacketType) arqObj.Close("SOCKS failure received", arq.CloseOptions{Force: true}) return nil } diff --git a/internal/client/stream_client.go b/internal/client/stream_client.go index a9a9b43..4fcd183 100644 --- a/internal/client/stream_client.go +++ b/internal/client/stream_client.go @@ -174,6 +174,13 @@ func (c *Client) new_stream(streamID uint16, conn net.Conn, targetPayload []byte c.ensureStreamPreferredConnection(s) } + if c.log != nil && streamID != 0 { + c.log.Infof( + "๐Ÿงฆ Stream Created | Stream: %d | Session: %d | Status: %s", + streamID, c.sessionID, s.StatusValue(), + ) + } + return s } @@ -286,6 +293,13 @@ func (s *Stream_client) RemoveQueuedData(sequenceNum uint16) bool { } func (s *Stream_client) cleanupResources() { + if s.client != nil && s.client.log != nil && s.StreamID != 0 { + s.client.log.Debugf( + "๐Ÿงน Stream Cleanup | Stream: %d | Session: %d", + s.StreamID, s.client.sessionID, + ) + } + if s.NetConn != nil { _ = s.NetConn.Close() } @@ -464,6 +478,10 @@ func (c *Client) CloseAllStreams() { c.active_streams = make(map[uint16]*Stream_client) c.streamsMu.Unlock() + if c.log != nil { + c.log.Infof("๐Ÿงฆ Closing all streams | Count: %d", len(streams)) + } + for _, s := range streams { if s != nil { s.Close() diff --git a/internal/logger/logger.go b/internal/logger/logger.go index a24d43c..c68190e 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -167,6 +167,40 @@ func (l *Logger) Enabled(level int) bool { return l != nil && level >= l.level } +// SubLogger wraps a Logger with a pre-rendered context prefix (e.g. "[Sess:3] [Str:42]"). +// It satisfies the arq.Logger interface and can be chained via With(). +type SubLogger struct { + parent *Logger + prefix string +} + +// With creates a SubLogger with a single [key:value] context field. +func (l *Logger) With(key, value string) *SubLogger { + return &SubLogger{parent: l, prefix: "[" + key + ":" + value + "]"} +} + +// With chains an additional [key:value] context field onto an existing SubLogger. +func (s *SubLogger) With(key, value string) *SubLogger { + return &SubLogger{parent: s.parent, prefix: s.prefix + " [" + key + ":" + value + "]"} +} + +func (s *SubLogger) Debugf(format string, args ...any) { + s.parent.logf(levelDebug, s.prefix+" "+format, args...) +} +func (s *SubLogger) Infof(format string, args ...any) { + s.parent.logf(levelInfo, s.prefix+" "+format, args...) +} +func (s *SubLogger) Warnf(format string, args ...any) { + s.parent.logf(levelWarn, s.prefix+" "+format, args...) +} +func (s *SubLogger) Errorf(format string, args ...any) { + s.parent.logf(levelError, s.prefix+" "+format, args...) +} + +func (s *SubLogger) Enabled(level int) bool { + return s.parent.Enabled(level) +} + func stripColorTags(text string) string { start := strings.IndexByte(text, '<') if start == -1 { diff --git a/internal/logger/logger_test.go b/internal/logger/logger_test.go index 3e72738..0cbba8c 100644 --- a/internal/logger/logger_test.go +++ b/internal/logger/logger_test.go @@ -93,3 +93,76 @@ func TestShouldUseColorHonorsNoColor(t *testing.T) { t.Fatal("NO_COLOR should disable colors even when FORCE_COLOR is set") } } + +func TestSubLoggerPrefix(t *testing.T) { + var buf bytes.Buffer + l := &Logger{ + name: "test", + level: levelDebug, + consoleWriter: &buf, + color: false, + appNameText: "[test]", + } + + sub := l.With("Sess", "3") + sub.Infof("hello %s", "world") + + output := buf.String() + if !strings.Contains(output, "[Sess:3] hello world") { + t.Fatalf("expected prefix in output, got: %s", output) + } +} + +func TestSubLoggerChaining(t *testing.T) { + var buf bytes.Buffer + l := &Logger{ + name: "test", + level: levelDebug, + consoleWriter: &buf, + color: false, + appNameText: "[test]", + } + + sub := l.With("Sess", "3").With("Str", "42") + sub.Debugf("stream opened") + + output := buf.String() + if !strings.Contains(output, "[Sess:3] [Str:42] stream opened") { + t.Fatalf("expected chained prefix in output, got: %s", output) + } +} + +func TestSubLoggerRespectsLevel(t *testing.T) { + var buf bytes.Buffer + l := &Logger{ + name: "test", + level: levelWarn, + consoleWriter: &buf, + color: false, + appNameText: "[test]", + } + + sub := l.With("Sess", "1") + sub.Debugf("should not appear") + sub.Infof("should not appear") + sub.Warnf("should appear") + + output := buf.String() + if strings.Contains(output, "should not appear") { + t.Fatal("sub-logger should suppress below parent level") + } + if !strings.Contains(output, "should appear") { + t.Fatal("sub-logger should log at parent level") + } +} + +func TestSubLoggerEnabled(t *testing.T) { + l := &Logger{level: levelWarn} + sub := l.With("X", "1") + if sub.Enabled(levelDebug) { + t.Fatal("Enabled(debug) should be false at warn level") + } + if !sub.Enabled(levelWarn) { + t.Fatal("Enabled(warn) should be true at warn level") + } +} diff --git a/internal/udpserver/server.go b/internal/udpserver/server.go index a9329e7..ff59a81 100644 --- a/internal/udpserver/server.go +++ b/internal/udpserver/server.go @@ -192,6 +192,8 @@ func (s *Server) Run(ctx context.Context) error { s.sessionCleanupLoop(runCtx) }() + go s.serverStatsLogger(runCtx) + s.deferredSession.Start(runCtx) s.startDNSWorkers(runCtx, conn, reqCh, &workerWG) @@ -221,3 +223,42 @@ func (s *Server) Run(ctx context.Context) error { return nil } } + +func (s *Server) serverStatsLogger(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.logServerStats() + } + } +} + +func (s *Server) logServerStats() { + if s.log == nil { + return + } + + s.sessions.mu.Lock() + activeSessions := 0 + totalStreams := 0 + for _, record := range s.sessions.byID { + if record == nil { + continue + } + activeSessions++ + record.StreamsMu.RLock() + totalStreams += len(record.ActiveStreams) + record.StreamsMu.RUnlock() + } + s.sessions.mu.Unlock() + + s.log.Infof( + "๐Ÿ“Š Server Stats | Sessions: %d | Streams: %d", + activeSessions, totalStreams, + ) +} diff --git a/internal/udpserver/server_session.go b/internal/udpserver/server_session.go index d3307fa..89afc38 100644 --- a/internal/udpserver/server_session.go +++ b/internal/udpserver/server_session.go @@ -71,6 +71,12 @@ func (s *Server) handleSessionCloseNotice(vpnPacket VpnProto.Packet, now time.Ti lookup, known := s.sessions.Lookup(vpnPacket.SessionID) if !known || lookup.State != sessionLookupActive || lookup.Cookie != vpnPacket.SessionCookie { + if s.debugLoggingEnabled() { + s.log.Debugf( + "\U0001F6AA Session close rejected | Session: %d | Known: %t | Cookie match: %t", + vpnPacket.SessionID, known, known && lookup.Cookie == vpnPacket.SessionCookie, + ) + } return } diff --git a/internal/udpserver/server_utils.go b/internal/udpserver/server_utils.go index 4bb8ed3..a764a28 100644 --- a/internal/udpserver/server_utils.go +++ b/internal/udpserver/server_utils.go @@ -45,10 +45,16 @@ func buildNoDataResponseLite(packet []byte, parsed DnsParser.LitePacket) []byte } func (s *Server) buildNoDataResponseLogged(packet []byte, reason string) []byte { + if s.debugLoggingEnabled() { + s.log.Debugf("โš ๏ธ NoData response | Reason: %s", reason) + } return buildNoDataResponse(packet) } func (s *Server) buildNoDataResponseLiteLogged(packet []byte, parsed DnsParser.LitePacket, reason string) []byte { + if s.debugLoggingEnabled() { + s.log.Debugf("โš ๏ธ NoData response | Reason: %s", reason) + } return buildNoDataResponseLite(packet, parsed) } diff --git a/internal/udpserver/stream_server.go b/internal/udpserver/stream_server.go index c41b6a9..ed3e647 100644 --- a/internal/udpserver/stream_server.go +++ b/internal/udpserver/stream_server.go @@ -57,6 +57,14 @@ func NewStreamServer(streamID uint16, sessionID uint8, arqConfig arq.Config, loc s.ARQ = arq.NewARQ(streamID, sessionID, s, localConn, mtu, logger, arqConfig) s.ARQ.Start() + + if logger != nil { + logger.Debugf( + "๐Ÿงฆ Stream Server Created | Stream: %d | Session: %d", + streamID, sessionID, + ) + } + return s }