diff --git a/client/client.go b/client/client.go index ea656266..f1a255db 100644 --- a/client/client.go +++ b/client/client.go @@ -85,9 +85,6 @@ func NewClient(ctx context.Context, nc *nats.Conn, namespace string, opts ...Cli func (n *nexClient) GetNexusPTags() (map[string]string, error) { msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingPTagRequestSubject(n.namespace), []byte{}, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) { - return map[string]string{}, nil - } if err != nil { return nil, err } @@ -96,6 +93,10 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) { resp := make(map[string]string) msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) + return true + } t := map[string]string{} err = json.Unmarshal(m.Data, &t) if err == nil { @@ -106,18 +107,20 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) { } } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) - return resp, nil + return resp, errs } func (n *nexClient) GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) { req := &models.NodeInfoRequest{} reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal node info request") + return nil, n.nexInternalError(err, "failed to marshal node info request") } resp, err := n.nc.Request(models.NodeInfoRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout) @@ -150,7 +153,7 @@ func (n *nexClient) SetLameduck(nodeId string, delay time.Duration, tag map[stri reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal lameduck request") + return nil, n.nexInternalError(err, "failed to marshal lameduck request") } respMsg, err := n.nc.Request(models.LameduckRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout) @@ -181,13 +184,10 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal list nodes request") + return nil, n.nexInternalError(err, "failed to marshal list nodes request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) { - return []*models.NodePingResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request list nodes") } @@ -196,7 +196,8 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo resp := []*models.NodePingResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.NodePingResponse) @@ -205,11 +206,13 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) - return resp, nil + return resp, errs } func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) { @@ -221,13 +224,10 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti auctionRequestB, err := json.Marshal(auctionRequest) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal auction request") + return nil, n.nexInternalError(err, "failed to marshal auction request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return []*models.AuctionResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request auction") } @@ -236,7 +236,8 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti resp := []*models.AuctionResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.AuctionResponse) @@ -245,11 +246,13 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) - return resp, nil + return resp, errs } func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) { @@ -269,7 +272,7 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal start workload request") + return nil, n.nexInternalError(err, "failed to marshal start workload request") } startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout) @@ -297,17 +300,12 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal stop workload request") + return nil, n.nexInternalError(err, "failed to marshal stop workload request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall)) if err != nil { - return &models.StopWorkloadResponse{ - Id: workloadId, - Message: err.Error(), - Stopped: false, - WorkloadType: "", - }, nil + return nil, n.nexInternalError(err, "failed to request stop workload") } ret := &models.StopWorkloadResponse{ @@ -317,9 +315,11 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons WorkloadType: "", } + var errs error msgs(func(m *nats.Msg, e error) bool { if e == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } var swresp models.StopWorkloadResponse @@ -331,10 +331,13 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons } } } + if e != nil && !errors.Is(e, nats.ErrNoResponders) { + errs = errors.Join(errs, e) + } return true }) - return ret, nil + return ret, errs } func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error) { @@ -345,13 +348,10 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads reqB, err := json.Marshal(req) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal list workloads request") + return nil, n.nexInternalError(err, "failed to marshal list workloads request") } msgs, err := natsext.RequestMany(n.ctx, n.nc, models.NamespacePingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return []*models.AgentListWorkloadsResponse{}, nil - } if err != nil { return nil, n.nexInternalError(err, "failed to request list workloads") } @@ -360,7 +360,8 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads resp := []*models.AgentListWorkloadsResponse{} msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + errs = errors.Join(errs, nexErr) return true } t := new(models.AgentListWorkloadsResponse) @@ -369,11 +370,13 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads resp = append(resp, t) } } - errs = errors.Join(errs, err) + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + errs = errors.Join(errs, err) + } return true }) - return resp, nil + return resp, errs } func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error) { @@ -394,22 +397,20 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St cloneReqB, err := json.Marshal(cloneReq) if err != nil { - return nil, n.nexBadRequestError(err, "failed to marshal clone request") + return nil, n.nexInternalError(err, "failed to marshal clone request") } - genericNotFoundError := errors.New(string(models.GenericErrorsWorkloadNotFound)) msgs, err := natsext.RequestMany(n.ctx, n.nc, models.CloneWorkloadRequestSubject(n.namespace, id), cloneReqB, natsext.RequestManyStall(n.requestManyStall)) - if errors.Is(err, nats.ErrNoResponders) { - return nil, n.nexNotFoundError(genericNotFoundError, "workload not found") - } if err != nil { return nil, n.nexInternalError(err, "workload not found") } var cloneResp *models.StartWorkloadRequest + var cloneErrs error msgs(func(m *nats.Msg, err error) bool { if err == nil && m.Data != nil && string(m.Data) != "null" { - if m.Header.Get(micro.ErrorCodeHeader) != "" { + if nexErr := nexErrorFromMsg(m); nexErr != nil { + cloneErrs = errors.Join(cloneErrs, nexErr) return true } err = json.Unmarshal(m.Data, &cloneResp) @@ -417,19 +418,25 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St return false } } + if err != nil && !errors.Is(err, nats.ErrNoResponders) { + cloneErrs = errors.Join(cloneErrs, err) + } return true }) + if cloneResp == nil && cloneErrs != nil { + return nil, cloneErrs + } + if cloneResp == nil { return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found") } aucResp, err := n.Auction(cloneResp.WorkloadType, tags) - if err != nil { - return nil, err - } - if len(aucResp) == 0 { + if err != nil { + return nil, err + } return nil, n.nexNotFoundError(errors.New("no nodes available for placement"), "no nodes available for placement") } diff --git a/client/client_test.go b/client/client_test.go index aea70121..03b7436e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,11 +2,14 @@ package client import ( "context" + "encoding/json" + "strconv" "testing" "time" "github.com/carlmjohnson/be" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/micro" "github.com/synadia-io/nex/_test" "github.com/synadia-io/nex/models" ) @@ -208,11 +211,11 @@ func TestNexClient_ListWorkloads(t *testing.T) { server := _test.StartNatsServer(t, workDir) defer server.Shutdown() - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 1, false) - be.Equal(t, 1, len(nexNodes)) + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), tt.size, false) + be.Equal(t, tt.size, len(nexNodes)) nc, err := nats.Connect(server.ClientURL()) be.NilErr(t, err) @@ -225,33 +228,36 @@ func TestNexClient_ListWorkloads(t *testing.T) { var ar []*models.AuctionResponse _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[1%tt.size].BidderId, "tester2", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) _test.WaitFor(t, 10*time.Second, func() bool { ar, err = client.Auction("inmem", map[string]string{}) - return err == nil && len(ar) == 1 - }, "waiting for auction to return 1 result") + return err == nil && len(ar) == tt.size + }, "waiting for auction to return expected results") - _, err = client.StartWorkload(ar[0].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + _, err = client.StartWorkload(ar[2%tt.size].BidderId, "tester3", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) be.NilErr(t, err) wl, err := client.ListWorkloads([]string{}) be.NilErr(t, err) - be.Equal(t, 1, len(wl)) - be.Equal(t, 3, len(*wl[0])) + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + be.Equal(t, 3, totalCount) for _, node := range nexNodes { be.NilErr(t, node.Shutdown()) @@ -545,3 +551,180 @@ func TestNexClient_StopWorkload(t *testing.T) { }) } } + +// fakeErrorResponder subscribes to the given subject and simulates a nex node that returns an error. +func fakeErrorResponder(t *testing.T, nc *nats.Conn, subject string, code int, msg string) *nats.Subscription { + t.Helper() + + errBody, _ := json.Marshal(struct { + ErrorID string `json:"error_id"` + Error string `json:"error"` + }{ + ErrorID: "fake-error-node", + Error: msg, + }) + + sub, err := nc.Subscribe(subject, func(m *nats.Msg) { + resp := nats.NewMsg(m.Reply) + resp.Header.Set(micro.ErrorCodeHeader, strconv.Itoa(code)) + resp.Header.Set(micro.ErrorHeader, msg) + resp.Data = errBody + _ = nc.PublishMsg(resp) + }) + be.NilErr(t, err) + return sub +} + +func TestNexClient_ListNodes_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.PingRequestSubject(models.SystemNamespace), 500, "internal node error") + defer func() { _ = sub.Unsubscribe() }() + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, models.SystemNamespace) + be.NilErr(t, err) + be.Nonzero(t, client) + + var nodes []*models.NodePingResponse + _test.WaitFor(t, 10*time.Second, func() bool { + nodes, err = client.ListNodes(nil) + return len(nodes) == 3 + }, "waiting for list nodes to return 3 healthy results") + + be.Nonzero(t, err) + be.Equal(t, 3, len(nodes)) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +func TestNexClient_ListWorkloads_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, "user") + be.NilErr(t, err) + be.Nonzero(t, client) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return err == nil && len(ar) == 3 + }, "waiting for auction to return 3 results") + + _, err = client.StartWorkload(ar[0].BidderId, "tester1", "My test workload", "{}", "inmem", models.WorkloadLifecycleService, nil) + be.NilErr(t, err) + + _test.WaitFor(t, 10*time.Second, func() bool { + wl, wlErr := client.ListWorkloads(nil) + if wlErr != nil { + return false + } + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + return totalCount == 1 + }, "waiting for workload to be running") + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.NamespacePingRequestSubject("user"), 500, "node unavailable") + defer func() { _ = sub.Unsubscribe() }() + be.NilErr(t, errNC.Flush()) + + wl, err := client.ListWorkloads([]string{}) + be.Nonzero(t, err) + + totalCount := 0 + for _, w := range wl { + totalCount += len(*w) + } + be.Equal(t, 1, totalCount) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +} + +func TestNexClient_Auction_PartialError(t *testing.T) { + workDir := t.TempDir() + server := _test.StartNatsServer(t, workDir) + defer server.Shutdown() + + nc, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + nexNodes := _test.StartNexus(t, ctx, server.ClientURL(), 3, false) + be.Equal(t, 3, len(nexNodes)) + + errNC, err := nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer errNC.Close() + + sub := fakeErrorResponder(t, errNC, models.AuctionRequestSubject("user"), 500, "auction failed") + defer func() { _ = sub.Unsubscribe() }() + + nc, err = nats.Connect(server.ClientURL()) + be.NilErr(t, err) + defer nc.Close() + + client, err := NewClient(context.Background(), nc, "user") + be.NilErr(t, err) + be.Nonzero(t, client) + + var ar []*models.AuctionResponse + _test.WaitFor(t, 10*time.Second, func() bool { + ar, err = client.Auction("inmem", map[string]string{}) + return len(ar) == 3 + }, "waiting for auction to return 3 healthy results") + + be.Nonzero(t, err) + be.Equal(t, 3, len(ar)) + + for _, node := range nexNodes { + be.NilErr(t, node.Shutdown()) + } +}