Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 55 additions & 48 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func NewClient(ctx context.Context, nc *nats.Conn, namespace string, opts ...Cli

func (n *nexClient) GetNexusPTags() (map[string]string, error) {
msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingPTagRequestSubject(n.namespace), []byte{}, natsext.RequestManyStall(n.requestManyStall))
if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) {
return map[string]string{}, nil
}
if err != nil {
return nil, err
}
Expand All @@ -96,6 +93,10 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) {
resp := make(map[string]string)
msgs(func(m *nats.Msg, err error) bool {
if err == nil && m.Data != nil && string(m.Data) != "null" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
errs = errors.Join(errs, nexErr)
return true
}
t := map[string]string{}
err = json.Unmarshal(m.Data, &t)
if err == nil {
Expand All @@ -106,18 +107,20 @@ func (n *nexClient) GetNexusPTags() (map[string]string, error) {
}
}
}
errs = errors.Join(errs, err)
if err != nil && !errors.Is(err, nats.ErrNoResponders) {
errs = errors.Join(errs, err)
}
return true
})

return resp, nil
return resp, errs
}

func (n *nexClient) GetNodeInfo(nodeId string) (*models.NodeInfoResponse, error) {
req := &models.NodeInfoRequest{}
reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal node info request")
return nil, n.nexInternalError(err, "failed to marshal node info request")
}

resp, err := n.nc.Request(models.NodeInfoRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout)
Expand Down Expand Up @@ -150,7 +153,7 @@ func (n *nexClient) SetLameduck(nodeId string, delay time.Duration, tag map[stri

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal lameduck request")
return nil, n.nexInternalError(err, "failed to marshal lameduck request")
}

respMsg, err := n.nc.Request(models.LameduckRequestSubject(n.namespace, nodeId), reqB, n.defaultTimeout)
Expand Down Expand Up @@ -181,13 +184,10 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal list nodes request")
return nil, n.nexInternalError(err, "failed to marshal list nodes request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.PingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall))
if errors.Is(err, nats.ErrNoResponders) || errors.Is(err, nats.ErrTimeout) {
return []*models.NodePingResponse{}, nil
}
if err != nil {
return nil, n.nexInternalError(err, "failed to request list nodes")
}
Expand All @@ -196,7 +196,8 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo
resp := []*models.NodePingResponse{}
msgs(func(m *nats.Msg, err error) bool {
if err == nil && m.Data != nil && string(m.Data) != "null" {
if m.Header.Get(micro.ErrorCodeHeader) != "" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
errs = errors.Join(errs, nexErr)
return true
}
t := new(models.NodePingResponse)
Expand All @@ -205,11 +206,13 @@ func (n *nexClient) ListNodes(filter map[string]string) ([]*models.NodePingRespo
resp = append(resp, t)
}
}
errs = errors.Join(errs, err)
if err != nil && !errors.Is(err, nats.ErrNoResponders) {
errs = errors.Join(errs, err)
}
return true
})

return resp, nil
return resp, errs
}

func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.AuctionResponse, error) {
Expand All @@ -221,13 +224,10 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti

auctionRequestB, err := json.Marshal(auctionRequest)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal auction request")
return nil, n.nexInternalError(err, "failed to marshal auction request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.AuctionRequestSubject(n.namespace), auctionRequestB, natsext.RequestManyStall(n.auctionRequestManyStall))
if errors.Is(err, nats.ErrNoResponders) {
return []*models.AuctionResponse{}, nil
}
if err != nil {
return nil, n.nexInternalError(err, "failed to request auction")
}
Expand All @@ -236,7 +236,8 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti
resp := []*models.AuctionResponse{}
msgs(func(m *nats.Msg, err error) bool {
if err == nil {
if m.Header.Get(micro.ErrorCodeHeader) != "" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
errs = errors.Join(errs, nexErr)
return true
}
t := new(models.AuctionResponse)
Expand All @@ -245,11 +246,13 @@ func (n *nexClient) Auction(typ string, tags map[string]string) ([]*models.Aucti
resp = append(resp, t)
}
}
errs = errors.Join(errs, err)
if err != nil && !errors.Is(err, nats.ErrNoResponders) {
errs = errors.Join(errs, err)
}
return true
})

return resp, nil
return resp, errs
}

