Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions _test/nexlet_inmem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) {
var aR []*models.AuctionResponse
deadline := time.Now().Add(10 * time.Second)
for time.Now().Before(deadline) {
aR, err = nClient.Auction("inmem", nil)
aR, err = nClient.Auction(models.SystemNamespace, "inmem", nil)
if err == nil && len(aR) == 1 {
break
}
Expand All @@ -166,7 +166,13 @@ func TestFunctionStartWorkloadAndTrigger(t *testing.T) {
be.NilErr(t, err)
be.Equal(t, 1, len(aR))

sR, err := nClient.StartWorkload(aR[0].BidderId, "inmemfunc", "", "{}", "inmem", "function", nil)
sR, err := nClient.StartWorkload(aR[0].BidderId, &models.StartWorkloadRequest{
Namespace: models.SystemNamespace,
Name: "inmemfunc",
RunRequest: "{}",
WorkloadType: "inmem",
WorkloadLifecycle: "function",
})
be.NilErr(t, err)
be.Nonzero(t, sR)

Expand Down
124 changes: 103 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
Expand All @@ -28,8 +29,8 @@ type NexClient interface {
GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error)
SetLameduck(nodeId string, delay time.Duration, tag map[string]string) (*models.LameduckResponse, error)
ListNodes(filter map[string]string) ([]*models.NodePingResponse, error)
Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error)
StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error)
Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error)
StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error)
StopWorkload(workloadId string) (*models.StopWorkloadResponse, error)
ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error)
CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error)
Expand Down Expand Up @@ -215,7 +216,7 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo
return resp, errs
}

func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) {
func (n *nexClient) Auction(namespace, typ string, tags map[string]string) ([]*models.AuctionResponse, error) {
auctionRequest := &models.AuctionRequest{
AgentType: typ,
AuctionId: nuid.New().Next(),
Expand All @@ -227,7 +228,7 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti
return nil, n.nexInternalError(err, "failed to marshal auction request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall))
msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall))
if err != nil {
return nil, n.nexInternalError(err, "failed to request auction")
}
Expand Down Expand Up @@ -255,27 +256,26 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti
return resp, errs
}

func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) {
if pTags == nil {
pTags = make(models.NodeTags)
func (n *nexClient) StartWorkload(deployId string, req *models.StartWorkloadRequest) (*models.StartWorkloadResponse, error) {
if req == nil {
return nil, n.nexBadRequestError(errors.New("nil request"), "start workload request must not be nil")
}

req := &models.StartWorkloadRequest{
Namespace: n.namespace,
Name: name,
Description: desc,
RunRequest: runRequest,
WorkloadLifecycle: lifecycle,
WorkloadType: typ,
Tags: pTags,
// Shallow-copy so defaulting Tags does not mutate the caller's struct.
// This matters for call sites like CloneWorkload that pass the agent's
// StartWorkloadRequest through verbatim — we must not silently poke at
// fields on a value the caller still holds a reference to.
local := *req
if local.Tags == nil {
local.Tags = make(models.NodeTags)
}

reqB, err := json.Marshal(req)
reqB, err := json.Marshal(&local)
if err != nil {
return nil, n.nexInternalError(err, "failed to marshal start workload request")
}

startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout)
startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(local.Namespace, deployId), reqB, n.startWorkloadTimeout)
if err != nil {
return nil, n.nexInternalError(err, "failed to request start workload")
}
Expand All @@ -294,16 +294,69 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string,
}

func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadResponse, error) {
// targetNS is the namespace the workload actually lives in. For user
// callers it is the caller's own namespace. For system callers we must
// discover it because system is administrative and never hosts
// workloads itself (except in rare explicit system-namespace starts).
targetNS := n.namespace

if n.namespace == models.SystemNamespace {
summaries, err := n.ListWorkloads([]string{workloadId})
if err != nil {
return nil, n.nexInternalError(err, "failed to discover workload namespace")
}

var resolved []string
for _, agentResp := range summaries {
for _, summary := range *agentResp {
if summary.Id != workloadId {
continue
}
if summary.Namespace == nil {
return nil, n.nexInternalError(
errors.New("owning namespace not reported"),
fmt.Sprintf("cannot stop workload %s as system user: owning namespace could not be determined (nexlet does not report namespace in list response); retry with --namespace set to the owning namespace", workloadId))
}
already := false
for _, existing := range resolved {
if existing == *summary.Namespace {
already = true
break
}
}
if !already {
resolved = append(resolved, *summary.Namespace)
}
}
}

switch len(resolved) {
case 0:
return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found")
case 1:
targetNS = resolved[0]
default:
// Don't echo the resolved namespace names in the error
// message — even though only system callers reach this branch
// today, the error propagates through NATS micro response
// headers and logs where the audience is broader. The
// operator can list workloads themselves to investigate.
return nil, n.nexInternalError(
errors.New("ambiguous workload id"),
fmt.Sprintf("workload id %s is ambiguous: it matches workloads in multiple namespaces; use --namespace to specify which one to stop", workloadId))
}
}

req := models.StopWorkloadRequest{
Namespace: n.namespace,
Namespace: targetNS,
}

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexInternalError(err, "failed to marshal stop workload request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall))
msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(targetNS, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall))
if err != nil {
return nil, n.nexInternalError(err, "failed to request stop workload")
}
Expand Down Expand Up @@ -390,6 +443,13 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St
return nil, n.nexInternalError(err, "failed to get clone public key")
}

