Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions manager/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func listPropletsEndpoint(svc manager.Service) endpoint.Endpoint {
}

return listpropletResponse{
PropletPage: proplets,
PropletPageView: proplets.View(),
}, nil
}
}
Expand All @@ -47,11 +47,30 @@ func getPropletEndpoint(svc manager.Service) endpoint.Endpoint {
}

return propletResponse{
Proplet: node,
PropletView: node.View(),
}, nil
}
}

func getPropletAliveHistoryEndpoint(svc manager.Service) endpoint.Endpoint {
return func(ctx context.Context, request any) (any, error) {
req, ok := request.(metricsReq)
if !ok {
return propletAliveHistoryResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData)
}
if err := req.validate(); err != nil {
return propletAliveHistoryResponse{}, errors.Join(apiutil.ErrValidation, err)
}

page, err := svc.GetPropletAliveHistory(ctx, req.id, req.offset, req.limit)
if err != nil {
return propletAliveHistoryResponse{}, err
}

return propletAliveHistoryResponse{PropletAliveHistoryPage: page}, nil
}
}

func deletePropletEndpoint(svc manager.Service) endpoint.Endpoint {
return func(ctx context.Context, request any) (any, error) {
req, ok := request.(entityReq)
Expand Down
21 changes: 19 additions & 2 deletions manager/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
var (
_ supermq.Response = (*propletResponse)(nil)
_ supermq.Response = (*listpropletResponse)(nil)
_ supermq.Response = (*propletAliveHistoryResponse)(nil)
_ supermq.Response = (*taskResponse)(nil)
_ supermq.Response = (*listTaskResponse)(nil)
_ supermq.Response = (*messageResponse)(nil)
Expand All @@ -24,7 +25,7 @@ var (
)

type propletResponse struct {
proplet.Proplet
proplet.PropletView

created bool
deleted bool
Expand Down Expand Up @@ -56,7 +57,7 @@ func (w propletResponse) Empty() bool {
}

type listpropletResponse struct {
proplet.PropletPage
proplet.PropletPageView
}

func (l listpropletResponse) Code() int {
Expand All @@ -71,6 +72,22 @@ func (l listpropletResponse) Empty() bool {
return false
}

type propletAliveHistoryResponse struct {
proplet.PropletAliveHistoryPage
}

func (r propletAliveHistoryResponse) Code() int {
return http.StatusOK
}

func (r propletAliveHistoryResponse) Headers() map[string]string {
return map[string]string{}
}

func (r propletAliveHistoryResponse) Empty() bool {
return false
}

type taskResponse struct {
task.Task

Expand Down
6 changes: 6 additions & 0 deletions manager/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func MakeHandler(svc manager.Service, logger *slog.Logger, instanceID string) ht
api.EncodeResponse,
opts...,
), "get-proplet-metrics").ServeHTTP)
r.Get("/alive-history", otelhttp.NewHandler(kithttp.NewServer(
getPropletAliveHistoryEndpoint(svc),
decodeMetricsReq("propletID"),
api.EncodeResponse,
opts...,
), "get-proplet-alive-history").ServeHTTP)
})
})

Expand Down
1 change: 1 addition & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Service interface {

GetTaskMetrics(ctx context.Context, taskID string, offset, limit uint64) (TaskMetricsPage, error)
GetPropletMetrics(ctx context.Context, propletID string, offset, limit uint64) (PropletMetricsPage, error)
GetPropletAliveHistory(ctx context.Context, propletID string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error)

// Orchestrator/Experiment Config API (Manager acts as Orchestrator per diagram)
// Step 1: Configure experiment with FL Coordinator
Expand Down
22 changes: 22 additions & 0 deletions manager/middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ func (lm *loggingMiddleware) SelectProplet(ctx context.Context, t task.Task) (w
return lm.svc.SelectProplet(ctx, t)
}

func (lm *loggingMiddleware) GetPropletAliveHistory(ctx context.Context, propletID string, offset, limit uint64) (resp proplet.PropletAliveHistoryPage, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.Group("proplet",
slog.String("id", propletID),
),
slog.Uint64("offset", offset),
slog.Uint64("limit", limit),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Get proplet alive history failed", args...)

return
}
lm.logger.Info("Get proplet alive history completed successfully", args...)
}(time.Now())

return lm.svc.GetPropletAliveHistory(ctx, propletID, offset, limit)
}

