From e969335179dfba855e8f7ecc680af90c5c7e6dcd Mon Sep 17 00:00:00 2001 From: Jordan Rash Date: Thu, 9 Apr 2026 10:34:51 -0600 Subject: [PATCH 1/5] fix: honor owning namespace on clone, stop, and credential mint Operations initiated by the system namespace must act on behalf of the workload's owning namespace. The client now publishes on the owning namespace's NATS control subjects (masquerading at the subject layer while authenticated as system) and discovers the target namespace via ListWorkloads when the system user stops a workload by id. The node mints workload credentials against the request body's namespace so log permissions line up with where the workload actually lives. handleCloneWorkload also gains a guard that prevents non-system callers from reading workload definitions belonging to other namespaces. - client.StartWorkload now takes a *models.StartWorkloadRequest so the caller controls the target namespace directly; Auction takes an explicit namespace argument - client.StopWorkload detects system callers and resolves the owning namespace via ListWorkloads before publishing the stop; errors out clearly when the namespace cannot be determined, suggesting the --namespace flag as the operator escape hatch - client.CloneWorkload uses cloneResp.Namespace for both the auction and the deploy, so clones land in the owning namespace instead of being re-homed into system - handlers.go handleAuctionDeploy mints WorkloadCred with req.Namespace so \$NEX.FEED..logs.> permission lines up with the workload's actual namespace - handlers.go handleCloneWorkload silently drops responses whose workload namespace does not match the caller (unless caller is system), closing a pre-existing cross-namespace info disclosure - nex workload copy --stop prints clone success and stop failure as distinct lines so partial failures are not silent - Error messages on the system-user stop discovery path do not echo the resolved namespace names, to avoid leaking namespace inventory through NATS micro response headers and logs - Tests: cross-namespace clone, cross-namespace clone+stop, system-user stop discovery, not-found, and a WorkloadClaims unit test locking in the log-subject scope invariant Signed-off-by: Jordan Rash --- _test/nexlet_inmem/inmem_test.go | 10 +- client/client.go | 106 +++++++-- client/client_benchmark_test.go | 2 +- client/client_test.go | 352 ++++++++++++++++++++++++++-- cmd/nex/workloads.go | 35 ++- handlers.go | 25 +- internal/credentials/vendor_test.go | 29 +++ 7 files changed, 513 insertions(+), 46 deletions(-) create mode 100644 internal/credentials/vendor_test.go diff --git a/_test/nexlet_inmem/inmem_test.go b/_test/nexlet_inmem/inmem_test.go index ab44239d..e03008a0 100644 --- a/_test/nexlet_inmem/inmem_test.go +++ b/_test/nexlet_inmem/inmem_test.go @@ -157,7 +157,7 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { var aR []*models.AuctionResponse deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { - aR, err = nClient.Auction("inmem", nil) + aR, err = nClient.Auction(models.SystemNamespace, "inmem", nil) if err == nil && len(aR) == 1 { break } @@ -166,7 +166,13 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { be.NilErr(t, err) be.Equal(t, 1, len(aR)) - sR, err := nClient.StartWorkload(aR[0].BidderId, "inmemfunc", "", "{}", "inmem", "function", nil) + sR, err := nClient.StartWorkload(aR[0].BidderId, &models.StartWorkloadRequest{ + Namespace: models.SystemNamespace, + Name: "inmemfunc", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: "function", + }) be.NilErr(t, err) be.Nonzero(t, sR) diff --git a/client/client.go b/client/client.go index f1a255db..43a9a623 100644 --- a/client/client.go +++ b/client/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/rand" "net/http" "strconv" @@ -28,8 +29,8 @@ type NexClient interface { GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) SetLameduck(nodeId string, delay time.Duration, tag map[string]string) (*models.LameduckResponse, error) ListNodes(filter map[string]string) ([]*models.NodePingResponse, error) - Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) - StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) + Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error) + StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error) StopWorkload(workloadId string) (*models.StopWorkloadResponse, error) ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error) CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error) @@ -215,7 +216,7 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo return resp, errs } -func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) { +func (n *nexClient) Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error) { auctionRequest := &models.AuctionRequest{ AgentType: typ, AuctionId: nuid.New().Next(), @@ -227,7 +228,7 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti return nil, n.nexInternalError(err, "failed to marshal auction request") } - msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) + msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) if err != nil { return nil, n.nexInternalError(err, "failed to request auction") } @@ -255,19 +256,12 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti return resp, errs } -func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) { - if pTags == nil { - pTags = make(models.NodeTags) +func (n *nexClient) StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error) { + if req == nil { + return nil, n.nexBadRequestError(errors.New("nil request"), "start workload request must not be nil") } - - req := &models.StartWorkloadRequest{ - Namespace: n.namespace, - Name: name, - Description: desc, - RunRequest: runRequest, - WorkloadLifecycle: lifecycle, - WorkloadType: typ, - Tags: pTags, + if req.Tags == nil { + req.Tags = make(models.NodeTags) } reqB, err := json.Marshal(req) @@ -275,7 +269,7 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, return nil, n.nexInternalError(err, "failed to marshal start workload request") } - startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout) + startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(req.Namespace, deployId), reqB, n.startWorkloadTimeout) if err != nil { return nil, n.nexInternalError(err, "failed to request start workload") } @@ -294,8 +288,61 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, } func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadResponse, error) { + // targetNS is the namespace the workload actually lives in. For user + // callers it is the caller's own namespace. For system callers we must + // discover it because system is administrative and never hosts + // workloads itself (except in rare explicit system-namespace starts). + targetNS := n.namespace + + if n.namespace == models.SystemNamespace { + summaries, err := n.ListWorkloads([]string{workloadId}) + if err != nil { + return nil, n.nexInternalError(err, "failed to discover workload namespace") + } + + var resolved []string + for _, agentResp := range summaries { + for _, summary := range *agentResp { + if summary.Id != workloadId { + continue + } + if summary.Namespace == nil { + return nil, n.nexInternalError( + errors.New("owning namespace not reported"), + fmt.Sprintf("cannot stop workload %s as system user: owning namespace could not be determined (nexlet does not report namespace in list response); retry with --namespace set to the owning namespace", workloadId)) + } + already := false + for _, existing := range resolved { + if existing == *summary.Namespace { + already = true + break + } + } + if !already { + resolved = append(resolved, *summary.Namespace) + } + } + } + + switch len(resolved) { + case 0: + return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") + case 1: + targetNS = resolved[0] + default: + // Don't echo the resolved namespace names in the error + // message — even though only system callers reach this branch + // today, the error propagates through NATS micro response + // headers and logs where the audience is broader. The + // operator can list workloads themselves to investigate. + return nil, n.nexInternalError( + errors.New("ambiguous workload id"), + fmt.Sprintf("workload id %s is ambiguous: it matches workloads in multiple namespaces; use --namespace to specify which one to stop", workloadId)) + } + } + req := models.StopWorkloadRequest{ - Namespace: n.namespace, + Namespace: targetNS, } reqB, err := json.Marshal(req) @@ -303,7 +350,7 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons return nil, n.nexInternalError(err, "failed to marshal stop workload request") } - msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) + msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(targetNS, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) if err != nil { return nil, n.nexInternalError(err, "failed to request stop workload") } @@ -390,6 +437,13 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexInternalError(err, "failed to get clone public key") } + // The clone fetch uses n.namespace because the caller has not yet + // learned the target namespace — the whole point of this round-trip is + // to discover the workload's owning namespace along with its full + // definition. The node's handleCloneWorkload bypass for + // models.SystemNamespace is load-bearing here for cross-namespace + // clones by a system operator; a user caller can only fetch workloads + // in their own namespace (handlers.go enforces this). cloneReq := models.CloneWorkloadRequest{ Namespace: n.namespace, NewTargetXkey: tKpPub, @@ -432,7 +486,16 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") } - aucResp, err := n.Auction(cloneResp.WorkloadType, tags) + // The clone must be deployed into the workload's actual owning + // namespace, not the caller's namespace. For a system operator cloning + // a default-namespace workload, this means the auction and deploy + // target `default`. The system credential masquerades at the subject + // layer — publishing on default.* control subjects while authenticated + // as system — which the node accepts because subject-body namespaces + // match. cloneResp already carries the authoritative Namespace from + // the agent's state, so we reuse it verbatim and only override the + // placement tags the caller passed to CloneWorkload. + aucResp, err := n.Auction(cloneResp.Namespace, cloneResp.WorkloadType, tags) if len(aucResp) == 0 { if err != nil { return nil, err @@ -441,7 +504,8 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St } randomNode := aucResp[rand.Intn(len(aucResp))] - swr, err := n.StartWorkload(randomNode.BidderId, cloneResp.Name, cloneResp.Description, cloneResp.RunRequest, cloneResp.WorkloadType, cloneResp.WorkloadLifecycle, tags) + cloneResp.Tags = tags + swr, err := n.StartWorkload(randomNode.BidderId, cloneResp) if err != nil { return nil, err } diff --git a/client/client_benchmark_test.go b/client/client_benchmark_test.go index 6e7a9fd4..808fc996 100644 --- a/client/client_benchmark_test.go +++ b/client/client_benchmark_test.go @@ -91,7 +91,7 @@ func BenchmarkClientAuction(b *testing.B) { b.ResetTimer() for b.Loop() { - auctionResponses, err := client.Auction(workloadType, map[string]string{}) + auctionResponses, err := client.Auction(namespace, workloadType, map[string]string{}) be.NilErr(b, err) b.StopTimer() for _, ar := range auctionResponses { diff --git a/client/client_test.go b/client/client_test.go index 03b7436e..aefbae85 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -78,11 +78,18 @@ func TestNexClient_User(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 1 }, "waiting for auction to return 1 result") - sr, err := client.StartWorkload(ar[0].BidderId, "tester", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + sr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) be.Equal(t, "tester", sr.Name) @@ -227,27 +234,48 @@ func TestNexClient_ListWorkloads(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[1%tt.size].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[1%tt.size].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester2", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[2%tt.size].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[2%tt.size].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester3", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) wl, err := client.ListWorkloads([]string{}) @@ -337,11 +365,18 @@ func TestNexClient_CloneWorkload(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + swr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _, err = client.CloneWorkload(swr.Id, nil) @@ -518,11 +553,18 @@ func TestNexClient_StopWorkload(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + swr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { @@ -644,11 +686,18 @@ func TestNexClient_ListWorkloads_PartialError(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 3 }, "waiting for auction to return 3 results") - _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { @@ -717,7 +766,7 @@ func TestNexClient_Auction_PartialError(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return len(ar) == 3 }, "waiting for auction to return 3 healthy results") @@ -728,3 +777,276 @@ func TestNexClient_Auction_PartialError(t *testing.T) { be.NilErr(t, node.Shutdown()) } } + +// countWorkloads collapses the scatter-gather list response to a total count. +func countWorkloads(t *testing.T, c NexClient) int { + t.Helper() + wl, err := c.ListWorkloads(nil) + be.NilErr(t, err) + total := 0 + for _, w := range wl { + total += len(*w) + } + return total +} + +// TestNexClient_CloneWorkload_CrossNamespace verifies that when a system-user +// client clones a workload owned by a user namespace, the clone lands in the +// owning namespace — not in `system`. This was the primary bug motivating the +// cross-namespace fix: before the fix, client.CloneWorkload would pass through +// to StartWorkload with n.namespace="system" and the clone would be stored +// under n.workloads["system"] on the agent. +func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { + t.Parallel() + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + // Build two clients sharing the same nexus: one scoped to the "user" + // namespace that will deploy the original workload, and one scoped to + // system that will perform the cross-namespace clone. + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + origSwr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "orig", + Description: "original workload owned by user", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + // Sanity: the user namespace now has exactly one workload, the system + // namespace has none. + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to register in user namespace") + be.Equal(t, 0, countWorkloads(t, systemClient)-countWorkloads(t, userClient)) + + // System user clones the user-owned workload. + cloneResp, err := systemClient.CloneWorkload(origSwr.Id, nil) + be.NilErr(t, err) + be.Nonzero(t, cloneResp) + be.Unequal(t, origSwr.Id, cloneResp.Id) + + // Both the original and the clone must appear in the user namespace. + // The system-namespace view aggregates every namespace, so it should + // match the user view exactly (there is no namespace other than user). + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 2 + }, "waiting for clone to appear in user namespace") + be.Equal(t, 2, countWorkloads(t, userClient)) + + // Verify each workload's Namespace field on the system list reports + // "user", confirming the clone did not re-home into system. + sysList, err := systemClient.ListWorkloads(nil) + be.NilErr(t, err) + seenUser := 0 + for _, agentResp := range sysList { + for _, summary := range *agentResp { + be.Nonzero(t, summary.Namespace) + if summary.Namespace != nil && *summary.Namespace == "user" { + seenUser++ + } + } + } + be.Equal(t, 2, seenUser) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_StopWorkload_System_Discovery verifies that a system-user +// client can stop a workload owned by a different namespace. The client must +// discover the owning namespace via ListWorkloads and publish the stop on +// that namespace's subject, not `system`. +func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { + t.Parallel() + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + swr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "stoppable", + Description: "workload to be stopped by system", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for workload to register") + + // System user stops the user-owned workload. The client must discover + // namespace=user from the list response and target the user subject. + stopResp, err := systemClient.StopWorkload(swr.Id) + be.NilErr(t, err) + be.True(t, stopResp.Stopped) + be.Equal(t, swr.Id, stopResp.Id) + + // Confirm the workload is gone from the user namespace. + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 0 + }, "waiting for workload to stop") + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_StopWorkload_System_NotFound verifies that a system-user stop +// for a nonexistent workload surfaces a clean not-found error rather than +// silently returning Stopped=false. +func TestNexClient_StopWorkload_System_NotFound(t *testing.T) { + t.Parallel() + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + _, err = systemClient.StopWorkload("does-not-exist") + be.Nonzero(t, err) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_CloneWorkload_StopOrig_CrossNamespace exercises the CLI's +// clone-then-stop flow as a system user against a user-owned workload. The +// clone must land in the user namespace and the original must be stopped. +func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { + t.Parallel() + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + origSwr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "orig-stop", + Description: "original, will be stopped after clone", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to register") + + // System-user clone then stop, mirroring `nex workload copy --stop`. + cloneResp, err := systemClient.CloneWorkload(origSwr.Id, nil) + be.NilErr(t, err) + + stopResp, err := systemClient.StopWorkload(origSwr.Id) + be.NilErr(t, err) + be.True(t, stopResp.Stopped) + + // Only the clone should remain, and it should be in the user namespace. + _test.WaitFor(t, 10*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to stop after clone") + be.Equal(t, 1, countWorkloads(t, userClient)) + + wl, err := userClient.ListWorkloads(nil) + be.NilErr(t, err) + foundClone := false + for _, agentResp := range wl { + for _, summary := range *agentResp { + if summary.Id == cloneResp.Id { + foundClone = true + be.Nonzero(t, summary.Namespace) + if summary.Namespace != nil { + be.Equal(t, "user", *summary.Namespace) + } + } + } + } + be.True(t, foundClone) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} diff --git a/cmd/nex/workloads.go b/cmd/nex/workloads.go index 0f17ae54..0c2dbb62 100644 --- a/cmd/nex/workloads.go +++ b/cmd/nex/workloads.go @@ -119,7 +119,7 @@ func (r *StartWorkload) Run(ctx context.Context, globals *Globals) error { r.WorkloadStartRequest = json.RawMessage(srB) } - aucResp, err := client.Auction(r.AgentType, r.AuctionTags) + aucResp, err := client.Auction(globals.Namespace, r.AgentType, r.AuctionTags) if err != nil { return err } @@ -177,7 +177,15 @@ func (r *StartWorkload) Run(ctx context.Context, globals *Globals) error { return err } - startResponse, err := client.StartWorkload(randomNode.BidderId, r.WorkloadName, r.WorkloadDescription, string(wsrB), r.AgentType, models.WorkloadLifecycle(r.WorkloadLifecycle), r.AuctionTags) + startResponse, err := client.StartWorkload(randomNode.BidderId, &models.StartWorkloadRequest{ + Namespace: globals.Namespace, + Name: r.WorkloadName, + Description: r.WorkloadDescription, + RunRequest: string(wsrB), + WorkloadType: r.AgentType, + WorkloadLifecycle: models.WorkloadLifecycle(r.WorkloadLifecycle), + Tags: r.AuctionTags, + }) if err != nil { return err } @@ -344,21 +352,32 @@ func (r *CloneWorkload) Run(ctx context.Context, globals *Globals) error { if err != nil { return err } + + // Clone succeeded. If --stop was requested and the subsequent stop + // fails, report both outcomes distinctly instead of swallowing the + // clone success so the operator knows the clone is live and only the + // original is still running. var stopped bool + var stopErr error if r.StopOrig { - stopResp, err := nexClient.StopWorkload(r.WorkloadId) - if err != nil { - return err - } else if stopResp.Stopped { + stopResp, sErr := nexClient.StopWorkload(r.WorkloadId) + switch { + case sErr != nil: + stopErr = sErr + case stopResp.Stopped: stopped = true } } + if globals.JSON { respB, err := json.Marshal(resp) if err != nil { return err } fmt.Println(string(respB)) + if stopErr != nil { + return stopErr + } return nil } @@ -366,6 +385,10 @@ func (r *CloneWorkload) Run(ctx context.Context, globals *Globals) error { if stopped { fmt.Printf("Original workload [%s] stopped\n", r.WorkloadId) } + if stopErr != nil { + fmt.Fprintf(os.Stderr, "failed to stop original workload [%s]: %v\n", r.WorkloadId, stopErr) + return stopErr + } return nil } diff --git a/handlers.go b/handlers.go index e451b73c..99f922d0 100644 --- a/handlers.go +++ b/handlers.go @@ -300,7 +300,13 @@ func (n *NexNode) handleAuctionDeployWorkload() func(micro.Request) { } workloadID := n.idgen.Generate(req) - wlNatsConn, err := n.minter.Mint(models.WorkloadCred, namespace, workloadID) + // Mint against the workload's owning namespace (from the request + // body), not the subject namespace. The subject namespace is only + // used for the authorization bypass check above; the workload's + // NATS permissions — specifically the log sub scope + // $NEX.FEED..logs.> — must line up with where the workload + // actually lives, which is req.Namespace. + wlNatsConn, err := n.minter.Mint(models.WorkloadCred, req.Namespace, workloadID) if err != nil { n.handlerError(r, err, models.ErrCodeInternalServerError, "failed to mint workload nats connection") return @@ -440,6 +446,23 @@ func (n *NexNode) handleCloneWorkload() func(micro.Request) { return } + // The agent's state.Exists iterates every namespace when looking + // up a workload by id, so a user caller could otherwise fetch the + // full StartWorkloadRequest definition (including RunRequest) of a + // workload owned by another namespace. Verify the returned + // workload's namespace matches the caller's, unless the caller is + // system (which is permitted to clone across namespaces). Silent + // drop on mismatch matches the existing "not found" silence + // pattern above so existence is not leaked. + tmp := new(models.StartWorkloadRequest) + if err := json.Unmarshal(getWorkload.Data, tmp); err != nil { + n.handlerError(r, err, models.ErrCodeInternalServerError, "failed to unmarshal workload definition from agent") + return + } + if tmp.Namespace != namespace && namespace != models.SystemNamespace { + return + } + err = r.Respond(getWorkload.Data) if err != nil { n.logger.Error("failed to respond to clone workload request", slog.String("err", err.Error())) diff --git a/internal/credentials/vendor_test.go b/internal/credentials/vendor_test.go new file mode 100644 index 00000000..269da985 --- /dev/null +++ b/internal/credentials/vendor_test.go @@ -0,0 +1,29 @@ +package credentials + +import ( + "slices" + "testing" + + "github.com/carlmjohnson/be" + "github.com/synadia-io/nex/models" +) + +// TestWorkloadClaims_LogScopedToNamespace locks in the invariant that a +// workload's NATS sub permissions are scoped to its owning namespace. This +// is the counterpart to the handlers.go handleAuctionDeploy fix that mints +// credentials against req.Namespace (the workload's real namespace) rather +// than the subject namespace. Without this invariant, a cloned workload +// would receive log permissions for the caller's namespace and be unable to +// stream logs into its own. +func TestWorkloadClaims_LogScopedToNamespace(t *testing.T) { + perms := WorkloadClaims("default", "wid123") + + expectedLogSub := models.LogAPIPrefix("default") + ".>" + be.True(t, slices.Contains(perms.Sub.Allow, expectedLogSub)) + + // Guard against the specific regression we just fixed: the system + // namespace must not appear in a default-namespace workload's sub + // allow list. + systemLogSub := models.LogAPIPrefix(models.SystemNamespace) + ".>" + be.False(t, slices.Contains(perms.Sub.Allow, systemLogSub)) +} From ed51210b0fedd08f1a3eaa457c61e9f8131e757f Mon Sep 17 00:00:00 2001 From: Jordan Rash Date: Thu, 9 Apr 2026 10:59:08 -0600 Subject: [PATCH 2/5] fix(client): address review feedback on clone/stop cross-namespace - StartWorkload shallow-copies the incoming request before defaulting Tags so call sites that pass a struct they still hold (notably CloneWorkload passing cloneResp verbatim) do not see silent mutation - CloneWorkload fails fast with a clear error when cloneResp.Namespace is empty, instead of publishing on a malformed auction subject and surfacing as a misleading "no nodes available for placement" - TestNexClient_CloneWorkload_CrossNamespace replaces a trivially-true subtraction assertion with explicit count checks against both the user and system clients Signed-off-by: Jordan Rash --- client/client.go | 26 ++++++++++++++++++++++---- client/client_test.go | 8 +++++--- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/client/client.go b/client/client.go index 43a9a623..1fb19333 100644 --- a/client/client.go +++ b/client/client.go @@ -260,16 +260,22 @@ func (n *nexClient) StartWorkload(deployId string, req *models.StartWorkloadRequ if req == nil { return nil, n.nexBadRequestError(errors.New("nil request"), "start workload request must not be nil") } - if req.Tags == nil { - req.Tags = make(models.NodeTags) + + // Shallow-copy so defaulting Tags does not mutate the caller's struct. + // This matters for call sites like CloneWorkload that pass the agent's + // StartWorkloadRequest through verbatim — we must not silently poke at + // fields on a value the caller still holds a reference to. + local := *req + if local.Tags == nil { + local.Tags = make(models.NodeTags) } - reqB, err := json.Marshal(req) + reqB, err := json.Marshal(&local) if err != nil { return nil, n.nexInternalError(err, "failed to marshal start workload request") } - startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(req.Namespace, deployId), reqB, n.startWorkloadTimeout) + startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(local.Namespace, deployId), reqB, n.startWorkloadTimeout) if err != nil { return nil, n.nexInternalError(err, "failed to request start workload") } @@ -486,6 +492,18 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") } + // An empty namespace on the clone response means the agent did not + // report the workload's owning namespace — typically an older nexlet + // that pre-dates the WorkloadSummary.Namespace contract. Without a + // target namespace we would publish the auction on a malformed subject + // and the failure would surface as an unhelpful "no nodes available" + // error. Fail fast with a clear message instead. + if cloneResp.Namespace == "" { + return nil, n.nexInternalError( + errors.New("clone response missing namespace"), + "cannot clone workload: owning namespace not reported by the agent") + } + // The clone must be deployed into the workload's actual owning // namespace, not the caller's namespace. For a system operator cloning // a default-namespace workload, this means the auction and deploy diff --git a/client/client_test.go b/client/client_test.go index aefbae85..8647e605 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -837,12 +837,14 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { }) be.NilErr(t, err) - // Sanity: the user namespace now has exactly one workload, the system - // namespace has none. + // Sanity: the user namespace now has exactly one workload, and a + // system-scoped list sees the same single workload (system aggregates + // across every namespace, and user is the only namespace with state). _test.WaitFor(t, 10*time.Second, func() bool { return countWorkloads(t, userClient) == 1 }, "waiting for original to register in user namespace") - be.Equal(t, 0, countWorkloads(t, systemClient)-countWorkloads(t, userClient)) + be.Equal(t, 1, countWorkloads(t, userClient)) + be.Equal(t, 1, countWorkloads(t, systemClient)) // System user clones the user-owned workload. cloneResp, err := systemClient.CloneWorkload(origSwr.Id, nil) From e9c6f8c338eba41f5331e261e5d4c169b1cfbdea Mon Sep 17 00:00:00 2001 From: Jordan Rash Date: Thu, 9 Apr 2026 12:33:38 -0600 Subject: [PATCH 3/5] test(client): stabilize cross-namespace tests for slow CI runners Signed-off-by: Jordan Rash --- client/client_test.go | 45 +++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 8647e605..49c671db 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -806,7 +806,7 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { be.NilErr(t, err) defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) @@ -822,7 +822,7 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { be.NilErr(t, err) var ar []*models.AuctionResponse - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { ar, err = userClient.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 1 }, "waiting for user auction to return 1 result") @@ -837,13 +837,13 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { }) be.NilErr(t, err) - // Sanity: the user namespace now has exactly one workload, and a - // system-scoped list sees the same single workload (system aggregates - // across every namespace, and user is the only namespace with state). - _test.WaitFor(t, 10*time.Second, func() bool { + // Sanity: both clients see exactly one workload. We use a single + // WaitFor on the user client (the authoritative namespace) and then + // read the system client once — keeping ListWorkloads calls to a + // minimum since each carries a RequestMany stall. + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 1 }, "waiting for original to register in user namespace") - be.Equal(t, 1, countWorkloads(t, userClient)) be.Equal(t, 1, countWorkloads(t, systemClient)) // System user clones the user-owned workload. @@ -853,15 +853,13 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { be.Unequal(t, origSwr.Id, cloneResp.Id) // Both the original and the clone must appear in the user namespace. - // The system-namespace view aggregates every namespace, so it should - // match the user view exactly (there is no namespace other than user). - _test.WaitFor(t, 10*time.Second, func() bool { + // A single WaitFor confirms the clone landed; we then do one + // system-scoped list to verify every workload's Namespace field + // reports "user", confirming nothing re-homed into system. + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 2 }, "waiting for clone to appear in user namespace") - be.Equal(t, 2, countWorkloads(t, userClient)) - // Verify each workload's Namespace field on the system list reports - // "user", confirming the clone did not re-home into system. sysList, err := systemClient.ListWorkloads(nil) be.NilErr(t, err) seenUser := 0 @@ -894,7 +892,7 @@ func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { be.NilErr(t, err) defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) @@ -907,7 +905,7 @@ func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { be.NilErr(t, err) var ar []*models.AuctionResponse - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { ar, err = userClient.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 1 }, "waiting for user auction to return 1 result") @@ -922,7 +920,7 @@ func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { }) be.NilErr(t, err) - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 1 }, "waiting for workload to register") @@ -934,7 +932,7 @@ func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { be.Equal(t, swr.Id, stopResp.Id) // Confirm the workload is gone from the user namespace. - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 0 }, "waiting for workload to stop") @@ -956,7 +954,7 @@ func TestNexClient_StopWorkload_System_NotFound(t *testing.T) { be.NilErr(t, err) defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) @@ -986,7 +984,7 @@ func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { be.NilErr(t, err) defer nc.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) @@ -999,7 +997,7 @@ func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { be.NilErr(t, err) var ar []*models.AuctionResponse - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { ar, err = userClient.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 1 }, "waiting for user auction to return 1 result") @@ -1014,7 +1012,7 @@ func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { }) be.NilErr(t, err) - _test.WaitFor(t, 10*time.Second, func() bool { + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 1 }, "waiting for original to register") @@ -1027,10 +1025,11 @@ func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { be.True(t, stopResp.Stopped) // Only the clone should remain, and it should be in the user namespace. - _test.WaitFor(t, 10*time.Second, func() bool { + // A single authoritative ListWorkloads after the stop covers both the + // count and the namespace-field checks without redundant round-trips. + _test.WaitFor(t, 30*time.Second, func() bool { return countWorkloads(t, userClient) == 1 }, "waiting for original to stop after clone") - be.Equal(t, 1, countWorkloads(t, userClient)) wl, err := userClient.ListWorkloads(nil) be.NilErr(t, err) From e77eed1beb18bc774f8a4bdf511f34bd97b3ac55 Mon Sep 17 00:00:00 2001 From: Jordan Rash Date: Thu, 9 Apr 2026 15:53:43 -0600 Subject: [PATCH 4/5] remove parallel for test Signed-off-by: Jordan Rash --- client/client_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 49c671db..fbfaf62f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -797,7 +797,6 @@ func countWorkloads(t *testing.T, c NexClient) int { // to StartWorkload with n.namespace="system" and the clone would be stored // under n.workloads["system"] on the agent. func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { - t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) defer server.Shutdown() @@ -883,7 +882,6 @@ func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { // discover the owning namespace via ListWorkloads and publish the stop on // that namespace's subject, not `system`. func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { - t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) defer server.Shutdown() @@ -945,7 +943,6 @@ func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { // for a nonexistent workload surfaces a clean not-found error rather than // silently returning Stopped=false. func TestNexClient_StopWorkload_System_NotFound(t *testing.T) { - t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) defer server.Shutdown() @@ -975,7 +972,6 @@ func TestNexClient_StopWorkload_System_NotFound(t *testing.T) { // clone-then-stop flow as a system user against a user-owned workload. The // clone must land in the user namespace and the original must be stopped. func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { - t.Parallel() workDir := t.TempDir() server := _test.StartNatsServer(t, workDir) defer server.Shutdown() From 88035a7747935b4f93cd191c53f36cd8f0847e87 Mon Sep 17 00:00:00 2001 From: Jordan Rash Date: Thu, 9 Apr 2026 16:20:41 -0600 Subject: [PATCH 5/5] waits for micro to be ready Signed-off-by: Jordan Rash --- node.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node.go b/node.go index 66ce3707..815030c0 100644 --- a/node.go +++ b/node.go @@ -320,6 +320,11 @@ func (n *NexNode) Start() error { n.logger.Warn("nex node started without any agents") } + // Ensure subscriptions are live at the server before declaring ready. + if err := n.nc.Flush(); err != nil { + return fmt.Errorf("failed to flush nats connection after subscribing: %w", err) + } + n.logger.Info("nex node ready") n.nodeState = models.NodeStateRunning return nil