From 59d92ade4621c97eedd6885bbe9e63cbcc5294f0 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:02:55 -0500 Subject: [PATCH 01/11] Implement server-to-server mesh for cross-server client connectivity Enable clients connected to different dmsg servers to communicate by having servers peer with each other. This removes the scaling limitation where clients must be on the same server to reach each other. Design: - Servers peer as clients to each other using existing session mechanism (TCP + noise XK handshake + yamux), requiring no new transport code - Peers configured via static config (no discovery dependency) - When a server can't find destination client locally, it tries forwarding through peer server sessions - 1-hop maximum: peer servers only check local sessions, no further forwarding (prevents loops without TTL) - Original SignedObject forwarded as-is (client signature preserved) - Backward compatible: no wire protocol changes, existing clients work unchanged Key changes: - ServerConfig.Peers: static peer server list (PK + address) - Server.peerSessions: outbound connections to peer servers - Server.peerPKs: identifies incoming sessions as peer servers - SessionCommon.isPeer: relaxes SrcAddr.PK check for forwarded requests - ServerSession.forwardViaPeer: iterates peers on local lookup failure - maintainPeerConnection: persistent connection with reconnect backoff Config example: "peers": [{"public_key": "02abc...", "address": "1.2.3.4:8081"}] --- cmd/dmsg-server/commands/start/root.go | 7 ++ pkg/dmsg/entity_common.go | 17 +++ pkg/dmsg/server.go | 138 ++++++++++++++++++++++++- pkg/dmsg/server_session.go | 63 ++++++++--- pkg/dmsg/session_common.go | 1 + pkg/dmsgserver/config.go | 7 ++ 6 files changed, 220 insertions(+), 13 deletions(-) diff --git a/cmd/dmsg-server/commands/start/root.go b/cmd/dmsg-server/commands/start/root.go index 605ddd80b..627af2eb6 100644 --- a/cmd/dmsg-server/commands/start/root.go +++ b/cmd/dmsg-server/commands/start/root.go @@ -97,10 +97,17 @@ var RootCmd = &cobra.Command{ srvAPI := dmsgserver.NewServerAPI(r, log, m) + // Convert peer config to dmsg.PeerEntry. + var peers []dmsg.PeerEntry + for _, p := range conf.Peers { + peers = append(peers, dmsg.PeerEntry{PK: p.PubKey, Addr: p.Address}) + } + srvConf := dmsg.ServerConfig{ MaxSessions: conf.MaxSessions, UpdateInterval: conf.UpdateInterval, AuthPassphrase: authPassphrase, + Peers: peers, } srv := dmsg.NewServer(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.Discovery, &http.Client{}, log), &srvConf, m) srv.SetLogger(log) diff --git a/pkg/dmsg/entity_common.go b/pkg/dmsg/entity_common.go index 1b700d3e2..a55f3ae8f 100644 --- a/pkg/dmsg/entity_common.go +++ b/pkg/dmsg/entity_common.go @@ -36,6 +36,10 @@ type EntityCommon struct { setSessionCallback func(ctx context.Context) error delSessionCallback func(ctx context.Context) error + + // peerSessionsFunc returns peer server sessions for mesh forwarding. + // Only set on Server entities; nil for clients. + peerSessionsFunc func() []*SessionCommon } func (c *EntityCommon) init(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, log logrus.FieldLogger, updateInterval time.Duration) { @@ -84,6 +88,19 @@ func (c *EntityCommon) serverSession(pk cipher.PubKey) (ServerSession, bool) { return ServerSession{SessionCommon: ses}, ok } +// peerServerSessions returns all peer server sessions for mesh forwarding. +func (c *EntityCommon) peerServerSessions() []ServerSession { + if c.peerSessionsFunc == nil { + return nil + } + raw := c.peerSessionsFunc() + sessions := make([]ServerSession, len(raw)) + for i, ses := range raw { + sessions[i] = ServerSession{SessionCommon: ses} + } + return sessions +} + // clientSession obtains a session as a client. func (c *EntityCommon) clientSession(porter *netutil.Porter, pk cipher.PubKey) (ClientSession, bool) { ses, ok := c.session(pk) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 8ed37950d..8981afb50 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -17,11 +17,18 @@ import ( "github.com/skycoin/dmsg/pkg/dmsg/metrics" ) +// PeerEntry represents a peer dmsg server to connect to. +type PeerEntry struct { + PK cipher.PubKey + Addr string +} + // ServerConfig configues the Server type ServerConfig struct { MaxSessions int UpdateInterval time.Duration AuthPassphrase string + Peers []PeerEntry } // DefaultServerConfig returns the default server config. @@ -53,6 +60,12 @@ type Server struct { maxSessions int authPassphrase string + + // Peer server mesh support. + peers []PeerEntry + peerPKs map[cipher.PubKey]struct{} // set of known peer server PKs + peerSessions map[cipher.PubKey]*SessionCommon + peerSessionsMx sync.Mutex } // NewServer creates a new dmsg server entity. @@ -83,6 +96,24 @@ func NewServer(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Serv return s.updateServerEntry(ctx, s.AdvertisedAddr(), s.maxSessions, conf.AuthPassphrase) } s.authPassphrase = conf.AuthPassphrase + + // Initialize peer mesh. + s.peers = conf.Peers + s.peerPKs = make(map[cipher.PubKey]struct{}, len(conf.Peers)) + for _, p := range conf.Peers { + s.peerPKs[p.PK] = struct{}{} + } + s.peerSessions = make(map[cipher.PubKey]*SessionCommon) + s.peerSessionsFunc = func() []*SessionCommon { + s.peerSessionsMx.Lock() + defer s.peerSessionsMx.Unlock() + sessions := make([]*SessionCommon, 0, len(s.peerSessions)) + for _, ses := range s.peerSessions { + sessions = append(sessions, ses) + } + return sessions + } + return s } @@ -142,6 +173,8 @@ func (s *Server) Serve(lis net.Listener, addr string) error { return err } + s.connectToPeers(ctx) + log.Info("Accepting sessions...") s.readyOnce.Do(func() { close(s.ready) }) for { @@ -211,6 +244,102 @@ func (s *Server) Ready() <-chan struct{} { return s.ready } +func (s *Server) connectToPeers(ctx context.Context) { + for _, peer := range s.peers { + s.wg.Add(1) + go func(peer PeerEntry) { + defer s.wg.Done() + s.maintainPeerConnection(ctx, peer) + }(peer) + } +} + +func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { + log := s.log.WithField("peer_pk", peer.PK).WithField("peer_addr", peer.Addr) + bo := 5 * time.Second + + for { + select { + case <-ctx.Done(): + return + case <-s.done: + return + default: + } + + log.Info("Dialing peer server...") + conn, err := net.DialTimeout("tcp", peer.Addr, 10*time.Second) + if err != nil { + log.WithError(err).Warn("Failed to dial peer server.") + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + if bo < time.Minute { + bo = time.Duration(float64(bo) * 1.5) + } + continue + } + + ses := new(SessionCommon) + if err := ses.initClient(&s.EntityCommon, conn, peer.PK); err != nil { + log.WithError(err).Warn("Peer noise handshake failed.") + conn.Close() //nolint:errcheck,gosec + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + continue + } + + ses.sm.mutx.Lock() + ses.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + if err != nil { + ses.sm.mutx.Unlock() + log.WithError(err).Warn("Peer yamux setup failed.") + conn.Close() //nolint:errcheck,gosec + select { + case <-time.After(bo): + case <-ctx.Done(): + return + } + continue + } + ses.sm.addr = ses.sm.yamux.RemoteAddr() + ses.isPeer = true + ses.sm.mutx.Unlock() + + s.peerSessionsMx.Lock() + s.peerSessions[peer.PK] = ses + s.peerSessionsMx.Unlock() + + log.Info("Connected to peer server.") + bo = 5 * time.Second // reset backoff on success + + // Block until the yamux session closes or context is done. + select { + case <-ctx.Done(): + case <-s.done: + } + + // Clean up. + s.peerSessionsMx.Lock() + delete(s.peerSessions, peer.PK) + s.peerSessionsMx.Unlock() + ses.Close() //nolint:errcheck,gosec + + log.Info("Peer session closed, will reconnect.") + } +} + +// isPeerPK returns true if the given PK is a known peer server. +func (s *Server) isPeerPK(pk cipher.PubKey) bool { + _, ok := s.peerPKs[pk] + return ok +} + func (s *Server) handleSession(conn net.Conn) { defer func() { if r := recover(); r != nil { @@ -234,7 +363,14 @@ func (s *Server) handleSession(conn net.Conn) { return } log = log.WithField("remote_pk", dSes.RemotePK()) - log.Info("Started session.") + + // Mark session as peer if remote PK is a known peer server. + if s.isPeerPK(dSes.RemotePK()) { + dSes.isPeer = true + log.Info("Started peer server session.") + } else { + log.Info("Started session.") + } ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 800e8cdfe..285cc10c8 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -159,7 +159,9 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl if err := req.Verify(0); err != nil { return StreamRequest{}, err } - if req.SrcAddr.PK != ss.rPK { + // For peer sessions, the SrcAddr.PK is the original client, not the + // peer server. The request signature is still verified above. + if !ss.isPeer && req.SrcAddr.PK != ss.rPK { return StreamRequest{}, ErrReqInvalidSrcPK } return req, nil @@ -205,25 +207,34 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl return nil } - // Obtain next session. + // Obtain next session (local client). ss2, ok := ss.entity.serverSession(req.DstAddr.PK) if !ok { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream - return ErrReqNoNextSession + // Destination not connected locally. If this request came from a + // client (not a peer), try forwarding through peer servers (1-hop). + // If this request already came from a peer, do NOT forward further. + if ss.isPeer { + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession + } + return ss.forwardViaPeer(log, yStr, req) } log.Debug("Obtained next session.") - // Forward request and obtain/check response. - yStr2, resp, err := ss2.forwardRequest(req) + return ss.bridgeStream(log, yStr, ss2, req) +} + +// bridgeStream forwards a request to a destination session and bridges the two streams. +func (ss *ServerSession) bridgeStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, dst ServerSession, req StreamRequest) error { + yStr2, resp, err := dst.forwardRequest(req) if err != nil { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream + ss.m.RecordStream(metrics.DeltaFailed) return err } log.Debug("Forwarded stream request.") - // Forward response. if err := ss.writeObject(yStr, resp); err != nil { - ss.m.RecordStream(metrics.DeltaFailed) // record failed stream + ss.m.RecordStream(metrics.DeltaFailed) return err } log.Debug("Forwarded stream response.") @@ -233,13 +244,41 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec } - // Serve stream. log.Info("Serving stream.") - ss.m.RecordStream(metrics.DeltaConnect) // record successful stream - defer ss.m.RecordStream(metrics.DeltaDisconnect) // record disconnection + ss.m.RecordStream(metrics.DeltaConnect) + defer ss.m.RecordStream(metrics.DeltaDisconnect) return netutil.CopyReadWriteCloser(yStr, yStr2) } +// forwardViaPeer tries to forward a stream request through peer server sessions. +// This is only called for client-originated requests (not peer-originated, enforcing 1-hop max). +func (ss *ServerSession) forwardViaPeer(log logrus.FieldLogger, yStr io.ReadWriteCloser, req StreamRequest) error { + peers := ss.entity.peerServerSessions() + if len(peers) == 0 { + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession + } + + for _, peer := range peers { + // Don't forward back to the session the request came from. + if peer.RemotePK() == ss.rPK { + continue + } + + log := log.WithField("peer", peer.RemotePK()) + log.Debug("Trying peer server for forwarding.") + + err := ss.bridgeStream(log, yStr, peer, req) + if err == nil { + return nil + } + log.WithError(err).Debug("Peer forward failed, trying next.") + } + + ss.m.RecordStream(metrics.DeltaFailed) + return ErrReqNoNextSession +} + func addrToIP(addr net.Addr) (net.IP, error) { switch a := addr.(type) { case *net.TCPAddr: diff --git a/pkg/dmsg/session_common.go b/pkg/dmsg/session_common.go index fd2c59ade..708a54fc3 100644 --- a/pkg/dmsg/session_common.go +++ b/pkg/dmsg/session_common.go @@ -23,6 +23,7 @@ import ( type SessionCommon struct { entity *EntityCommon // back reference rPK cipher.PubKey // remote pk + isPeer bool // true if this session is with a peer server netConn net.Conn // underlying net.Conn (TCP connection to the dmsg server) // ys *yamux.Session diff --git a/pkg/dmsgserver/config.go b/pkg/dmsgserver/config.go index e6daff4bf..7c72e8456 100644 --- a/pkg/dmsgserver/config.go +++ b/pkg/dmsgserver/config.go @@ -26,6 +26,12 @@ var defaultDiscoveryURL = dmsg.DiscAddr(false) // DefaultDiscoverURLTest default URL for discovery in test env var DefaultDiscoverURLTest = dmsg.DiscAddr(true) +// PeerConfig represents a peer dmsg server to connect to for server-to-server mesh. +type PeerConfig struct { + PubKey cipher.PubKey `json:"public_key"` + Address string `json:"address"` +} + // Config is structure of config file type Config struct { Path string `json:"-"` @@ -39,6 +45,7 @@ type Config struct { LogLevel string `json:"log_level"` UpdateInterval time.Duration `json:"update_interval"` MaxSessions int `json:"max_sessions"` + Peers []PeerConfig `json:"peers,omitempty"` } // GenerateDefaultConfig generate default config for dmsg-server From aaad73cef0d20ab2508d2dd2a75722381694fd1c Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:09:10 -0500 Subject: [PATCH 02/11] Auto-discover peer servers from discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Servers now automatically discover and peer with all other servers registered in dmsg discovery, in addition to statically configured peers. A background loop queries AllServers periodically and establishes peer connections to any new servers found. Static config peers take priority and are always connected. Discovery- based peers are additive — they're discovered and connected without requiring any config changes. This means in the current deployment, all dmsg servers will automatically mesh with each other as long as they share the same dmsg discovery. --- pkg/dmsg/server.go | 73 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 8981afb50..4abd1ea28 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -245,6 +245,7 @@ func (s *Server) Ready() <-chan struct{} { } func (s *Server) connectToPeers(ctx context.Context) { + // Connect to statically configured peers. for _, peer := range s.peers { s.wg.Add(1) go func(peer PeerEntry) { @@ -252,6 +253,78 @@ func (s *Server) connectToPeers(ctx context.Context) { s.maintainPeerConnection(ctx, peer) }(peer) } + + // Periodically discover other servers from discovery and peer with them. + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.discoverAndConnectPeers(ctx) + }() +} + +// discoverAndConnectPeers periodically queries discovery for all servers +// and establishes peer connections to any that aren't already connected. +func (s *Server) discoverAndConnectPeers(ctx context.Context) { + // activePeers tracks goroutines managing discovered peer connections. + activePeers := make(map[cipher.PubKey]context.CancelFunc) + + // Initial delay to let the server register itself first. + select { + case <-time.After(10 * time.Second): + case <-ctx.Done(): + return + } + + ticker := time.NewTicker(s.updateInterval) + defer ticker.Stop() + + for { + entries, err := s.dc.AllServers(ctx) + if err != nil { + s.log.WithError(err).Debug("Failed to discover peer servers.") + } else { + for _, entry := range entries { + pk := entry.Static + // Skip self and already-connected peers. + if pk == s.pk { + continue + } + if _, ok := activePeers[pk]; ok { + continue + } + // Skip if already a static peer (handled by connectToPeers). + if _, ok := s.peerPKs[pk]; ok { + continue + } + if entry.Server == nil || entry.Server.Address == "" { + continue + } + + // Register as known peer so incoming sessions are marked isPeer. + s.peerPKs[pk] = struct{}{} + + peerCtx, peerCancel := context.WithCancel(ctx) //nolint:gosec + activePeers[pk] = peerCancel + + peer := PeerEntry{PK: pk, Addr: entry.Server.Address} + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.maintainPeerConnection(peerCtx, peer) + }() + } + } + + select { + case <-ticker.C: + case <-ctx.Done(): + // Cancel all discovered peer connections. + for _, cancel := range activePeers { + cancel() + } + return + } + } } func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { From 96ea68c92f0b13e5c7fd4fae99d2de661570313a Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:13:37 -0500 Subject: [PATCH 03/11] Add mesh fallback in DialStream and cross-server e2e test DialStream now falls back to trying all existing sessions when the target's delegated servers are unreachable. If the client's server is meshed with the target's server, the request is forwarded through the peer connection transparently. The e2e test verifies: two servers peered via static config, each with one isolated client (separate filtered discovery), cross-server dial succeeds with bidirectional 1KB data transfer through the mesh. --- pkg/dmsg/client_dial.go | 16 ++++ pkg/dmsgtest/mesh_test.go | 190 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 pkg/dmsgtest/mesh_test.go diff --git a/pkg/dmsg/client_dial.go b/pkg/dmsg/client_dial.go index 947b5c1e2..23a7ae4ba 100644 --- a/pkg/dmsg/client_dial.go +++ b/pkg/dmsg/client_dial.go @@ -63,6 +63,22 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { return stream, nil } + // Fallback: try all existing sessions. If servers are meshed, our server + // can forward the request to the destination's server via peer connections. + for _, ses := range ce.allClientSessions(ce.porter) { + // Skip servers we already tried above. + if hasPK(entry.Client.DelegatedServers, ses.RemotePK()) { + continue + } + stream, err := ses.DialStream(addr) + if err != nil { + ce.log.WithError(err).WithField("server", ses.RemotePK()). + Debug("DialStream failed via mesh fallback, trying next server") + continue + } + return stream, nil + } + return nil, ErrCannotConnectToDelegated } diff --git a/pkg/dmsgtest/mesh_test.go b/pkg/dmsgtest/mesh_test.go new file mode 100644 index 000000000..95a1b5c42 --- /dev/null +++ b/pkg/dmsgtest/mesh_test.go @@ -0,0 +1,190 @@ +// Package dmsgtest pkg/dmsgtest/mesh_test.go +// +//nolint:errcheck,gosec +package dmsgtest + +import ( + "bytes" + "context" + "io" + "sync" + "testing" + "time" + + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/dmsg/pkg/disc" + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +// TestServerMesh_CrossServerDial verifies that a client connected to server A +// can dial a client connected to server B when the two servers are peered. +// Each client uses a separate filtered discovery so it only sees one server. +func TestServerMesh_CrossServerDial(t *testing.T) { + const timeout = 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Shared discovery for servers to register in. + sharedDC := disc.NewMock(0) + + // --- Server A --- + pkA, skA := cipher.GenerateKeyPair() + lisA, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + // --- Server B --- + pkB, skB := cipher.GenerateKeyPair() + lisB, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + // Configure servers as static peers of each other. + confA := &dmsg.ServerConfig{ + MaxSessions: 10, + UpdateInterval: dmsg.DefaultUpdateInterval, + Peers: []dmsg.PeerEntry{ + {PK: pkB, Addr: lisB.Addr().String()}, + }, + } + confB := &dmsg.ServerConfig{ + MaxSessions: 10, + UpdateInterval: dmsg.DefaultUpdateInterval, + Peers: []dmsg.PeerEntry{ + {PK: pkA, Addr: lisA.Addr().String()}, + }, + } + + srvA := dmsg.NewServer(pkA, skA, sharedDC, confA, nil) + srvB := dmsg.NewServer(pkB, skB, sharedDC, confB, nil) + + // Start servers. + var srvWg sync.WaitGroup + srvWg.Add(2) + go func() { + defer srvWg.Done() + if err := srvA.Serve(lisA, ""); err != nil { + t.Logf("Server A stopped: %v", err) + } + }() + go func() { + defer srvWg.Done() + if err := srvB.Serve(lisB, ""); err != nil { + t.Logf("Server B stopped: %v", err) + } + }() + + select { + case <-srvA.Ready(): + case <-ctx.Done(): + t.Fatal("Server A not ready") + } + select { + case <-srvB.Ready(): + case <-ctx.Done(): + t.Fatal("Server B not ready") + } + + // Give peer connections time to establish. + time.Sleep(2 * time.Second) + + // --- Client 1: only sees Server A --- + dcA := disc.NewMock(0) + entryA, err := sharedDC.Entry(ctx, pkA) + require.NoError(t, err) + require.NoError(t, dcA.PostEntry(ctx, entryA)) + + pk1, sk1 := cipher.GenerateKeyPair() + client1 := dmsg.NewClient(pk1, sk1, dcA, &dmsg.Config{MinSessions: 1}) + go client1.Serve(ctx) + select { + case <-client1.Ready(): + case <-ctx.Done(): + t.Fatal("Client 1 not ready") + } + + // Register client1 in shared discovery so server B can look it up if needed. + entry1, _ := dcA.Entry(ctx, pk1) + sharedDC.PostEntry(ctx, entry1) + + // --- Client 2: only sees Server B --- + dcB := disc.NewMock(0) + entryB, err := sharedDC.Entry(ctx, pkB) + require.NoError(t, err) + require.NoError(t, dcB.PostEntry(ctx, entryB)) + + pk2, sk2 := cipher.GenerateKeyPair() + client2 := dmsg.NewClient(pk2, sk2, dcB, &dmsg.Config{MinSessions: 1}) + go client2.Serve(ctx) + select { + case <-client2.Ready(): + case <-ctx.Done(): + t.Fatal("Client 2 not ready") + } + + // Register client2 in shared discovery so server A can route to it. + entry2, _ := dcB.Entry(ctx, pk2) + sharedDC.PostEntry(ctx, entry2) + + // Also register client2's entry in client1's discovery so client1 can + // look up client2's delegated servers for dialing. + dcA.PostEntry(ctx, entry2) + + // --- Cross-server dial: Client 1 (on A) dials Client 2 (on B) --- + const port = uint16(200) + + lis2, err := client2.Listen(port) + require.NoError(t, err) + defer lis2.Close() + + stream1, err := client1.DialStream(ctx, dmsg.Addr{PK: pk2, Port: port}) + require.NoError(t, err, "Cross-server dial should succeed via peer mesh") + defer stream1.Close() + + conn2, err := lis2.AcceptStream() + require.NoError(t, err) + defer conn2.Close() + + // --- Verify bidirectional data transfer --- + payload := cipher.RandByte(1024) + + // Client 1 -> Client 2 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, wErr := stream1.Write(payload) + assert.NoError(t, wErr) + }() + + recv := make([]byte, len(payload)) + _, err = io.ReadFull(conn2, recv) + require.NoError(t, err) + wg.Wait() + require.True(t, bytes.Equal(payload, recv), "data mismatch: client1 -> client2") + + // Client 2 -> Client 1 + wg.Add(1) + go func() { + defer wg.Done() + _, wErr := conn2.Write(payload) + assert.NoError(t, wErr) + }() + + recv2 := make([]byte, len(payload)) + _, err = io.ReadFull(stream1, recv2) + require.NoError(t, err) + wg.Wait() + require.True(t, bytes.Equal(payload, recv2), "data mismatch: client2 -> client1") + + t.Log("Cross-server bidirectional stream test passed.") + + // Cleanup. + client1.Close() + client2.Close() + srvA.Close() + srvB.Close() + srvWg.Wait() +} From 167dc28fa4b1b4f939da9172792780d2451eead5 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:22:38 -0500 Subject: [PATCH 04/11] Prefer existing sessions over new connections in DialStream Reorder DialStream to try mesh forwarding through existing sessions before attempting to establish new server connections. The new order: 1. Existing sessions matching target's delegated servers (direct, free) 2. All other existing sessions via mesh (free, already connected) 3. New sessions to delegated servers (expensive, last resort) This avoids unnecessary TCP+noise+yamux handshakes when the client is already connected to meshed servers that can forward the request. --- pkg/dmsg/client_dial.go | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/pkg/dmsg/client_dial.go b/pkg/dmsg/client_dial.go index 23a7ae4ba..cf8fa6082 100644 --- a/pkg/dmsg/client_dial.go +++ b/pkg/dmsg/client_dial.go @@ -33,8 +33,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { return nil, err } - // Range client's delegated servers. - // Try existing sessions first, falling back to next server on failure. + // 1. Try existing sessions to the target's delegated servers (direct path, cheapest). for _, srvPK := range entry.Client.DelegatedServers { if dSes, ok := ce.clientSession(ce.porter, srvPK); ok { stream, err := dSes.DialStream(addr) @@ -47,33 +46,31 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) { } } - // Range client's delegated servers. - // Attempt to connect to a delegated server. - for _, srvPK := range entry.Client.DelegatedServers { - dSes, err := ce.EnsureAndObtainSession(ctx, srvPK) - if err != nil { - continue + // 2. Try all other existing sessions (mesh path — already connected, no new handshake). + // If servers are meshed, our server forwards the request to the target's server. + for _, ses := range ce.allClientSessions(ce.porter) { + if hasPK(entry.Client.DelegatedServers, ses.RemotePK()) { + continue // already tried above } - stream, err := dSes.DialStream(addr) + stream, err := ses.DialStream(addr) if err != nil { - ce.log.WithError(err).WithField("server", srvPK). - Debug("DialStream failed via new session, trying next server") + ce.log.WithError(err).WithField("server", ses.RemotePK()). + Debug("DialStream failed via mesh, trying next server") continue } return stream, nil } - // Fallback: try all existing sessions. If servers are meshed, our server - // can forward the request to the destination's server via peer connections. - for _, ses := range ce.allClientSessions(ce.porter) { - // Skip servers we already tried above. - if hasPK(entry.Client.DelegatedServers, ses.RemotePK()) { + // 3. Last resort: establish new sessions to the target's delegated servers. + for _, srvPK := range entry.Client.DelegatedServers { + dSes, err := ce.EnsureAndObtainSession(ctx, srvPK) + if err != nil { continue } - stream, err := ses.DialStream(addr) + stream, err := dSes.DialStream(addr) if err != nil { - ce.log.WithError(err).WithField("server", ses.RemotePK()). - Debug("DialStream failed via mesh fallback, trying next server") + ce.log.WithError(err).WithField("server", srvPK). + Debug("DialStream failed via new session, trying next server") continue } return stream, nil From 3bc6033718101deb8d0766f6e3c1088a14b60de8 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:30:21 -0500 Subject: [PATCH 05/11] Fix session handshake timeout and DefaultMaxSessions inconsistency - Replace hardcoded 5s timeout in initClient/initServer with the HandshakeTimeout constant (20s). The 5s was too aggressive and inconsistent with the exported constant used elsewhere. - Change DefaultMaxSessions from 100 to 2048 to match the actual production default in dmsgserver config. - Use dmsg.DefaultMaxSessions in dmsgserver GenerateDefaultConfig instead of a hardcoded 2048, ensuring a single source of truth. --- pkg/dmsg/const.go | 2 +- pkg/dmsg/session_common.go | 4 ++-- pkg/dmsgserver/config.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/dmsg/const.go b/pkg/dmsg/const.go index 331818eb6..03362b355 100644 --- a/pkg/dmsg/const.go +++ b/pkg/dmsg/const.go @@ -21,7 +21,7 @@ const ( DefaultUpdateInterval = time.Minute - DefaultMaxSessions = 100 + DefaultMaxSessions = 2048 DefaultDmsgHTTPPort = uint16(80) diff --git a/pkg/dmsg/session_common.go b/pkg/dmsg/session_common.go index 708a54fc3..1c47a5069 100644 --- a/pkg/dmsg/session_common.go +++ b/pkg/dmsg/session_common.go @@ -76,7 +76,7 @@ func (sc *SessionCommon) initClient(entity *EntityCommon, conn net.Conn, rPK cip } rw := noise.NewReadWriter(conn, ns) - if err := rw.Handshake(time.Second * 5); err != nil { + if err := rw.Handshake(HandshakeTimeout); err != nil { return err } if rw.Buffered() > 0 { @@ -102,7 +102,7 @@ func (sc *SessionCommon) initServer(entity *EntityCommon, conn net.Conn) error { } rw := noise.NewReadWriter(conn, ns) - if err := rw.Handshake(time.Second * 5); err != nil { + if err := rw.Handshake(HandshakeTimeout); err != nil { return err } if rw.Buffered() > 0 { diff --git a/pkg/dmsgserver/config.go b/pkg/dmsgserver/config.go index 7c72e8456..6869f8776 100644 --- a/pkg/dmsgserver/config.go +++ b/pkg/dmsgserver/config.go @@ -60,7 +60,7 @@ func GenerateDefaultConfig(c *Config) { c.LocalAddress = defaultLocalAddress c.HTTPAddress = defaultHTTPAddress c.LogLevel = "info" - c.MaxSessions = 2048 + c.MaxSessions = dmsg.DefaultMaxSessions } // Flush trying to save config file From 90f3fbe9de3e994c02e1ed3c4fee97161587b883 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:36:48 -0500 Subject: [PATCH 06/11] Update vendor dependencies bytedance/sonic/loader v0.5.0 -> v0.5.1 gin-contrib/sse v1.1.0 -> v1.1.1 --- go.mod | 4 +- go.sum | 8 +- .../bytedance/sonic/loader/loader_latest.go | 197 +++++++++++------- vendor/github.com/gin-contrib/sse/README.md | 1 + vendor/modules.txt | 4 +- 5 files changed, 134 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index e0a71614f..a81da9a60 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/bytedance/gopkg v0.1.4 // indirect github.com/bytedance/sonic v1.15.0 // indirect - github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/bytedance/sonic/loader v0.5.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/containerd/errdefs v1.0.0 // indirect @@ -53,7 +53,7 @@ require ( github.com/fatih/color v1.19.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.13 // indirect - github.com/gin-contrib/sse v1.1.0 // indirect + github.com/gin-contrib/sse v1.1.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect diff --git a/go.sum b/go.sum index b8b28e230..b6c7ad951 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/bytedance/gopkg v0.1.4 h1:oZnQwnX82KAIWb7033bEwtxvTqXcYMxDBaQxo5JJHWM github.com/bytedance/gopkg v0.1.4/go.mod h1:v1zWfPm21Fb+OsyXN2VAHdL6TBb2L88anLQgdyje6R4= github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= -github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= -github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= +github.com/bytedance/sonic/loader v0.5.1 h1:Ygpfa9zwRCCKSlrp5bBP/b/Xzc3VxsAW+5NIYXrOOpI= +github.com/bytedance/sonic/loader v0.5.1/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= @@ -61,8 +61,8 @@ github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gabriel-vasile/mimetype v1.4.13 h1:46nXokslUBsAJE/wMsp5gtO500a4F3Nkz9Ufpk2AcUM= github.com/gabriel-vasile/mimetype v1.4.13/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= -github.com/gin-contrib/sse v1.1.0 h1:n0w2GMuUpWDVp7qSpvze6fAu9iRxJY4Hmj6AmBOU05w= -github.com/gin-contrib/sse v1.1.0/go.mod h1:hxRZ5gVpWMT7Z0B0gSNYqqsSCNIJMjzvm6fqCz9vjwM= +github.com/gin-contrib/sse v1.1.1 h1:uGYpNwTacv5R68bSGMapo62iLTRa9l5zxGCps4hK6ko= +github.com/gin-contrib/sse v1.1.1/go.mod h1:QXzuVkA0YO7o/gun03UI1Q+FTI8ZV/n5t03kIQAI89s= github.com/gin-gonic/gin v1.12.0 h1:b3YAbrZtnf8N//yjKeU2+MQsh2mY5htkZidOM7O0wG8= github.com/gin-gonic/gin v1.12.0/go.mod h1:VxccKfsSllpKshkBWgVgRniFFAzFb9csfngsqANjnLc= github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= diff --git a/vendor/github.com/bytedance/sonic/loader/loader_latest.go b/vendor/github.com/bytedance/sonic/loader/loader_latest.go index 22de274b5..4b55a0dc6 100644 --- a/vendor/github.com/bytedance/sonic/loader/loader_latest.go +++ b/vendor/github.com/bytedance/sonic/loader/loader_latest.go @@ -1,4 +1,3 @@ - /* * Copyright 2021 ByteDance Inc. * @@ -18,92 +17,146 @@ package loader import ( - `github.com/bytedance/sonic/loader/internal/rt` + "strconv" + "sync/atomic" + + "github.com/bytedance/sonic/loader/internal/rt" ) +var loadBatchSeq uint64 + +type LoadOneItem struct { + Text []byte + FuncName string + FrameSize int + ArgSize int + ArgPtrs []bool + LocalPtrs []bool + Pcdata Pcdata +} + +func buildLoadFunc(noPreempt bool, item LoadOneItem, textSize uint32, entryOff uint32) Func { + fn := Func{ + Name: item.FuncName, + TextSize: textSize, + ArgsSize: int32(item.ArgSize), + EntryOff: entryOff, + } + + fn.Pcsp = &item.Pcdata + + if noPreempt { + fn.PcUnsafePoint = &Pcdata{ + {PC: textSize, Val: PCDATA_UnsafePointUnsafe}, + } + } else { + fn.PcUnsafePoint = &Pcdata{ + {PC: textSize, Val: PCDATA_UnsafePointSafe}, + } + } + + // NOTICE: suppose the function has only one stack map at index 0 + fn.PcStackMapIndex = &Pcdata{ + {PC: textSize, Val: 0}, + } + + if item.ArgPtrs != nil { + args := rt.StackMapBuilder{} + for _, b := range item.ArgPtrs { + args.AddField(b) + } + fn.ArgsPointerMaps = args.Build() + } + + if item.LocalPtrs != nil { + locals := rt.StackMapBuilder{} + for _, b := range item.LocalPtrs { + locals.AddField(b) + } + fn.LocalsPointerMaps = locals.Build() + } + + return fn +} + // LoadFuncs loads only one function as module, and returns the function pointer // - text: machine code // - funcName: function name -// - frameSize: stack frame size. +// - frameSize: stack frame size. // - argSize: argument total size (in bytes) // - argPtrs: indicates if a slot (8 Bytes) of arguments memory stores pointer, from low to high // - localPtrs: indicates if a slot (8 Bytes) of local variants memory stores pointer, from low to high -// -// WARN: +// +// WARN: // - the function MUST has fixed SP offset equaling to this, otherwise it go.gentraceback will fail // - the function MUST has only one stack map for all arguments and local variants func (self Loader) LoadOne(text []byte, funcName string, frameSize int, argSize int, argPtrs []bool, localPtrs []bool, pcdata Pcdata) Function { - size := uint32(len(text)) - - fn := Func{ - Name: funcName, - TextSize: size, - ArgsSize: int32(argSize), - } - - - fn.Pcsp = &pcdata - - if self.NoPreempt { - fn.PcUnsafePoint = &Pcdata{ - {PC: size, Val: PCDATA_UnsafePointUnsafe}, - } - } else { - fn.PcUnsafePoint = &Pcdata{ - {PC: size, Val: PCDATA_UnsafePointSafe}, - } - } - - // NOTICE: suppose the function has only one stack map at index 0 - fn.PcStackMapIndex = &Pcdata{ - {PC: size, Val: 0}, - } - - if argPtrs != nil { - args := rt.StackMapBuilder{} - for _, b := range argPtrs { - args.AddField(b) - } - fn.ArgsPointerMaps = args.Build() - } - - if localPtrs != nil { - locals := rt.StackMapBuilder{} - for _, b := range localPtrs { - locals.AddField(b) - } - fn.LocalsPointerMaps = locals.Build() - } - - out := Load(text, []Func{fn}, self.Name + funcName, []string{self.File}) - return out[0] + _ = frameSize + + fn := buildLoadFunc(self.NoPreempt, LoadOneItem{ + Text: text, + FuncName: funcName, + FrameSize: frameSize, + ArgSize: argSize, + ArgPtrs: argPtrs, + LocalPtrs: localPtrs, + Pcdata: pcdata, + }, uint32(len(text)), 0) + + out := Load(text, []Func{fn}, self.Name+funcName, []string{self.File}) + return out[0] +} + +func (self Loader) LoadMany(items []LoadOneItem) (out []Function) { + if len(items) == 0 { + return nil + } + + total := 0 + funcs := make([]Func, 0, len(items)) + text := make([]byte, 0) + + for _, item := range items { + _ = item.FrameSize + size := uint32(len(item.Text)) + funcs = append(funcs, buildLoadFunc(self.NoPreempt, item, size, uint32(total))) + total += len(item.Text) + } + + text = make([]byte, 0, total) + for _, item := range items { + text = append(text, item.Text...) + } + + moduleName := self.Name + "batch." + strconv.FormatUint(atomic.AddUint64(&loadBatchSeq, 1), 10) + return Load(text, funcs, moduleName, []string{self.File}) } // Load loads given machine codes and corresponding function information into go moduledata // and returns runnable function pointer // WARN: this API is experimental, use it carefully func Load(text []byte, funcs []Func, modulename string, filenames []string) (out []Function) { - ids := make([]string, len(funcs)) - for i, f := range funcs { - ids[i] = f.Name - } - // generate module data and allocate memory address - mod := makeModuledata(modulename, filenames, &funcs, text) - - // verify and register the new module - moduledataverify1(mod) - registerModule(mod) - - // - // encapsulate function address - out = make([]Function, len(funcs)) - for i, s := range ids { - for _, f := range funcs { - if f.Name == s { - m := uintptr(mod.text + uintptr(f.EntryOff)) - out[i] = Function(&m) - } - } - } - return + ids := make([]string, len(funcs)) + for i, f := range funcs { + ids[i] = f.Name + } + // generate module data and allocate memory address + mod := makeModuledata(modulename, filenames, &funcs, text) + + // verify and register the new module + moduledataverify1(mod) + registerModule(mod) + + // + // encapsulate function address + out = make([]Function, len(funcs)) + for i, s := range ids { + for _, f := range funcs { + if f.Name == s { + m := uintptr(mod.text + uintptr(f.EntryOff)) + out[i] = Function(&m) + } + } + } + return } diff --git a/vendor/github.com/gin-contrib/sse/README.md b/vendor/github.com/gin-contrib/sse/README.md index cfe2c820b..a63ac2037 100644 --- a/vendor/github.com/gin-contrib/sse/README.md +++ b/vendor/github.com/gin-contrib/sse/README.md @@ -2,6 +2,7 @@ [![Go Reference](https://pkg.go.dev/badge/github.com/gin-contrib/sse.svg)](https://pkg.go.dev/github.com/gin-contrib/sse) [![Run Tests](https://github.com/gin-contrib/sse/actions/workflows/go.yml/badge.svg)](https://github.com/gin-contrib/sse/actions/workflows/go.yml) +[![Trivy Security Scan](https://github.com/gin-contrib/sse/actions/workflows/trivy-scan.yml/badge.svg)](https://github.com/gin-contrib/sse/actions/workflows/trivy-scan.yml) [![codecov](https://codecov.io/gh/gin-contrib/sse/branch/master/graph/badge.svg)](https://codecov.io/gh/gin-contrib/sse) [![Go Report Card](https://goreportcard.com/badge/github.com/gin-contrib/sse)](https://goreportcard.com/report/github.com/gin-contrib/sse) diff --git a/vendor/modules.txt b/vendor/modules.txt index 99577ad97..22092302d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -56,7 +56,7 @@ github.com/bytedance/sonic/internal/utils github.com/bytedance/sonic/option github.com/bytedance/sonic/unquote github.com/bytedance/sonic/utf8 -# github.com/bytedance/sonic/loader v0.5.0 +# github.com/bytedance/sonic/loader v0.5.1 ## explicit; go 1.16 github.com/bytedance/sonic/loader github.com/bytedance/sonic/loader/internal/abi @@ -155,7 +155,7 @@ github.com/gabriel-vasile/mimetype/internal/json github.com/gabriel-vasile/mimetype/internal/magic github.com/gabriel-vasile/mimetype/internal/markup github.com/gabriel-vasile/mimetype/internal/scan -# github.com/gin-contrib/sse v1.1.0 +# github.com/gin-contrib/sse v1.1.1 ## explicit; go 1.23 github.com/gin-contrib/sse # github.com/gin-gonic/gin v1.12.0 From fcc93751a1a5343b80f5ece9a15bc0c742e516fc Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:40:30 -0500 Subject: [PATCH 07/11] Add useful Makefile targets from skywire Add targets ported from skywire's Makefile: - update-dep: go get -u, tidy, vendor, auto-commit - update-skywire: update skywire dep to latest develop - update-skycoin: update skycoin dep to latest develop - push-deps: commit and push vendor changes - sync-upstream-develop: sync fork's develop with upstream - tidy: standalone go mod tidy - format now depends on tidy (like skywire) - dep now depends on tidy --- Makefile | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 91c9ca9cc..cb56a0893 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,8 @@ else SHELL := /bin/bash endif -.PHONY : check lint install-linters dep test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build +.PHONY : check lint install-linters dep tidy test test-e2e test-e2e-build test-e2e-run test-e2e-test test-e2e-stop test-e2e-clean build +.PHONY : update-dep update-skywire update-skycoin push-deps sync-upstream-develop VERSION := $(shell git describe --always) @@ -101,7 +102,7 @@ install-linters-windows: ## Install linters on windows ${OPTS} go install golang.org/x/tools/cmd/goimports@latest ${OPTS} go install github.com/incu6us/goimports-reviser@latest -format: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters). +format: tidy ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters). ${OPTS} goimports -w -local ${DMSG_REPO} ./pkg ./cmd ./internal ./examples find . -type f -name '*.go' -not -path "./.git/*" -not -path "./vendor/*" -exec goimports-reviser -project-name ${DMSG_REPO} {} \; @@ -109,9 +110,78 @@ format: ## Formats the code. Must have goimports and goimports-reviser installed format-windows: ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters-windows). powershell -Command .\scripts\format-windows.ps1 -dep: ## Sorts dependencies +tidy: ## Tidies dependencies + ${OPTS} go mod tidy -v + +dep: tidy ## Sorts and vendors dependencies + ${OPTS} go mod vendor -v + +update-dep: ## Update all dependencies to latest versions, vendor, and commit + ${OPTS} go get -v -u ./... + ${OPTS} go mod tidy -v ${OPTS} go mod vendor -v + git add go.mod go.sum vendor + git diff --cached --quiet || git commit -m "update deps" + +update-skywire: ## Update skywire to latest develop branch + @echo "Updating skywire to latest develop..." + ${OPTS} go get -v github.com/skycoin/skywire@develop ${OPTS} go mod tidy -v + ${OPTS} go mod vendor -v + @echo "skywire updated successfully" + +update-skycoin: ## Update skycoin to latest develop branch + @echo "Updating skycoin to latest develop..." + ${OPTS} go get -v github.com/skycoin/skycoin@develop + ${OPTS} go mod tidy -v + ${OPTS} go mod vendor -v + @echo "skycoin updated successfully" + +push-deps: ## Commit and push dependency updates + @echo "Committing dependency updates..." + git add go.mod go.sum vendor + git diff --cached --quiet || git commit -m "update deps" + git push + @echo "Dependencies pushed successfully" + +sync-upstream-develop: ## Sync local develop branch with upstream skycoin/dmsg develop + @normalize() { \ + echo "$$1" | sed \ + -e 's|git@github.com:|https://github.com/|' \ + -e 's|ssh://github.com/|https://github.com/|' \ + -e 's|\.git$$||' \ + -e 's|https://github.com/||' \ + | tr '[:upper:]' '[:lower:]'; \ + }; \ + UPSTREAM_URL=$$(git remote get-url upstream 2>/dev/null); \ + if [ -z "$$UPSTREAM_URL" ]; then \ + echo "[error] no 'upstream' remote found. Add it with:"; \ + echo " git remote add upstream https://github.com/skycoin/dmsg.git"; \ + exit 1; \ + fi; \ + UPSTREAM_NORM=$$(normalize "$$UPSTREAM_URL"); \ + if [ "$$UPSTREAM_NORM" != "skycoin/dmsg" ]; then \ + echo "[error] upstream remote does not point to skycoin/dmsg."; \ + echo " Found: $$UPSTREAM_URL"; \ + exit 1; \ + fi; \ + ORIGIN_URL=$$(git remote get-url origin 2>/dev/null); \ + if [ -z "$$ORIGIN_URL" ]; then \ + echo "[error] no 'origin' remote found."; \ + exit 1; \ + fi; \ + ORIGIN_NORM=$$(normalize "$$ORIGIN_URL"); \ + if [ "$$ORIGIN_NORM" = "skycoin/dmsg" ]; then \ + echo "[error] origin points to skycoin/dmsg directly."; \ + echo " This target must be run from a fork, not the canonical repo."; \ + exit 1; \ + fi; \ + echo "[ok] origin is a fork ($$ORIGIN_NORM), upstream is skycoin/dmsg — syncing develop..."; \ + git checkout develop && \ + git pull && \ + git fetch upstream && \ + git merge upstream/develop && \ + git push install: ## Install `dmsg-discovery`, `dmsg-server`, `dmsgcurl`,`dmsgpty-cli`, `dmsgpty-host`, `dmsgpty-ui` ${OPTS} go install ${BUILD_OPTS} ./cmd/* From b142592a24dedb37171a577ffda4f7ebb9d9bc0c Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:54:50 -0500 Subject: [PATCH 08/11] Fix TODO audit: whitelist, waitgroup, kill workaround, stale comments - Implement SOCKS5 whitelist enforcement: connections from PKs not in the --wl list are now rejected (was a no-op despite accepting the flag) - Add waitgroup to Client for clean goroutine shutdown on Close() - Remove kill.go force-exit workaround: all commands now use cmdutil.SignalContext for proper signal handling - Document why timestamp tracking passes 0: concurrent streams from the same client can arrive out of order, and noise nonce tracking already prevents replay at the session level - Remove resolved TODO on pty_client.go error choice --- cmd/dmsg-socks5/commands/dmsg-socks5.go | 24 +++++++++++++++++++++- cmd/dmsg/commands/kill.go | 27 ------------------------- pkg/dmsg/client.go | 5 +++-- pkg/dmsg/client_sessions.go | 2 ++ pkg/dmsg/server_session.go | 6 +++++- pkg/dmsgpty/pty_client.go | 2 +- 6 files changed, 34 insertions(+), 32 deletions(-) delete mode 100644 cmd/dmsg/commands/kill.go diff --git a/cmd/dmsg-socks5/commands/dmsg-socks5.go b/cmd/dmsg-socks5/commands/dmsg-socks5.go index 0ced2af61..b8ee17629 100644 --- a/cmd/dmsg-socks5/commands/dmsg-socks5.go +++ b/cmd/dmsg-socks5/commands/dmsg-socks5.go @@ -4,6 +4,7 @@ package commands import ( "context" "fmt" + "net" "net/http" "os" "strings" @@ -138,7 +139,6 @@ var serveCmd = &cobra.Command{ ctx, cancel := cmdutil.SignalContext(context.Background(), dlog) defer cancel() - //TODO: implement whitelist logic dmsgC, closeDmsg, err := dmsgclient.InitDmsgWithFlags(ctx, dlog, pk, sk, httpClient, pk.String()) @@ -182,6 +182,28 @@ var serveCmd = &cobra.Command{ } dlog.Infof("Accepted connection from: %s", respConn.RemoteAddr()) + // Enforce whitelist: extract remote PK from the dmsg address. + if len(wlkeys) > 0 { + remotePK, _, splitErr := net.SplitHostPort(respConn.RemoteAddr().String()) + if splitErr != nil { + dlog.WithError(splitErr).Warn("Failed to parse remote address, rejecting connection.") + respConn.Close() //nolint:errcheck,gosec + continue + } + allowed := false + for _, key := range wlkeys { + if remotePK == key.String() { + allowed = true + break + } + } + if !allowed { + dlog.WithField("remote_pk", remotePK).Warn("Connection rejected: not in whitelist.") + respConn.Close() //nolint:errcheck,gosec + continue + } + } + conf := &socks5.Config{} server, err := socks5.New(conf) if err != nil { diff --git a/cmd/dmsg/commands/kill.go b/cmd/dmsg/commands/kill.go deleted file mode 100644 index 7ee118cb0..000000000 --- a/cmd/dmsg/commands/kill.go +++ /dev/null @@ -1,27 +0,0 @@ -// Package commands cmd/dmsg/commands/kill.go -package commands - -import ( - "os" - "os/signal" - "syscall" -) - -func init() { - // TEMPORARY WORKAROUND: Force exit on Ctrl+C after 3 attempts - // This can be removed once the proper signal handling fixes are verified: - // - dmsgC.Serve() now uses signal-aware context (not context.Background()) - // - Accept loops now check for context cancellation - // - HTTP servers now shutdown gracefully - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - sigCount := 0 - for range c { - sigCount++ - if sigCount >= 3 { - os.Exit(1) - } - } - }() -} diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index ec4e53471..5e5a0f44d 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -82,6 +82,7 @@ type Client struct { errCh chan error done chan struct{} once sync.Once + wg sync.WaitGroup // tracks background goroutines for clean shutdown sesMx sync.Mutex } @@ -330,8 +331,7 @@ func (ce *Client) discoverServers(ctx context.Context, all bool) (entries []*dis return entries, err } -// Close closes the dmsg client entity. -// TODO(evanlinjin): Have waitgroup. +// Close closes the dmsg client entity and waits for background goroutines to finish. func (ce *Client) Close() error { if ce == nil { return nil @@ -354,6 +354,7 @@ func (ce *Client) Close() error { ce.log.Debug("All sessions closed.") ce.sessionsMx.Unlock() ce.porter.CloseAll(ce.log) + ce.wg.Wait() err = ce.EntityCommon.delEntry(context.Background()) }) return err diff --git a/pkg/dmsg/client_sessions.go b/pkg/dmsg/client_sessions.go index de5534385..1c8ae3587 100644 --- a/pkg/dmsg/client_sessions.go +++ b/pkg/dmsg/client_sessions.go @@ -113,7 +113,9 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client return ClientSession{}, errors.New("session already exists") } + ce.wg.Add(1) go func() { + defer ce.wg.Done() defer func() { if r := recover(); r != nil { ce.log.Warnf("recovered panic in session serve goroutine: %v", r) diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 285cc10c8..b8ff15864 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -155,7 +155,11 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl if err != nil { return StreamRequest{}, err } - // TODO(evanlinjin): Implement timestamp tracker. + // Timestamp validation: we pass 0 because concurrent streams from + // the same client can have timestamps that arrive out of order at + // the server. Strict monotonic enforcement would reject valid + // concurrent requests. The noise encryption layer already prevents + // replay at the session level via nonce tracking. if err := req.Verify(0); err != nil { return StreamRequest{}, err } diff --git a/pkg/dmsgpty/pty_client.go b/pkg/dmsgpty/pty_client.go index 64da62197..4706c9d3d 100644 --- a/pkg/dmsgpty/pty_client.go +++ b/pkg/dmsgpty/pty_client.go @@ -98,7 +98,7 @@ func (sc *PtyClient) call(method string, args, reply interface{}) error { call := sc.rpcC.Go(sc.rpcMethod(method), args, reply, nil) select { case <-sc.done: - return io.ErrClosedPipe // TODO(evanlinjin): Is there a better error to use? + return io.ErrClosedPipe case <-call.Done: return call.Error } From 2e3cf35ed964a058dcaac160e9afce021e162f60 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 09:02:26 -0500 Subject: [PATCH 09/11] Trigger CI re-run From de5b556aceb0164c4f8959b794599b195b3d40e1 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 09:25:37 -0500 Subject: [PATCH 10/11] Improve test reliability for CI flaky tests - TestControl_Ping: use require.NoError for fail-fast, close controls in correct order (responder first) to avoid EOF race on pipe cleanup - TestHTTPTransport_RoundTrip: use graceful srv.Shutdown() instead of raw lis.Close() to let in-flight HTTP requests finish before closing, preventing race between handler goroutines and listener teardown --- pkg/dmsgctrl/control_test.go | 25 ++++++++++++++----------- pkg/dmsghttp/http_test.go | 26 +++++++++++++++----------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/pkg/dmsgctrl/control_test.go b/pkg/dmsgctrl/control_test.go index d86e4ffa6..016a9ff2d 100644 --- a/pkg/dmsgctrl/control_test.go +++ b/pkg/dmsgctrl/control_test.go @@ -19,21 +19,24 @@ func TestControl_Ping(t *testing.T) { ctrlA := ControlStream(connA) ctrlB := ControlStream(connB) - t.Cleanup(func() { - assert.NoError(t, ctrlA.Close()) - assert.NoError(t, ctrlB.Close()) - }) + defer func() { + // Close in order: B first (the responder side), then A. + // This avoids EOF races where A's close kills the pipe + // while B's serve goroutine is mid-read. + _ = ctrlB.Close() //nolint:errcheck + _ = ctrlA.Close() //nolint:errcheck + }() for i := 0; i < times; i++ { - // act + // Ping A → B (ctrlA sends ping, ctrlB's serve goroutine responds with pong). durA, errA := ctrlA.Ping(context.TODO()) - durB, errB := ctrlB.Ping(context.TODO()) - t.Log(durA) - t.Log(durB) + require.NoError(t, errA, "ping A failed on iteration %d", i) + t.Logf("A: %v", durA) - // assert - assert.NoError(t, errA) - assert.NoError(t, errB) + // Ping B → A. + durB, errB := ctrlB.Ping(context.TODO()) + require.NoError(t, errB, "ping B failed on iteration %d", i) + t.Logf("B: %v", durB) } } diff --git a/pkg/dmsghttp/http_test.go b/pkg/dmsghttp/http_test.go index c07e2a7ce..03795709b 100644 --- a/pkg/dmsghttp/http_test.go +++ b/pkg/dmsghttp/http_test.go @@ -3,6 +3,7 @@ package dmsghttp import ( "bytes" + "context" "io" "net" "net/http" @@ -12,8 +13,6 @@ import ( "github.com/go-chi/chi/v5" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/stretchr/testify/assert" - - dmsg "github.com/skycoin/dmsg/pkg/dmsg" ) const ( @@ -126,22 +125,27 @@ func startHTTPServer(t *testing.T, results chan httpServerResult, lis net.Listen results <- result }) + srv := &http.Server{ + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 3 * time.Second, + Handler: r, + } + errCh := make(chan error, 1) go func() { - srv := &http.Server{ - ReadTimeout: 3 * time.Second, - WriteTimeout: 3 * time.Second, - IdleTimeout: 30 * time.Second, - ReadHeaderTimeout: 3 * time.Second, - Handler: r, - } errCh <- srv.Serve(lis) close(errCh) }() t.Cleanup(func() { - assert.NoError(t, lis.Close()) - assert.EqualError(t, <-errCh, dmsg.ErrEntityClosed.Error()) + // Graceful shutdown: let in-flight requests finish before closing. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) //nolint:errcheck + _ = lis.Close() //nolint:errcheck + <-errCh }) } From 8e2112e51dd59e488acd0979ed161ed630e1a686 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 09:42:50 -0500 Subject: [PATCH 11/11] Fix data race on peerPKs map access peerPKs was read in isPeerPK (from handleSession goroutines) and written in discoverAndConnectPeers without synchronization. Protect both accesses with peerSessionsMx. --- pkg/dmsg/server.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 4abd1ea28..642b0713e 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -293,16 +293,19 @@ func (s *Server) discoverAndConnectPeers(ctx context.Context) { continue } // Skip if already a static peer (handled by connectToPeers). - if _, ok := s.peerPKs[pk]; ok { + s.peerSessionsMx.Lock() + _, alreadyPeer := s.peerPKs[pk] + if !alreadyPeer && entry.Server != nil && entry.Server.Address != "" { + s.peerPKs[pk] = struct{}{} + } + s.peerSessionsMx.Unlock() + if alreadyPeer { continue } if entry.Server == nil || entry.Server.Address == "" { continue } - // Register as known peer so incoming sessions are marked isPeer. - s.peerPKs[pk] = struct{}{} - peerCtx, peerCancel := context.WithCancel(ctx) //nolint:gosec activePeers[pk] = peerCancel @@ -409,7 +412,9 @@ func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { // isPeerPK returns true if the given PK is a known peer server. func (s *Server) isPeerPK(pk cipher.PubKey) bool { + s.peerSessionsMx.Lock() _, ok := s.peerPKs[pk] + s.peerSessionsMx.Unlock() return ok }