diff --git a/_test/nexlet_inmem/inmem_test.go b/_test/nexlet_inmem/inmem_test.go index ab44239d..e03008a0 100644 --- a/_test/nexlet_inmem/inmem_test.go +++ b/_test/nexlet_inmem/inmem_test.go @@ -157,7 +157,7 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { var aR []*models.AuctionResponse deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { - aR, err = nClient.Auction("inmem", nil) + aR, err = nClient.Auction(models.SystemNamespace, "inmem", nil) if err == nil && len(aR) == 1 { break } @@ -166,7 +166,13 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) { be.NilErr(t, err) be.Equal(t, 1, len(aR)) - sR, err := nClient.StartWorkload(aR[0].BidderId, "inmemfunc", "", "{}", "inmem", "function", nil) + sR, err := nClient.StartWorkload(aR[0].BidderId, &models.StartWorkloadRequest{ + Namespace: models.SystemNamespace, + Name: "inmemfunc", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: "function", + }) be.NilErr(t, err) be.Nonzero(t, sR) diff --git a/client/client.go b/client/client.go index f1a255db..1fb19333 100644 --- a/client/client.go +++ b/client/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/rand" "net/http" "strconv" @@ -28,8 +29,8 @@ type NexClient interface { GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) SetLameduck(nodeId string, delay time.Duration, tag map[string]string) (*models.LameduckResponse, error) ListNodes(filter map[string]string) ([]*models.NodePingResponse, error) - Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) - StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) + Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error) + StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error) StopWorkload(workloadId string) (*models.StopWorkloadResponse, error) ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error) CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error) @@ -215,7 +216,7 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo return resp, errs } -func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) { +func (n *nexClient) Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error) { auctionRequest := &models.AuctionRequest{ AgentType: typ, AuctionId: nuid.New().Next(), @@ -227,7 +228,7 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti return nil, n.nexInternalError(err, "failed to marshal auction request") } - msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) + msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) if err != nil { return nil, n.nexInternalError(err, "failed to request auction") } @@ -255,27 +256,26 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti return resp, errs } -func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) { - if pTags == nil { - pTags = make(models.NodeTags) +func (n *nexClient) StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error) { + if req == nil { + return nil, n.nexBadRequestError(errors.New("nil request"), "start workload request must not be nil") } - req := &models.StartWorkloadRequest{ - Namespace: n.namespace, - Name: name, - Description: desc, - RunRequest: runRequest, - WorkloadLifecycle: lifecycle, - WorkloadType: typ, - Tags: pTags, + // Shallow-copy so defaulting Tags does not mutate the caller's struct. + // This matters for call sites like CloneWorkload that pass the agent's + // StartWorkloadRequest through verbatim — we must not silently poke at + // fields on a value the caller still holds a reference to. + local := *req + if local.Tags == nil { + local.Tags = make(models.NodeTags) } - reqB, err := json.Marshal(req) + reqB, err := json.Marshal(&local) if err != nil { return nil, n.nexInternalError(err, "failed to marshal start workload request") } - startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout) + startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(local.Namespace, deployId), reqB, n.startWorkloadTimeout) if err != nil { return nil, n.nexInternalError(err, "failed to request start workload") } @@ -294,8 +294,61 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, } func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadResponse, error) { + // targetNS is the namespace the workload actually lives in. For user + // callers it is the caller's own namespace. For system callers we must + // discover it because system is administrative and never hosts + // workloads itself (except in rare explicit system-namespace starts). + targetNS := n.namespace + + if n.namespace == models.SystemNamespace { + summaries, err := n.ListWorkloads([]string{workloadId}) + if err != nil { + return nil, n.nexInternalError(err, "failed to discover workload namespace") + } + + var resolved []string + for _, agentResp := range summaries { + for _, summary := range *agentResp { + if summary.Id != workloadId { + continue + } + if summary.Namespace == nil { + return nil, n.nexInternalError( + errors.New("owning namespace not reported"), + fmt.Sprintf("cannot stop workload %s as system user: owning namespace could not be determined (nexlet does not report namespace in list response); retry with --namespace set to the owning namespace", workloadId)) + } + already := false + for _, existing := range resolved { + if existing == *summary.Namespace { + already = true + break + } + } + if !already { + resolved = append(resolved, *summary.Namespace) + } + } + } + + switch len(resolved) { + case 0: + return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") + case 1: + targetNS = resolved[0] + default: + // Don't echo the resolved namespace names in the error + // message — even though only system callers reach this branch + // today, the error propagates through NATS micro response + // headers and logs where the audience is broader. The + // operator can list workloads themselves to investigate. + return nil, n.nexInternalError( + errors.New("ambiguous workload id"), + fmt.Sprintf("workload id %s is ambiguous: it matches workloads in multiple namespaces; use --namespace to specify which one to stop", workloadId)) + } + } + req := models.StopWorkloadRequest{ - Namespace: n.namespace, + Namespace: targetNS, } reqB, err := json.Marshal(req) @@ -303,7 +356,7 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons return nil, n.nexInternalError(err, "failed to marshal stop workload request") } - msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) + msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(targetNS, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) if err != nil { return nil, n.nexInternalError(err, "failed to request stop workload") } @@ -390,6 +443,13 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexInternalError(err, "failed to get clone public key") } + // The clone fetch uses n.namespace because the caller has not yet + // learned the target namespace — the whole point of this round-trip is + // to discover the workload's owning namespace along with its full + // definition. The node's handleCloneWorkload bypass for + // models.SystemNamespace is load-bearing here for cross-namespace + // clones by a system operator; a user caller can only fetch workloads + // in their own namespace (handlers.go enforces this). cloneReq := models.CloneWorkloadRequest{ Namespace: n.namespace, NewTargetXkey: tKpPub, @@ -432,7 +492,28 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") } - aucResp, err := n.Auction(cloneResp.WorkloadType, tags) + // An empty namespace on the clone response means the agent did not + // report the workload's owning namespace — typically an older nexlet + // that pre-dates the WorkloadSummary.Namespace contract. Without a + // target namespace we would publish the auction on a malformed subject + // and the failure would surface as an unhelpful "no nodes available" + // error. Fail fast with a clear message instead. + if cloneResp.Namespace == "" { + return nil, n.nexInternalError( + errors.New("clone response missing namespace"), + "cannot clone workload: owning namespace not reported by the agent") + } + + // The clone must be deployed into the workload's actual owning + // namespace, not the caller's namespace. For a system operator cloning + // a default-namespace workload, this means the auction and deploy + // target `default`. The system credential masquerades at the subject + // layer — publishing on default.* control subjects while authenticated + // as system — which the node accepts because subject-body namespaces + // match. cloneResp already carries the authoritative Namespace from + // the agent's state, so we reuse it verbatim and only override the + // placement tags the caller passed to CloneWorkload. + aucResp, err := n.Auction(cloneResp.Namespace, cloneResp.WorkloadType, tags) if len(aucResp) == 0 { if err != nil { return nil, err @@ -441,7 +522,8 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St } randomNode := aucResp[rand.Intn(len(aucResp))] - swr, err := n.StartWorkload(randomNode.BidderId, cloneResp.Name, cloneResp.Description, cloneResp.RunRequest, cloneResp.WorkloadType, cloneResp.WorkloadLifecycle, tags) + cloneResp.Tags = tags + swr, err := n.StartWorkload(randomNode.BidderId, cloneResp) if err != nil { return nil, err } diff --git a/client/client_benchmark_test.go b/client/client_benchmark_test.go index 6e7a9fd4..808fc996 100644 --- a/client/client_benchmark_test.go +++ b/client/client_benchmark_test.go @@ -91,7 +91,7 @@ func BenchmarkClientAuction(b *testing.B) { b.ResetTimer() for b.Loop() { - auctionResponses, err := client.Auction(workloadType, map[string]string{}) + auctionResponses, err := client.Auction(namespace, workloadType, map[string]string{}) be.NilErr(b, err) b.StopTimer() for _, ar := range auctionResponses { diff --git a/client/client_test.go b/client/client_test.go index 03b7436e..fbfaf62f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -78,11 +78,18 @@ func TestNexClient_User(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 1 }, "waiting for auction to return 1 result") - sr, err := client.StartWorkload(ar[0].BidderId, "tester", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + sr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) be.Equal(t, "tester", sr.Name) @@ -227,27 +234,48 @@ func TestNexClient_ListWorkloads(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[1%tt.size].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[1%tt.size].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester2", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[2%tt.size].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[2%tt.size].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester3", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) wl, err := client.ListWorkloads([]string{}) @@ -337,11 +365,18 @@ func TestNexClient_CloneWorkload(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + swr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _, err = client.CloneWorkload(swr.Id, nil) @@ -518,11 +553,18 @@ func TestNexClient_StopWorkload(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == tt.size }, "waiting for auction to return expected results") - swr, err := client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + swr, err := client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { @@ -644,11 +686,18 @@ func TestNexClient_ListWorkloads_PartialError(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return err == nil && len(ar) == 3 }, "waiting for auction to return 3 results") - _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "tester1", + Description: "My test workload", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { @@ -717,7 +766,7 @@ func TestNexClient_Auction_PartialError(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { - ar, err = client.Auction("inmem", map[string]string{}) + ar, err = client.Auction("user", "inmem", map[string]string{}) return len(ar) == 3 }, "waiting for auction to return 3 healthy results") @@ -728,3 +777,273 @@ func TestNexClient_Auction_PartialError(t *testing.T) { be.NilErr(t, node.Shutdown()) } } + +// countWorkloads collapses the scatter-gather list response to a total count. +func countWorkloads(t *testing.T, c NexClient) int { + t.Helper() + wl, err := c.ListWorkloads(nil) + be.NilErr(t, err) + total := 0 + for _, w := range wl { + total += len(*w) + } + return total +} + +// TestNexClient_CloneWorkload_CrossNamespace verifies that when a system-user +// client clones a workload owned by a user namespace, the clone lands in the +// owning namespace — not in `system`. This was the primary bug motivating the +// cross-namespace fix: before the fix, client.CloneWorkload would pass through +// to StartWorkload with n.namespace="system" and the clone would be stored +// under n.workloads["system"] on the agent. +func TestNexClient_CloneWorkload_CrossNamespace(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + // Build two clients sharing the same nexus: one scoped to the "user" + // namespace that will deploy the original workload, and one scoped to + // system that will perform the cross-namespace clone. + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 30*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + origSwr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "orig", + Description: "original workload owned by user", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + // Sanity: both clients see exactly one workload. We use a single + // WaitFor on the user client (the authoritative namespace) and then + // read the system client once — keeping ListWorkloads calls to a + // minimum since each carries a RequestMany stall. + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to register in user namespace") + be.Equal(t, 1, countWorkloads(t, systemClient)) + + // System user clones the user-owned workload. + cloneResp, err := systemClient.CloneWorkload(origSwr.Id, nil) + be.NilErr(t, err) + be.Nonzero(t, cloneResp) + be.Unequal(t, origSwr.Id, cloneResp.Id) + + // Both the original and the clone must appear in the user namespace. + // A single WaitFor confirms the clone landed; we then do one + // system-scoped list to verify every workload's Namespace field + // reports "user", confirming nothing re-homed into system. + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 2 + }, "waiting for clone to appear in user namespace") + + sysList, err := systemClient.ListWorkloads(nil) + be.NilErr(t, err) + seenUser := 0 + for _, agentResp := range sysList { + for _, summary := range *agentResp { + be.Nonzero(t, summary.Namespace) + if summary.Namespace != nil && *summary.Namespace == "user" { + seenUser++ + } + } + } + be.Equal(t, 2, seenUser) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_StopWorkload_System_Discovery verifies that a system-user +// client can stop a workload owned by a different namespace. The client must +// discover the owning namespace via ListWorkloads and publish the stop on +// that namespace's subject, not `system`. +func TestNexClient_StopWorkload_System_Discovery(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 30*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + swr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "stoppable", + Description: "workload to be stopped by system", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for workload to register") + + // System user stops the user-owned workload. The client must discover + // namespace=user from the list response and target the user subject. + stopResp, err := systemClient.StopWorkload(swr.Id) + be.NilErr(t, err) + be.True(t, stopResp.Stopped) + be.Equal(t, swr.Id, stopResp.Id) + + // Confirm the workload is gone from the user namespace. + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 0 + }, "waiting for workload to stop") + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_StopWorkload_System_NotFound verifies that a system-user stop +// for a nonexistent workload surfaces a clean not-found error rather than +// silently returning Stopped=false. +func TestNexClient_StopWorkload_System_NotFound(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + _, err = systemClient.StopWorkload("does-not-exist") + be.Nonzero(t, err) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +// TestNexClient_CloneWorkload_StopOrig_CrossNamespace exercises the CLI's +// clone-then-stop flow as a system user against a user-owned workload. The +// clone must land in the user namespace and the original must be stopped. +func TestNexClient_CloneWorkload_StopOrig_CrossNamespace(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) + be.Equal(t, 1, len(nexNodes)) + + userClient, err := NewClient(context.Background(), nc, "user", WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + systemClient, err := NewClient(context.Background(), nc, models.SystemNamespace, WithAuctionStall(5*time.Second)) + be.NilErr(t, err) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 30*time.Second, func() bool { + ar, err = userClient.Auction("user", "inmem", map[string]string{}) + return err == nil && len(ar) == 1 + }, "waiting for user auction to return 1 result") + + origSwr, err := userClient.StartWorkload(ar[0].BidderId, &models.StartWorkloadRequest{ + Namespace: "user", + Name: "orig-stop", + Description: "original, will be stopped after clone", + RunRequest: "{}", + WorkloadType: "inmem", + WorkloadLifecycle: models.WorkloadLifecycleService, + }) + be.NilErr(t, err) + + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to register") + + // System-user clone then stop, mirroring `nex workload copy --stop`. + cloneResp, err := systemClient.CloneWorkload(origSwr.Id, nil) + be.NilErr(t, err) + + stopResp, err := systemClient.StopWorkload(origSwr.Id) + be.NilErr(t, err) + be.True(t, stopResp.Stopped) + + // Only the clone should remain, and it should be in the user namespace. + // A single authoritative ListWorkloads after the stop covers both the + // count and the namespace-field checks without redundant round-trips. + _test.WaitFor(t, 30*time.Second, func() bool { + return countWorkloads(t, userClient) == 1 + }, "waiting for original to stop after clone") + + wl, err := userClient.ListWorkloads(nil) + be.NilErr(t, err) + foundClone := false + for _, agentResp := range wl { + for _, summary := range *agentResp { + if summary.Id == cloneResp.Id { + foundClone = true + be.Nonzero(t, summary.Namespace) + if summary.Namespace != nil { + be.Equal(t, "user", *summary.Namespace) + } + } + } + } + be.True(t, foundClone) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} diff --git a/cmd/nex/workloads.go b/cmd/nex/workloads.go index 0f17ae54..0c2dbb62 100644 --- a/cmd/nex/workloads.go +++ b/cmd/nex/workloads.go @@ -119,7 +119,7 @@ func (r *StartWorkload) Run(ctx context.Context, globals *Globals) error { r.WorkloadStartRequest = json.RawMessage(srB) } - aucResp, err := client.Auction(r.AgentType, r.AuctionTags) + aucResp, err := client.Auction(globals.Namespace, r.AgentType, r.AuctionTags) if err != nil { return err } @@ -177,7 +177,15 @@ func (r *StartWorkload) Run(ctx context.Context, globals *Globals) error { return err } - startResponse, err := client.StartWorkload(randomNode.BidderId, r.WorkloadName, r.WorkloadDescription, string(wsrB), r.AgentType, models.WorkloadLifecycle(r.WorkloadLifecycle), r.AuctionTags) + startResponse, err := client.StartWorkload(randomNode.BidderId, &models.StartWorkloadRequest{ + Namespace: globals.Namespace, + Name: r.WorkloadName, + Description: r.WorkloadDescription, + RunRequest: string(wsrB), + WorkloadType: r.AgentType, + WorkloadLifecycle: models.WorkloadLifecycle(r.WorkloadLifecycle), + Tags: r.AuctionTags, + }) if err != nil { return err } @@ -344,21 +352,32 @@ func (r *CloneWorkload) Run(ctx context.Context, globals *Globals) error { if err != nil { return err } + + // Clone succeeded. If --stop was requested and the subsequent stop + // fails, report both outcomes distinctly instead of swallowing the + // clone success so the operator knows the clone is live and only the + // original is still running. var stopped bool + var stopErr error if r.StopOrig { - stopResp, err := nexClient.StopWorkload(r.WorkloadId) - if err != nil { - return err - } else if stopResp.Stopped { + stopResp, sErr := nexClient.StopWorkload(r.WorkloadId) + switch { + case sErr != nil: + stopErr = sErr + case stopResp.Stopped: stopped = true } } + if globals.JSON { respB, err := json.Marshal(resp) if err != nil { return err } fmt.Println(string(respB)) + if stopErr != nil { + return stopErr + } return nil } @@ -366,6 +385,10 @@ func (r *CloneWorkload) Run(ctx context.Context, globals *Globals) error { if stopped { fmt.Printf("Original workload [%s] stopped\n", r.WorkloadId) } + if stopErr != nil { + fmt.Fprintf(os.Stderr, "failed to stop original workload [%s]: %v\n", r.WorkloadId, stopErr) + return stopErr + } return nil } diff --git a/handlers.go b/handlers.go index e451b73c..99f922d0 100644 --- a/handlers.go +++ b/handlers.go @@ -300,7 +300,13 @@ func (n *NexNode) handleAuctionDeployWorkload() func(micro.Request) { } workloadID := n.idgen.Generate(req) - wlNatsConn, err := n.minter.Mint(models.WorkloadCred, namespace, workloadID) + // Mint against the workload's owning namespace (from the request + // body), not the subject namespace. The subject namespace is only + // used for the authorization bypass check above; the workload's + // NATS permissions — specifically the log sub scope + // $NEX.FEED..logs.> — must line up with where the workload + // actually lives, which is req.Namespace. + wlNatsConn, err := n.minter.Mint(models.WorkloadCred, req.Namespace, workloadID) if err != nil { n.handlerError(r, err, models.ErrCodeInternalServerError, "failed to mint workload nats connection") return @@ -440,6 +446,23 @@ func (n *NexNode) handleCloneWorkload() func(micro.Request) { return } + // The agent's state.Exists iterates every namespace when looking + // up a workload by id, so a user caller could otherwise fetch the + // full StartWorkloadRequest definition (including RunRequest) of a + // workload owned by another namespace. Verify the returned + // workload's namespace matches the caller's, unless the caller is + // system (which is permitted to clone across namespaces). Silent + // drop on mismatch matches the existing "not found" silence + // pattern above so existence is not leaked. + tmp := new(models.StartWorkloadRequest) + if err := json.Unmarshal(getWorkload.Data, tmp); err != nil { + n.handlerError(r, err, models.ErrCodeInternalServerError, "failed to unmarshal workload definition from agent") + return + } + if tmp.Namespace != namespace && namespace != models.SystemNamespace { + return + } + err = r.Respond(getWorkload.Data) if err != nil { n.logger.Error("failed to respond to clone workload request", slog.String("err", err.Error())) diff --git a/internal/credentials/vendor_test.go b/internal/credentials/vendor_test.go new file mode 100644 index 00000000..269da985 --- /dev/null +++ b/internal/credentials/vendor_test.go @@ -0,0 +1,29 @@ +package credentials + +import ( + "slices" + "testing" + + "github.com/carlmjohnson/be" + "github.com/synadia-io/nex/models" +) + +// TestWorkloadClaims_LogScopedToNamespace locks in the invariant that a +// workload's NATS sub permissions are scoped to its owning namespace. This +// is the counterpart to the handlers.go handleAuctionDeploy fix that mints +// credentials against req.Namespace (the workload's real namespace) rather +// than the subject namespace. Without this invariant, a cloned workload +// would receive log permissions for the caller's namespace and be unable to +// stream logs into its own. +func TestWorkloadClaims_LogScopedToNamespace(t *testing.T) { + perms := WorkloadClaims("default", "wid123") + + expectedLogSub := models.LogAPIPrefix("default") + ".>" + be.True(t, slices.Contains(perms.Sub.Allow, expectedLogSub)) + + // Guard against the specific regression we just fixed: the system + // namespace must not appear in a default-namespace workload's sub + // allow list. + systemLogSub := models.LogAPIPrefix(models.SystemNamespace) + ".>" + be.False(t, slices.Contains(perms.Sub.Allow, systemLogSub)) +} diff --git a/node.go b/node.go index 66ce3707..815030c0 100644 --- a/node.go +++ b/node.go @@ -320,6 +320,11 @@ func (n *NexNode) Start() error { n.logger.Warn("nex node started without any agents") } + // Ensure subscriptions are live at the server before declaring ready. + if err := n.nc.Flush(); err != nil { + return fmt.Errorf("failed to flush nats connection after subscribing: %w", err) + } + n.logger.Info("nex node ready") n.nodeState = models.NodeStateRunning return nil