From 9d1e54fa75390e74bbf7b3c8ff788b2398bebf85 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 10:24:56 -0600 Subject: [PATCH 1/6] refactor: return internal server errors when marshaling client requests fails Signed-off-by: Joe Riddle --- client/client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index ea656266..197fe0df 100644 --- a/client/client.go +++ b/client/client.go @@ -117,7 +117,7 @@ func (n *nexClient) GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) req := &models.NodeInfoRequest{} reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal node info request") + return nil, n.nexInternalError(err, "failed to marshal node info request") } resp, err := n.nc.Request(models.NodeInfoRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout) @@ -150,7 +150,7 @@ func (n *nexClient) SetLameduck(nodeId string, delay time.Duration, tag map[stri reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal lameduck request") + return nil, n.nexInternalError(err, "failed to marshal lameduck request") } respMsg, err := n.nc.Request(models.LameduckRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout) @@ -181,7 +181,7 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal list nodes request") + return nil, n.nexInternalError(err, "failed to marshal list nodes request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) @@ -221,7 +221,7 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti auctionRequestB, err := json.Marshal(auctionRequest) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal auction request") + return nil, n.nexInternalError(err, "failed to marshal auction request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) @@ -269,7 +269,7 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal start workload request") + return nil, n.nexInternalError(err, "failed to marshal start workload request") } startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout) @@ -297,7 +297,7 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal stop workload request") + return nil, n.nexInternalError(err, "failed to marshal stop workload request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) @@ -345,7 +345,7 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal list workloads request") + return nil, n.nexInternalError(err, "failed to marshal list workloads request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.NamespacePingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) @@ -394,7 +394,7 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St cloneReqB, err := json.Marshal(cloneReq) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal clone request") + return nil, n.nexInternalError(err, "failed to marshal clone request") } genericNotFoundError := errors.New(string(models.GenericErrorsWorkloadNotFound)) From 16a50d23225bc94e7c7cdaf4d12f7e7067cbef16 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 10:49:47 -0600 Subject: [PATCH 2/6] feat: detect nex error type in scatter-gathers Signed-off-by: Joe Riddle --- client/client.go | 51 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/client/client.go b/client/client.go index 197fe0df..06dacbc7 100644 --- a/client/client.go +++ b/client/client.go @@ -96,6 +96,10 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) { resp := make(map[string]string) msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) + return true + } t := map[string]string{} err = json.Unmarshal(m.Data, &t) if err == nil { @@ -110,7 +114,7 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) { return true }) - return resp, nil + return resp, errs } func (n *nexClient) GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) { @@ -196,7 +200,8 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo resp := []*models.NodePingResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.NodePingResponse) @@ -209,7 +214,7 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo return true }) - return resp, nil + return resp, errs } func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) { @@ -236,7 +241,8 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti resp := []*models.AuctionResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.AuctionResponse) @@ -249,7 +255,7 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti return true }) - return resp, nil + return resp, errs } func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) { @@ -301,14 +307,17 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) - if err != nil { + if errors.Is(err, nats.ErrNoResponders) { return &models.StopWorkloadResponse{ Id: workloadId, - Message: err.Error(), + Message: string(models.GenericErrorsWorkloadNotFound), Stopped: false, WorkloadType: "", }, nil } + if err != nil { + return nil, n.nexInternalError(err, "failed to request stop workload") + } ret := &models.StopWorkloadResponse{ Id: workloadId, @@ -317,9 +326,11 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons WorkloadType: "", } + var errs error msgs(func(m *nats.Msg, e error) bool { if e == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } var swresp models.StopWorkloadResponse @@ -331,10 +342,11 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons } } } + errs = errors.Join(errs, e) return true }) - return ret, nil + return ret, errs } func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error) { @@ -360,7 +372,8 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads resp := []*models.AgentListWorkloadsResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.AgentListWorkloadsResponse) @@ -373,7 +386,7 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads return true }) - return resp, nil + return resp, errs } func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error) { @@ -407,9 +420,11 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St } var cloneResp *models.StartWorkloadRequest + var cloneErrs error msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + cloneErrs = errors.Join(cloneErrs, nexErr) return true } err = json.Unmarshal(m.Data, &cloneResp) @@ -417,19 +432,23 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return false } } + cloneErrs = errors.Join(cloneErrs, err) return true }) + if cloneResp == nil && cloneErrs != nil { + return nil, cloneErrs + } + if cloneResp == nil { return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") } aucResp, err := n.Auction(cloneResp.WorkloadType, tags) - if err != nil { - return nil, err - } - if len(aucResp) == 0 { + if err != nil { + return nil, err + } return nil, n.nexNotFoundError(errors.New("no nodes available for placement"), "no nodes available for placement") } From 332bff396997352886af697aeadb2eb582418a1c Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 10:54:30 -0600 Subject: [PATCH 3/6] fix: TestNexClient_ListWorkloads uses hard-coded node size of 1 Signed-off-by: Joe Riddle --- client/client_test.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index aea70121..a1ebd62d 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -208,11 +208,11 @@ func TestNexClient_ListWorkloads(t *testing.T) { server := _test.StartNatsServer(t, workDir) defer server.Shutdown() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) - be.Equal(t, 1, len(nexNodes)) + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), tt.size, false) + be.Equal(t, tt.size, len(nexNodes)) nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -225,33 +225,36 @@ func TestNexClient_ListWorkloads(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[1%tt.size].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[2%tt.size].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) wl, err := client.ListWorkloads([]string{}) be.NilErr(t, err) - be.Equal(t, 1, len(wl)) - be.Equal(t, 3, len(*wl[0])) + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + be.Equal(t, 3, totalCount) for _, node := range nexNodes { be.NilErr(t, node.Shutdown()) From 70f42af75f81983273616455b33ef940da55c901 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 11:24:33 -0600 Subject: [PATCH 4/6] test: add tests for partial errors when multiple nex nodes Signed-off-by: Joe Riddle --- client/client_test.go | 179 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/client/client_test.go b/client/client_test.go index a1ebd62d..e78bd670 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,11 +2,14 @@ package client import ( "context" + "encoding/json" + "strconv" "testing" "time" "github.com/carlmjohnson/be" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/micro" "github.com/synadia-io/nex/_test" "github.com/synadia-io/nex/models" ) @@ -548,3 +551,179 @@ func TestNexClient_StopWorkload(t *testing.T) { }) } } + +// fakeErrorResponder subscribes to the given subject and simulates a nex node that returns an error. +func fakeErrorResponder(t *testing.T, nc *nats.Conn, subject string, code int, msg string) *nats.Subscription { + t.Helper() + + errBody, _ := json.Marshal(struct { + ErrorID string `json:"error_id"` + Error string `json:"error"` + }{ + ErrorID: "fake-error-node", + Error: msg, + }) + + sub, err := nc.Subscribe(subject, func(m *nats.Msg) { + resp := nats.NewMsg(m.Reply) + resp.Header.Set(micro.ErrorCodeHeader, strconv.Itoa(code)) + resp.Header.Set(micro.ErrorHeader, msg) + resp.Data = errBody + _ = nc.PublishMsg(resp) + }) + be.NilErr(t, err) + return sub +} + +func TestNexClient_ListNodes_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.PingRequestSubject(models.SystemNamespace), 500, "internal node error") + defer func() { _ = sub.Unsubscribe() }() + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, models.SystemNamespace) + be.NilErr(t, err) + be.Nonzero(t, client) + + var nodes []*models.NodePingResponse + _test.WaitFor(t, 10*time.Second, func() bool { + nodes, err = client.ListNodes(nil) + return len(nodes) == 3 + }, "waiting for list nodes to return 3 healthy results") + + be.Nonzero(t, err) + be.Equal(t, 3, len(nodes)) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +func TestNexClient_ListWorkloads_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, "user") + be.NilErr(t, err) + be.Nonzero(t, client) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 3 + }, "waiting for auction to return 3 results") + + _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + be.NilErr(t, err) + + _test.WaitFor(t, 10*time.Second, func() bool { + wl, wlErr := client.ListWorkloads(nil) + if wlErr != nil { + return false + } + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + return totalCount == 1 + }, "waiting for workload to be running") + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.NamespacePingRequestSubject("user"), 500, "node unavailable") + defer func() { _ = sub.Unsubscribe() }() + + wl, err := client.ListWorkloads([]string{}) + be.Nonzero(t, err) + + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + be.Equal(t, 1, totalCount) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +func TestNexClient_Auction_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.AuctionRequestSubject("user"), 500, "auction failed") + defer func() { _ = sub.Unsubscribe() }() + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, "user") + be.NilErr(t, err) + be.Nonzero(t, client) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return len(ar) == 3 + }, "waiting for auction to return 3 healthy results") + + be.Nonzero(t, err) + be.Equal(t, 3, len(ar)) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} From be8d559ec416589bc50cd0e9c99a46cf1b4cc351 Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 13:28:59 -0600 Subject: [PATCH 5/6] refactor: remove nats.ErrNoResponders checks for requestmany Signed-off-by: Joe Riddle --- client/client.go | 48 ++++++++++++++++++------------------------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/client/client.go b/client/client.go index 06dacbc7..f1a255db 100644 --- a/client/client.go +++ b/client/client.go @@ -85,9 +85,6 @@ func NewClient(ctx context.Context, nc *nats.Conn, namespace string, opts ...Cli func (n *nexClient) GetNexusPTags() (map[string]string, error) { msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingPTagRequestSubject(n.namespace), []byte{}, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) { - return map[string]string{}, nil - } if err != nil { return nil, err } @@ -110,7 +107,9 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) { } } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) @@ -189,9 +188,6 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) { - return []*models.NodePingResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request list nodes") } @@ -210,7 +206,9 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) @@ -230,9 +228,6 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return []*models.AuctionResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request auction") } @@ -251,7 +246,9 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) @@ -307,14 +304,6 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return &models.StopWorkloadResponse{ - Id: workloadId, - Message: string(models.GenericErrorsWorkloadNotFound), - Stopped: false, - WorkloadType: "", - }, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request stop workload") } @@ -342,7 +331,9 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons } } } - errs = errors.Join(errs, e) + if e != nil && !errors.Is(e, nats.ErrNoResponders) { + errs = errors.Join(errs, e) + } return true }) @@ -361,9 +352,6 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.NamespacePingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return []*models.AgentListWorkloadsResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request list workloads") } @@ -382,7 +370,9 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) @@ -410,11 +400,7 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return nil, n.nexInternalError(err, "failed to marshal clone request") } - genericNotFoundError := errors.New(string(models.GenericErrorsWorkloadNotFound)) msgs, err := natsext.RequestMany(n.ctx, n.nc, models.CloneWorkloadRequestSubject(n.namespace, id), cloneReqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return nil, n.nexNotFoundError(genericNotFoundError, "workload not found") - } if err != nil { return nil, n.nexInternalError(err, "workload not found") } @@ -432,7 +418,9 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return false } } - cloneErrs = errors.Join(cloneErrs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + cloneErrs = errors.Join(cloneErrs, err) + } return true }) From 78aba4df571d98073b9fa5093893426644ea6ddb Mon Sep 17 00:00:00 2001 From: Joe Riddle Date: Mon, 30 Mar 2026 13:54:00 -0600 Subject: [PATCH 6/6] test: fix flaky client test in CI Signed-off-by: Joe Riddle --- client/client_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/client_test.go b/client/client_test.go index e78bd670..03b7436e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -669,6 +669,7 @@ func TestNexClient_ListWorkloads_PartialError(t *testing.T) { sub := fakeErrorResponder(t, errNC, models.NamespacePingRequestSubject("user"), 500, "node unavailable") defer func() { _ = sub.Unsubscribe() }() + be.NilErr(t, errNC.Flush()) wl, err := client.ListWorkloads([]string{}) be.Nonzero(t, err)