From 0c49e4a74a43d04ace587ed63a513dbfb0389c0f Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Apr 2026 15:55:27 -0500 Subject: [PATCH 1/2] Serve pprof debug interface over dmsg for all services Add reusable ServeDebug(), DebugMux(), and WhitelistMiddleware() in pkg/dmsghttp/debug.go. Access controlled by survey_whitelist PKs. Wire into dmsg-discovery and dmsg-server (port 81). --- cmd/dmsg-discovery/commands/dmsg-discovery.go | 12 ++ cmd/dmsg-server/commands/start/root.go | 35 ++++ pkg/dmsghttp/debug.go | 97 +++++++++++ pkg/dmsghttp/debug_test.go | 158 ++++++++++++++++++ 4 files changed, 302 insertions(+) create mode 100644 pkg/dmsghttp/debug.go create mode 100644 pkg/dmsghttp/debug_test.go diff --git a/cmd/dmsg-discovery/commands/dmsg-discovery.go b/cmd/dmsg-discovery/commands/dmsg-discovery.go index 3dd22e9a..2a866556 100644 --- a/cmd/dmsg-discovery/commands/dmsg-discovery.go +++ b/cmd/dmsg-discovery/commands/dmsg-discovery.go @@ -14,6 +14,7 @@ import ( proxyproto "github.com/pires/go-proxyproto" "github.com/sirupsen/logrus" + "github.com/skycoin/skywire/deployment" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cmdutil" @@ -206,6 +207,17 @@ Example: cancel() } }() + + // Serve pprof debug interface over dmsg + wl := deployment.Prod.SurveyWhitelist + if testEnvironment { + wl = deployment.Test.SurveyWhitelist + } + go func() { + if debugErr := dmsghttp.ServeDebug(ctx, dmsgDC, log, wl); debugErr != nil { + log.Errorf("dmsghttp.ServeDebug: %v", debugErr) + } + }() } <-ctx.Done() diff --git a/cmd/dmsg-server/commands/start/root.go b/cmd/dmsg-server/commands/start/root.go index 627af2eb..3ba68dcd 100644 --- a/cmd/dmsg-server/commands/start/root.go +++ b/cmd/dmsg-server/commands/start/root.go @@ -13,17 +13,21 @@ import ( chi "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "github.com/skycoin/skywire/deployment" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/buildinfo" + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cmdutil" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/metricsutil" "github.com/spf13/cobra" dmsgcmdutil "github.com/skycoin/dmsg/pkg/cmdutil" + "github.com/skycoin/dmsg/pkg/direct" "github.com/skycoin/dmsg/pkg/disc" dmsg "github.com/skycoin/dmsg/pkg/dmsg" "github.com/skycoin/dmsg/pkg/dmsg/metrics" "github.com/skycoin/dmsg/pkg/dmsgclient" + "github.com/skycoin/dmsg/pkg/dmsghttp" "github.com/skycoin/dmsg/pkg/dmsgserver" ) @@ -127,6 +131,37 @@ var RootCmd = &cobra.Command{ } }() + // Serve pprof debug interface over dmsg using a direct client through ourselves + go func() { + // Wait for the dmsg server to be ready before connecting the debug client + <-srv.Ready() + + serverEntry := &disc.Entry{ + Version: "0.0.1", + Static: conf.PubKey, + Server: &disc.Server{ + Address: conf.PublicAddress, + AvailableSessions: conf.MaxSessions, + }, + } + entries := direct.GetAllEntries(cipher.PubKeys{conf.PubKey}, []*disc.Entry{serverEntry}) + dClient := direct.NewClient(entries, log) + + debugConfig := &dmsg.Config{ + MinSessions: 0, + } + dmsgC, closeDebug, err := direct.StartDmsg(ctx, log, conf.PubKey, conf.SecKey, dClient, debugConfig) + if err != nil { + log.WithError(err).Error("failed to start debug dmsg client") + return + } + defer closeDebug() + + if debugErr := dmsghttp.ServeDebug(ctx, dmsgC, log, deployment.Prod.SurveyWhitelist); debugErr != nil { + log.Errorf("dmsghttp.ServeDebug: %v", debugErr) + } + }() + <-ctx.Done() }, } diff --git a/pkg/dmsghttp/debug.go b/pkg/dmsghttp/debug.go new file mode 100644 index 00000000..7fc752b7 --- /dev/null +++ b/pkg/dmsghttp/debug.go @@ -0,0 +1,97 @@ +// Package dmsghttp pkg/dmsghttp/debug.go +package dmsghttp + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/pprof" + "time" + + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" + + dmsg "github.com/skycoin/dmsg/pkg/dmsg" +) + +// DefaultDebugPort is the dmsg port used for serving debug/pprof endpoints. +const DefaultDebugPort = uint16(81) + +// DebugMux returns an http.ServeMux with standard pprof endpoints registered. +func DebugMux() *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + for _, p := range []string{"heap", "goroutine", "threadcreate", "block", "mutex", "allocs"} { + mux.Handle("/debug/pprof/"+p, pprof.Handler(p)) + } + return mux +} + +// WhitelistMiddleware wraps an http.Handler with public-key-based access control. +// When serving over dmsg, RemoteAddr is in the format ":". +// If whitelistedPKs is empty, all requests are allowed. +func WhitelistMiddleware(whitelistedPKs []cipher.PubKey, next http.Handler) http.Handler { + if len(whitelistedPKs) == 0 { + return next + } + allowed := make(map[string]struct{}, len(whitelistedPKs)) + for _, pk := range whitelistedPKs { + allowed[pk.String()] = struct{}{} + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + remotePK, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + http.Error(w, "500 Internal Server Error", http.StatusInternalServerError) + return + } + if _, ok := allowed[remotePK]; !ok { + http.Error(w, "401 Unauthorized", http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) +} + +// ServeDebug serves pprof endpoints over dmsg on DefaultDebugPort, gated by the +// provided whitelist public keys. It blocks until the context is cancelled or +// an error occurs. +func ServeDebug(ctx context.Context, dmsgC *dmsg.Client, log *logging.Logger, whitelistPKs []cipher.PubKey) error { + handler := WhitelistMiddleware(whitelistPKs, DebugMux()) + + lis, err := dmsgC.Listen(DefaultDebugPort) + if err != nil { + return fmt.Errorf("debug dmsg listen on port %d: %w", DefaultDebugPort, err) + } + + log.WithField("dmsg_addr", fmt.Sprintf("dmsg://%v", lis.Addr().String())). + Info("Serving debug/pprof over dmsg") + + srv := &http.Server{ + ReadTimeout: 5 * time.Second, + WriteTimeout: 60 * time.Second, // pprof profile collection takes 30s + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + MaxHeaderBytes: 1 << 14, // 16KB + Handler: handler, + } + + done := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + if shutdownErr := srv.Shutdown(context.Background()); shutdownErr != nil { + log.WithError(shutdownErr).Error("debug server shutdown error") + } + case <-done: + } + }() + + err = srv.Serve(lis) + close(done) + return err +} diff --git a/pkg/dmsghttp/debug_test.go b/pkg/dmsghttp/debug_test.go new file mode 100644 index 00000000..42660469 --- /dev/null +++ b/pkg/dmsghttp/debug_test.go @@ -0,0 +1,158 @@ +// Package dmsghttp_test pkg/dmsghttp/debug_test.go +package dmsghttp_test + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/dmsg/pkg/disc" + "github.com/skycoin/dmsg/pkg/dmsg" + "github.com/skycoin/dmsg/pkg/dmsghttp" +) + +func TestDebugMux_PprofEndpoints(t *testing.T) { + mux := dmsghttp.DebugMux() + + endpoints := []string{ + "/debug/pprof/", + "/debug/pprof/cmdline", + "/debug/pprof/symbol", + "/debug/pprof/heap", + "/debug/pprof/goroutine", + "/debug/pprof/allocs", + } + + for _, ep := range endpoints { + t.Run(ep, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, ep, nil) + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + assert.NotEqual(t, http.StatusNotFound, w.Code, "endpoint %s should be registered", ep) + }) + } +} + +func TestWhitelistMiddleware_EmptyWhitelistAllowsAll(t *testing.T) { + inner := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + handler := dmsghttp.WhitelistMiddleware(nil, inner) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.RemoteAddr = "somekey:1234" + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) +} + +func TestWhitelistMiddleware_AllowsWhitelistedPK(t *testing.T) { + pk, _ := cipher.GenerateKeyPair() + + inner := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + handler := dmsghttp.WhitelistMiddleware([]cipher.PubKey{pk}, inner) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.RemoteAddr = pk.String() + ":1234" + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) +} + +func TestWhitelistMiddleware_RejectsNonWhitelistedPK(t *testing.T) { + whitelisted, _ := cipher.GenerateKeyPair() + nonWhitelisted, _ := cipher.GenerateKeyPair() + + inner := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + handler := dmsghttp.WhitelistMiddleware([]cipher.PubKey{whitelisted}, inner) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.RemoteAddr = nonWhitelisted.String() + ":1234" + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +func TestWhitelistMiddleware_InvalidRemoteAddr(t *testing.T) { + pk, _ := cipher.GenerateKeyPair() + + inner := http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + handler := dmsghttp.WhitelistMiddleware([]cipher.PubKey{pk}, inner) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.RemoteAddr = "invalid-no-port" + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + assert.Equal(t, http.StatusInternalServerError, w.Code) +} + +func TestServeDebug_Integration(t *testing.T) { + // Integration test: start a dmsg server + client, serve debug, and fetch pprof index + dc := disc.NewMock(0) + + // Create and start server + srvPK, srvSK := cipher.GenerateKeyPair() + srvConf := dmsg.ServerConfig{MaxSessions: 100} + srv := dmsg.NewServer(srvPK, srvSK, dc, &srvConf, nil) + + lis, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go srv.Serve(lis, "") //nolint:errcheck + <-srv.Ready() + defer srv.Close() //nolint:errcheck + + // Create debug client that connects through the server + clientPK, clientSK := cipher.GenerateKeyPair() + dmsgC := dmsg.NewClient(clientPK, clientSK, dc, &dmsg.Config{MinSessions: 1}) + go dmsgC.Serve(ctx) //nolint:errcheck + <-dmsgC.Ready() + defer dmsgC.Close() //nolint:errcheck + + // Serve debug on the client + log := logging.MustGetLogger("test-debug") + go dmsghttp.ServeDebug(ctx, dmsgC, log, nil) //nolint:errcheck + + // Allow listener to start + time.Sleep(100 * time.Millisecond) + + // Create a second client to access the debug interface + fetchPK, fetchSK := cipher.GenerateKeyPair() + fetchC := dmsg.NewClient(fetchPK, fetchSK, dc, &dmsg.Config{MinSessions: 1}) + go fetchC.Serve(ctx) //nolint:errcheck + <-fetchC.Ready() + defer fetchC.Close() //nolint:errcheck + + httpC := http.Client{Transport: dmsghttp.MakeHTTPTransport(ctx, fetchC)} + resp, err := httpC.Get(fmt.Sprintf("dmsg://%s:%d/debug/pprof/", clientPK.Hex(), dmsghttp.DefaultDebugPort)) + require.NoError(t, err) + defer resp.Body.Close() //nolint:errcheck + + assert.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Contains(t, string(body), "pprof") +} From 05c72ae0fb0d8199356fcf8f61ba6d4d4bc63231 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Apr 2026 16:02:09 -0500 Subject: [PATCH 2/2] Fix lint: misspell and gosec G118 in debug.go --- pkg/dmsghttp/debug.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/dmsghttp/debug.go b/pkg/dmsghttp/debug.go index 7fc752b7..81150758 100644 --- a/pkg/dmsghttp/debug.go +++ b/pkg/dmsghttp/debug.go @@ -58,7 +58,7 @@ func WhitelistMiddleware(whitelistedPKs []cipher.PubKey, next http.Handler) http } // ServeDebug serves pprof endpoints over dmsg on DefaultDebugPort, gated by the -// provided whitelist public keys. It blocks until the context is cancelled or +// provided whitelist public keys. It blocks until the context is canceled or // an error occurs. func ServeDebug(ctx context.Context, dmsgC *dmsg.Client, log *logging.Logger, whitelistPKs []cipher.PubKey) error { handler := WhitelistMiddleware(whitelistPKs, DebugMux()) @@ -81,10 +81,12 @@ func ServeDebug(ctx context.Context, dmsgC *dmsg.Client, log *logging.Logger, wh } done := make(chan struct{}) - go func() { + go func() { //nolint:gosec select { case <-ctx.Done(): - if shutdownErr := srv.Shutdown(context.Background()); shutdownErr != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) //nolint:gosec + defer cancel() + if shutdownErr := srv.Shutdown(shutdownCtx); shutdownErr != nil { log.WithError(shutdownErr).Error("debug server shutdown error") } case <-done: