From b197c59570c151dd4bf2ad6740a2615485e49976 Mon Sep 17 00:00:00 2001 From: Calin Teodor Date: Mon, 9 Feb 2026 14:18:11 +0200 Subject: [PATCH 01/19] Refactor/maintainability (#4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add install section to agent skills documentation Agents and users reading SKILLS.md had no install instructions. Adding the section makes the doc self-contained for onboarding. * Extract named constants and eliminate magic numbers Hardcoded numeric literals scattered across the daemon package made tuning values difficult to find and reason about. This centralizes them into named constants with clear documentation: - Beacon message types (protocol/header.go): single source of truth used by beacon server, tunnel manager, and tests - Dial/retransmission constants: retry counts, RTO bounds, intervals - RTO parameters (RFC 6298): clock granularity, min/max clamp values - Zero-window probe bounds, accept queue, send buffer capacities - Handshake timing: replay reaper interval, recv timeout, close delay - ConnState.String() method replaces inline switch in ConnectionList - Heartbeat loop uses config keepalive interval instead of hardcoded 30s * Improve maintainability with sentinel errors, deduplication, and cleanup Define shared sentinel errors (ErrNodeNotFound, ErrNetworkNotFound, ErrConnClosed, ErrConnRefused, ErrDialTimeout, ErrChecksumMismatch) in protocol/header.go and replace ~25 ad-hoc fmt.Errorf strings across registry, daemon, driver, and beacon — callers can now use errors.Is(). Extract startRecvPusher() in daemon/ipc.go to deduplicate the recv-push goroutine that was copy-pasted between CmdDial and CmdAccept handlers. Add jsonRPC() helper and named sub-command constants in driver/driver.go, reducing 10 methods from 8-12 lines each to 1-5 lines. Fix silent error ignoring in dashboard (w.Write), gateway (io.Copy), and nameserver (conn.Write) with explicit discards or error logging. Convert 7 short mutex patterns to use defer across daemon, driver, and replication packages. Remove redundant replication count() method. Add overflow guard comment in packet.go and document InsecureSkipVerify cert-pinning pattern in registry/client.go. --------- Co-authored-by: Teodor Calin --- docs/SKILLS.md | 22 +++++ pkg/beacon/server.go | 24 ++--- pkg/daemon/daemon.go | 61 ++++++++----- pkg/daemon/handshake.go | 15 +-- pkg/daemon/ipc.go | 69 ++++++-------- pkg/daemon/ports.go | 71 +++++++++------ pkg/daemon/tunnel.go | 21 +++-- pkg/driver/conn.go | 3 +- pkg/driver/driver.go | 176 ++++++++++-------------------------- pkg/driver/ipc.go | 6 +- pkg/gateway/gateway.go | 8 +- pkg/nameserver/server.go | 4 +- pkg/protocol/header.go | 22 +++++ pkg/protocol/packet.go | 6 +- pkg/registry/client.go | 6 +- pkg/registry/dashboard.go | 4 +- pkg/registry/replication.go | 12 +-- pkg/registry/server.go | 40 ++++---- tests/nat_traversal_test.go | 20 ++-- 19 files changed, 284 insertions(+), 306 deletions(-) diff --git a/docs/SKILLS.md b/docs/SKILLS.md index 7dd6160c..cc43cc8b 100644 --- a/docs/SKILLS.md +++ b/docs/SKILLS.md @@ -48,6 +48,28 @@ The `hint` field is included in most errors and tells you what to do next. - **Nothing is interactive**: every command runs non-interactively and exits. Use `--json` for programmatic output - **All agents are on network 0** (the global backbone). Custom networks and nameserver are planned but not yet available +## Install + +No sudo required. Binaries are installed to `~/.pilot/bin/`. + +```bash +curl -fsSL https://raw.githubusercontent.com/TeoSlayer/pilotprotocol/main/install.sh | sh +``` + +Sets a hostname during install: + +```bash +curl -fsSL https://raw.githubusercontent.com/TeoSlayer/pilotprotocol/main/install.sh | PILOT_HOSTNAME=my-agent sh +``` + +For bots (install the agent skills via ClawHub): + +```bash +clawhub install pilotprotocol +``` + +The installer detects your platform, downloads pre-built binaries (or builds from source if no release is available), writes `~/.pilot/config.json`, adds `~/.pilot/bin` to your PATH, and sets up a system service (systemd on Linux, launchd on macOS). Only the gateway requires sudo — and only for ports below 1024. + ## Self-discovery ```bash diff --git a/pkg/beacon/server.go b/pkg/beacon/server.go index 1696c83d..2e64462d 100644 --- a/pkg/beacon/server.go +++ b/pkg/beacon/server.go @@ -6,16 +6,8 @@ import ( "log/slog" "net" "sync" -) -// Message types -const ( - MsgDiscover byte = 0x01 - MsgDiscoverReply byte = 0x02 - MsgPunchRequest byte = 0x03 - MsgPunchCommand byte = 0x04 - MsgRelay byte = 0x05 - MsgRelayDeliver byte = 0x06 + "web4/pkg/protocol" ) type Server struct { @@ -88,11 +80,11 @@ func (s *Server) handlePacket(data []byte, remote *net.UDPAddr) { msgType := data[0] switch msgType { - case MsgDiscover: + case protocol.BeaconMsgDiscover: s.handleDiscover(data[1:], remote) - case MsgPunchRequest: + case protocol.BeaconMsgPunchRequest: s.handlePunchRequest(data[1:], remote) - case MsgRelay: + case protocol.BeaconMsgRelay: s.handleRelay(data[1:], remote) default: slog.Warn("unknown beacon message type", "type", fmt.Sprintf("0x%02X", msgType), "from", remote) @@ -125,7 +117,7 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { // Format: [type(1)][iplen(1)][IP(4 or 16)][port(2)] reply := make([]byte, 1+1+len(ip)+2) - reply[0] = MsgDiscoverReply + reply[0] = protocol.BeaconMsgDiscoverReply reply[1] = byte(len(ip)) copy(reply[2:2+len(ip)], ip) binary.BigEndian.PutUint16(reply[2+len(ip):], uint16(remote.Port)) @@ -197,7 +189,7 @@ func (s *Server) handleRelay(data []byte, remote *net.UDPAddr) { // Build relay deliver message msg := make([]byte, 1+4+len(payload)) - msg[0] = MsgRelayDeliver + msg[0] = protocol.BeaconMsgRelayDeliver binary.BigEndian.PutUint32(msg[1:5], senderNodeID) copy(msg[5:], payload) @@ -213,7 +205,7 @@ func (s *Server) SendPunchCommand(nodeID uint32, targetIP net.IP, targetPort uin s.mu.RUnlock() if !ok { - return fmt.Errorf("node %d not found", nodeID) + return fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } ip := targetIP.To4() @@ -226,7 +218,7 @@ func (s *Server) SendPunchCommand(nodeID uint32, targetIP net.IP, targetPort uin // Format: [type(1)][iplen(1)][IP(4 or 16)][port(2)] msg := make([]byte, 1+1+len(ip)+2) - msg[0] = MsgPunchCommand + msg[0] = protocol.BeaconMsgPunchCommand msg[1] = byte(len(ip)) copy(msg[2:2+len(ip)], ip) binary.BigEndian.PutUint16(msg[2+len(ip):], targetPort) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index b78264df..feb3a808 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -60,6 +60,25 @@ const ( DefaultTimeWaitDuration = 10 * time.Second ) +// Dial and retransmission constants. +const ( + DialDirectRetries = 3 // direct connection attempts before relay + DialMaxRetries = 6 // total attempts (direct + relay) + DialInitialRTO = 1 * time.Second // initial SYN retransmission timeout + DialMaxRTO = 8 * time.Second // max backoff for SYN retransmission + DialCheckInterval = 10 * time.Millisecond // poll interval for state changes during dial + RetxCheckInterval = 100 * time.Millisecond // retransmission check ticker + MaxRetxAttempts = 8 // abandon connection after this many retransmissions + HeartbeatReregThresh = 3 // heartbeat failures before re-registration + SYNBucketAge = 10 * time.Second // stale per-source SYN bucket reap threshold +) + +// Zero-window probe constants. +const ( + ZeroWinProbeInitial = 500 * time.Millisecond // initial zero-window probe interval + ZeroWinProbeMax = 30 * time.Second // max zero-window probe backoff +) + type Daemon struct { config Config addrMu sync.RWMutex // protects nodeID and addr (H6 fix) @@ -203,7 +222,7 @@ func (d *Daemon) allowSYNFromSource(srcNode uint32) bool { func (d *Daemon) reapPerSrcSYN() { d.perSrcSYNMu.Lock() defer d.perSrcSYNMu.Unlock() - threshold := time.Now().Add(-10 * time.Second) + threshold := time.Now().Add(-SYNBucketAge) for id, b := range d.perSrcSYN { if b.lastFill.Before(threshold) { delete(d.perSrcSYN, id) @@ -977,17 +996,17 @@ func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connect // Phase 1: Direct connection (3 retries). // Phase 2: Relay through beacon if direct fails (3 more retries). retries := 0 - directRetries := 3 - maxRetries := 6 + directRetries := DialDirectRetries + maxRetries := DialMaxRetries relayActive := d.tunnels.IsRelayPeer(dstAddr.Node) // may already be relay from prior attempt if relayActive { directRetries = 0 // skip direct phase, go straight to relay } - rto := 1 * time.Second + rto := DialInitialRTO timer := time.NewTimer(rto) defer timer.Stop() - check := time.NewTicker(10 * time.Millisecond) + check := time.NewTicker(DialCheckInterval) defer check.Stop() for { @@ -1001,7 +1020,7 @@ func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connect return conn, nil } if st == StateClosed { - return nil, fmt.Errorf("connection refused") + return nil, protocol.ErrConnRefused } case <-timer.C: retries++ @@ -1011,12 +1030,12 @@ func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connect slog.Info("direct dial timed out, switching to relay", "node_id", dstAddr.Node) d.tunnels.SetRelayPeer(dstAddr.Node, true) relayActive = true - rto = 1 * time.Second // reset backoff for relay phase + rto = DialInitialRTO // reset backoff for relay phase } if retries > maxRetries { d.ports.RemoveConnection(conn.ID) - return nil, fmt.Errorf("dial timeout") + return nil, protocol.ErrDialTimeout } // Resend SYN (uses relay if relayActive) conn.Mu.Lock() @@ -1024,8 +1043,8 @@ func (d *Daemon) DialConnection(dstAddr protocol.Addr, dstPort uint16) (*Connect conn.Mu.Unlock() d.tunnels.Send(dstAddr.Node, syn) rto = rto * 2 // exponential backoff - if rto > 8*time.Second { - rto = 8 * time.Second + if rto > DialMaxRTO { + rto = DialMaxRTO } timer.Reset(rto) } @@ -1111,7 +1130,7 @@ func (d *Daemon) nagleFlush(conn *Connection) error { case <-time.After(NagleTimeout): // Timeout — flush regardless case <-conn.RetxStop: - return fmt.Errorf("connection closed") + return protocol.ErrConnClosed } // Re-check under lock after waking @@ -1156,7 +1175,7 @@ func (d *Daemon) sendDataImmediate(conn *Connection, data []byte) error { // sendSegment sends a single segment, waiting for the congestion window. // Implements zero-window probing when the peer's receive window is 0. func (d *Daemon) sendSegment(conn *Connection, data []byte) error { - probeInterval := 500 * time.Millisecond + probeInterval := ZeroWinProbeInitial // Wait for effective window to have space for { @@ -1170,9 +1189,9 @@ func (d *Daemon) sendSegment(conn *Connection, data []byte) error { // Window full — wait for ACK to open it, with zero-window probing select { case <-conn.WindowCh: - probeInterval = 500 * time.Millisecond + probeInterval = ZeroWinProbeInitial case <-conn.RetxStop: - return fmt.Errorf("connection closed") + return protocol.ErrConnClosed case <-time.After(probeInterval): // Send zero-window probe (empty ACK) to trigger window update conn.Mu.Lock() @@ -1194,8 +1213,8 @@ func (d *Daemon) sendSegment(conn *Connection, data []byte) error { d.tunnels.Send(conn.RemoteAddr.Node, probe) // Exponential backoff up to 30s probeInterval = probeInterval * 2 - if probeInterval > 30*time.Second { - probeInterval = 30 * time.Second + if probeInterval > ZeroWinProbeMax { + probeInterval = ZeroWinProbeMax } } } @@ -1243,7 +1262,7 @@ func (d *Daemon) sendSegment(conn *Connection, data []byte) error { // startRetxLoop starts the retransmission goroutine for a connection. func (d *Daemon) startRetxLoop(conn *Connection) { - conn.RTO = 1 * time.Second + conn.RTO = InitialRTO conn.RetxStop = make(chan struct{}) conn.RetxSend = func(pkt *protocol.Packet) { d.tunnels.Send(conn.RemoteAddr.Node, pkt) @@ -1252,7 +1271,7 @@ func (d *Daemon) startRetxLoop(conn *Connection) { } func (d *Daemon) retxLoop(conn *Connection) { - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(RetxCheckInterval) defer ticker.Stop() for { @@ -1300,7 +1319,7 @@ func (d *Daemon) retransmitUnacked(conn *Connection) { continue } if now.Sub(e.sentAt) > conn.RTO { - if e.attempts >= 8 { + if e.attempts >= MaxRetxAttempts { // Too many retransmissions — abandon connection slog.Error("max retransmits exceeded, sending RST", "conn_id", conn.ID) // Send RST to notify the remote peer @@ -1510,7 +1529,7 @@ func (d *Daemon) ensureTunnel(nodeID uint32) error { } func (d *Daemon) heartbeatLoop() { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(d.config.keepaliveInterval()) defer ticker.Stop() consecutiveFailures := 0 for { @@ -1527,7 +1546,7 @@ func (d *Daemon) heartbeatLoop() { // After 3 failures, try to re-register (the auto-reconnect in // the registry client will re-establish the TCP connection, but // after a registry restart we need to re-register our node) - if consecutiveFailures >= 3 { + if consecutiveFailures >= HeartbeatReregThresh { slog.Info("attempting re-registration") d.reRegister() consecutiveFailures = 0 diff --git a/pkg/daemon/handshake.go b/pkg/daemon/handshake.go index 220e5277..a0fe9f1f 100644 --- a/pkg/daemon/handshake.go +++ b/pkg/daemon/handshake.go @@ -53,10 +53,13 @@ type PendingHandshake struct { ReceivedAt time.Time } -// Handshake replay protection constants +// Handshake timing constants const ( - handshakeMaxAge = 5 * time.Minute - handshakeMaxFuture = 30 * time.Second + handshakeMaxAge = 5 * time.Minute // replay protection: max message age + handshakeMaxFuture = 30 * time.Second // replay protection: max clock skew + handshakeReapInterval = 5 * time.Minute // how often to reap stale replay entries + handshakeRecvTimeout = 10 * time.Second // time to wait for handshake message + handshakeCloseDelay = 500 * time.Millisecond // delay before closing after send to let data flush ) // HandshakeManager handles the trust handshake protocol on port 444. @@ -233,7 +236,7 @@ func (hm *HandshakeManager) Start() error { // Start periodic replay set reaper hm.reapStop = make(chan struct{}) go func() { - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(handshakeReapInterval) defer ticker.Stop() for { select { @@ -263,7 +266,7 @@ func (hm *HandshakeManager) handleConnection(conn *Connection) { return } hm.processMessage(conn, &msg) - case <-time.After(10 * time.Second): + case <-time.After(handshakeRecvTimeout): slog.Warn("handshake timeout waiting for message", "remote_addr", conn.RemoteAddr) } } @@ -838,7 +841,7 @@ func (hm *HandshakeManager) sendMessage(peerNodeID uint32, msg *HandshakeMsg) er // Close after brief delay to let the data flush hm.goRPC(func() { - time.Sleep(500 * time.Millisecond) + time.Sleep(handshakeCloseDelay) hm.daemon.CloseConnection(conn) }) diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index cb82c564..fd95b816 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -60,14 +60,14 @@ func (c *ipcConn) ipcWrite(data []byte) error { func (c *ipcConn) trackPort(port uint16) { c.rmu.Lock() + defer c.rmu.Unlock() c.ports = append(c.ports, port) - c.rmu.Unlock() } func (c *ipcConn) trackConn(connID uint32) { c.rmu.Lock() + defer c.rmu.Unlock() c.conns = append(c.conns, connID) - c.rmu.Unlock() } // IPCServer handles connections from local drivers over Unix socket. @@ -242,26 +242,7 @@ func (s *IPCServer) handleBind(conn *ipcConn, payload []byte) { return } - // Start pushing received data - go func(c *Connection) { - for data := range c.RecvBuf { - msg := make([]byte, 1+4+len(data)) - msg[0] = CmdRecv - binary.BigEndian.PutUint32(msg[1:5], c.ID) - copy(msg[5:], data) - if err := conn.ipcWrite(msg); err != nil { - slog.Debug("IPC recv push failed", "conn_id", c.ID, "err", err) - return - } - } - // RecvBuf closed — notify driver the connection is done - closeMsg := make([]byte, 5) - closeMsg[0] = CmdCloseOK - binary.BigEndian.PutUint32(closeMsg[1:5], c.ID) - if err := conn.ipcWrite(closeMsg); err != nil { - slog.Debug("IPC close notify failed", "conn_id", c.ID, "err", err) - } - }(c) + s.startRecvPusher(conn, c) } }() } @@ -292,26 +273,7 @@ func (s *IPCServer) handleDial(conn *ipcConn, payload []byte) { return } - // Start pushing received data - go func() { - for data := range c.RecvBuf { - msg := make([]byte, 1+4+len(data)) - msg[0] = CmdRecv - binary.BigEndian.PutUint32(msg[1:5], c.ID) - copy(msg[5:], data) - if err := conn.ipcWrite(msg); err != nil { - slog.Debug("IPC recv push failed", "conn_id", c.ID, "err", err) - return - } - } - // RecvBuf closed — notify driver the connection is done - closeMsg := make([]byte, 5) - closeMsg[0] = CmdCloseOK - binary.BigEndian.PutUint32(closeMsg[1:5], c.ID) - if err := conn.ipcWrite(closeMsg); err != nil { - slog.Debug("IPC close notify failed", "conn_id", c.ID, "err", err) - } - }() + s.startRecvPusher(conn, c) } func (s *IPCServer) handleSend(conn *ipcConn, payload []byte) { @@ -682,6 +644,29 @@ func (s *IPCServer) ipcWriteHandshakeOK(conn *ipcConn, data []byte) { } } +// startRecvPusher drains c.RecvBuf and pushes data to the IPC client. +// When RecvBuf closes (remote FIN), it sends CmdCloseOK to the driver. +func (s *IPCServer) startRecvPusher(conn *ipcConn, c *Connection) { + go func() { + for data := range c.RecvBuf { + msg := make([]byte, 1+4+len(data)) + msg[0] = CmdRecv + binary.BigEndian.PutUint32(msg[1:5], c.ID) + copy(msg[5:], data) + if err := conn.ipcWrite(msg); err != nil { + slog.Debug("IPC recv push failed", "conn_id", c.ID, "err", err) + return + } + } + closeMsg := make([]byte, 5) + closeMsg[0] = CmdCloseOK + binary.BigEndian.PutUint32(closeMsg[1:5], c.ID) + if err := conn.ipcWrite(closeMsg); err != nil { + slog.Debug("IPC close notify failed", "conn_id", c.ID, "err", err) + } + }() +} + func (s *IPCServer) sendError(conn *ipcConn, msg string) { resp := make([]byte, 1+2+len(msg)) resp[0] = CmdError diff --git a/pkg/daemon/ports.go b/pkg/daemon/ports.go index fd85183c..2905edd7 100644 --- a/pkg/daemon/ports.go +++ b/pkg/daemon/ports.go @@ -103,6 +103,16 @@ const ( RecvBufSize = 512 // receive buffer channel capacity (segments) MaxRecvWin = RecvBufSize * MaxSegmentSize // 2 MB max receive window MaxOOOBuf = 128 // max out-of-order segments buffered per connection + AcceptQueueLen = 64 // listener accept channel capacity + SendBufLen = 256 // send buffer channel capacity (segments) +) + +// RTO parameters (RFC 6298) +const ( + ClockGranularity = 10 * time.Millisecond // minimum RTTVAR for RTO calculation + RTOMin = 200 * time.Millisecond // minimum retransmission timeout + RTOMax = 10 * time.Second // maximum retransmission timeout + InitialRTO = 1 * time.Second // initial retransmission timeout ) type Connection struct { @@ -183,6 +193,29 @@ const ( StateTimeWait ) +func (s ConnState) String() string { + switch s { + case StateClosed: + return "CLOSED" + case StateListen: + return "LISTEN" + case StateSynSent: + return "SYN_SENT" + case StateSynReceived: + return "SYN_RECV" + case StateEstablished: + return "ESTABLISHED" + case StateFinWait: + return "FIN_WAIT" + case StateCloseWait: + return "CLOSE_WAIT" + case StateTimeWait: + return "TIME_WAIT" + default: + return "unknown" + } +} + func NewPortManager() *PortManager { return &PortManager{ listeners: make(map[uint16]*Listener), @@ -202,7 +235,7 @@ func (pm *PortManager) Bind(port uint16) (*Listener, error) { ln := &Listener{ Port: port, - AcceptCh: make(chan *Connection, 64), + AcceptCh: make(chan *Connection, AcceptQueueLen), } pm.listeners[port] = ln return ln, nil @@ -301,7 +334,7 @@ func (pm *PortManager) NewConnection(localPort uint16, remoteAddr protocol.Addr, RemotePort: remotePort, State: StateClosed, LastActivity: time.Now(), - SendBuf: make(chan []byte, 256), + SendBuf: make(chan []byte, SendBufLen), RecvBuf: make(chan []byte, RecvBufSize), CongWin: InitialCongWin, SSThresh: MaxCongWin / 2, @@ -382,32 +415,12 @@ func (pm *PortManager) ConnectionList() []ConnectionInfo { stats := c.Stats c.Mu.Unlock() - stateStr := "unknown" - switch st { - case StateClosed: - stateStr = "CLOSED" - case StateListen: - stateStr = "LISTEN" - case StateSynSent: - stateStr = "SYN_SENT" - case StateSynReceived: - stateStr = "SYN_RECV" - case StateEstablished: - stateStr = "ESTABLISHED" - case StateFinWait: - stateStr = "FIN_WAIT" - case StateCloseWait: - stateStr = "CLOSE_WAIT" - case StateTimeWait: - stateStr = "TIME_WAIT" - } - list = append(list, ConnectionInfo{ ID: c.ID, LocalPort: c.LocalPort, RemoteAddr: c.RemoteAddr.String(), RemotePort: c.RemotePort, - State: stateStr, + State: st.String(), SendSeq: sendSeq, RecvAck: recvAck, CongWin: congWin, @@ -701,16 +714,16 @@ func (c *Connection) updateRTT(rtt time.Duration) { } // RTO = SRTT + max(G, K·RTTVAR) where K=4, G=clock granularity kvar := c.RTTVAR * 4 - if kvar < 10*time.Millisecond { - kvar = 10 * time.Millisecond // clock granularity floor + if kvar < ClockGranularity { + kvar = ClockGranularity } c.RTO = c.SRTT + kvar // Clamp RTO - if c.RTO < 200*time.Millisecond { - c.RTO = 200 * time.Millisecond + if c.RTO < RTOMin { + c.RTO = RTOMin } - if c.RTO > 10*time.Second { - c.RTO = 10 * time.Second + if c.RTO > RTOMax { + c.RTO = RTOMax } } diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index 2b564ae9..38d4a430 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -136,6 +136,9 @@ const maxPendingPerPeer = 64 // maxPendingPeers limits the total number of peers with pending key exchanges. const maxPendingPeers = 256 +// RecvChSize is the capacity of the incoming packet channel. +const RecvChSize = 1024 + func NewTunnelManager() *TunnelManager { return &TunnelManager{ peers: make(map[uint32]*net.UDPAddr), @@ -143,7 +146,7 @@ func NewTunnelManager() *TunnelManager { peerPubKeys: make(map[uint32]ed25519.PublicKey), pending: make(map[uint32][][]byte), relayPeers: make(map[uint32]bool), - recvCh: make(chan *IncomingPacket, 1024), + recvCh: make(chan *IncomingPacket, RecvChSize), done: make(chan struct{}), } } @@ -232,7 +235,7 @@ func (tm *TunnelManager) RegisterWithBeacon() { return } msg := make([]byte, 5) - msg[0] = 0x01 // MsgDiscover + msg[0] = protocol.BeaconMsgDiscover binary.BigEndian.PutUint32(msg[1:5], tm.loadNodeID()) if _, err := tm.conn.WriteToUDP(msg, bAddr); err != nil { slog.Warn("beacon registration failed", "error", err) @@ -251,7 +254,7 @@ func (tm *TunnelManager) RequestHolePunch(targetNodeID uint32) { } // Format: [MsgPunchRequest(1)][ourNodeID(4)][targetNodeID(4)] msg := make([]byte, 9) - msg[0] = 0x03 // MsgPunchRequest + msg[0] = protocol.BeaconMsgPunchRequest binary.BigEndian.PutUint32(msg[1:5], tm.loadNodeID()) binary.BigEndian.PutUint32(msg[5:9], targetNodeID) if _, err := tm.conn.WriteToUDP(msg, bAddr); err != nil { @@ -271,7 +274,7 @@ func (tm *TunnelManager) writeFrame(nodeID uint32, addr *net.UDPAddr, frame []by if relay && bAddr != nil { // MsgRelay: [0x05][senderNodeID(4)][destNodeID(4)][frame...] msg := make([]byte, 1+4+4+len(frame)) - msg[0] = 0x05 // MsgRelay + msg[0] = protocol.BeaconMsgRelay binary.BigEndian.PutUint32(msg[1:5], tm.loadNodeID()) binary.BigEndian.PutUint32(msg[5:9], nodeID) copy(msg[9:], frame) @@ -974,11 +977,11 @@ func (tm *TunnelManager) handleBeaconMessage(data []byte, from *net.UDPAddr) { return } switch data[0] { - case 0x02: // MsgDiscoverReply + case protocol.BeaconMsgDiscoverReply: slog.Debug("beacon discover reply on tunnel socket", "from", from) - case 0x04: // MsgPunchCommand + case protocol.BeaconMsgPunchCommand: tm.handlePunchCommand(data[1:]) - case 0x06: // MsgRelayDeliver + case protocol.BeaconMsgRelayDeliver: tm.handleRelayDeliver(data[1:]) default: slog.Debug("unknown beacon message on tunnel socket", "type", data[0], "from", from) @@ -1086,7 +1089,7 @@ func DiscoverEndpoint(beaconAddr string, nodeID uint32, conn *net.UDPConn) (*net // Send discover message msg := make([]byte, 5) - msg[0] = 0x01 // MsgDiscover + msg[0] = protocol.BeaconMsgDiscover binary.BigEndian.PutUint32(msg[1:5], nodeID) if _, err := conn.WriteToUDP(msg, bAddr); err != nil { @@ -1103,7 +1106,7 @@ func DiscoverEndpoint(beaconAddr string, nodeID uint32, conn *net.UDPConn) (*net } // Format: [type(1)][iplen(1)][IP(4 or 16)][port(2)] - if n < 4 || buf[0] != 0x02 { + if n < 4 || buf[0] != protocol.BeaconMsgDiscoverReply { return nil, fmt.Errorf("invalid discover reply") } ipLen := int(buf[1]) diff --git a/pkg/driver/conn.go b/pkg/driver/conn.go index 5250cb15..60250f99 100644 --- a/pkg/driver/conn.go +++ b/pkg/driver/conn.go @@ -2,7 +2,6 @@ package driver import ( "encoding/binary" - "fmt" "io" "net" "os" @@ -75,7 +74,7 @@ func (c *Conn) Write(b []byte) (int, error) { c.mu.Lock() if c.closed { c.mu.Unlock() - return 0, fmt.Errorf("connection closed") + return 0, protocol.ErrConnClosed } c.mu.Unlock() diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 65524e9b..5c4e3104 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -10,6 +10,30 @@ import ( const DefaultSocketPath = "/tmp/pilot.sock" +// Handshake sub-commands (must match daemon SubHandshake* constants) +const ( + subHandshakeSend byte = 0x01 + subHandshakeApprove byte = 0x02 + subHandshakeReject byte = 0x03 + subHandshakePending byte = 0x04 + subHandshakeTrusted byte = 0x05 + subHandshakeRevoke byte = 0x06 +) + +// jsonRPC sends an IPC message, waits for the expected response, and +// unmarshals the JSON payload. Most driver methods follow this pattern. +func (d *Driver) jsonRPC(msg []byte, expectCmd byte, label string) (map[string]interface{}, error) { + resp, err := d.ipc.sendAndWait(msg, expectCmd) + if err != nil { + return nil, fmt.Errorf("%s: %w", label, err) + } + var result map[string]interface{} + if err := json.Unmarshal(resp, &result); err != nil { + return nil, fmt.Errorf("%s unmarshal: %w", label, err) + } + return result, nil +} + // Driver is the main entry point for the Pilot Protocol SDK. type Driver struct { ipc *ipcClient @@ -115,126 +139,55 @@ func (d *Driver) RecvFrom() (*Datagram, error) { // Info returns the daemon's status information. func (d *Driver) Info() (map[string]interface{}, error) { - msg := []byte{cmdInfo} - resp, err := d.ipc.sendAndWait(msg, cmdInfoOK) - if err != nil { - return nil, fmt.Errorf("info: %w", err) - } - var info map[string]interface{} - if err := json.Unmarshal(resp, &info); err != nil { - return nil, fmt.Errorf("info unmarshal: %w", err) - } - return info, nil + return d.jsonRPC([]byte{cmdInfo}, cmdInfoOK, "info") } // Handshake sends a trust handshake request to a remote node. func (d *Driver) Handshake(nodeID uint32, justification string) (map[string]interface{}, error) { - payload := make([]byte, 1+4+len(justification)) - payload[0] = 0x01 // SendRequest sub-command - binary.BigEndian.PutUint32(payload[1:5], nodeID) - copy(payload[5:], justification) - - msg := make([]byte, 1+len(payload)) + msg := make([]byte, 1+1+4+len(justification)) msg[0] = cmdHandshake - copy(msg[1:], payload) - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("handshake: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("handshake unmarshal: %w", err) - } - return result, nil + msg[1] = subHandshakeSend + binary.BigEndian.PutUint32(msg[2:6], nodeID) + copy(msg[6:], justification) + return d.jsonRPC(msg, cmdHandshakeOK, "handshake") } // ApproveHandshake approves a pending trust handshake request. func (d *Driver) ApproveHandshake(nodeID uint32) (map[string]interface{}, error) { - msg := make([]byte, 1+1+4) + msg := make([]byte, 6) msg[0] = cmdHandshake - msg[1] = 0x02 // Approve sub-command + msg[1] = subHandshakeApprove binary.BigEndian.PutUint32(msg[2:6], nodeID) - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("approve: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("approve unmarshal: %w", err) - } - return result, nil + return d.jsonRPC(msg, cmdHandshakeOK, "approve") } // RejectHandshake rejects a pending trust handshake request. func (d *Driver) RejectHandshake(nodeID uint32, reason string) (map[string]interface{}, error) { - payload := make([]byte, 1+4+len(reason)) - payload[0] = 0x03 // Reject sub-command - binary.BigEndian.PutUint32(payload[1:5], nodeID) - copy(payload[5:], reason) - - msg := make([]byte, 1+len(payload)) + msg := make([]byte, 1+1+4+len(reason)) msg[0] = cmdHandshake - copy(msg[1:], payload) - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("reject: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("reject unmarshal: %w", err) - } - return result, nil + msg[1] = subHandshakeReject + binary.BigEndian.PutUint32(msg[2:6], nodeID) + copy(msg[6:], reason) + return d.jsonRPC(msg, cmdHandshakeOK, "reject") } // PendingHandshakes returns pending trust handshake requests. func (d *Driver) PendingHandshakes() (map[string]interface{}, error) { - msg := []byte{cmdHandshake, 0x04} - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("pending: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("pending unmarshal: %w", err) - } - return result, nil + return d.jsonRPC([]byte{cmdHandshake, subHandshakePending}, cmdHandshakeOK, "pending") } // TrustedPeers returns all trusted peers from the handshake protocol. func (d *Driver) TrustedPeers() (map[string]interface{}, error) { - msg := []byte{cmdHandshake, 0x05} - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("trusted: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("trusted unmarshal: %w", err) - } - return result, nil + return d.jsonRPC([]byte{cmdHandshake, subHandshakeTrusted}, cmdHandshakeOK, "trusted") } // RevokeTrust removes a peer from the trusted set and notifies the registry. func (d *Driver) RevokeTrust(nodeID uint32) (map[string]interface{}, error) { msg := make([]byte, 6) msg[0] = cmdHandshake - msg[1] = 0x06 // SubHandshakeRevoke + msg[1] = subHandshakeRevoke binary.BigEndian.PutUint32(msg[2:6], nodeID) - - resp, err := d.ipc.sendAndWait(msg, cmdHandshakeOK) - if err != nil { - return nil, fmt.Errorf("revoke: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("revoke unmarshal: %w", err) - } - return result, nil + return d.jsonRPC(msg, cmdHandshakeOK, "revoke") } // ResolveHostname resolves a hostname to node info via the daemon. @@ -242,16 +195,7 @@ func (d *Driver) ResolveHostname(hostname string) (map[string]interface{}, error msg := make([]byte, 1+len(hostname)) msg[0] = cmdResolveHostname copy(msg[1:], hostname) - - resp, err := d.ipc.sendAndWait(msg, cmdResolveHostnameOK) - if err != nil { - return nil, fmt.Errorf("resolve_hostname: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("resolve_hostname unmarshal: %w", err) - } - return result, nil + return d.jsonRPC(msg, cmdResolveHostnameOK, "resolve_hostname") } // SetHostname sets or clears the daemon's hostname via the registry. @@ -259,16 +203,7 @@ func (d *Driver) SetHostname(hostname string) (map[string]interface{}, error) { msg := make([]byte, 1+len(hostname)) msg[0] = cmdSetHostname copy(msg[1:], hostname) - - resp, err := d.ipc.sendAndWait(msg, cmdSetHostnameOK) - if err != nil { - return nil, fmt.Errorf("set_hostname: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("set_hostname unmarshal: %w", err) - } - return result, nil + return d.jsonRPC(msg, cmdSetHostnameOK, "set_hostname") } // SetVisibility sets the daemon's visibility on the registry. @@ -278,31 +213,12 @@ func (d *Driver) SetVisibility(public bool) (map[string]interface{}, error) { if public { msg[1] = 1 } - - resp, err := d.ipc.sendAndWait(msg, cmdSetVisibilityOK) - if err != nil { - return nil, fmt.Errorf("set_visibility: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("set_visibility unmarshal: %w", err) - } - return result, nil + return d.jsonRPC(msg, cmdSetVisibilityOK, "set_visibility") } // Deregister removes the daemon from the registry. func (d *Driver) Deregister() (map[string]interface{}, error) { - msg := []byte{cmdDeregister} - - resp, err := d.ipc.sendAndWait(msg, cmdDeregisterOK) - if err != nil { - return nil, fmt.Errorf("deregister: %w", err) - } - var result map[string]interface{} - if err := json.Unmarshal(resp, &result); err != nil { - return nil, fmt.Errorf("deregister unmarshal: %w", err) - } - return result, nil + return d.jsonRPC([]byte{cmdDeregister}, cmdDeregisterOK, "deregister") } // Disconnect closes a connection by ID. Used by administrative tools. diff --git a/pkg/driver/ipc.go b/pkg/driver/ipc.go index b804dd96..d53adad2 100644 --- a/pkg/driver/ipc.go +++ b/pkg/driver/ipc.go @@ -254,18 +254,18 @@ func (c *ipcClient) removeHandler(cmd byte, ch chan []byte) { func (c *ipcClient) registerAcceptCh(port uint16) chan []byte { ch := make(chan []byte, 64) c.acceptMu.Lock() + defer c.acceptMu.Unlock() c.acceptChs[port] = ch - c.acceptMu.Unlock() return ch } func (c *ipcClient) unregisterAcceptCh(port uint16) { c.acceptMu.Lock() + defer c.acceptMu.Unlock() if ch, ok := c.acceptChs[port]; ok { close(ch) delete(c.acceptChs, port) } - c.acceptMu.Unlock() } func (c *ipcClient) registerRecvCh(connID uint32) chan []byte { @@ -284,7 +284,7 @@ func (c *ipcClient) registerRecvCh(connID uint32) chan []byte { func (c *ipcClient) unregisterRecvCh(connID uint32) { c.recvMu.Lock() + defer c.recvMu.Unlock() delete(c.recvChs, connID) - c.recvMu.Unlock() } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 6bd03976..e68ca792 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -257,12 +257,16 @@ func (gw *Gateway) bridgeConnection(tcpConn net.Conn, pilotAddr protocol.Addr, p // to unblock the other goroutine and prevent leaks done := make(chan struct{}, 2) go func() { - io.Copy(pilotConn, tcpConn) + if _, err := io.Copy(pilotConn, tcpConn); err != nil { + slog.Debug("gateway copy tcp→pilot ended", "error", err) + } pilotConn.Close() done <- struct{}{} }() go func() { - io.Copy(tcpConn, pilotConn) + if _, err := io.Copy(tcpConn, pilotConn); err != nil { + slog.Debug("gateway copy pilot→tcp ended", "error", err) + } tcpConn.Close() done <- struct{}{} }() diff --git a/pkg/nameserver/server.go b/pkg/nameserver/server.go index bc2ad666..4684a07b 100644 --- a/pkg/nameserver/server.go +++ b/pkg/nameserver/server.go @@ -81,12 +81,12 @@ func (s *Server) handleConn(conn net.Conn) { line := string(buf[:n]) req, err := ParseRequest(line) if err != nil { - conn.Write([]byte(FormatResponseErr(err.Error()))) + _, _ = conn.Write([]byte(FormatResponseErr(err.Error()))) return } resp := s.handleRequest(req, conn.RemoteAddr()) - conn.Write([]byte(resp)) + _, _ = conn.Write([]byte(resp)) } func (s *Server) handleRequest(req Request, remoteAddr net.Addr) string { diff --git a/pkg/protocol/header.go b/pkg/protocol/header.go index 0a8d0fa7..ae066af4 100644 --- a/pkg/protocol/header.go +++ b/pkg/protocol/header.go @@ -1,8 +1,20 @@ package protocol +import "errors" + // Protocol version const Version uint8 = 1 +// Sentinel errors shared across packages. +var ( + ErrNodeNotFound = errors.New("node not found") + ErrNetworkNotFound = errors.New("network not found") + ErrConnClosed = errors.New("connection closed") + ErrConnRefused = errors.New("connection refused") + ErrDialTimeout = errors.New("dial timeout") + ErrChecksumMismatch = errors.New("checksum mismatch") +) + // Flags (4 bits, stored in lower nibble of first byte alongside version) const ( FlagSYN uint8 = 0x1 @@ -56,3 +68,13 @@ var TunnelMagicPunch = [4]byte{0x50, 0x49, 0x4C, 0x50} // Well-known port for handshake requests const PortHandshake uint16 = 444 + +// Beacon message types (single-byte codes, all < 0x10 to avoid collision with tunnel magic) +const ( + BeaconMsgDiscover byte = 0x01 + BeaconMsgDiscoverReply byte = 0x02 + BeaconMsgPunchRequest byte = 0x03 + BeaconMsgPunchCommand byte = 0x04 + BeaconMsgRelay byte = 0x05 + BeaconMsgRelayDeliver byte = 0x06 +) diff --git a/pkg/protocol/packet.go b/pkg/protocol/packet.go index b7fa5542..acc52269 100644 --- a/pkg/protocol/packet.go +++ b/pkg/protocol/packet.go @@ -2,7 +2,6 @@ package protocol import ( "encoding/binary" - "errors" "fmt" ) @@ -49,7 +48,8 @@ func (p *Packet) Marshal() ([]byte, error) { return nil, fmt.Errorf("payload too large: %d bytes (max 65535)", payloadLen) } - buf := make([]byte, packetHeaderSize+payloadLen) + totalLen := packetHeaderSize + payloadLen // safe: payloadLen ≤ 0xFFFF (checked above) + buf := make([]byte, totalLen) buf[0] = (p.Version << 4) | (p.Flags & 0x0F) buf[1] = p.Protocol @@ -92,7 +92,7 @@ func Unmarshal(data []byte) (*Packet, error) { binary.BigEndian.PutUint32(data[30:34], wireChecksum) // restore if computed != wireChecksum { - return nil, errors.New("checksum mismatch") + return nil, ErrChecksumMismatch } p := &Packet{ diff --git a/pkg/registry/client.go b/pkg/registry/client.go index 04bd681c..c9df364a 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -63,7 +63,11 @@ func DialTLS(addr string, tlsConfig *tls.Config) (*Client, error) { // The fingerprint is a hex-encoded SHA-256 hash of the server's DER-encoded certificate. func DialTLSPinned(addr, fingerprint string) (*Client, error) { tlsConfig := &tls.Config{ - InsecureSkipVerify: true, + // InsecureSkipVerify disables the default CA chain check so we can + // use VerifyPeerCertificate for certificate pinning (SHA-256 fingerprint). + // This is the standard Go pattern — the custom callback below provides + // strictly stronger verification than CA-based trust. + InsecureSkipVerify: true, //nolint:gosec // cert pinning via VerifyPeerCertificate VerifyPeerCertificate: func(rawCerts [][]byte, _ [][]*x509.Certificate) error { if len(rawCerts) == 0 { return fmt.Errorf("no certificate presented") diff --git a/pkg/registry/dashboard.go b/pkg/registry/dashboard.go index ecd1324d..ba26cda6 100644 --- a/pkg/registry/dashboard.go +++ b/pkg/registry/dashboard.go @@ -16,14 +16,14 @@ func (s *Server) ServeDashboard(addr string) error { return } w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.Write([]byte(dashboardHTML)) + _, _ = w.Write([]byte(dashboardHTML)) }) mux.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") stats := s.GetDashboardStats() - json.NewEncoder(w).Encode(stats) + _ = json.NewEncoder(w).Encode(stats) }) slog.Info("dashboard listening", "addr", addr) diff --git a/pkg/registry/replication.go b/pkg/registry/replication.go index 5c186ab8..6a971a94 100644 --- a/pkg/registry/replication.go +++ b/pkg/registry/replication.go @@ -36,22 +36,18 @@ func newReplicationManager() *replicationManager { func (rm *replicationManager) addSub(conn net.Conn) { rm.mu.Lock() rm.subs[conn] = &connWriter{conn: conn} + total := len(rm.subs) rm.mu.Unlock() - slog.Info("replication subscriber added", "remote", conn.RemoteAddr(), "total", rm.count()) + slog.Info("replication subscriber added", "remote", conn.RemoteAddr(), "total", total) } // removeSub removes a disconnected subscriber. func (rm *replicationManager) removeSub(conn net.Conn) { rm.mu.Lock() delete(rm.subs, conn) + total := len(rm.subs) rm.mu.Unlock() - slog.Info("replication subscriber removed", "remote", conn.RemoteAddr(), "total", rm.count()) -} - -func (rm *replicationManager) count() int { - rm.mu.Lock() - defer rm.mu.Unlock() - return len(rm.subs) + slog.Info("replication subscriber removed", "remote", conn.RemoteAddr(), "total", total) } // push sends a snapshot to all subscribers. Failed subscribers are removed. diff --git a/pkg/registry/server.go b/pkg/registry/server.go index ebdca6eb..e3e44650 100644 --- a/pkg/registry/server.go +++ b/pkg/registry/server.go @@ -759,7 +759,7 @@ func (s *Server) handleRotateKey(msg map[string]interface{}) (map[string]interfa node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // Verify signature: message = "rotate:" @@ -1045,7 +1045,7 @@ func (s *Server) handleJoinNetwork(msg map[string]interface{}) (map[string]inter network, ok := s.networks[netID] if !ok { - return nil, fmt.Errorf("network %d not found", netID) + return nil, fmt.Errorf("network %d: %w", netID, protocol.ErrNetworkNotFound) } // Check join rules @@ -1119,7 +1119,7 @@ func (s *Server) handleLeaveNetwork(msg map[string]interface{}) (map[string]inte network, ok := s.networks[netID] if !ok { - return nil, fmt.Errorf("network %d not found", netID) + return nil, fmt.Errorf("network %d: %w", netID, protocol.ErrNetworkNotFound) } // Remove network from node's list @@ -1160,7 +1160,7 @@ func (s *Server) handleLookup(msg map[string]interface{}) (map[string]interface{ node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } resp := map[string]interface{}{ @@ -1226,7 +1226,7 @@ func (s *Server) handleResolve(msg map[string]interface{}) (map[string]interface node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // Public nodes: endpoint always available @@ -1280,10 +1280,10 @@ func (s *Server) handleReportTrust(msg map[string]interface{}) (map[string]inter // Both nodes must exist nodeAInfo, ok := s.nodes[nodeA] if !ok { - return nil, fmt.Errorf("node %d not found", nodeA) + return nil, fmt.Errorf("node %d: %w", nodeA, protocol.ErrNodeNotFound) } if _, ok := s.nodes[nodeB]; !ok { - return nil, fmt.Errorf("node %d not found", nodeB) + return nil, fmt.Errorf("node %d: %w", nodeB, protocol.ErrNodeNotFound) } // H3 fix: verify signature @@ -1312,7 +1312,7 @@ func (s *Server) handleRevokeTrust(msg map[string]interface{}) (map[string]inter // H3 fix: verify signature — node must exist (prevents auth bypass on missing node) nodeAInfo, ok := s.nodes[nodeA] if !ok { - return nil, fmt.Errorf("node %d not found", nodeA) + return nil, fmt.Errorf("node %d: %w", nodeA, protocol.ErrNodeNotFound) } if err := s.verifyNodeSignature(nodeAInfo, msg, fmt.Sprintf("revoke_trust:%d:%d", nodeA, nodeB)); err != nil { return nil, err @@ -1342,7 +1342,7 @@ func (s *Server) handleSetVisibility(msg map[string]interface{}) (map[string]int node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // H3 fix: verify signature @@ -1380,10 +1380,10 @@ func (s *Server) handleRequestHandshake(msg map[string]interface{}) (map[string] // Both nodes must exist fromNode, ok := s.nodes[fromNodeID] if !ok { - return nil, fmt.Errorf("node %d not found", fromNodeID) + return nil, fmt.Errorf("node %d: %w", fromNodeID, protocol.ErrNodeNotFound) } if _, ok := s.nodes[toNodeID]; !ok { - return nil, fmt.Errorf("node %d not found", toNodeID) + return nil, fmt.Errorf("node %d: %w", toNodeID, protocol.ErrNodeNotFound) } // M12 fix: verify sender signature if node has a public key @@ -1437,7 +1437,7 @@ func (s *Server) handlePollHandshakes(msg map[string]interface{}) (map[string]in node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // H3 fix: verify signature to prevent unauthorized inbox access @@ -1489,10 +1489,10 @@ func (s *Server) handleRespondHandshake(msg map[string]interface{}) (map[string] respNode, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } if _, ok := s.nodes[peerID]; !ok { - return nil, fmt.Errorf("node %d not found", peerID) + return nil, fmt.Errorf("node %d: %w", peerID, protocol.ErrNodeNotFound) } // M12 fix: verify responder signature if node has a public key @@ -1547,7 +1547,7 @@ func (s *Server) handleSetHostname(msg map[string]interface{}) (map[string]inter node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // H3 fix: verify signature @@ -1644,7 +1644,7 @@ func (s *Server) handleListNodes(msg map[string]interface{}) (map[string]interfa network, ok := s.networks[netID] if !ok { - return nil, fmt.Errorf("network %d not found", netID) + return nil, fmt.Errorf("network %d: %w", netID, protocol.ErrNetworkNotFound) } nodes := make([]map[string]interface{}, 0) @@ -1720,7 +1720,7 @@ func (s *Server) handleHeartbeat(msg map[string]interface{}) (map[string]interfa node, ok := s.nodes[nodeID] if !ok { - return nil, fmt.Errorf("node %d not found", nodeID) + return nil, fmt.Errorf("node %d: %w", nodeID, protocol.ErrNodeNotFound) } // H3 fix: verify signature @@ -1746,7 +1746,7 @@ func (s *Server) handlePunch(msg map[string]interface{}) (map[string]interface{} requester, ok := s.nodes[requesterID] if !ok { - return nil, fmt.Errorf("node %d not found", requesterID) + return nil, fmt.Errorf("node %d: %w", requesterID, protocol.ErrNodeNotFound) } // H3 fix: verify requester signature and ensure requester is a participant @@ -1760,10 +1760,10 @@ func (s *Server) handlePunch(msg map[string]interface{}) (map[string]interface{} a, okA := s.nodes[nodeA] b, okB := s.nodes[nodeB] if !okA { - return nil, fmt.Errorf("node %d not found", nodeA) + return nil, fmt.Errorf("node %d: %w", nodeA, protocol.ErrNodeNotFound) } if !okB { - return nil, fmt.Errorf("node %d not found", nodeB) + return nil, fmt.Errorf("node %d: %w", nodeB, protocol.ErrNodeNotFound) } // Return both endpoints so the caller (daemon) can attempt direct connection diff --git a/tests/nat_traversal_test.go b/tests/nat_traversal_test.go index a990636e..c04843e5 100644 --- a/tests/nat_traversal_test.go +++ b/tests/nat_traversal_test.go @@ -52,7 +52,7 @@ func TestBeaconPunchRequest(t *testing.T) { // Node A discovers discoverA := make([]byte, 5) - discoverA[0] = beacon.MsgDiscover + discoverA[0] = protocol.BeaconMsgDiscover binary.BigEndian.PutUint32(discoverA[1:], nodeA) connA.WriteToUDP(discoverA, beaconAddr) @@ -63,14 +63,14 @@ func TestBeaconPunchRequest(t *testing.T) { if err != nil { t.Fatalf("node A discover reply: %v", err) } - if n < 4 || buf[0] != beacon.MsgDiscoverReply { + if n < 4 || buf[0] != protocol.BeaconMsgDiscoverReply { t.Fatalf("unexpected reply type: 0x%02x", buf[0]) } t.Logf("node A registered with beacon") // Node B discovers discoverB := make([]byte, 5) - discoverB[0] = beacon.MsgDiscover + discoverB[0] = protocol.BeaconMsgDiscover binary.BigEndian.PutUint32(discoverB[1:], nodeB) connB.WriteToUDP(discoverB, beaconAddr) @@ -79,14 +79,14 @@ func TestBeaconPunchRequest(t *testing.T) { if err != nil { t.Fatalf("node B discover reply: %v", err) } - if n < 4 || buf[0] != beacon.MsgDiscoverReply { + if n < 4 || buf[0] != protocol.BeaconMsgDiscoverReply { t.Fatalf("unexpected reply type: 0x%02x", buf[0]) } t.Logf("node B registered with beacon") // Node A sends MsgPunchRequest for node B punch := make([]byte, 9) - punch[0] = beacon.MsgPunchRequest + punch[0] = protocol.BeaconMsgPunchRequest binary.BigEndian.PutUint32(punch[1:], nodeA) binary.BigEndian.PutUint32(punch[5:], nodeB) connA.WriteToUDP(punch, beaconAddr) @@ -97,7 +97,7 @@ func TestBeaconPunchRequest(t *testing.T) { if err != nil { t.Fatalf("node A punch command: %v", err) } - if buf[0] != beacon.MsgPunchCommand { + if buf[0] != protocol.BeaconMsgPunchCommand { t.Fatalf("expected MsgPunchCommand (0x04), got 0x%02x", buf[0]) } // Parse punch target — should be node B's address @@ -110,7 +110,7 @@ func TestBeaconPunchRequest(t *testing.T) { if err != nil { t.Fatalf("node B punch command: %v", err) } - if buf[0] != beacon.MsgPunchCommand { + if buf[0] != protocol.BeaconMsgPunchCommand { t.Fatalf("expected MsgPunchCommand (0x04), got 0x%02x", buf[0]) } ipLen = int(buf[1]) @@ -394,7 +394,7 @@ func TestBeaconRelayDeliver(t *testing.T) { nodeID uint32 }{{connA, nodeA}, {connB, nodeB}} { msg := make([]byte, 5) - msg[0] = beacon.MsgDiscover + msg[0] = protocol.BeaconMsgDiscover binary.BigEndian.PutUint32(msg[1:], pair.nodeID) pair.conn.WriteToUDP(msg, beaconAddr) @@ -410,7 +410,7 @@ func TestBeaconRelayDeliver(t *testing.T) { // MsgRelay format: [0x05][senderNodeID(4)][destNodeID(4)][payload...] payload := []byte("test relay payload") relay := make([]byte, 1+4+4+len(payload)) - relay[0] = beacon.MsgRelay + relay[0] = protocol.BeaconMsgRelay binary.BigEndian.PutUint32(relay[1:5], nodeA) binary.BigEndian.PutUint32(relay[5:9], nodeB) copy(relay[9:], payload) @@ -424,7 +424,7 @@ func TestBeaconRelayDeliver(t *testing.T) { t.Fatalf("relay deliver: %v", err) } - if buf[0] != beacon.MsgRelayDeliver { + if buf[0] != protocol.BeaconMsgRelayDeliver { t.Fatalf("expected MsgRelayDeliver (0x06), got 0x%02x", buf[0]) } From e63eb8006519165ce76c99eb64d6939251cd9c70 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Mon, 9 Feb 2026 18:06:43 +0200 Subject: [PATCH 02/19] Preserve trust pairs and handshake state through node lifecycle Trust pairs and handshake inboxes were being destroyed when nodes disconnected (reap) or deregistered, causing permanent loss of identity-to-identity trust relationships. This fixes four issues: - cleanupNode no longer deletes trust pairs or handshake inboxes - reapStaleNodes preserves ownerIdx for re-registration reclaim - snapshotJSON includes trust pairs and handshake data for replication - reRegister re-syncs local trust pairs to registry after reconnect --- pkg/daemon/daemon.go | 14 ++++++++++++++ pkg/registry/replication.go | 19 +++++++++++++++++++ pkg/registry/server.go | 25 ++++++------------------- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index feb3a808..b253164b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -1630,6 +1630,20 @@ func (d *Daemon) reRegister() { } } + // Re-sync local trust pairs to registry (trust survives disconnection locally + // but the registry may have lost and re-loaded state) + if d.handshakes != nil { + peers := d.handshakes.TrustedPeers() + for _, rec := range peers { + if _, err := d.regConn.ReportTrust(nodeID, rec.NodeID); err != nil { + slog.Debug("re-registration: failed to re-sync trust pair", "peer", rec.NodeID, "error", err) + } + } + if len(peers) > 0 { + slog.Info("re-synced trust pairs", "count", len(peers)) + } + } + // Re-register with beacon for NAT traversal if d.config.BeaconAddr != "" { d.tunnels.RegisterWithBeacon() diff --git a/pkg/registry/replication.go b/pkg/registry/replication.go index 6a971a94..ffb250ab 100644 --- a/pkg/registry/replication.go +++ b/pkg/registry/replication.go @@ -211,6 +211,25 @@ func (s *Server) snapshotJSON() []byte { } } + // Include trust pairs + for key := range s.trustPairs { + snap.TrustPairs = append(snap.TrustPairs, key) + } + + // Include handshake inboxes + if len(s.handshakeInbox) > 0 { + snap.HandshakeInbox = make(map[string][]*HandshakeRelayMsg, len(s.handshakeInbox)) + for nodeID, msgs := range s.handshakeInbox { + snap.HandshakeInbox[fmt.Sprintf("%d", nodeID)] = msgs + } + } + if len(s.handshakeResponses) > 0 { + snap.HandshakeResponses = make(map[string][]*HandshakeResponseMsg, len(s.handshakeResponses)) + for nodeID, msgs := range s.handshakeResponses { + snap.HandshakeResponses[fmt.Sprintf("%d", nodeID)] = msgs + } + } + data, err := json.Marshal(snap) if err != nil { slog.Error("snapshot marshal error", "err", err) diff --git a/pkg/registry/server.go b/pkg/registry/server.go index e3e44650..56ec0a25 100644 --- a/pkg/registry/server.go +++ b/pkg/registry/server.go @@ -507,10 +507,7 @@ func (s *Server) reapStaleNodes() { } } } - // Keep pubKeyIdx entry so re-registration can reclaim the node_id - if node.Owner != "" { - delete(s.ownerIdx, node.Owner) - } + // Keep pubKeyIdx and ownerIdx entries so re-registration can reclaim the node_id if node.Hostname != "" { delete(s.hostnameIdx, node.Hostname) } @@ -1188,22 +1185,12 @@ func trustPairKey(a, b uint32) string { return fmt.Sprintf("%d:%d", a, b) } -// cleanupNode removes all trust pairs, handshake inboxes, and response -// queues associated with a departed node. Caller must hold s.mu. +// cleanupNode removes transient state for a departed node. Caller must hold s.mu. +// Trust pairs and handshake inboxes are preserved — trust is identity-to-identity +// and must survive disconnections. Only explicit revoke_trust removes trust pairs. func (s *Server) cleanupNode(nodeID uint32) { - // Remove all trust pairs involving this node - for key := range s.trustPairs { - // Trust pair key format is "min:max" - var a, b uint32 - if _, err := fmt.Sscanf(key, "%d:%d", &a, &b); err == nil { - if a == nodeID || b == nodeID { - delete(s.trustPairs, key) - } - } - } - // Remove handshake inboxes - delete(s.handshakeInbox, nodeID) - delete(s.handshakeResponses, nodeID) + // Trust pairs: intentionally preserved (identity-level, survive disconnect) + // Handshake inboxes/responses: intentionally preserved (node may reconnect) } func (s *Server) handleResolve(msg map[string]interface{}) (map[string]interface{}, error) { From e1f8a5b5fdf2864cdd72a8b0a06c7022615e20fb Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Mon, 9 Feb 2026 22:03:35 +0200 Subject: [PATCH 03/19] Update install script to detect existing installs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When pilotctl is already present in ~/.pilot/bin, the script now skips config, service, and PATH setup — only replaces binaries and exits with a restart hint. Also removes broken platform-suffix renames from the download path. --- install.sh | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/install.sh b/install.sh index 725f0de6..246977d4 100755 --- a/install.sh +++ b/install.sh @@ -90,6 +90,17 @@ echo " Registry: ${REGISTRY}" echo " Beacon: ${BEACON}" echo "" +# --- Detect existing installation --- + +UPDATING=false +if [ -x "$BIN_DIR/pilotctl" ]; then + UPDATING=true + CURRENT=$("$BIN_DIR/pilotctl" version 2>/dev/null || echo "unknown") + echo " Existing install detected (${CURRENT})" + echo " Updating binaries..." + echo "" +fi + # --- Download or build --- TMPDIR=$(mktemp -d) @@ -104,9 +115,6 @@ if [ -n "$TAG" ]; then echo "Downloading ${TAG}..." if curl -fsSL "$URL" -o "$TMPDIR/$ARCHIVE" 2>/dev/null; then tar -xzf "$TMPDIR/$ARCHIVE" -C "$TMPDIR" - mv "$TMPDIR/pilot-daemon-${OS}-${ARCH}" "$TMPDIR/pilot-daemon" - mv "$TMPDIR/pilot-pilotctl-${OS}-${ARCH}" "$TMPDIR/pilotctl" - mv "$TMPDIR/pilot-gateway-${OS}-${ARCH}" "$TMPDIR/pilot-gateway" else TAG="" fi @@ -153,7 +161,22 @@ if [ -d "$LINK_DIR" ] && [ -w "$LINK_DIR" ]; then echo " Symlinked to ${LINK_DIR}" fi -# --- Write config --- +# --- Update: stop here, skip config/service/PATH setup --- + +if [ "$UPDATING" = true ]; then + echo "" + echo "Updated to ${TAG:-source}:" + echo " pilot-daemon ${BIN_DIR}/pilot-daemon" + echo " pilotctl ${BIN_DIR}/pilotctl" + echo " pilot-gateway ${BIN_DIR}/pilot-gateway" + echo "" + echo "Restart the daemon to use the new version:" + echo " pilotctl daemon stop && pilotctl daemon start" + echo "" + exit 0 +fi + +# --- Fresh install: write config --- cat > "$PILOT_DIR/config.json" < Date: Mon, 9 Feb 2026 22:11:40 +0200 Subject: [PATCH 04/19] Support bare node IDs in all CLI commands parseAddrOrHostname now tries interpreting the argument as a numeric node ID (mapping to backbone address 0:0000.XXXX.XXXX) before falling back to hostname resolution. This fixes send-message, ping, connect, and other commands that previously only accepted full addresses or hostnames. --- cmd/pilotctl/main.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/pilotctl/main.go b/cmd/pilotctl/main.go index 17b5530e..19e9fc9d 100644 --- a/cmd/pilotctl/main.go +++ b/cmd/pilotctl/main.go @@ -304,10 +304,16 @@ func resolveHostnameToAddr(d *driver.Driver, hostname string) (protocol.Addr, ui } func parseAddrOrHostname(d *driver.Driver, arg string) (protocol.Addr, error) { + // Try full address (e.g. "0:0000.0000.000B") addr, err := protocol.ParseAddr(arg) if err == nil { return addr, nil } + // Try bare node ID (e.g. "11" → backbone address 0:0000.0000.000B) + if id, numErr := strconv.ParseUint(arg, 10, 32); numErr == nil { + return protocol.Addr{Network: 0, Node: uint32(id)}, nil + } + // Try hostname resolution resolved, _, resolveErr := resolveHostnameToAddr(d, arg) if resolveErr != nil { return protocol.Addr{}, fmt.Errorf("cannot resolve %q — is the hostname correct and is there mutual trust? (see: pilotctl handshake)", arg) From 404655331655a372cc55e36614fdfca4db949e97 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Tue, 10 Feb 2026 23:18:45 +0200 Subject: [PATCH 05/19] v1.2.0: Webhooks, tags, beacon performance, registry persistence hardening Webhooks & event system: - Daemon emits real-time events via HTTP POST to configurable endpoint - Events: conn.syn_received, conn.established, conn.fin, conn.rst, message.received, handshake.pending, handshake.approved, handshake.auto_approved, trust.revoked_by_peer, tunnel.peer_added, node.registered, node.deregistered, security.syn_rate_limited - Async delivery with buffered channel, non-blocking, graceful shutdown - Runtime hot-swap via IPC (set-webhook / clear-webhook) - CLI: pilotctl daemon start --webhook , pilotctl set-webhook/clear-webhook Node tags: - Capability tags for node discovery (e.g. "webserver", "analytics") - Validated format (lowercase alphanumeric + hyphens, 1-32 chars, max 8) - CLI: pilotctl set-tags [tag2] ..., pilotctl clear-tags - Tags visible in dashboard node list with filtering - Persisted in registry snapshots Beacon relay performance: - Worker pool architecture: single reader goroutine + N workers (NumCPU) - Buffered relay dispatch channel (4096 capacity) for backpressure - sync.Pool for payload buffers, per-worker pre-allocated send buffers - Read-only relay path (RLock only, no write lock contention) - 4MB UDP receive buffer for burst absorption Registry persistence hardening: - pubKeyIdx persisted in snapshots (survives node reap + restart cycles) - Debounced save: async signal + background flush at most once per second - Compact JSON serialization (no indent) reduces write amplification - Known-key re-registrations bypass rate limiter for fast reconnection - Close() guarantees final flush before returning Dashboard improvements: - pprof endpoints (/debug/pprof/) for live CPU/memory profiling - Tag display and filtering in node table - Pagination for large node lists - Total requests stat card, responsive grid layout Logging: - Demoted noisy registration/key-rotation logs from INFO to DEBUG - Demoted beacon relay-not-found from WARN to DEBUG CI: - CodeQL static analysis (Go) on push/PR to main + weekly schedule --- .github/codeql/codeql-config.yml | 8 + .github/workflows/codeql.yml | 29 + .gitignore | 3 + cmd/daemon/main.go | 2 + cmd/pilotctl/main.go | 191 ++++- pkg/beacon/server.go | 124 +++- pkg/daemon/daemon.go | 67 ++ pkg/daemon/handshake.go | 32 + pkg/daemon/ipc.go | 49 ++ pkg/daemon/services.go | 29 +- pkg/daemon/tunnel.go | 27 + pkg/daemon/webhook.go | 108 +++ pkg/driver/driver.go | 18 + pkg/driver/ipc.go | 4 + pkg/registry/client.go | 13 + pkg/registry/dashboard.go | 105 ++- pkg/registry/server.go | 252 +++++-- tests/dashboard_test.go | 10 +- tests/reregistration_test.go | 10 +- tests/tags_test.go | 446 ++++++++++++ tests/webhook_test.go | 1125 ++++++++++++++++++++++++++++++ 21 files changed, 2530 insertions(+), 122 deletions(-) create mode 100644 .github/codeql/codeql-config.yml create mode 100644 .github/workflows/codeql.yml create mode 100644 pkg/daemon/webhook.go create mode 100644 tests/tags_test.go create mode 100644 tests/webhook_test.go diff --git a/.github/codeql/codeql-config.yml b/.github/codeql/codeql-config.yml new file mode 100644 index 00000000..23b93c6b --- /dev/null +++ b/.github/codeql/codeql-config.yml @@ -0,0 +1,8 @@ +name: "Pilot Protocol CodeQL config" + +query-filters: + # False positive: DialTLSPinned uses InsecureSkipVerify with a + # VerifyPeerCertificate callback that enforces SHA-256 cert pinning, + # which is strictly stronger than CA-based trust. + - exclude: + id: go/disabled-certificate-check diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 00000000..f29dc26f --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,29 @@ +name: "CodeQL" + +on: + push: + branches: [main] + pull_request: + branches: [main] + schedule: + - cron: "0 6 * * 1" # weekly, Monday 6 AM UTC + +jobs: + analyze: + name: Analyze Go + runs-on: ubuntu-latest + permissions: + security-events: write + contents: read + + steps: + - uses: actions/checkout@v4 + + - uses: github/codeql-action/init@v3 + with: + languages: go + config-file: ./.github/codeql/codeql-config.yml + + - uses: github/codeql-action/autobuild@v3 + + - uses: github/codeql-action/analyze@v3 diff --git a/.gitignore b/.gitignore index 17ce6c85..631bc4ce 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,6 @@ docs/* tmp/ web/node_modules/ + +# Internal tooling +cmd/spoof/ diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index b0615bea..8d4c2609 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -36,6 +36,7 @@ func main() { noEcho := flag.Bool("no-echo", false, "disable built-in echo service (port 7)") noDataExchange := flag.Bool("no-dataexchange", false, "disable built-in data exchange service (port 1001)") noEventStream := flag.Bool("no-eventstream", false, "disable built-in event stream service (port 1002)") + webhookURL := flag.String("webhook", "", "HTTP(S) endpoint for event notifications (empty = disabled)") logLevel := flag.String("log-level", "info", "log level (debug, info, warn, error)") logFormat := flag.String("log-format", "text", "log format (text, json)") flag.Parse() @@ -72,6 +73,7 @@ func main() { DisableEcho: *noEcho, DisableDataExchange: *noDataExchange, DisableEventStream: *noEventStream, + WebhookURL: *webhookURL, }) if err := d.Start(); err != nil { diff --git a/cmd/pilotctl/main.go b/cmd/pilotctl/main.go index 19e9fc9d..f23f113b 100644 --- a/cmd/pilotctl/main.go +++ b/cmd/pilotctl/main.go @@ -337,7 +337,7 @@ Bootstrap: pilotctl config [--set key=value] Daemon lifecycle: - pilotctl daemon start [--config ] [--registry ] [--beacon ] + pilotctl daemon start [--config ] [--registry ] [--beacon ] [--webhook ] pilotctl daemon stop pilotctl daemon status @@ -353,6 +353,8 @@ Discovery commands: pilotctl find pilotctl set-hostname pilotctl clear-hostname + pilotctl set-tags [tag2] ... + pilotctl clear-tags Communication commands: pilotctl connect [port] [--message ] [--timeout ] @@ -501,6 +503,14 @@ func main() { cmdSetHostname(cmdArgs) case "clear-hostname": cmdClearHostname() + case "set-tags": + cmdSetTags(cmdArgs) + case "clear-tags": + cmdClearTags() + case "set-webhook": + cmdSetWebhook(cmdArgs) + case "clear-webhook": + cmdClearWebhook() // Communication case "connect": @@ -658,7 +668,7 @@ func cmdContext() { "returns": "current configuration as JSON", }, "daemon start": map[string]interface{}{ - "args": []string{"[--config ]", "[--registry ]", "[--beacon ]", "[--listen ]", "[--identity ]", "[--owner ]", "[--hostname ]", "[--log-level ]", "[--log-format ]", "[--public]", "[--foreground]", "[--no-encrypt]", "[--socket ]"}, + "args": []string{"[--config ]", "[--registry ]", "[--beacon ]", "[--listen ]", "[--identity ]", "[--owner ]", "[--hostname ]", "[--log-level ]", "[--log-format ]", "[--public]", "[--foreground]", "[--no-encrypt]", "[--socket ]", "[--webhook ]"}, "description": "Start the daemon as a background process. Blocks until registered, then prints status and exits", "returns": "node_id, address, pid, socket, hostname, log_file", }, @@ -697,6 +707,26 @@ func cmdContext() { "description": "Clear hostname for this daemon's node", "returns": "hostname, node_id", }, + "set-tags": map[string]interface{}{ + "args": []string{"", "[tag2]", "..."}, + "description": "Set capability tags for this daemon's node (replaces existing tags)", + "returns": "node_id, tags", + }, + "clear-tags": map[string]interface{}{ + "args": []string{}, + "description": "Clear all tags for this daemon's node", + "returns": "node_id, tags", + }, + "set-webhook": map[string]interface{}{ + "args": []string{""}, + "description": "Set the webhook URL for event notifications (applies immediately if daemon is running)", + "returns": "webhook, applied", + }, + "clear-webhook": map[string]interface{}{ + "args": []string{}, + "description": "Clear the webhook URL (applies immediately if daemon is running)", + "returns": "webhook, applied", + }, "info": map[string]interface{}{ "args": []string{}, "description": "Show daemon status: node_id, address, hostname, uptime, peers, connections, encryption, identity", @@ -901,7 +931,10 @@ func cmdDaemonStart(args []string) { } // Clean up stale socket - socketPath := getSocket() + socketPath := flagString(flags, "socket", "") + if socketPath == "" { + socketPath = getSocket() + } if _, err := os.Stat(socketPath); err == nil { // Try to connect — if it works, daemon is running d, err := driver.Connect(socketPath) @@ -950,11 +983,17 @@ func cmdDaemonStart(args []string) { logLevel := flagString(flags, "log-level", "info") logFormat := flagString(flags, "log-format", "text") public := flagBool(flags, "public") + webhookURL := flagString(flags, "webhook", "") + if webhookURL == "" { + if w, ok := cfg["webhook"].(string); ok { + webhookURL = w + } + } // If --foreground, run in-process if flagBool(flags, "foreground") { runDaemonForeground(configFile, registryAddr, beaconAddr, listenAddr, - socketPath, encrypt, identityPath, owner, hostname, logLevel, logFormat, public) + socketPath, encrypt, identityPath, owner, hostname, logLevel, logFormat, public, webhookURL) return } @@ -995,6 +1034,9 @@ func cmdDaemonStart(args []string) { if public { daemonArgs = append(daemonArgs, "--public") } + if webhookURL != "" { + daemonArgs = append(daemonArgs, "--webhook", webhookURL) + } proc := exec.Command(selfPath, daemonArgs...) proc.Stdout = logFile @@ -1228,14 +1270,15 @@ func runDaemonInternal(args []string) { configFile := flagString(flags, "config", "") encrypt := !flagBool(flags, "no-encrypt") public := flagBool(flags, "public") + webhookURL := flagString(flags, "webhook", "") runDaemonForeground(configFile, registryAddr, beaconAddr, listenAddr, - socketPath, encrypt, identityPath, owner, hostname, logLevel, logFormat, public) + socketPath, encrypt, identityPath, owner, hostname, logLevel, logFormat, public, webhookURL) } func runDaemonForeground(configFile, registryAddr, beaconAddr, listenAddr, socketPath string, encrypt bool, identityPath, owner, hostname, - logLevel, logFormat string, public bool) { + logLevel, logFormat string, public bool, webhookURL string) { if configFile != "" { cfg, err := config.Load(configFile) @@ -1268,6 +1311,7 @@ func runDaemonForeground(configFile, registryAddr, beaconAddr, listenAddr, Owner: owner, Public: public, Hostname: hostname, + WebhookURL: webhookURL, }) if err := d.Start(); err != nil { @@ -1707,6 +1751,132 @@ func cmdClearHostname() { } } +func cmdSetWebhook(args []string) { + if len(args) < 1 { + fatalCode("invalid_argument", "usage: pilotctl set-webhook ") + } + url := args[0] + if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") { + fatalCode("invalid_argument", "webhook URL must start with http:// or https://") + } + + // Persist to config so it survives daemon restart + cfg := loadConfig() + cfg["webhook"] = url + if err := saveConfig(cfg); err != nil { + fatalCode("internal", "save config: %v", err) + } + + // Apply to running daemon (best-effort — daemon may not be running) + applied := false + d, err := driver.Connect(getSocket()) + if err == nil { + _, err = d.SetWebhook(url) + d.Close() + if err == nil { + applied = true + } + } + + if jsonOutput { + outputOK(map[string]interface{}{ + "webhook": url, + "applied": applied, + }) + } else { + fmt.Printf("webhook set: %s\n", url) + if applied { + fmt.Printf("applied to running daemon\n") + } else { + fmt.Printf("will take effect on next daemon start\n") + } + } +} + +func cmdClearWebhook() { + cfg := loadConfig() + delete(cfg, "webhook") + if err := saveConfig(cfg); err != nil { + fatalCode("internal", "save config: %v", err) + } + + // Apply to running daemon (best-effort) + applied := false + d, err := driver.Connect(getSocket()) + if err == nil { + _, err = d.SetWebhook("") + d.Close() + if err == nil { + applied = true + } + } + + if jsonOutput { + outputOK(map[string]interface{}{ + "webhook": "", + "applied": applied, + }) + } else { + fmt.Printf("webhook cleared\n") + if applied { + fmt.Printf("applied to running daemon\n") + } else { + fmt.Printf("will take effect on next daemon start\n") + } + } +} + +func cmdSetTags(args []string) { + if len(args) < 1 { + fatalCode("invalid_argument", "usage: pilotctl set-tags [tag2] ...") + } + if len(args) > 3 { + fatalCode("invalid_argument", "set-tags: maximum 3 tags allowed, got %d", len(args)) + } + d := connectDriver() + defer d.Close() + + result, err := d.SetTags(args) + if err != nil { + fatalCode("connection_failed", "set-tags: %v", err) + } + + if jsonOutput { + outputOK(map[string]interface{}{ + "node_id": result["node_id"], + "tags": result["tags"], + }) + } else { + tags := "none" + if t, ok := result["tags"].([]interface{}); ok && len(t) > 0 { + parts := make([]string, len(t)) + for i, v := range t { + parts[i] = fmt.Sprintf("#%s", v) + } + tags = strings.Join(parts, " ") + } + fmt.Printf("tags set: %s\n", tags) + } +} + +func cmdClearTags() { + d := connectDriver() + defer d.Close() + + _, err := d.SetTags([]string{}) + if err != nil { + fatalCode("connection_failed", "clear-tags: %v", err) + } + + if jsonOutput { + outputOK(map[string]interface{}{ + "tags": []string{}, + }) + } else { + fmt.Printf("tags cleared\n") + } +} + // ===================== COMMUNICATION ===================== func cmdConnect(args []string) { @@ -2286,7 +2456,7 @@ func cmdPublish(args []string) { func cmdHandshake(args []string) { if len(args) < 1 { - fatalCode("invalid_argument", "usage: pilotctl handshake [justification]") + fatalCode("invalid_argument", "usage: pilotctl handshake [justification]") } d := connectDriver() defer d.Close() @@ -2295,10 +2465,15 @@ func cmdHandshake(args []string) { target := args[0] if id, err := strconv.ParseUint(target, 10, 32); err == nil { nodeID = uint32(id) + } else if addr, err := protocol.ParseAddr(target); err == nil { + nodeID = addr.Node + if !jsonOutput { + fmt.Fprintf(os.Stderr, "parsed address %s → node %d\n", target, nodeID) + } } else { _, resolved, err := resolveHostnameToAddr(d, target) if err != nil { - fatalCode("not_found", "resolve hostname %q: %v", target, err) + fatalCode("not_found", "resolve %q: %v", target, err) } nodeID = resolved if !jsonOutput { diff --git a/pkg/beacon/server.go b/pkg/beacon/server.go index 2e64462d..76478bac 100644 --- a/pkg/beacon/server.go +++ b/pkg/beacon/server.go @@ -5,23 +5,41 @@ import ( "fmt" "log/slog" "net" + "runtime" "sync" "web4/pkg/protocol" ) +// relayJob is a pre-parsed relay packet dispatched to a worker. +type relayJob struct { + senderID uint32 + destID uint32 + payload []byte // owned by the job, returned to pool after send +} + type Server struct { mu sync.RWMutex conn *net.UDPConn nodes map[uint32]*net.UDPAddr // node_id → observed public endpoint readyCh chan struct{} + relayCh chan relayJob // buffered channel for relay workers + pool sync.Pool // reusable payload buffers } +const relayQueueSize = 4096 // buffered relay jobs before backpressure + func New() *Server { - return &Server{ + s := &Server{ nodes: make(map[uint32]*net.UDPAddr), readyCh: make(chan struct{}), + relayCh: make(chan relayJob, relayQueueSize), + } + s.pool.New = func() interface{} { + b := make([]byte, 1500) + return &b } + return s } func (s *Server) ListenAndServe(addr string) error { @@ -35,9 +53,23 @@ func (s *Server) ListenAndServe(addr string) error { return fmt.Errorf("listen: %w", err) } s.conn = conn + + // Increase UDP receive buffer to handle bursts + _ = conn.SetReadBuffer(4 * 1024 * 1024) // 4MB + slog.Info("beacon listening", "addr", conn.LocalAddr()) close(s.readyCh) + // Start relay workers — one per CPU core, each processes relay + // jobs independently: lookup dest + WriteToUDP in parallel. + workers := runtime.NumCPU() + if workers < 2 { + workers = 2 + } + for i := 0; i < workers; i++ { + go s.relayWorker() + } + buf := make([]byte, 65535) for { n, remote, err := conn.ReadFromUDP(buf) @@ -85,9 +117,9 @@ func (s *Server) handlePacket(data []byte, remote *net.UDPAddr) { case protocol.BeaconMsgPunchRequest: s.handlePunchRequest(data[1:], remote) case protocol.BeaconMsgRelay: - s.handleRelay(data[1:], remote) + s.dispatchRelay(data[1:]) default: - slog.Warn("unknown beacon message type", "type", fmt.Sprintf("0x%02X", msgType), "from", remote) + slog.Debug("unknown beacon message type", "type", fmt.Sprintf("0x%02X", msgType), "from", remote) } } @@ -103,7 +135,7 @@ func (s *Server) handleDiscover(data []byte, remote *net.UDPAddr) { s.nodes[nodeID] = remote s.mu.Unlock() - slog.Info("beacon discover", "node_id", nodeID, "addr", remote) + slog.Debug("beacon discover", "node_id", nodeID, "addr", remote) // Reply with observed IP:port using variable-length IP encoding ip := remote.IP.To4() @@ -157,47 +189,81 @@ func (s *Server) handlePunchRequest(data []byte, remote *net.UDPAddr) { if err := s.SendPunchCommand(targetID, requesterAddr.IP, uint16(requesterAddr.Port)); err != nil { slog.Debug("punch command to target failed", "node_id", targetID, "err", err) } - slog.Info("punch coordinated", "requester", requesterID, "target", targetID, + slog.Debug("punch coordinated", "requester", requesterID, "target", targetID, "requester_addr", requesterAddr, "target_addr", targetAddr) } -func (s *Server) handleRelay(data []byte, remote *net.UDPAddr) { - // Format: [senderNodeID(4)][destNodeID(4)][payload...] +// dispatchRelay parses the relay header and dispatches to a worker goroutine. +// The read loop stays fast — no locks, no syscalls, no allocations on the hot path. +func (s *Server) dispatchRelay(data []byte) { if len(data) < 8 { return } - senderNodeID := binary.BigEndian.Uint32(data[0:4]) - destNodeID := binary.BigEndian.Uint32(data[4:8]) - payload := data[8:] + senderID := binary.BigEndian.Uint32(data[0:4]) + destID := binary.BigEndian.Uint32(data[4:8]) - // Update sender's endpoint (handles symmetric NAT port changes) - s.mu.Lock() - s.nodes[senderNodeID] = remote - s.mu.Unlock() - - s.mu.RLock() - destAddr, ok := s.nodes[destNodeID] - s.mu.RUnlock() + // Copy payload into a pooled buffer so we don't hold the read buffer + payload := data[8:] + bp := s.pool.Get().(*[]byte) + buf := *bp + if cap(buf) < len(payload) { + buf = make([]byte, len(payload)) + } else { + buf = buf[:len(payload)] + } + copy(buf, payload) - if !ok { - slog.Warn("relay dest not found", "dest_node_id", destNodeID, "sender_node_id", senderNodeID) - return + select { + case s.relayCh <- relayJob{senderID: senderID, destID: destID, payload: buf}: + default: + // Queue full — drop packet (UDP is best-effort) + *bp = buf[:cap(buf)] + s.pool.Put(bp) } +} - slog.Info("relaying", "from", senderNodeID, "to", destNodeID, "dest_addr", destAddr, "payload_len", len(payload)) +// relayWorker processes relay jobs: dest lookup and UDP send. +// Multiple workers run in parallel to distribute the WriteToUDP syscalls. +// Sender endpoint is NOT updated here — discover/punch already handle it. +// This keeps the relay path entirely read-only (no write lock contention). +func (s *Server) relayWorker() { + sendBuf := make([]byte, 1500) // per-worker send buffer, no allocations + for job := range s.relayCh { + // Lookup dest (read lock — all workers can do this concurrently) + s.mu.RLock() + destAddr, ok := s.nodes[job.destID] + s.mu.RUnlock() + + if !ok { + slog.Debug("relay dest not found", "dest_node_id", job.destID, "sender_node_id", job.senderID) + s.returnPayload(job.payload) + continue + } - // Build relay deliver message - msg := make([]byte, 1+4+len(payload)) - msg[0] = protocol.BeaconMsgRelayDeliver - binary.BigEndian.PutUint32(msg[1:5], senderNodeID) - copy(msg[5:], payload) + // Build relay deliver message in pre-allocated send buffer + msgLen := 1 + 4 + len(job.payload) + if cap(sendBuf) < msgLen { + sendBuf = make([]byte, msgLen) + } + msg := sendBuf[:msgLen] + msg[0] = protocol.BeaconMsgRelayDeliver + binary.BigEndian.PutUint32(msg[1:5], job.senderID) + copy(msg[5:], job.payload) - if _, err := s.conn.WriteToUDP(msg, destAddr); err != nil { - slog.Warn("beacon relay send failed", "dest_node_id", destNodeID, "err", err) + if _, err := s.conn.WriteToUDP(msg, destAddr); err != nil { + slog.Debug("beacon relay send failed", "dest_node_id", job.destID, "err", err) + } + + s.returnPayload(job.payload) } } +func (s *Server) returnPayload(buf []byte) { + buf = buf[:cap(buf)] + s.pool.Put(&buf) +} + // SendPunchCommand tells a node to send UDP to a target endpoint. func (s *Server) SendPunchCommand(nodeID uint32, targetIP net.IP, targetPort uint16) error { s.mu.RLock() diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index b253164b..9db37e21 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -40,6 +40,9 @@ type Config struct { DisableDataExchange bool // disable built-in data exchange service (port 1001) DisableEventStream bool // disable built-in event stream service (port 1002) + // Webhook + WebhookURL string // HTTP(S) endpoint for event notifications (empty = disabled) + // Tuning (zero = use defaults) KeepaliveInterval time.Duration // default 30s IdleTimeout time.Duration // default 120s @@ -90,6 +93,7 @@ type Daemon struct { ports *PortManager ipc *IPCServer handshakes *HandshakeManager + webhook *WebhookClient startTime time.Time stopCh chan struct{} // closed on Stop() to signal goroutines @@ -357,6 +361,15 @@ func (d *Daemon) Start() error { slog.Info("daemon registered", "node_id", d.nodeID, "addr", d.addr, "endpoint", registrationAddr) + // Initialize webhook client (no-op if URL is empty) + d.webhook = NewWebhookClient(d.config.WebhookURL, d.NodeID) + d.tunnels.SetWebhook(d.webhook) + d.handshakes.SetWebhook(d.webhook) + d.webhook.Emit("node.registered", map[string]interface{}{ + "address": d.addr.String(), + "endpoint": registrationAddr, + }) + // Register with beacon using real nodeID for NAT traversal (punch/relay) if d.config.BeaconAddr != "" { if err := d.tunnels.SetBeaconAddr(d.config.BeaconAddr); err != nil { @@ -481,12 +494,14 @@ func (d *Daemon) Stop() error { // Deregister from registry if d.regConn != nil { + d.webhook.Emit("node.deregistered", nil) d.regConn.Deregister(d.NodeID()) d.regConn.Close() } d.ipc.Close() d.tunnels.Close() + d.webhook.Close() return nil } @@ -495,6 +510,22 @@ func (d *Daemon) NodeID() uint32 { defer d.addrMu.RUnlock() return d.nodeID } + +// SetWebhookURL hot-swaps the webhook client at runtime. +// An empty URL disables the webhook (all Emit calls become no-ops). +func (d *Daemon) SetWebhookURL(url string) { + old := d.webhook + d.webhook = NewWebhookClient(url, d.NodeID) + d.tunnels.SetWebhook(d.webhook) + d.handshakes.SetWebhook(d.webhook) + old.Close() + if url != "" { + slog.Info("webhook updated", "url", url) + } else { + slog.Info("webhook cleared") + } +} + // Identity returns the daemon's Ed25519 identity (may be nil if unset). func (d *Daemon) Identity() *crypto.Identity { return d.identity } @@ -603,6 +634,9 @@ func (d *Daemon) handlePacket(pkt *protocol.Packet, from *net.UDPAddr) { if !d.tunnels.HasPeer(pkt.Src.Node) { if !d.config.Encrypt || d.tunnels.HasCrypto(pkt.Src.Node) { d.tunnels.AddPeer(pkt.Src.Node, from) + d.webhook.Emit("tunnel.peer_added", map[string]interface{}{ + "peer_node_id": pkt.Src.Node, "endpoint": from.String(), + }) } } @@ -652,6 +686,9 @@ func (d *Daemon) handleStreamPacket(pkt *protocol.Packet) { // SYN rate limiting if !d.allowSYN() { slog.Warn("SYN rate limit exceeded", "src_addr", pkt.Src, "src_port", pkt.SrcPort) + d.webhook.Emit("security.syn_rate_limited", map[string]interface{}{ + "src_addr": pkt.Src.String(), "src_port": pkt.SrcPort, + }) return // silently drop — don't even RST (avoid amplification) } if !d.allowSYNFromSource(pkt.Src.Node) { @@ -680,6 +717,10 @@ func (d *Daemon) handleStreamPacket(pkt *protocol.Packet) { conn.RecvAck = pkt.Seq + 1 conn.ExpectedSeq = pkt.Seq + 1 // first data segment after SYN conn.Mu.Unlock() + d.webhook.Emit("conn.syn_received", map[string]interface{}{ + "src_addr": pkt.Src.String(), "src_port": pkt.SrcPort, + "dst_port": pkt.DstPort, "conn_id": conn.ID, + }) // Process peer's receive window from SYN (H9 fix: always update, including Window==0) conn.RetxMu.Lock() @@ -704,6 +745,10 @@ func (d *Daemon) handleStreamPacket(pkt *protocol.Packet) { conn.SendSeq++ conn.State = StateEstablished conn.Mu.Unlock() + d.webhook.Emit("conn.established", map[string]interface{}{ + "src_addr": pkt.Src.String(), "src_port": pkt.SrcPort, + "dst_port": pkt.DstPort, "conn_id": conn.ID, + }) d.startRetxLoop(conn) // Non-blocking push to accept queue — if full, clean up and RST @@ -769,10 +814,17 @@ func (d *Daemon) handleStreamPacket(pkt *protocol.Packet) { if conn != nil { conn.CloseRecvBuf() conn.Mu.Lock() + wasTimeWait := conn.State == StateTimeWait conn.State = StateTimeWait conn.LastActivity = time.Now() sendSeq := conn.SendSeq conn.Mu.Unlock() + if !wasTimeWait { + d.webhook.Emit("conn.fin", map[string]interface{}{ + "remote_addr": pkt.Src.String(), "remote_port": pkt.SrcPort, + "local_port": pkt.DstPort, "conn_id": conn.ID, + }) + } // Connection will be reaped by idleSweepLoop after TimeWaitDuration // Send FIN-ACK @@ -801,6 +853,10 @@ func (d *Daemon) handleStreamPacket(pkt *protocol.Packet) { conn.Mu.Unlock() conn.CloseRecvBuf() d.ports.RemoveConnection(conn.ID) + d.webhook.Emit("conn.rst", map[string]interface{}{ + "remote_addr": pkt.Src.String(), "remote_port": pkt.SrcPort, + "local_port": pkt.DstPort, "conn_id": conn.ID, + }) } return } @@ -923,6 +979,10 @@ func (d *Daemon) sendDelayedACK(conn *Connection) { func (d *Daemon) handleDatagramPacket(pkt *protocol.Packet) { if len(pkt.Payload) > 0 { + d.webhook.Emit("data.datagram", map[string]interface{}{ + "src_addr": pkt.Src.String(), "src_port": pkt.SrcPort, + "dst_port": pkt.DstPort, "size": len(pkt.Payload), + }) d.ipc.DeliverDatagram(pkt.Src, pkt.SrcPort, pkt.DstPort, pkt.Payload) } } @@ -1617,6 +1677,9 @@ func (d *Daemon) reRegister() { nodeID := d.nodeID slog.Info("re-registered", "node_id", nodeID, "addr", d.addr) d.addrMu.Unlock() + d.webhook.Emit("node.reregistered", map[string]interface{}{ + "address": d.addr.String(), + }) // Restore visibility and hostname after re-registration if d.config.Public { @@ -1673,6 +1736,10 @@ func (d *Daemon) idleSweepLoop() { dead := d.ports.IdleConnections(idleTimeout) for _, conn := range dead { slog.Debug("closing dead connection", "conn_id", conn.ID, "idle_timeout", idleTimeout, "remote_addr", conn.RemoteAddr, "remote_port", conn.RemotePort) + d.webhook.Emit("conn.idle_timeout", map[string]interface{}{ + "remote_addr": conn.RemoteAddr.String(), "remote_port": conn.RemotePort, + "local_port": conn.LocalPort, "conn_id": conn.ID, + }) d.CloseConnection(conn) } diff --git a/pkg/daemon/handshake.go b/pkg/daemon/handshake.go index a0fe9f1f..547aea2f 100644 --- a/pkg/daemon/handshake.go +++ b/pkg/daemon/handshake.go @@ -74,6 +74,9 @@ type HandshakeManager struct { reapStop chan struct{} // signals replay reaper to stop stopOnce sync.Once // ensures reapStop is closed only once + // Webhook + webhook *WebhookClient + // Replay protection replayMu sync.Mutex replaySet map[[32]byte]time.Time // message hash → first seen @@ -98,6 +101,11 @@ func NewHandshakeManager(d *Daemon) *HandshakeManager { return hm } +// SetWebhook configures the webhook client for event notifications. +func (hm *HandshakeManager) SetWebhook(wc *WebhookClient) { + hm.webhook = wc +} + // Stop waits for all background RPCs to finish and stops the replay reaper. func (hm *HandshakeManager) Stop() { hm.stopOnce.Do(func() { @@ -348,6 +356,9 @@ func (hm *HandshakeManager) reapReplay() { func (hm *HandshakeManager) handleRequest(conn *Connection, msg *HandshakeMsg) { peerNodeID := msg.NodeID slog.Info("handshake request received", "peer_node_id", peerNodeID, "justification", msg.Justification) + hm.webhook.Emit("handshake.received", map[string]interface{}{ + "peer_node_id": peerNodeID, "justification": msg.Justification, + }) hm.mu.Lock() defer hm.mu.Unlock() @@ -370,6 +381,9 @@ func (hm *HandshakeManager) handleRequest(conn *Connection, msg *HandshakeMsg) { Mutual: true, } slog.Info("mutual handshake auto-approved", "peer_node_id", peerNodeID) + hm.webhook.Emit("handshake.auto_approved", map[string]interface{}{ + "peer_node_id": peerNodeID, "reason": "mutual", + }) hm.saveTrust() hm.sendAcceptLocked(peerNodeID) // Report trust to registry @@ -388,6 +402,9 @@ func (hm *HandshakeManager) handleRequest(conn *Connection, msg *HandshakeMsg) { Network: hm.sharedNetwork(peerNodeID), } slog.Info("same network handshake auto-approved", "peer_node_id", peerNodeID) + hm.webhook.Emit("handshake.auto_approved", map[string]interface{}{ + "peer_node_id": peerNodeID, "reason": "same_network", + }) hm.saveTrust() hm.sendAcceptLocked(peerNodeID) // Report trust to registry @@ -406,6 +423,9 @@ func (hm *HandshakeManager) handleRequest(conn *Connection, msg *HandshakeMsg) { } hm.saveTrust() slog.Info("handshake request pending approval", "peer_node_id", peerNodeID) + hm.webhook.Emit("handshake.pending", map[string]interface{}{ + "peer_node_id": peerNodeID, "justification": msg.Justification, + }) } // handleAccept processes a handshake acceptance from a peer. @@ -615,6 +635,9 @@ func (hm *HandshakeManager) ApproveHandshake(peerNodeID uint32) error { hm.mu.Unlock() slog.Info("handshake approved", "peer_node_id", peerNodeID) + hm.webhook.Emit("handshake.approved", map[string]interface{}{ + "peer_node_id": peerNodeID, + }) // Report trust to registry (creates the trust pair for resolve authorization) if hm.daemon.regConn != nil { @@ -640,6 +663,9 @@ func (hm *HandshakeManager) RejectHandshake(peerNodeID uint32, reason string) er hm.mu.Unlock() slog.Info("handshake rejected", "peer_node_id", peerNodeID, "reason", reason) + hm.webhook.Emit("handshake.rejected", map[string]interface{}{ + "peer_node_id": peerNodeID, "reason": reason, + }) // Relay rejection via registry so the requester learns about it even behind NAT if hm.daemon.regConn != nil { @@ -687,6 +713,9 @@ func (hm *HandshakeManager) RevokeTrust(peerNodeID uint32) error { } slog.Info("trust revoked", "peer_node_id", peerNodeID) + hm.webhook.Emit("trust.revoked", map[string]interface{}{ + "peer_node_id": peerNodeID, + }) // Tear down the tunnel to the revoked peer immediately hm.daemon.tunnels.RemovePeer(peerNodeID) @@ -719,6 +748,9 @@ func (hm *HandshakeManager) RevokeTrust(peerNodeID uint32) error { func (hm *HandshakeManager) handleRevokeMsg(msg *HandshakeMsg) { peerNodeID := msg.NodeID slog.Info("trust revoked by peer", "peer_node_id", peerNodeID) + hm.webhook.Emit("trust.revoked_by_peer", map[string]interface{}{ + "peer_node_id": peerNodeID, + }) hm.mu.Lock() _, wasTrusted := hm.trusted[peerNodeID] diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index fd95b816..f8182430 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -40,6 +40,10 @@ const ( CmdSetVisibilityOK byte = 0x16 CmdDeregister byte = 0x17 CmdDeregisterOK byte = 0x18 + CmdSetTags byte = 0x19 + CmdSetTagsOK byte = 0x1A + CmdSetWebhook byte = 0x1B + CmdSetWebhookOK byte = 0x1C ) // ipcConn wraps a net.Conn with a write mutex for goroutine safety. @@ -196,6 +200,10 @@ func (s *IPCServer) handleClient(conn *ipcConn) { s.handleSetVisibility(conn, payload) case CmdDeregister: s.handleDeregister(conn) + case CmdSetTags: + s.handleSetTags(conn, payload) + case CmdSetWebhook: + s.handleSetWebhook(conn, payload) default: s.sendError(conn, fmt.Sprintf("unknown command: 0x%02X", cmd)) } @@ -506,6 +514,47 @@ func (s *IPCServer) handleDeregister(conn *ipcConn) { } } +func (s *IPCServer) handleSetTags(conn *ipcConn, payload []byte) { + var tags []string + if err := json.Unmarshal(payload, &tags); err != nil { + s.sendError(conn, fmt.Sprintf("set_tags: invalid JSON: %v", err)) + return + } + if len(tags) > 3 { + s.sendError(conn, "set_tags: maximum 3 tags allowed") + return + } + result, err := s.daemon.regConn.SetTags(s.daemon.NodeID(), tags) + if err != nil { + s.sendError(conn, fmt.Sprintf("set_tags: %v", err)) + return + } + data, err := json.Marshal(result) + if err != nil { + s.sendError(conn, fmt.Sprintf("set_tags marshal: %v", err)) + return + } + resp := make([]byte, 1+len(data)) + resp[0] = CmdSetTagsOK + copy(resp[1:], data) + if err := conn.ipcWrite(resp); err != nil { + slog.Debug("IPC set_tags reply failed", "err", err) + } +} + +func (s *IPCServer) handleSetWebhook(conn *ipcConn, payload []byte) { + url := string(payload) // empty string = clear webhook + s.daemon.SetWebhookURL(url) + result := map[string]interface{}{"webhook": url} + data, _ := json.Marshal(result) + resp := make([]byte, 1+len(data)) + resp[0] = CmdSetWebhookOK + copy(resp[1:], data) + if err := conn.ipcWrite(resp); err != nil { + slog.Debug("IPC set_webhook reply failed", "err", err) + } +} + // Handshake IPC sub-commands const ( SubHandshakeSend byte = 0x01 diff --git a/pkg/daemon/services.go b/pkg/daemon/services.go index 04721baa..be870d67 100644 --- a/pkg/daemon/services.go +++ b/pkg/daemon/services.go @@ -223,6 +223,9 @@ func (d *Daemon) saveReceivedFile(frame *dataexchange.Frame) error { return fmt.Errorf("write: %w", err) } slog.Info("file saved", "path", destPath, "bytes", len(frame.Payload)) + d.webhook.Emit("file.received", map[string]interface{}{ + "filename": safeName, "size": len(frame.Payload), "path": destPath, + }) return nil } @@ -258,6 +261,10 @@ func (d *Daemon) saveInboxMessage(frame *dataexchange.Frame, from protocol.Addr) return fmt.Errorf("write: %w", err) } slog.Info("inbox message saved", "path", destPath, "type", dataexchange.TypeName(frame.Type), "bytes", len(frame.Payload)) + d.webhook.Emit("message.received", map[string]interface{}{ + "type": dataexchange.TypeName(frame.Type), "from": from.String(), + "size": len(frame.Payload), + }) return nil } @@ -268,7 +275,8 @@ func (d *Daemon) startEventStreamService() error { return err } broker := &eventBroker{ - subs: make(map[string][]*connAdapter), + subs: make(map[string][]*connAdapter), + webhook: d.webhook, } go func() { for { @@ -290,14 +298,21 @@ func (d *Daemon) startEventStreamService() error { // eventBroker is an in-process pub/sub broker for the event stream service. type eventBroker struct { - mu sync.RWMutex - subs map[string][]*connAdapter // topic → subscribers + mu sync.RWMutex + subs map[string][]*connAdapter // topic → subscribers + webhook *WebhookClient } func (b *eventBroker) handleConn(adapter *connAdapter) { + var topic string defer func() { b.removeSub(adapter) adapter.Close() + if topic != "" { + b.webhook.Emit("pubsub.unsubscribed", map[string]interface{}{ + "topic": topic, "remote": adapter.RemoteAddr().String(), + }) + } }() // First event = subscription @@ -305,9 +320,12 @@ func (b *eventBroker) handleConn(adapter *connAdapter) { if err != nil { return } - topic := subEvt.Topic + topic = subEvt.Topic b.addSub(topic, adapter) slog.Debug("eventstream subscription", "remote", adapter.RemoteAddr(), "topic", topic) + b.webhook.Emit("pubsub.subscribed", map[string]interface{}{ + "topic": topic, "remote": adapter.RemoteAddr().String(), + }) // Remaining events = publish for { @@ -369,4 +387,7 @@ func (b *eventBroker) publish(evt *eventstream.Event, sender *connAdapter) { b.removeSub(conn) } slog.Debug("eventstream published", "topic", evt.Topic, "bytes", len(evt.Payload), "from", sender.RemoteAddr()) + b.webhook.Emit("pubsub.published", map[string]interface{}{ + "topic": evt.Topic, "size": len(evt.Payload), "from": sender.RemoteAddr().String(), + }) } diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index 38d4a430..8bf1d202 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -116,6 +116,9 @@ type TunnelManager struct { beaconAddr *net.UDPAddr // beacon address for punch/relay relayPeers map[uint32]bool // peers that need relay (symmetric NAT) + // Webhook + webhook *WebhookClient + // Metrics BytesSent uint64 BytesRecv uint64 @@ -151,6 +154,13 @@ func NewTunnelManager() *TunnelManager { } } +// SetWebhook configures the webhook client for event notifications. +func (tm *TunnelManager) SetWebhook(wc *WebhookClient) { + tm.mu.Lock() + tm.webhook = wc + tm.mu.Unlock() +} + // EnableEncryption generates an X25519 keypair and enables tunnel encryption. func (tm *TunnelManager) EnableEncryption() error { curve := ecdh.X25519() @@ -514,6 +524,10 @@ func (tm *TunnelManager) handleAuthKeyExchange(data []byte, from *net.UDPAddr, f } else { slog.Info("encrypted tunnel established", "auth", authenticated, "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) } + tm.webhook.Emit("tunnel.established", map[string]interface{}{ + "peer_node_id": peerNodeID, "authenticated": authenticated, + "relay": fromRelay, "rekeyed": keyChanged, + }) if !hadCrypto || keyChanged { tm.sendKeyExchangeToNode(peerNodeID) @@ -577,6 +591,10 @@ func (tm *TunnelManager) handleKeyExchange(data []byte, from *net.UDPAddr, fromR } else { slog.Info("encrypted tunnel established", "peer_node_id", peerNodeID, "endpoint", from, "relay", fromRelay) } + tm.webhook.Emit("tunnel.established", map[string]interface{}{ + "peer_node_id": peerNodeID, "authenticated": false, + "relay": fromRelay, "rekeyed": keyChanged, + }) // Respond with our key if this is a new peer or the peer rekeyed if !hadCrypto || keyChanged { @@ -613,6 +631,9 @@ func (tm *TunnelManager) handleEncrypted(data []byte, from *net.UDPAddr) { if !pc.checkAndRecordNonce(recvCounter) { pc.replayMu.Unlock() slog.Warn("tunnel nonce replay detected", "peer_node_id", peerNodeID, "counter", recvCounter, "max", pc.maxRecvNonce) + tm.webhook.Emit("security.nonce_replay", map[string]interface{}{ + "peer_node_id": peerNodeID, "counter": recvCounter, + }) return } pc.replayMu.Unlock() @@ -1027,12 +1048,18 @@ func (tm *TunnelManager) handleRelayDeliver(data []byte) { // Mark this peer as relay-capable (they sent through relay, so they're behind NAT) tm.mu.Lock() + wasRelay := tm.relayPeers[srcNodeID] tm.relayPeers[srcNodeID] = true // Ensure we have a peer entry (use beacon addr as placeholder for relay peers) if _, ok := tm.peers[srcNodeID]; !ok && tm.beaconAddr != nil { tm.peers[srcNodeID] = tm.beaconAddr } tm.mu.Unlock() + if !wasRelay { + tm.webhook.Emit("tunnel.relay_activated", map[string]interface{}{ + "peer_node_id": srcNodeID, + }) + } if len(payload) < 4 { return diff --git a/pkg/daemon/webhook.go b/pkg/daemon/webhook.go new file mode 100644 index 00000000..8c3bfb9d --- /dev/null +++ b/pkg/daemon/webhook.go @@ -0,0 +1,108 @@ +package daemon + +import ( + "bytes" + "encoding/json" + "log/slog" + "net/http" + "sync" + "time" +) + +// WebhookEvent is the JSON payload POSTed to the webhook endpoint. +type WebhookEvent struct { + Event string `json:"event"` + NodeID uint32 `json:"node_id"` + Timestamp time.Time `json:"timestamp"` + Data interface{} `json:"data,omitempty"` +} + +// WebhookClient dispatches events asynchronously to an HTTP(S) endpoint. +// If URL is empty, all methods are no-ops (zero overhead when disabled). +type WebhookClient struct { + url string + ch chan *WebhookEvent + client *http.Client + done chan struct{} + nodeID func() uint32 + closeOnce sync.Once + closed chan struct{} // closed when Close is called, guards Emit +} + +// NewWebhookClient creates a webhook dispatcher. If url is empty, returns nil. +func NewWebhookClient(url string, nodeIDFunc func() uint32) *WebhookClient { + if url == "" { + return nil + } + wc := &WebhookClient{ + url: url, + ch: make(chan *WebhookEvent, 1024), + client: &http.Client{Timeout: 5 * time.Second}, + done: make(chan struct{}), + nodeID: nodeIDFunc, + closed: make(chan struct{}), + } + go wc.run() + return wc +} + +// Emit queues an event for async delivery. Non-blocking; drops if buffer full. +// Safe to call after Close (becomes a no-op). +func (wc *WebhookClient) Emit(event string, data interface{}) { + if wc == nil { + return + } + select { + case <-wc.closed: + return // already closed + default: + } + ev := &WebhookEvent{ + Event: event, + NodeID: wc.nodeID(), + Timestamp: time.Now().UTC(), + Data: data, + } + select { + case wc.ch <- ev: + case <-wc.closed: + default: + slog.Warn("webhook queue full, dropping event", "event", event) + } +} + +// Close drains the queue and stops the background goroutine. Idempotent. +func (wc *WebhookClient) Close() { + if wc == nil { + return + } + wc.closeOnce.Do(func() { + close(wc.closed) + close(wc.ch) + }) + <-wc.done +} + +func (wc *WebhookClient) run() { + defer close(wc.done) + for ev := range wc.ch { + wc.post(ev) + } +} + +func (wc *WebhookClient) post(ev *WebhookEvent) { + body, err := json.Marshal(ev) + if err != nil { + slog.Warn("webhook marshal error", "event", ev.Event, "error", err) + return + } + resp, err := wc.client.Post(wc.url, "application/json", bytes.NewReader(body)) + if err != nil { + slog.Warn("webhook POST failed", "event", ev.Event, "error", err) + return + } + resp.Body.Close() + if resp.StatusCode >= 400 { + slog.Warn("webhook POST error status", "event", ev.Event, "status", resp.StatusCode) + } +} diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index 5c4e3104..92f4d1cd 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -221,6 +221,24 @@ func (d *Driver) Deregister() (map[string]interface{}, error) { return d.jsonRPC([]byte{cmdDeregister}, cmdDeregisterOK, "deregister") } +// SetTags sets the capability tags for this daemon's node. +func (d *Driver) SetTags(tags []string) (map[string]interface{}, error) { + data, _ := json.Marshal(tags) + msg := make([]byte, 1+len(data)) + msg[0] = cmdSetTags + copy(msg[1:], data) + return d.jsonRPC(msg, cmdSetTagsOK, "set_tags") +} + +// SetWebhook sets or clears the daemon's webhook URL at runtime. +// An empty URL disables the webhook. +func (d *Driver) SetWebhook(url string) (map[string]interface{}, error) { + msg := make([]byte, 1+len(url)) + msg[0] = cmdSetWebhook + copy(msg[1:], url) + return d.jsonRPC(msg, cmdSetWebhookOK, "set_webhook") +} + // Disconnect closes a connection by ID. Used by administrative tools. func (d *Driver) Disconnect(connID uint32) error { msg := make([]byte, 5) diff --git a/pkg/driver/ipc.go b/pkg/driver/ipc.go index d53adad2..f59a485f 100644 --- a/pkg/driver/ipc.go +++ b/pkg/driver/ipc.go @@ -36,6 +36,10 @@ const ( cmdSetVisibilityOK byte = 0x16 cmdDeregister byte = 0x17 cmdDeregisterOK byte = 0x18 + cmdSetTags byte = 0x19 + cmdSetTagsOK byte = 0x1A + cmdSetWebhook byte = 0x1B + cmdSetWebhookOK byte = 0x1C ) // Datagram represents a received unreliable datagram. diff --git a/pkg/registry/client.go b/pkg/registry/client.go index c9df364a..044ad035 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -417,6 +417,19 @@ func (c *Client) SetHostname(nodeID uint32, hostname string) (map[string]interfa return c.Send(msg) } +// SetTags sets the capability tags for a node. +func (c *Client) SetTags(nodeID uint32, tags []string) (map[string]interface{}, error) { + msg := map[string]interface{}{ + "type": "set_tags", + "node_id": nodeID, + "tags": tags, + } + if sig := c.sign(fmt.Sprintf("set_tags:%d", nodeID)); sig != "" { + msg["signature"] = sig + } + return c.Send(msg) +} + // ResolveHostname resolves a hostname to node info (node_id, address, public flag). func (c *Client) ResolveHostname(hostname string) (map[string]interface{}, error) { return c.Send(map[string]interface{}{ diff --git a/pkg/registry/dashboard.go b/pkg/registry/dashboard.go index ba26cda6..a18f63ce 100644 --- a/pkg/registry/dashboard.go +++ b/pkg/registry/dashboard.go @@ -4,6 +4,7 @@ import ( "encoding/json" "log/slog" "net/http" + "net/http/pprof" ) // ServeDashboard starts an HTTP server serving the dashboard UI and stats API. @@ -26,6 +27,13 @@ func (s *Server) ServeDashboard(addr string) error { _ = json.NewEncoder(w).Encode(stats) }) + // pprof endpoints for live profiling + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + slog.Info("dashboard listening", "addr", addr) return http.ListenAndServe(addr, mux) } @@ -49,7 +57,7 @@ header h1{font-size:20px;font-weight:600;color:#e6edf3} header .links{display:flex;gap:16px;font-size:13px} .uptime{font-size:12px;color:#8b949e;margin-top:4px} -.stats-row{display:grid;grid-template-columns:repeat(3,1fr);gap:16px;margin-bottom:32px} +.stats-row{display:grid;grid-template-columns:repeat(4,1fr);gap:16px;margin-bottom:32px} .stat-card{background:#161b22;border:1px solid #21262d;border-radius:8px;padding:20px;text-align:center} .stat-card .value{font-size:32px;font-weight:700;color:#e6edf3;display:block} .stat-card .label{font-size:12px;color:#8b949e;text-transform:uppercase;letter-spacing:0.5px;margin-top:4px} @@ -62,22 +70,28 @@ th{text-align:left;font-size:11px;font-weight:600;color:#8b949e;text-transform:u td{padding:10px 16px;border-bottom:1px solid #21262d;font-size:13px} tr:last-child td{border-bottom:none} -.status-dot{display:inline-block;width:8px;height:8px;border-radius:50%;margin-right:6px;vertical-align:middle} -.status-online{background:#3fb950} -.status-offline{background:#484f58} - .diagrams{display:grid;grid-template-columns:1fr 1fr;gap:16px;margin-bottom:32px} .diagram-card{background:#161b22;border:1px solid #21262d;border-radius:8px;padding:20px;text-align:center} .diagram-card h3{font-size:13px;font-weight:600;color:#8b949e;margin-bottom:12px;text-transform:uppercase;letter-spacing:0.5px} +.tag{display:inline-block;background:#1f2937;border:1px solid #30363d;border-radius:12px;padding:2px 10px;font-size:11px;color:#58a6ff;margin:2px 4px 2px 0;white-space:nowrap} +.tag-filter{background:#0d1117;border:1px solid #30363d;border-radius:6px;padding:8px 12px;color:#c9d1d9;font-family:inherit;font-size:13px;width:100%;margin-bottom:12px;outline:none} +.tag-filter:focus{border-color:#58a6ff} +.tag-filter::placeholder{color:#484f58} .empty{color:#484f58;font-style:italic;padding:20px;text-align:center} +.pagination{display:flex;align-items:center;justify-content:center;gap:8px;margin-top:12px;font-size:13px} +.pagination button{background:#161b22;border:1px solid #30363d;border-radius:6px;padding:6px 12px;color:#c9d1d9;font-family:inherit;font-size:13px;cursor:pointer} +.pagination button:hover{border-color:#58a6ff;color:#58a6ff} +.pagination button:disabled{opacity:0.3;cursor:default;border-color:#30363d;color:#c9d1d9} +.pagination .page-info{color:#8b949e} + footer{text-align:center;padding:24px 0;border-top:1px solid #21262d;margin-top:32px;font-size:12px;color:#484f58} footer a{color:#484f58} footer a:hover{color:#58a6ff} @media(max-width:640px){ - .stats-row{grid-template-columns:1fr} + .stats-row{grid-template-columns:repeat(2,1fr)} .diagrams{grid-template-columns:1fr} } @@ -98,16 +112,20 @@ footer a:hover{color:#58a6ff}
- - Total Nodes + + Total Requests
- Active Nodes + Online Nodes
- - Requests Served + + Trust Links +
+
+ + Unique Tags
@@ -186,12 +204,14 @@ footer a:hover{color:#58a6ff}

Nodes

+ - + - +
AddressHostnameStatus
AddressStatusTrustTags
Loading...
Loading...
+