func (lm *loggingMiddleware) DeleteProplet(ctx context.Context, id string) (err error) {
defer func(begin time.Time) {
args := []any{
Expand Down
9 changes: 9 additions & 0 deletions manager/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func (mm *metricsMiddleware) SelectProplet(ctx context.Context, t task.Task) (pr
return mm.svc.SelectProplet(ctx, t)
}

func (mm *metricsMiddleware) GetPropletAliveHistory(ctx context.Context, propletID string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error) {
defer func(begin time.Time) {
mm.counter.With("method", "get-proplet-alive-history").Add(1)
mm.latency.With("method", "get-proplet-alive-history").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.GetPropletAliveHistory(ctx, propletID, offset, limit)
}

func (mm *metricsMiddleware) DeleteProplet(ctx context.Context, id string) error {
defer func(begin time.Time) {
mm.counter.With("method", "delete-proplet").Add(1)
Expand Down
11 changes: 11 additions & 0 deletions manager/middleware/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ func (tm *tracing) SelectProplet(ctx context.Context, t task.Task) (resp proplet
return tm.svc.SelectProplet(ctx, t)
}

func (tm *tracing) GetPropletAliveHistory(ctx context.Context, propletID string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error) {
ctx, span := tm.tracer.Start(ctx, "get-proplet-alive-history", trace.WithAttributes(
attribute.String("id", propletID),
attribute.Int64("offset", int64(offset)),
attribute.Int64("limit", int64(limit)),
))
defer span.End()

return tm.svc.GetPropletAliveHistory(ctx, propletID, offset, limit)
}

func (tm *tracing) DeleteProplet(ctx context.Context, id string) (err error) {
ctx, span := tm.tracer.Start(ctx, "delete-proplet", trace.WithAttributes(
attribute.String("id", id),
Expand Down
66 changes: 66 additions & 0 deletions manager/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,20 @@ func (svc *service) GetPropletMetrics(ctx context.Context, propletID string, off
}, nil
}

func (svc *service) GetPropletAliveHistory(ctx context.Context, propletID string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error) {
history, total, err := svc.propletRepo.GetAliveHistory(ctx, propletID, offset, limit)
if err != nil {
return proplet.PropletAliveHistoryPage{}, err
}

return proplet.PropletAliveHistoryPage{
Offset: offset,
Limit: limit,
Total: total,
History: history,
}, nil
}

func (svc *service) GetTaskResults(ctx context.Context, taskID string) (any, error) {
t, err := svc.GetTask(ctx, taskID)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/proplet/proplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,59 @@ type PropletPage struct {
Proplets []Proplet `json:"proplets"`
}

type PropletView struct {
ID string `json:"id"`
Name string `json:"name"`
TaskCount uint64 `json:"task_count"`
Alive bool `json:"alive"`
LastAliveAt *time.Time `json:"last_alive_at,omitempty"`
Metadata PropletMetadata `json:"metadata"`
}

func (p *Proplet) View() PropletView {
v := PropletView{
ID: p.ID,
Name: p.Name,
TaskCount: p.TaskCount,
Alive: p.Alive,
Metadata: p.Metadata,
}
if n := len(p.AliveHistory); n > 0 {
t := p.AliveHistory[n-1]
v.LastAliveAt = &t
}

return v
}

type PropletPageView struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Proplets []PropletView `json:"proplets"`
}

func (pp PropletPage) View() PropletPageView {
views := make([]PropletView, len(pp.Proplets))
for i := range pp.Proplets {
views[i] = pp.Proplets[i].View()
}

return PropletPageView{
Offset: pp.Offset,
Limit: pp.Limit,
Total: pp.Total,
Proplets: views,
}
}

type PropletAliveHistoryPage struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
History []time.Time `json:"history"`
}

type ChunkPayload struct {
AppName string `json:"app_name"`
ChunkIdx int `json:"chunk_idx"`
Expand Down
24 changes: 23 additions & 1 deletion pkg/sdk/proplet.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
package sdk

import "net/http"
import (
"encoding/json"
"fmt"
"net/http"

"github.com/absmach/propeller/pkg/proplet"
)

const propletsEndpoint = "/proplets"

func (sdk *propSDK) GetPropletAliveHistory(id string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error) {
url := fmt.Sprintf("%s%s/%s/alive-history?offset=%d&limit=%d", sdk.managerURL, propletsEndpoint, id, offset, limit)

body, err := sdk.processRequest(http.MethodGet, url, nil, http.StatusOK)
if err != nil {
return proplet.PropletAliveHistoryPage{}, err
}

var page proplet.PropletAliveHistoryPage
if err := json.Unmarshal(body, &page); err != nil {
return proplet.PropletAliveHistoryPage{}, err
}

return page, nil
}

func (sdk *propSDK) DeleteProplet(id string) error {
url := sdk.managerURL + propletsEndpoint + "/" + id

Expand Down
9 changes: 9 additions & 0 deletions pkg/sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"net/http"

"github.com/absmach/propeller/pkg/proplet"
)

const CTJSON string = "application/json"
Expand Down Expand Up @@ -106,6 +108,13 @@ type SDK interface {
// _ := sdk.StopJob("b1d10738-c5d7-4ff1-8f4d-b9328ce6f040")
StopJob(jobID string) error

// GetPropletAliveHistory returns the paginated heartbeat history for a proplet.
//
// example:
// page, _ := sdk.GetPropletAliveHistory("b1d10738-c5d7-4ff1-8f4d-b9328ce6f040", 0, 10)
// fmt.Println(page)
GetPropletAliveHistory(id string, offset, limit uint64) (proplet.PropletAliveHistoryPage, error)

// DeleteProplet deletes a proplet by id.
//
// example:
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/badger/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type PropletRepository interface {
Update(ctx context.Context, p proplet.Proplet) error
List(ctx context.Context, offset, limit uint64) ([]proplet.Proplet, uint64, error)
Delete(ctx context.Context, id string) error
GetAliveHistory(ctx context.Context, id string, offset, limit uint64) ([]time.Time, uint64, error)
}

type TaskPropletRepository interface {
Expand Down
Loading