// The clone fetch uses n.namespace because the caller has not yet
// learned the target namespace — the whole point of this round-trip is
// to discover the workload's owning namespace along with its full
// definition. The node's handleCloneWorkload bypass for
// models.SystemNamespace is load-bearing here for cross-namespace
// clones by a system operator; a user caller can only fetch workloads
// in their own namespace (handlers.go enforces this).
cloneReq := models.CloneWorkloadRequest{
Namespace: n.namespace,
NewTargetXkey: tKpPub,
Expand Down Expand Up @@ -432,7 +492,28 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St
return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found")
}

aucResp, err := n.Auction(cloneResp.WorkloadType, tags)
// An empty namespace on the clone response means the agent did not
// report the workload's owning namespace — typically an older nexlet
// that pre-dates the WorkloadSummary.Namespace contract. Without a
// target namespace we would publish the auction on a malformed subject
// and the failure would surface as an unhelpful "no nodes available"
// error. Fail fast with a clear message instead.
if cloneResp.Namespace == "" {
return nil, n.nexInternalError(
errors.New("clone response missing namespace"),
"cannot clone workload: owning namespace not reported by the agent")
}

// The clone must be deployed into the workload's actual owning
// namespace, not the caller's namespace. For a system operator cloning
// a default-namespace workload, this means the auction and deploy
// target `default`. The system credential masquerades at the subject
// layer — publishing on default.* control subjects while authenticated
// as system — which the node accepts because subject-body namespaces
// match. cloneResp already carries the authoritative Namespace from
// the agent's state, so we reuse it verbatim and only override the
// placement tags the caller passed to CloneWorkload.
aucResp, err := n.Auction(cloneResp.Namespace, cloneResp.WorkloadType, tags)
if len(aucResp) == 0 {
if err != nil {
return nil, err
Expand All @@ -441,7 +522,8 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St
}

randomNode := aucResp[rand.Intn(len(aucResp))]
swr, err := n.StartWorkload(randomNode.BidderId, cloneResp.Name, cloneResp.Description, cloneResp.RunRequest, cloneResp.WorkloadType, cloneResp.WorkloadLifecycle, tags)
cloneResp.Tags = tags
swr, err := n.StartWorkload(randomNode.BidderId, cloneResp)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/client_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func BenchmarkClientAuction(b *testing.B) {

b.ResetTimer()
for b.Loop() {
auctionResponses, err := client.Auction(workloadType, map[string]string{})
auctionResponses, err := client.Auction(namespace, workloadType, map[string]string{})
be.NilErr(b, err)
b.StopTimer()
for _, ar := range auctionResponses {
Expand Down
Loading
Loading