func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string, lifecycle models.WorkloadLifecycle, pTags models.NodeTags) (*models.StartWorkloadResponse, error) {
Expand All @@ -269,7 +272,7 @@ func (n *nexClient) StartWorkload(deployId, name, desc, runRequest, typ string,

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal start workload request")
return nil, n.nexInternalError(err, "failed to marshal start workload request")
}

startResponseMsg, err := n.nc.Request(models.AuctionDeployRequestSubject(n.namespace, deployId), reqB, n.startWorkloadTimeout)
Expand Down Expand Up @@ -297,17 +300,12 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal stop workload request")
return nil, n.nexInternalError(err, "failed to marshal stop workload request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.UndeployRequestSubject(n.namespace, workloadId), reqB, natsext.RequestManyStall(n.requestManyStall))
if err != nil {
return &models.StopWorkloadResponse{
Id: workloadId,
Message: err.Error(),
Stopped: false,
WorkloadType: "",
}, nil
return nil, n.nexInternalError(err, "failed to request stop workload")
}

ret := &models.StopWorkloadResponse{
Expand All @@ -317,9 +315,11 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons
WorkloadType: "",
}

var errs error
msgs(func(m *nats.Msg, e error) bool {
if e == nil && m.Data != nil && string(m.Data) != "null" {
if m.Header.Get(micro.ErrorCodeHeader) != "" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
errs = errors.Join(errs, nexErr)
return true
}
var swresp models.StopWorkloadResponse
Expand All @@ -331,10 +331,13 @@ func (n *nexClient) StopWorkload(workloadId string) (*models.StopWorkloadRespons
}
}
}
if e != nil && !errors.Is(e, nats.ErrNoResponders) {
errs = errors.Join(errs, e)
}
return true
})

return ret, nil
return ret, errs
}

func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloadsResponse, error) {
Expand All @@ -345,13 +348,10 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads

reqB, err := json.Marshal(req)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal list workloads request")
return nil, n.nexInternalError(err, "failed to marshal list workloads request")
}

msgs, err := natsext.RequestMany(n.ctx, n.nc, models.NamespacePingRequestSubject(n.namespace), reqB, natsext.RequestManyStall(n.requestManyStall))
if errors.Is(err, nats.ErrNoResponders) {
return []*models.AgentListWorkloadsResponse{}, nil
}
if err != nil {
return nil, n.nexInternalError(err, "failed to request list workloads")
}
Expand All @@ -360,7 +360,8 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads
resp := []*models.AgentListWorkloadsResponse{}
msgs(func(m *nats.Msg, err error) bool {
if err == nil && m.Data != nil && string(m.Data) != "null" {
if m.Header.Get(micro.ErrorCodeHeader) != "" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
errs = errors.Join(errs, nexErr)
return true
}
t := new(models.AgentListWorkloadsResponse)
Expand All @@ -369,11 +370,13 @@ func (n *nexClient) ListWorkloads(filter []string) ([]*models.AgentListWorkloads
resp = append(resp, t)
}
}
errs = errors.Join(errs, err)
if err != nil && !errors.Is(err, nats.ErrNoResponders) {
errs = errors.Join(errs, err)
}
return true
})

return resp, nil
return resp, errs
}

func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.StartWorkloadResponse, error) {
Expand All @@ -394,42 +397,46 @@ func (n *nexClient) CloneWorkload(id string, tags map[string]string) (*models.St

cloneReqB, err := json.Marshal(cloneReq)
if err != nil {
return nil, n.nexBadRequestError(err, "failed to marshal clone request")
return nil, n.nexInternalError(err, "failed to marshal clone request")
}

genericNotFoundError := errors.New(string(models.GenericErrorsWorkloadNotFound))
msgs, err := natsext.RequestMany(n.ctx, n.nc, models.CloneWorkloadRequestSubject(n.namespace, id), cloneReqB, natsext.RequestManyStall(n.requestManyStall))
if errors.Is(err, nats.ErrNoResponders) {
return nil, n.nexNotFoundError(genericNotFoundError, "workload not found")
}
if err != nil {
return nil, n.nexInternalError(err, "workload not found")
}

var cloneResp *models.StartWorkloadRequest
var cloneErrs error
msgs(func(m *nats.Msg, err error) bool {
if err == nil && m.Data != nil && string(m.Data) != "null" {
if m.Header.Get(micro.ErrorCodeHeader) != "" {
if nexErr := nexErrorFromMsg(m); nexErr != nil {
cloneErrs = errors.Join(cloneErrs, nexErr)
return true
}
err = json.Unmarshal(m.Data, &cloneResp)
if err != nil {
return false
}
}
if err != nil && !errors.Is(err, nats.ErrNoResponders) {
cloneErrs = errors.Join(cloneErrs, err)
}
return true
})

if cloneResp == nil && cloneErrs != nil {
return nil, cloneErrs
}

if cloneResp == nil {
return nil, n.nexNotFoundError(errors.New(string(models.GenericErrorsWorkloadNotFound)), "workload not found")
}

aucResp, err := n.Auction(cloneResp.WorkloadType, tags)
if err != nil {
return nil, err
}

if len(aucResp) == 0 {
if err != nil {
return nil, err
}
return nil, n.nexNotFoundError(errors.New("no nodes available for placement"), "no nodes available for placement")
}

Expand Down
Loading
Loading