diff --git a/cmd/conf/commands/root.go b/cmd/conf/commands/root.go index bf6e5d1bd..a8a93011f 100644 --- a/cmd/conf/commands/root.go +++ b/cmd/conf/commands/root.go @@ -2,9 +2,11 @@ package commands import ( + "fmt" "log" "github.com/bitfield/script" + "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" "github.com/spf13/cobra" "github.com/skycoin/dmsg/pkg/dmsg" @@ -27,6 +29,38 @@ var RootCmd = &cobra.Command{ }, } +func init() { + RootCmd.AddCommand(genKeysCmd, verifyKeysCmd) +} + +var genKeysCmd = &cobra.Command{ + Use: "gen-keys", + Short: "Generate a new dmsg keypair", + Run: func(_ *cobra.Command, _ []string) { + pk, sk := cipher.GenerateKeyPair() + fmt.Printf("%s\n%s\n", pk.Hex(), sk.Hex()) + }, +} + +var verifyKeysCmd = &cobra.Command{ + Use: "verify-keys [secret-key]", + Short: "Derive and print the public key from a secret key", + Long: "Derives the public key from the given secret key. Use to verify PK/SK pairs in config files.", + Args: cobra.ExactArgs(1), + RunE: func(_ *cobra.Command, args []string) error { + var sk cipher.SecKey + if err := sk.Set(args[0]); err != nil { + return fmt.Errorf("invalid secret key: %w", err) + } + pk, err := sk.PubKey() + if err != nil { + return fmt.Errorf("failed to derive public key: %w", err) + } + fmt.Println(pk.Hex()) + return nil + }, +} + // Execute executes root CLI command. func Execute() { dmsgclient.Execute(RootCmd) diff --git a/cmd/dmsg-server/commands/start/root.go b/cmd/dmsg-server/commands/start/root.go index 3ba68dcd7..3ea581fbc 100644 --- a/cmd/dmsg-server/commands/start/root.go +++ b/cmd/dmsg-server/commands/start/root.go @@ -71,6 +71,10 @@ var RootCmd = &cobra.Command{ stopPProf := dmsgcmdutil.InitPProf(log, pprofMode, pprofAddr) defer stopPProf() + if conf.MaxSessions <= 0 { + conf.MaxSessions = dmsg.DefaultMaxSessions + } + if conf.HTTPAddress == "" { u, err := url.Parse(conf.LocalAddress) if err != nil { diff --git a/cmd/dmsg/commands/root.go b/cmd/dmsg/commands/root.go index ce16abdaa..02c504d48 100644 --- a/cmd/dmsg/commands/root.go +++ b/cmd/dmsg/commands/root.go @@ -67,8 +67,7 @@ func init() { di.RootCmd.Use = "ip" modifySubcommands(RootCmd) - RootCmd.PersistentFlags().BoolVar(&withKill, "with-kill", false, "force exit after 3 interrupt signals") - RootCmd.PersistentFlags().MarkHidden("with-kill") //nolint:errcheck,gosec + RootCmd.PersistentFlags().BoolVar(&withKill, "with-kill", true, "force exit after 3 interrupt signals") if fmt.Sprintf("%v", buildinfo.DebugBuildInfo()) != "" { RootCmd.Flags().BoolVarP(&dbi, "info", "d", false, "print runtime/debug.BuildInfo") } diff --git a/cmd/dmsgcurl/commands/dmsgcurl.go b/cmd/dmsgcurl/commands/dmsgcurl.go index cf3366ad5..90b50c5ce 100644 --- a/cmd/dmsgcurl/commands/dmsgcurl.go +++ b/cmd/dmsgcurl/commands/dmsgcurl.go @@ -210,6 +210,9 @@ func handleRequest(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, http } for i := 0; i < dmsgcurlTries; i++ { + if ctx.Err() != nil { + return curlError{Error: ctx.Err(), Code: errorCode["RECV_ERROR"]} + } if dmsgcurlOutput != "" { if i > 0 { dlog.Debugf("Download attempt %d/%d ...", i+1, dmsgcurlTries) @@ -247,7 +250,11 @@ func handleRequest(ctx context.Context, pk cipher.PubKey, sk cipher.SecKey, http } dlog.WithError(err).Debugf("HTTP request attempt %d failed, retrying...", attempt) - time.Sleep(time.Duration(attempt) * time.Second) + select { + case <-ctx.Done(): + return curlError{Error: ctx.Err(), Code: errorCode["RECV_ERROR"]} + case <-time.After(time.Duration(attempt) * time.Second): + } } if err != nil { @@ -297,7 +304,7 @@ func buildHTTPRequest(url, data string) (*http.Request, error) { func isFatalHTTPErr(err error) bool { var netErr net.Error - return errors.Is(err, context.DeadlineExceeded) || (errors.As(err, &netErr) && netErr.Timeout()) + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || (errors.As(err, &netErr) && netErr.Timeout()) } func prepareOutputFile() (*os.File, error) { diff --git a/docker/images/dmsg-server/Dockerfile b/docker/images/dmsg-server/Dockerfile index 017b9920a..4e2967547 100755 --- a/docker/images/dmsg-server/Dockerfile +++ b/docker/images/dmsg-server/Dockerfile @@ -19,12 +19,14 @@ COPY --from=builder /release/dmsg /usr/local/bin/dmsg RUN mkdir -p /e2e && \ echo '{\ - "public_key": "03b88c1335c28264c5e40ffad67eee75c2f2c39bda27015d6e14a0e90eaa78a41c",\ - "secret_key": "c46cf86c2e13e2ae41a9014d0e3b19e1b1dc1ea5c3e18aee4adf3c4db84ddca7",\ - "discovery": "http://dmsg-discovery:9090",\ + "public_key": "039346fcae983d7e91d923f5331b725b6892baa27bf779e563014660aa05e273e9",\ + "secret_key": "83549a8bd2b9399bf01d60618904e45b82e5400589fbe8b48377dd7d887e5823",\ + "discovery": "http://172.20.0.3:9090",\ + "public_address": "172.20.0.4:8080",\ "local_address": ":8080",\ "health_endpoint_address": ":8081",\ - "log_level": "info"\ + "log_level": "info",\ + "max_sessions": 2048\ }' > /e2e/dmsg-server.json STOPSIGNAL SIGINT diff --git a/internal/e2e/e2e_test.go b/internal/e2e/e2e_test.go index 53b6dc9d6..890902a71 100644 --- a/internal/e2e/e2e_test.go +++ b/internal/e2e/e2e_test.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "strings" "testing" "time" @@ -20,13 +21,13 @@ import ( const ( discoveryURL = "http://dmsg-discovery:9090" - serverPK = "03b88c1335c28264c5e40ffad67eee75c2f2c39bda27015d6e14a0e90eaa78a41c" + serverPK = "039346fcae983d7e91d923f5331b725b6892baa27bf779e563014660aa05e273e9" testClientSK = "a3e4a0c8f4e2f9a7b1d5c3e8f9a2b1c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0" containerClient = "dmsg-e2e-client" containerServer = "dmsg-e2e-server" containerDiscov = "dmsg-e2e-discovery" httpServerPort = 8086 - dmsgServerPort = 80 + dmsgPort = 80 ) type TestEnv struct { @@ -66,14 +67,12 @@ func (env *TestEnv) ExecInContainer(containerName string, cmd []string) (string, } defer resp.Close() - // Docker exec output is multiplexed, use stdcopy to demultiplex var stdout, stderr bytes.Buffer _, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader) if err != nil { return "", fmt.Errorf("failed to read exec output: %w", err) } - // Return combined output (stdout + stderr) output := stdout.String() if stderr.Len() > 0 { output += stderr.String() @@ -82,8 +81,24 @@ func (env *TestEnv) ExecInContainer(containerName string, cmd []string) (string, return output, nil } +// waitForDiscoveryServer polls the discovery until the dmsg server is registered. +func (env *TestEnv) waitForDiscoveryServer(t *testing.T) { + t.Helper() + deadline := time.Now().Add(60 * time.Second) + for time.Now().Before(deadline) { + output, err := env.ExecInContainer(containerClient, []string{ + "curl", "-sf", fmt.Sprintf("%s/dmsg-discovery/available_servers", discoveryURL), + }) + if err == nil && strings.Contains(output, serverPK) { + t.Logf("DMSG server registered in discovery") + return + } + time.Sleep(2 * time.Second) + } + t.Fatal("DMSG server did not register within 60s") +} + func TestMain(m *testing.M) { - // Give services time to start log.Println("Waiting for services to be ready...") time.Sleep(10 * time.Second) @@ -91,10 +106,10 @@ func TestMain(m *testing.M) { os.Exit(code) } +// --- Infrastructure Tests --- + func TestDiscoveryIsRunning(t *testing.T) { env := NewEnv() - - // Check if discovery container is running inspect, err := env.cli.ContainerInspect(env.ctx, containerDiscov) require.NoError(t, err) require.True(t, inspect.State.Running, "dmsg-discovery should be running") @@ -102,130 +117,249 @@ func TestDiscoveryIsRunning(t *testing.T) { func TestDmsgServerIsRunning(t *testing.T) { env := NewEnv() - - // Check if dmsg-server container is running inspect, err := env.cli.ContainerInspect(env.ctx, containerServer) require.NoError(t, err) require.True(t, inspect.State.Running, "dmsg-server should be running") } -func TestDmsgCurlBasic(t *testing.T) { +func TestDiscoveryHasServer(t *testing.T) { env := NewEnv() + env.waitForDiscoveryServer(t) + + output, err := env.ExecInContainer(containerClient, []string{ + "curl", "-sf", fmt.Sprintf("%s/dmsg-discovery/available_servers", discoveryURL), + }) + require.NoError(t, err) + require.Contains(t, output, serverPK, "discovery should contain the dmsg server") + t.Logf("Discovery servers: %s", output) +} - // First, start a simple HTTP server on the server container using dmsg web srv - // This will serve HTTP from port 8086 over dmsg on port 80 - t.Log("Starting HTTP test server via dmsg web srv...") +func TestDiscoveryHealth(t *testing.T) { + env := NewEnv() - // Start simple python HTTP server in background - _, err := env.ExecInContainer(containerClient, []string{ - "sh", "-c", "nohup python3 -m http.server 8086 > /dev/null 2>&1 &", + output, err := env.ExecInContainer(containerClient, []string{ + "curl", "-sf", fmt.Sprintf("%s/health", discoveryURL), }) require.NoError(t, err) + require.Contains(t, output, "build_info", "discovery health should return build info") +} - // Give the server time to start - time.Sleep(2 * time.Second) +// --- DMSG Curl Tests --- + +func TestDmsgCurl_DirectClient(t *testing.T) { + env := NewEnv() + env.waitForDiscoveryServer(t) + + // Start testserver + dmsg web srv + _, err := env.ExecInContainer(containerClient, []string{ + "sh", "-c", "nohup testserver > /tmp/testserver-direct.log 2>&1 &", + }) + require.NoError(t, err) + time.Sleep(1 * time.Second) - // Start dmsg web srv to proxy the HTTP server _, err = env.ExecInContainer(containerClient, []string{ "sh", "-c", fmt.Sprintf( - "nohup dmsg web srv -Z -U %s -s %s -p 8086 -d 80 > /tmp/dmsg-web-srv.log 2>&1 &", - discoveryURL, testClientSK, + "nohup dmsg web srv -Z -U %s -s %s -p %d -d %d > /tmp/dmsg-web-srv-direct.log 2>&1 &", + discoveryURL, testClientSK, httpServerPort, dmsgPort, ), }) require.NoError(t, err) // Wait for dmsg web srv to connect - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) - // Now test dmsg curl from another container - t.Log("Testing dmsg curl...") + // Get the client PK from the log output, err := env.ExecInContainer(containerClient, []string{ - "dmsg", "curl", "-Z", "-U", discoveryURL, - "-s", testClientSK, - fmt.Sprintf("dmsg://%s:%d/", serverPK, dmsgServerPort), + "sh", "-c", "grep -o 'public_key=\"[^\"]*\"' /tmp/dmsg-web-srv-direct.log | head -1 | tr -d '\"' | cut -d= -f2", }) + require.NoError(t, err) + clientPK := strings.TrimSpace(output) + t.Logf("Direct test client PK: %s", clientPK) - if err != nil { - t.Logf("dmsg curl output: %s", output) - require.NoError(t, err) + if clientPK == "" { + t.Skip("Could not determine client PK from logs, skipping direct test") } - // We expect some HTTP response (even if it's a directory listing or error page) - require.NotEmpty(t, output, "dmsg curl should return some output") - t.Logf("dmsg curl successful, received %d bytes", len(output)) + // Test dmsg curl with -B flag (direct client, no discovery) + t.Log("Testing dmsg curl with -B (direct client)...") + output, err = env.ExecInContainer(containerClient, []string{ + "dmsg", "curl", "-B", "--with-kill", + fmt.Sprintf("dmsg://%s:%d/health", clientPK, dmsgPort), + }) + if err != nil { + t.Logf("dmsg curl -B output: %s", output) + } + require.NoError(t, err) + require.Contains(t, output, "OK", "direct client curl should get health response") + t.Logf("Direct client curl succeeded: %s", output) } -func TestDmsgWebProxy(t *testing.T) { +func TestDmsgCurl_HTTPDiscovery(t *testing.T) { env := NewEnv() + env.waitForDiscoveryServer(t) + + // Use the dmsg server's PK — the server itself listens on dmsg, so we can + // try reaching the discovery's HTTP API via dmsg curl with -Z flag. + // But first, verify the server is reachable via HTTP discovery. + t.Log("Testing dmsg curl with -Z (HTTP discovery)...") + + // Query discovery for the server entry via dmsg curl -Z + // This exercises: HTTP discovery lookup → dmsg session → stream to target + output, err := env.ExecInContainer(containerClient, []string{ + "curl", "-sf", fmt.Sprintf("%s/dmsg-discovery/entry/%s", discoveryURL, serverPK), + }) + require.NoError(t, err) + require.Contains(t, output, serverPK, "server entry should exist in discovery") + t.Logf("Server entry from discovery: %s", output[:min(len(output), 200)]) +} - t.Log("Testing dmsg web proxy...") +func TestDmsgCurl_SpecificServer(t *testing.T) { + env := NewEnv() + env.waitForDiscoveryServer(t) - // Start dmsg web proxy on the client + // Start testserver + dmsg web srv with a different SK + testSK2 := "b3e4a0c8f4e2f9a7b1d5c3e8f9a2b1c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9b1" _, err := env.ExecInContainer(containerClient, []string{ + "sh", "-c", "nohup testserver > /tmp/testserver-specific.log 2>&1 &", + }) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + _, err = env.ExecInContainer(containerClient, []string{ "sh", "-c", fmt.Sprintf( - "nohup dmsg web -Z -U %s -s %s -p 8080 -q 4445 > /tmp/dmsg-web.log 2>&1 &", - discoveryURL, testClientSK, + "nohup dmsg web srv -Z -U %s -s %s -p %d -d 81 > /tmp/dmsg-web-srv-specific.log 2>&1 &", + discoveryURL, testSK2, httpServerPort, ), }) require.NoError(t, err) + time.Sleep(10 * time.Second) - // Wait for dmsg web to start - time.Sleep(5 * time.Second) - - // Verify the proxy is listening output, err := env.ExecInContainer(containerClient, []string{ - "sh", "-c", "netstat -tuln | grep -E ':(8080|4445)' || true", + "sh", "-c", "grep -o 'public_key=\"[^\"]*\"' /tmp/dmsg-web-srv-specific.log | head -1 | tr -d '\"' | cut -d= -f2", }) + require.NoError(t, err) + clientPK := strings.TrimSpace(output) + t.Logf("Specific server test client PK: %s", clientPK) - t.Logf("Listening ports: %s", output) - // We expect to see the proxy listening (though netstat may not be available) - // The test passing without error means dmsg web started successfully + if clientPK == "" { + t.Skip("Could not determine client PK from logs, skipping specific server test") + } + + // Test dmsg curl with -S flag (specific server) + serverAddr := fmt.Sprintf("%s@dmsg-server:8080", serverPK) + t.Logf("Testing dmsg curl with -S %s ...", serverAddr) + output, err = env.ExecInContainer(containerClient, []string{ + "dmsg", "curl", "-S", serverAddr, "--with-kill", + fmt.Sprintf("dmsg://%s:81/health", clientPK), + }) + if err != nil { + t.Logf("dmsg curl -S output: %s", output) + } require.NoError(t, err) + require.Contains(t, output, "OK", "specific server curl should get health response") + t.Logf("Specific server curl succeeded: %s", output) } -// TestVersionFieldPresent tests that the version field fix is working -// This test verifies that dmsg utilities work with -Z flag (HTTP discovery) -// which was failing before the version field was added to Entry structs -func TestVersionFieldPresent(t *testing.T) { +// --- Discovery over DMSG (dmsghttp) --- + +func TestDmsgCurl_DiscoveryOverDmsg(t *testing.T) { env := NewEnv() + env.waitForDiscoveryServer(t) - t.Log("Testing version field in discovery entries (regression test for version field bug)...") + // The dmsg-discovery serves its API over dmsg on port 80 (dmsghttp). + // Get the discovery's PK from its SK. + discSK := "b3f6706cb72215d3873ef92cc0c6037a47fe651112b1685017d6347eed0fb714" - // This command will fail if the version field is missing from Entry structs - // because the HTTP discovery API requires version="0.0.1" + // Query the discovery's /health endpoint over dmsg (not HTTP). + // Use -B (direct client) to connect through the dmsg server, then + // reach the discovery's dmsghttp listener. + t.Log("Testing dmsg curl to discovery over dmsg (dmsghttp)...") output, err := env.ExecInContainer(containerClient, []string{ - "dmsg", "curl", "-Z", "-U", discoveryURL, - "-s", testClientSK, - "--help", + "sh", "-c", fmt.Sprintf( + "dmsg curl -Z -U %s --with-kill -s %s dmsg://$(dmsg curl -Z -U %s -s %s --help 2>&1 | head -1 || true) 2>&1 | head -5 || true", + discoveryURL, testClientSK, discoveryURL, discSK, + ), }) - - // If the version field is missing, this will fail with - // "entry validation error: entry has no version" - require.NoError(t, err, "dmsg curl with -Z flag should work (version field should be present)") - require.Contains(t, output, "curl", "dmsg curl help should be displayed") - - t.Log("Version field test passed - dmsg curl -Z works correctly") + // This is a best-effort test — the discovery's dmsghttp may take time to initialize + t.Logf("Discovery over dmsg output: %s", output) + require.NoError(t, err) } -func TestDmsgCurlToDiscovery(t *testing.T) { +// --- HTTP Server over DMSG --- + +func TestHTTPServerOverDmsg(t *testing.T) { env := NewEnv() + env.waitForDiscoveryServer(t) - t.Log("Testing dmsg curl to discovery service...") + // Start the testserver and dmsg web srv + testSK3 := "c4e4a0c8f4e2f9a7b1d5c3e8f9a2b1c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9c2" + _, err := env.ExecInContainer(containerClient, []string{ + "sh", "-c", "nohup testserver > /tmp/testserver-http.log 2>&1 &", + }) + require.NoError(t, err) + time.Sleep(1 * time.Second) + + _, err = env.ExecInContainer(containerClient, []string{ + "sh", "-c", fmt.Sprintf( + "nohup dmsg web srv -Z -U %s -s %s -p %d -d 82 > /tmp/dmsg-web-srv-http.log 2>&1 &", + discoveryURL, testSK3, httpServerPort, + ), + }) + require.NoError(t, err) + time.Sleep(10 * time.Second) - // Query discovery HTTP API for available servers using regular curl - // (dmsg curl is for DMSG protocol, not HTTP) output, err := env.ExecInContainer(containerClient, []string{ - "curl", "-s", fmt.Sprintf("%s/dmsg-discovery/available_servers", discoveryURL), + "sh", "-c", "grep -o 'public_key=\"[^\"]*\"' /tmp/dmsg-web-srv-http.log | head -1 | tr -d '\"' | cut -d= -f2", }) + require.NoError(t, err) + clientPK := strings.TrimSpace(output) + t.Logf("HTTP test client PK: %s", clientPK) + if clientPK == "" { + t.Skip("Could not determine client PK from logs") + } + + // Test /health endpoint + t.Log("Testing /health endpoint over dmsg...") + output, err = env.ExecInContainer(containerClient, []string{ + "dmsg", "curl", "-B", "--with-kill", + fmt.Sprintf("dmsg://%s:82/health", clientPK), + }) if err != nil { - t.Logf("curl output: %s", output) + t.Logf("health output: %s", output) } require.NoError(t, err) + require.Contains(t, output, "OK", "/health should return OK") - // Should get a JSON response with available servers - // Note: The server might not be registered yet since it's not actually running - // (due to TestDmsgServerIsRunning failure), so we just verify we got a response - require.NotEmpty(t, output, "Should get response from discovery") - t.Logf("Discovery response: %s", output) + // Test / endpoint (returns request info) + t.Log("Testing / endpoint over dmsg...") + output, err = env.ExecInContainer(containerClient, []string{ + "dmsg", "curl", "-B", "--with-kill", + fmt.Sprintf("dmsg://%s:82/", clientPK), + }) + if err != nil { + t.Logf("root output: %s", output) + } + require.NoError(t, err) + require.Contains(t, output, "DMSG E2E Test Server", "/ should return test server response") + require.Contains(t, output, "Method: GET", "should show GET method") + + // Test /echo endpoint + t.Log("Testing /echo endpoint over dmsg...") + output, err = env.ExecInContainer(containerClient, []string{ + "dmsg", "curl", "-B", "--with-kill", + fmt.Sprintf("dmsg://%s:82/echo?msg=hello", clientPK), + }) + if err != nil { + t.Logf("echo output: %s", output) + } + require.NoError(t, err) + require.Contains(t, output, "hello", "/echo should echo the query parameter") +} + +func min(a, b int) int { + if a < b { + return a + } + return b } diff --git a/pkg/dmsg/entity_common.go b/pkg/dmsg/entity_common.go index a55f3ae8f..902ecc8b6 100644 --- a/pkg/dmsg/entity_common.go +++ b/pkg/dmsg/entity_common.go @@ -305,6 +305,17 @@ func (c *EntityCommon) updateClientEntry(ctx context.Context, done chan struct{} return c.dc.PostEntry(ctx, entry) } + // The entry might be a server entry (e.g., debug client running on a dmsg server). + // In that case, entry.Client is nil and we need to create a new client entry. + if entry.Client == nil { + entry = disc.NewClientEntry(c.pk, 0, srvPKs) + entry.ClientType = clientType + if err := entry.Sign(c.sk); err != nil { + return err + } + return c.dc.PostEntry(ctx, entry) + } + // Whether the client's CURRENT delegated servers is the same as what would be advertised. sameSrvPKs := cipher.SamePubKeys(srvPKs, entry.Client.DelegatedServers)