From dec9bedd4fe6c5e8dce9edbcd2fe622d8afdc854 Mon Sep 17 00:00:00 2001 From: Jilks Smith Date: Thu, 26 Mar 2026 11:43:45 +0300 Subject: [PATCH 01/14] Enable filtering of events and jobs --- manager/api/endpoint.go | 4 +- manager/api/requests.go | 1 + manager/api/transport.go | 1 + manager/manager.go | 4 +- manager/middleware/logging.go | 10 ++-- manager/middleware/metrics.go | 8 +-- manager/middleware/tracing.go | 10 ++-- manager/mocks/service.go | 60 ++++++++++++-------- manager/service.go | 103 +++++++++++++++++++++++++++++++--- manager/service_job_test.go | 76 +++++++++++++++++++++++-- pkg/sdk/sdk.go | 8 ++- pkg/sdk/task.go | 5 +- 12 files changed, 235 insertions(+), 55 deletions(-) diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index 3f68a577..c3025df8 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -20,7 +20,7 @@ func listPropletsEndpoint(svc manager.Service) endpoint.Endpoint { return listpropletResponse{}, errors.Join(apiutil.ErrValidation, err) } - proplets, err := svc.ListProplets(ctx, req.offset, req.limit) + proplets, err := svc.ListProplets(ctx, req.offset, req.limit, req.status) if err != nil { return listpropletResponse{}, err } @@ -169,7 +169,7 @@ func listJobsEndpoint(svc manager.Service) endpoint.Endpoint { return listJobResponse{}, errors.Join(apiutil.ErrValidation, err) } - jobs, err := svc.ListJobs(ctx, req.offset, req.limit) + jobs, err := svc.ListJobs(ctx, req.offset, req.limit, req.status) if err != nil { return listJobResponse{}, err } diff --git a/manager/api/requests.go b/manager/api/requests.go index 3be90bbe..9d3f7f2e 100644 --- a/manager/api/requests.go +++ b/manager/api/requests.go @@ -94,6 +94,7 @@ func (e *entityReq) validate() error { type listEntityReq struct { offset, limit uint64 + status string } func (e *listEntityReq) validate() error { diff --git a/manager/api/transport.go b/manager/api/transport.go index faf0e260..1e49cc0c 100644 --- a/manager/api/transport.go +++ b/manager/api/transport.go @@ -309,6 +309,7 @@ func decodeListEntityReq(_ context.Context, r *http.Request) (any, error) { return listEntityReq{ offset: o, limit: l, + status: r.URL.Query().Get("status"), }, nil } diff --git a/manager/manager.go b/manager/manager.go index e38fc7af..6fbd6639 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -9,7 +9,7 @@ import ( type Service interface { GetProplet(ctx context.Context, propletID string) (proplet.Proplet, error) - ListProplets(ctx context.Context, offset, limit uint64) (proplet.PropletPage, error) + ListProplets(ctx context.Context, offset, limit uint64, status string) (proplet.PropletPage, error) SelectProplet(ctx context.Context, task task.Task) (proplet.Proplet, error) DeleteProplet(ctx context.Context, propletID string) error @@ -18,7 +18,7 @@ type Service interface { CreateJob(ctx context.Context, name string, tasks []task.Task, executionMode string) (string, []task.Task, error) GetTask(ctx context.Context, taskID string) (task.Task, error) GetJob(ctx context.Context, jobID string) ([]task.Task, error) - ListJobs(ctx context.Context, offset, limit uint64) (JobPage, error) + ListJobs(ctx context.Context, offset, limit uint64, status string) (JobPage, error) StartJob(ctx context.Context, jobID string) error StopJob(ctx context.Context, jobID string) error ListTasks(ctx context.Context, offset, limit uint64) (task.TaskPage, error) diff --git a/manager/middleware/logging.go b/manager/middleware/logging.go index 9ef334a3..0cc5ccde 100644 --- a/manager/middleware/logging.go +++ b/manager/middleware/logging.go @@ -42,12 +42,13 @@ func (lm *loggingMiddleware) GetProplet(ctx context.Context, id string) (resp pr return lm.svc.GetProplet(ctx, id) } -func (lm *loggingMiddleware) ListProplets(ctx context.Context, offset, limit uint64) (resp proplet.PropletPage, err error) { +func (lm *loggingMiddleware) ListProplets(ctx context.Context, offset, limit uint64, status string) (resp proplet.PropletPage, err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), slog.Uint64("offset", offset), slog.Uint64("limit", limit), + slog.String("status", status), } if err != nil { args = append(args, slog.Any("error", err)) @@ -58,7 +59,7 @@ func (lm *loggingMiddleware) ListProplets(ctx context.Context, offset, limit uin lm.logger.Info("List proplets completed successfully", args...) }(time.Now()) - return lm.svc.ListProplets(ctx, offset, limit) + return lm.svc.ListProplets(ctx, offset, limit, status) } func (lm *loggingMiddleware) SelectProplet(ctx context.Context, t task.Task) (w proplet.Proplet, err error) { @@ -348,12 +349,13 @@ func (lm *loggingMiddleware) GetJob(ctx context.Context, jobID string) (resp []t return lm.svc.GetJob(ctx, jobID) } -func (lm *loggingMiddleware) ListJobs(ctx context.Context, offset, limit uint64) (resp manager.JobPage, err error) { +func (lm *loggingMiddleware) ListJobs(ctx context.Context, offset, limit uint64, status string) (resp manager.JobPage, err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), slog.Uint64("offset", offset), slog.Uint64("limit", limit), + slog.String("status", status), } if err != nil { args = append(args, slog.Any("error", err)) @@ -365,7 +367,7 @@ func (lm *loggingMiddleware) ListJobs(ctx context.Context, offset, limit uint64) lm.logger.Info("List jobs completed successfully", args...) }(time.Now()) - return lm.svc.ListJobs(ctx, offset, limit) + return lm.svc.ListJobs(ctx, offset, limit, status) } func (lm *loggingMiddleware) StartJob(ctx context.Context, jobID string) (err error) { diff --git a/manager/middleware/metrics.go b/manager/middleware/metrics.go index c4ae24db..6c8972e8 100644 --- a/manager/middleware/metrics.go +++ b/manager/middleware/metrics.go @@ -35,13 +35,13 @@ func (mm *metricsMiddleware) GetProplet(ctx context.Context, id string) (proplet return mm.svc.GetProplet(ctx, id) } -func (mm *metricsMiddleware) ListProplets(ctx context.Context, offset, limit uint64) (proplet.PropletPage, error) { +func (mm *metricsMiddleware) ListProplets(ctx context.Context, offset, limit uint64, status string) (proplet.PropletPage, error) { defer func(begin time.Time) { mm.counter.With("method", "list-proplets").Add(1) mm.latency.With("method", "list-proplets").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.ListProplets(ctx, offset, limit) + return mm.svc.ListProplets(ctx, offset, limit, status) } func (mm *metricsMiddleware) SelectProplet(ctx context.Context, t task.Task) (proplet.Proplet, error) { @@ -170,13 +170,13 @@ func (mm *metricsMiddleware) GetJob(ctx context.Context, jobID string) ([]task.T return mm.svc.GetJob(ctx, jobID) } -func (mm *metricsMiddleware) ListJobs(ctx context.Context, offset, limit uint64) (manager.JobPage, error) { +func (mm *metricsMiddleware) ListJobs(ctx context.Context, offset, limit uint64, status string) (manager.JobPage, error) { defer func(begin time.Time) { mm.counter.With("method", "list-jobs").Add(1) mm.latency.With("method", "list-jobs").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.ListJobs(ctx, offset, limit) + return mm.svc.ListJobs(ctx, offset, limit, status) } func (mm *metricsMiddleware) StartJob(ctx context.Context, jobID string) error { diff --git a/manager/middleware/tracing.go b/manager/middleware/tracing.go index 3000a4f0..daa754b8 100644 --- a/manager/middleware/tracing.go +++ b/manager/middleware/tracing.go @@ -30,14 +30,15 @@ func (tm *tracing) GetProplet(ctx context.Context, id string) (resp proplet.Prop return tm.svc.GetProplet(ctx, id) } -func (tm *tracing) ListProplets(ctx context.Context, offset, limit uint64) (resp proplet.PropletPage, err error) { +func (tm *tracing) ListProplets(ctx context.Context, offset, limit uint64, status string) (resp proplet.PropletPage, err error) { ctx, span := tm.tracer.Start(ctx, "list-proplets", trace.WithAttributes( attribute.Int64("offset", int64(offset)), attribute.Int64("limit", int64(limit)), + attribute.String("status", status), )) defer span.End() - return tm.svc.ListProplets(ctx, offset, limit) + return tm.svc.ListProplets(ctx, offset, limit, status) } func (tm *tracing) SelectProplet(ctx context.Context, t task.Task) (resp proplet.Proplet, err error) { @@ -178,14 +179,15 @@ func (tm *tracing) GetJob(ctx context.Context, jobID string) (resp []task.Task, return tm.svc.GetJob(ctx, jobID) } -func (tm *tracing) ListJobs(ctx context.Context, offset, limit uint64) (resp manager.JobPage, err error) { +func (tm *tracing) ListJobs(ctx context.Context, offset, limit uint64, status string) (resp manager.JobPage, err error) { ctx, span := tm.tracer.Start(ctx, "list-jobs", trace.WithAttributes( attribute.Int64("offset", int64(offset)), attribute.Int64("limit", int64(limit)), + attribute.String("status", status), )) defer span.End() - return tm.svc.ListJobs(ctx, offset, limit) + return tm.svc.ListJobs(ctx, offset, limit, status) } func (tm *tracing) StartJob(ctx context.Context, jobID string) (err error) { diff --git a/manager/mocks/service.go b/manager/mocks/service.go index 77f39d3f..2eba3883 100644 --- a/manager/mocks/service.go +++ b/manager/mocks/service.go @@ -1062,8 +1062,8 @@ func (_c *MockService_GetTaskResults_Call) RunAndReturn(run func(ctx context.Con } // ListJobs provides a mock function for the type MockService -func (_mock *MockService) ListJobs(ctx context.Context, offset uint64, limit uint64) (manager.JobPage, error) { - ret := _mock.Called(ctx, offset, limit) +func (_mock *MockService) ListJobs(ctx context.Context, offset uint64, limit uint64, status string) (manager.JobPage, error) { + ret := _mock.Called(ctx, offset, limit, status) if len(ret) == 0 { panic("no return value specified for ListJobs") @@ -1071,16 +1071,16 @@ func (_mock *MockService) ListJobs(ctx context.Context, offset uint64, limit uin var r0 manager.JobPage var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) (manager.JobPage, error)); ok { - return returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, string) (manager.JobPage, error)); ok { + return returnFunc(ctx, offset, limit, status) } - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) manager.JobPage); ok { - r0 = returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, string) manager.JobPage); ok { + r0 = returnFunc(ctx, offset, limit, status) } else { r0 = ret.Get(0).(manager.JobPage) } - if returnFunc, ok := ret.Get(1).(func(context.Context, uint64, uint64) error); ok { - r1 = returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64, uint64, string) error); ok { + r1 = returnFunc(ctx, offset, limit, status) } else { r1 = ret.Error(1) } @@ -1096,11 +1096,12 @@ type MockService_ListJobs_Call struct { // - ctx context.Context // - offset uint64 // - limit uint64 -func (_e *MockService_Expecter) ListJobs(ctx interface{}, offset interface{}, limit interface{}) *MockService_ListJobs_Call { - return &MockService_ListJobs_Call{Call: _e.mock.On("ListJobs", ctx, offset, limit)} +// - status string +func (_e *MockService_Expecter) ListJobs(ctx interface{}, offset interface{}, limit interface{}, status interface{}) *MockService_ListJobs_Call { + return &MockService_ListJobs_Call{Call: _e.mock.On("ListJobs", ctx, offset, limit, status)} } -func (_c *MockService_ListJobs_Call) Run(run func(ctx context.Context, offset uint64, limit uint64)) *MockService_ListJobs_Call { +func (_c *MockService_ListJobs_Call) Run(run func(ctx context.Context, offset uint64, limit uint64, status string)) *MockService_ListJobs_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -1114,10 +1115,15 @@ func (_c *MockService_ListJobs_Call) Run(run func(ctx context.Context, offset ui if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } run( arg0, arg1, arg2, + arg3, ) }) return _c @@ -1128,14 +1134,14 @@ func (_c *MockService_ListJobs_Call) Return(jobPage manager.JobPage, err error) return _c } -func (_c *MockService_ListJobs_Call) RunAndReturn(run func(ctx context.Context, offset uint64, limit uint64) (manager.JobPage, error)) *MockService_ListJobs_Call { +func (_c *MockService_ListJobs_Call) RunAndReturn(run func(ctx context.Context, offset uint64, limit uint64, status string) (manager.JobPage, error)) *MockService_ListJobs_Call { _c.Call.Return(run) return _c } // ListProplets provides a mock function for the type MockService -func (_mock *MockService) ListProplets(ctx context.Context, offset uint64, limit uint64) (proplet.PropletPage, error) { - ret := _mock.Called(ctx, offset, limit) +func (_mock *MockService) ListProplets(ctx context.Context, offset uint64, limit uint64, status string) (proplet.PropletPage, error) { + ret := _mock.Called(ctx, offset, limit, status) if len(ret) == 0 { panic("no return value specified for ListProplets") @@ -1143,16 +1149,16 @@ func (_mock *MockService) ListProplets(ctx context.Context, offset uint64, limit var r0 proplet.PropletPage var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) (proplet.PropletPage, error)); ok { - return returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, string) (proplet.PropletPage, error)); ok { + return returnFunc(ctx, offset, limit, status) } - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64) proplet.PropletPage); ok { - r0 = returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, string) proplet.PropletPage); ok { + r0 = returnFunc(ctx, offset, limit, status) } else { r0 = ret.Get(0).(proplet.PropletPage) } - if returnFunc, ok := ret.Get(1).(func(context.Context, uint64, uint64) error); ok { - r1 = returnFunc(ctx, offset, limit) + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64, uint64, string) error); ok { + r1 = returnFunc(ctx, offset, limit, status) } else { r1 = ret.Error(1) } @@ -1168,11 +1174,12 @@ type MockService_ListProplets_Call struct { // - ctx context.Context // - offset uint64 // - limit uint64 -func (_e *MockService_Expecter) ListProplets(ctx interface{}, offset interface{}, limit interface{}) *MockService_ListProplets_Call { - return &MockService_ListProplets_Call{Call: _e.mock.On("ListProplets", ctx, offset, limit)} +// - status string +func (_e *MockService_Expecter) ListProplets(ctx interface{}, offset interface{}, limit interface{}, status interface{}) *MockService_ListProplets_Call { + return &MockService_ListProplets_Call{Call: _e.mock.On("ListProplets", ctx, offset, limit, status)} } -func (_c *MockService_ListProplets_Call) Run(run func(ctx context.Context, offset uint64, limit uint64)) *MockService_ListProplets_Call { +func (_c *MockService_ListProplets_Call) Run(run func(ctx context.Context, offset uint64, limit uint64, status string)) *MockService_ListProplets_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -1186,10 +1193,15 @@ func (_c *MockService_ListProplets_Call) Run(run func(ctx context.Context, offse if args[2] != nil { arg2 = args[2].(uint64) } + var arg3 string + if args[3] != nil { + arg3 = args[3].(string) + } run( arg0, arg1, arg2, + arg3, ) }) return _c @@ -1200,7 +1212,7 @@ func (_c *MockService_ListProplets_Call) Return(propletPage proplet.PropletPage, return _c } -func (_c *MockService_ListProplets_Call) RunAndReturn(run func(ctx context.Context, offset uint64, limit uint64) (proplet.PropletPage, error)) *MockService_ListProplets_Call { +func (_c *MockService_ListProplets_Call) RunAndReturn(run func(ctx context.Context, offset uint64, limit uint64, status string) (proplet.PropletPage, error)) *MockService_ListProplets_Call { _c.Call.Return(run) return _c } diff --git a/manager/service.go b/manager/service.go index c06d40f1..8b3b60d8 100644 --- a/manager/service.go +++ b/manager/service.go @@ -39,6 +39,14 @@ const ( ExecutionModeConfigurable = "configurable" EnvJobExecutionMode = "JOB_EXECUTION_MODE" shutdownTaskStopWait = 200 * time.Millisecond + + PropletStatusActive = "active" + PropletStatusInactive = "inactive" + + JobStatusPending = "pending" + JobStatusRunning = "running" + JobStatusCompleted = "completed" + JobStatusFailed = "failed" ) var ( @@ -112,25 +120,65 @@ func (svc *service) GetProplet(ctx context.Context, propletID string) (proplet.P return w, nil } -func (svc *service) ListProplets(ctx context.Context, offset, limit uint64) (proplet.PropletPage, error) { - proplets, total, err := svc.propletRepo.List(ctx, offset, limit) +func (svc *service) ListProplets(ctx context.Context, offset, limit uint64, status string) (proplet.PropletPage, error) { + if status != "" && status != PropletStatusActive && status != PropletStatusInactive { + return proplet.PropletPage{}, fmt.Errorf("invalid proplet status filter %q: must be %q, %q, or empty", status, PropletStatusActive, PropletStatusInactive) + } + + if status == "" { + proplets, total, err := svc.propletRepo.List(ctx, offset, limit) + if err != nil { + return proplet.PropletPage{}, err + } + for i := range proplets { + proplets[i].SetAlive() + } + + return proplet.PropletPage{ + Offset: offset, + Limit: limit, + Total: total, + Proplets: proplets, + }, nil + } + + all, err := svc.listAllProplets(ctx) if err != nil { return proplet.PropletPage{}, err } - for i := range proplets { - proplets[i].SetAlive() + + filtered := make([]proplet.Proplet, 0, len(all)) + for i := range all { + all[i].SetAlive() + match := (status == PropletStatusActive && all[i].Alive) || + (status == PropletStatusInactive && !all[i].Alive) + if match { + filtered = append(filtered, all[i]) + } } + total := uint64(len(filtered)) + if offset >= total { + return proplet.PropletPage{ + Offset: offset, + Limit: limit, + Total: total, + Proplets: []proplet.Proplet{}, + }, nil + } + + end := min(offset+limit, total) + return proplet.PropletPage{ Offset: offset, Limit: limit, Total: total, - Proplets: proplets, + Proplets: filtered[offset:end], }, nil } func (svc *service) SelectProplet(ctx context.Context, t task.Task) (proplet.Proplet, error) { - proplets, err := svc.ListProplets(ctx, defOffset, defLimit) + proplets, err := svc.ListProplets(ctx, defOffset, defLimit, "") if err != nil { return proplet.Proplet{}, err } @@ -338,7 +386,11 @@ func (svc *service) GetJob(ctx context.Context, jobID string) ([]task.Task, erro return svc.getJobTasks(ctx, jobID) } -func (svc *service) ListJobs(ctx context.Context, offset, limit uint64) (JobPage, error) { +func (svc *service) ListJobs(ctx context.Context, offset, limit uint64, status string) (JobPage, error) { + if status != "" && status != JobStatusPending && status != JobStatusRunning && status != JobStatusCompleted && status != JobStatusFailed { + return JobPage{}, fmt.Errorf("invalid job status filter %q: must be %q, %q, %q, %q, or empty", status, JobStatusPending, JobStatusRunning, JobStatusCompleted, JobStatusFailed) + } + jobs := make([]JobSummary, 0) seen := make(map[string]struct{}) @@ -393,6 +445,23 @@ func (svc *service) ListJobs(ctx context.Context, offset, limit uint64) (JobPage } }) + if status != "" { + statusStateMap := map[string]task.State{ + JobStatusPending: task.Pending, + JobStatusRunning: task.Running, + JobStatusCompleted: task.Completed, + JobStatusFailed: task.Failed, + } + targetState := statusStateMap[status] + filtered := make([]JobSummary, 0, len(jobs)) + for i := range jobs { + if jobs[i].State == targetState { + filtered = append(filtered, jobs[i]) + } + } + jobs = filtered + } + total := uint64(len(jobs)) if offset >= total { return JobPage{ @@ -1663,6 +1732,26 @@ func (svc *service) listAllTasks(ctx context.Context) ([]task.Task, error) { return allTasks, nil } +func (svc *service) listAllProplets(ctx context.Context) ([]proplet.Proplet, error) { + const pageSize uint64 = 100 + var allProplets []proplet.Proplet + var offset uint64 + + for { + proplets, total, err := svc.propletRepo.List(ctx, offset, pageSize) + if err != nil { + return nil, err + } + allProplets = append(allProplets, proplets...) + offset += uint64(len(proplets)) + if offset >= total || len(proplets) == 0 { + break + } + } + + return allProplets, nil +} + func (svc *service) pinTaskToProplet(ctx context.Context, taskID, propletID string) error { return svc.taskPropletRepo.Create(ctx, taskID, propletID) } diff --git a/manager/service_job_test.go b/manager/service_job_test.go index 9b01ab55..6e74754f 100644 --- a/manager/service_job_test.go +++ b/manager/service_job_test.go @@ -66,7 +66,7 @@ func TestListJobs(t *testing.T) { }, "sequential") require.NoError(t, err) - page, err := svc.ListJobs(context.Background(), 0, 100) + page, err := svc.ListJobs(context.Background(), 0, 100, "") require.NoError(t, err) assert.Equal(t, uint64(2), page.Total) assert.Len(t, page.Jobs, 2) @@ -88,7 +88,7 @@ func TestListJobsIncludesLegacyTaskOnlyJob(t *testing.T) { }) require.NoError(t, err) - page, err := svc.ListJobs(context.Background(), 0, 100) + page, err := svc.ListJobs(context.Background(), 0, 100, "") require.NoError(t, err) assert.Equal(t, uint64(2), page.Total) @@ -198,12 +198,12 @@ func TestListJobsPagination(t *testing.T) { require.NoError(t, err) } - page, err := svc.ListJobs(context.Background(), 0, 3) + page, err := svc.ListJobs(context.Background(), 0, 3, "") require.NoError(t, err) assert.Equal(t, uint64(5), page.Total) assert.Len(t, page.Jobs, 3) - page2, err := svc.ListJobs(context.Background(), 3, 3) + page2, err := svc.ListJobs(context.Background(), 3, 3, "") require.NoError(t, err) assert.Len(t, page2.Jobs, 2) } @@ -265,3 +265,71 @@ func TestComputeJobState(t *testing.T) { }) } } + +func TestListJobsFilterByStatus(t *testing.T) { + t.Parallel() + svc := newService(t) + + // Create jobs whose tasks have different states. + // Job1 — tasks stay Pending → job state = Pending + _, _, err := svc.CreateJob(context.Background(), "pending-job", []task.Task{ + {Name: "p1", State: task.Pending}, + }, "parallel") + require.NoError(t, err) + + // Job2 — tasks are Running → job state = Running + _, _, err = svc.CreateJob(context.Background(), "running-job", []task.Task{ + {Name: "r1", State: task.Running}, + }, "parallel") + require.NoError(t, err) + + // Job3 — tasks are Completed → job state = Completed + _, _, err = svc.CreateJob(context.Background(), "completed-job", []task.Task{ + {Name: "c1", State: task.Completed}, + }, "parallel") + require.NoError(t, err) + + // Job4 — tasks are Failed → job state = Failed + _, _, err = svc.CreateJob(context.Background(), "failed-job", []task.Task{ + {Name: "f1", State: task.Failed}, + }, "parallel") + require.NoError(t, err) + + // No filter — all 4 jobs + all, err := svc.ListJobs(context.Background(), 0, 100, "") + require.NoError(t, err) + assert.Equal(t, uint64(4), all.Total) + + // Filter by pending + page, err := svc.ListJobs(context.Background(), 0, 100, "pending") + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total) + assert.Equal(t, task.Pending, page.Jobs[0].State) + + // Filter by running + page, err = svc.ListJobs(context.Background(), 0, 100, "running") + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total) + assert.Equal(t, task.Running, page.Jobs[0].State) + + // Filter by completed + page, err = svc.ListJobs(context.Background(), 0, 100, "completed") + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total) + assert.Equal(t, task.Completed, page.Jobs[0].State) + + // Filter by failed + page, err = svc.ListJobs(context.Background(), 0, 100, "failed") + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total) + assert.Equal(t, task.Failed, page.Jobs[0].State) +} + +func TestListJobsInvalidStatusFilter(t *testing.T) { + t.Parallel() + svc := newService(t) + + _, err := svc.ListJobs(context.Background(), 0, 100, "invalid") + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid job status filter") +} diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index c3cb6b28..d870d26c 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -88,11 +88,13 @@ type SDK interface { // job, _ := sdk.GetJob("b1d10738-c5d7-4ff1-8f4d-b9328ce6f040") GetJob(jobID string) (JobResponse, error) - // ListJobs lists jobs. + // ListJobs lists jobs with optional status filter. + // Status can be "pending", "running", "completed", "failed", or "" (all). // // example: - // jobPage, _ := sdk.ListJobs(0, 10) - ListJobs(offset uint64, limit uint64) (JobPage, error) + // jobPage, _ := sdk.ListJobs(0, 10, "") + // jobPage, _ := sdk.ListJobs(0, 10, "running") + ListJobs(offset uint64, limit uint64, status string) (JobPage, error) // StartJob starts a job. // diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index 47ab7321..fd4b7c19 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -215,7 +215,7 @@ func (sdk *propSDK) GetJob(jobID string) (JobResponse, error) { return jr, nil } -func (sdk *propSDK) ListJobs(offset, limit uint64) (JobPage, error) { +func (sdk *propSDK) ListJobs(offset, limit uint64, status string) (JobPage, error) { queries := make([]string, 0) if offset > 0 { queries = append(queries, fmt.Sprintf("offset=%d", offset)) @@ -223,6 +223,9 @@ func (sdk *propSDK) ListJobs(offset, limit uint64) (JobPage, error) { if limit > 0 { queries = append(queries, fmt.Sprintf("limit=%d", limit)) } + if status != "" { + queries = append(queries, fmt.Sprintf("status=%s", status)) + } query := "" if len(queries) > 0 { query = "?" + strings.Join(queries, "&") From d40e7db671fff4d4af8648b9ada6e47faa3117de Mon Sep 17 00:00:00 2001 From: Jilks Smith Date: Fri, 27 Mar 2026 11:40:21 +0300 Subject: [PATCH 02/14] Lint --- manager/service_job_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/manager/service_job_test.go b/manager/service_job_test.go index 6e74754f..546ad0e0 100644 --- a/manager/service_job_test.go +++ b/manager/service_job_test.go @@ -269,56 +269,45 @@ func TestComputeJobState(t *testing.T) { func TestListJobsFilterByStatus(t *testing.T) { t.Parallel() svc := newService(t) - - // Create jobs whose tasks have different states. - // Job1 — tasks stay Pending → job state = Pending _, _, err := svc.CreateJob(context.Background(), "pending-job", []task.Task{ {Name: "p1", State: task.Pending}, }, "parallel") require.NoError(t, err) - // Job2 — tasks are Running → job state = Running _, _, err = svc.CreateJob(context.Background(), "running-job", []task.Task{ {Name: "r1", State: task.Running}, }, "parallel") require.NoError(t, err) - // Job3 — tasks are Completed → job state = Completed _, _, err = svc.CreateJob(context.Background(), "completed-job", []task.Task{ {Name: "c1", State: task.Completed}, }, "parallel") require.NoError(t, err) - // Job4 — tasks are Failed → job state = Failed _, _, err = svc.CreateJob(context.Background(), "failed-job", []task.Task{ {Name: "f1", State: task.Failed}, }, "parallel") require.NoError(t, err) - // No filter — all 4 jobs all, err := svc.ListJobs(context.Background(), 0, 100, "") require.NoError(t, err) assert.Equal(t, uint64(4), all.Total) - // Filter by pending page, err := svc.ListJobs(context.Background(), 0, 100, "pending") require.NoError(t, err) assert.Equal(t, uint64(1), page.Total) assert.Equal(t, task.Pending, page.Jobs[0].State) - // Filter by running page, err = svc.ListJobs(context.Background(), 0, 100, "running") require.NoError(t, err) assert.Equal(t, uint64(1), page.Total) assert.Equal(t, task.Running, page.Jobs[0].State) - // Filter by completed page, err = svc.ListJobs(context.Background(), 0, 100, "completed") require.NoError(t, err) assert.Equal(t, uint64(1), page.Total) assert.Equal(t, task.Completed, page.Jobs[0].State) - // Filter by failed page, err = svc.ListJobs(context.Background(), 0, 100, "failed") require.NoError(t, err) assert.Equal(t, uint64(1), page.Total) From 1ca156373e722ef4dfe52fc4d79e45c9bed0b5f3 Mon Sep 17 00:00:00 2001 From: Jilks Smith Date: Fri, 27 Mar 2026 12:11:38 +0300 Subject: [PATCH 03/14] Lint --- pkg/sdk/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index fd4b7c19..ced78498 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -224,7 +224,7 @@ func (sdk *propSDK) ListJobs(offset, limit uint64, status string) (JobPage, erro queries = append(queries, fmt.Sprintf("limit=%d", limit)) } if status != "" { - queries = append(queries, fmt.Sprintf("status=%s", status)) + queries = append(queries, "status="+status) } query := "" if len(queries) > 0 { From 9c8537e93b8ab9c6abb769c3ac805c5b7aa4f602 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:00:59 +0300 Subject: [PATCH 04/14] fix: return 400 for invalid status filter instead of 500 EncodeError now maps apiutil.ErrValidation and pkgerrors.ErrInvalidValue to HTTP 400. Invalid status values in ListProplets and ListJobs wrap ErrInvalidValue so they propagate through EncodeError as Bad Request rather than falling through to Internal Server Error. --- manager/service.go | 4 ++-- pkg/api/api.go | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/manager/service.go b/manager/service.go index 8b3b60d8..d8e8fd38 100644 --- a/manager/service.go +++ b/manager/service.go @@ -122,7 +122,7 @@ func (svc *service) GetProplet(ctx context.Context, propletID string) (proplet.P func (svc *service) ListProplets(ctx context.Context, offset, limit uint64, status string) (proplet.PropletPage, error) { if status != "" && status != PropletStatusActive && status != PropletStatusInactive { - return proplet.PropletPage{}, fmt.Errorf("invalid proplet status filter %q: must be %q, %q, or empty", status, PropletStatusActive, PropletStatusInactive) + return proplet.PropletPage{}, fmt.Errorf("%w: proplet status must be %q, %q, or empty, got %q", pkgerrors.ErrInvalidValue, PropletStatusActive, PropletStatusInactive, status) } if status == "" { @@ -388,7 +388,7 @@ func (svc *service) GetJob(ctx context.Context, jobID string) ([]task.Task, erro func (svc *service) ListJobs(ctx context.Context, offset, limit uint64, status string) (JobPage, error) { if status != "" && status != JobStatusPending && status != JobStatusRunning && status != JobStatusCompleted && status != JobStatusFailed { - return JobPage{}, fmt.Errorf("invalid job status filter %q: must be %q, %q, %q, %q, or empty", status, JobStatusPending, JobStatusRunning, JobStatusCompleted, JobStatusFailed) + return JobPage{}, fmt.Errorf("%w: job status must be %q, %q, %q, %q, or empty, got %q", pkgerrors.ErrInvalidValue, JobStatusPending, JobStatusRunning, JobStatusCompleted, JobStatusFailed, status) } jobs := make([]JobSummary, 0) diff --git a/pkg/api/api.go b/pkg/api/api.go index 1cfaee7a..f9bbac90 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -8,6 +8,7 @@ import ( pkgerrors "github.com/absmach/propeller/pkg/errors" "github.com/absmach/supermq" + apiutil "github.com/absmach/supermq/api/http/util" ) const ( @@ -40,7 +41,9 @@ func EncodeResponse(_ context.Context, w http.ResponseWriter, response any) erro func EncodeError(_ context.Context, err error, w http.ResponseWriter) { w.Header().Set("Content-Type", ContentType) switch { - case errors.Is(err, pkgerrors.ErrEmptyKey): + case errors.Is(err, apiutil.ErrValidation), + errors.Is(err, pkgerrors.ErrEmptyKey), + errors.Is(err, pkgerrors.ErrInvalidValue): w.WriteHeader(http.StatusBadRequest) case errors.Is(err, pkgerrors.ErrNotFound): w.WriteHeader(http.StatusNotFound) From 6e6fc3508a02c418b73cbe729fd3ee3677026400 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:01:05 +0300 Subject: [PATCH 05/14] fix: return 400 when ?status= is passed to GET /tasks ListTasks does not support status filtering. The endpoint now rejects any non-empty status query param with 400 Bad Request instead of silently ignoring it. --- manager/api/endpoint.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index c3025df8..49018820 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -231,6 +231,9 @@ func listTasksEndpoint(svc manager.Service) endpoint.Endpoint { if err := req.validate(); err != nil { return listTaskResponse{}, errors.Join(apiutil.ErrValidation, err) } + if req.status != "" { + return listTaskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData) + } tasks, err := svc.ListTasks(ctx, req.offset, req.limit) if err != nil { From 2a1a173c122a79ee1c8d3124b568f1c0fb80832d Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:01:11 +0300 Subject: [PATCH 06/14] feat(sdk): add ListProplets with optional status filter Adds Proplet and PropletPage SDK types, and ListProplets(offset, limit, status) to the SDK interface and propSDK implementation. Status values "active" and "inactive" are URL-encoded before being appended to the query string. --- pkg/sdk/proplet.go | 58 +++++++++++++++++++++++++++++++++++++++++++--- pkg/sdk/sdk.go | 8 +++++++ 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/pkg/sdk/proplet.go b/pkg/sdk/proplet.go index 19c68361..436628c7 100644 --- a/pkg/sdk/proplet.go +++ b/pkg/sdk/proplet.go @@ -1,13 +1,65 @@ package sdk -import "net/http" +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "time" +) const propletsEndpoint = "/proplets" +type Proplet struct { + ID string `json:"id"` + Name string `json:"name"` + TaskCount uint64 `json:"task_count"` + Alive bool `json:"alive"` + CreatedAt time.Time `json:"created_at"` +} + +type PropletPage struct { + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` + Total uint64 `json:"total"` + Proplets []Proplet `json:"proplets"` +} + +func (sdk *propSDK) ListProplets(offset, limit uint64, status string) (PropletPage, error) { + params := make([]string, 0) + if offset > 0 { + params = append(params, fmt.Sprintf("offset=%d", offset)) + } + if limit > 0 { + params = append(params, fmt.Sprintf("limit=%d", limit)) + } + if status != "" { + params = append(params, "status="+url.QueryEscape(status)) + } + query := "" + if len(params) > 0 { + query = "?" + strings.Join(params, "&") + } + reqURL := sdk.managerURL + propletsEndpoint + query + + body, err := sdk.processRequest(http.MethodGet, reqURL, nil, http.StatusOK) + if err != nil { + return PropletPage{}, err + } + + var pp PropletPage + if err := json.Unmarshal(body, &pp); err != nil { + return PropletPage{}, err + } + + return pp, nil +} + func (sdk *propSDK) DeleteProplet(id string) error { - url := sdk.managerURL + propletsEndpoint + "/" + id + reqURL := sdk.managerURL + propletsEndpoint + "/" + id - if _, err := sdk.processRequest(http.MethodDelete, url, nil, http.StatusNoContent); err != nil { + if _, err := sdk.processRequest(http.MethodDelete, reqURL, nil, http.StatusNoContent); err != nil { return err } diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index d870d26c..31557d4b 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -108,6 +108,14 @@ type SDK interface { // _ := sdk.StopJob("b1d10738-c5d7-4ff1-8f4d-b9328ce6f040") StopJob(jobID string) error + // ListProplets lists proplets with optional status filter. + // Status can be "active", "inactive", or "" (all). + // + // example: + // page, _ := sdk.ListProplets(0, 10, "") + // page, _ := sdk.ListProplets(0, 10, "active") + ListProplets(offset uint64, limit uint64, status string) (PropletPage, error) + // DeleteProplet deletes a proplet by id. // // example: From e9eaa5cc320eddf2588afaed28abe62fb8e4a67b Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:01:18 +0300 Subject: [PATCH 07/14] fix(sdk): URL-encode status query param in ListJobs Importing net/url for url.QueryEscape also required renaming the local url variables in all other SDK task functions to avoid shadowing the imported package (caught by gocritic importShadow). --- pkg/sdk/task.go | 61 +++++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index ced78498..2cdb0e77 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" "time" ) @@ -40,9 +41,9 @@ func (sdk *propSDK) CreateTask(task Task) (Task, error) { return Task{}, err } - url := sdk.managerURL + tasksEndpoint + reqURL := sdk.managerURL + tasksEndpoint - body, err := sdk.processRequest(http.MethodPost, url, data, http.StatusCreated) + body, err := sdk.processRequest(http.MethodPost, reqURL, data, http.StatusCreated) if err != nil { return Task{}, err } @@ -56,9 +57,9 @@ func (sdk *propSDK) CreateTask(task Task) (Task, error) { } func (sdk *propSDK) GetTask(id string) (Task, error) { - url := sdk.managerURL + tasksEndpoint + "/" + id + reqURL := sdk.managerURL + tasksEndpoint + "/" + id - body, err := sdk.processRequest(http.MethodGet, url, nil, http.StatusOK) + body, err := sdk.processRequest(http.MethodGet, reqURL, nil, http.StatusOK) if err != nil { return Task{}, err } @@ -83,9 +84,9 @@ func (sdk *propSDK) ListTasks(offset, limit uint64) (TaskPage, error) { if len(queries) > 0 { query = "?" + strings.Join(queries, "&") } - url := sdk.managerURL + tasksEndpoint + query + reqURL := sdk.managerURL + tasksEndpoint + query - body, err := sdk.processRequest(http.MethodGet, url, nil, http.StatusOK) + body, err := sdk.processRequest(http.MethodGet, reqURL, nil, http.StatusOK) if err != nil { return TaskPage{}, err } @@ -103,9 +104,9 @@ func (sdk *propSDK) UpdateTask(task Task) (Task, error) { if err != nil { return Task{}, err } - url := sdk.managerURL + tasksEndpoint + "/" + task.ID + reqURL := sdk.managerURL + tasksEndpoint + "/" + task.ID - body, err := sdk.processRequest(http.MethodPut, url, data, http.StatusOK) + body, err := sdk.processRequest(http.MethodPut, reqURL, data, http.StatusOK) if err != nil { return Task{}, err } @@ -119,9 +120,9 @@ func (sdk *propSDK) UpdateTask(task Task) (Task, error) { } func (sdk *propSDK) DeleteTask(id string) error { - url := sdk.managerURL + tasksEndpoint + "/" + id + reqURL := sdk.managerURL + tasksEndpoint + "/" + id - if _, err := sdk.processRequest(http.MethodDelete, url, nil, http.StatusNoContent); err != nil { + if _, err := sdk.processRequest(http.MethodDelete, reqURL, nil, http.StatusNoContent); err != nil { return err } @@ -129,9 +130,9 @@ func (sdk *propSDK) DeleteTask(id string) error { } func (sdk *propSDK) StartTask(id string) error { - url := fmt.Sprintf("%s/tasks/%s/start", sdk.managerURL, id) + reqURL := fmt.Sprintf("%s/tasks/%s/start", sdk.managerURL, id) - if _, err := sdk.processRequest(http.MethodPost, url, nil, http.StatusOK); err != nil { + if _, err := sdk.processRequest(http.MethodPost, reqURL, nil, http.StatusOK); err != nil { return err } @@ -139,9 +140,9 @@ func (sdk *propSDK) StartTask(id string) error { } func (sdk *propSDK) StopTask(id string) error { - url := fmt.Sprintf("%s/tasks/%s/stop", sdk.managerURL, id) + reqURL := fmt.Sprintf("%s/tasks/%s/stop", sdk.managerURL, id) - if _, err := sdk.processRequest(http.MethodPost, url, nil, http.StatusOK); err != nil { + if _, err := sdk.processRequest(http.MethodPost, reqURL, nil, http.StatusOK); err != nil { return err } @@ -184,9 +185,9 @@ func (sdk *propSDK) CreateJob(req JobRequest) (JobResponse, error) { return JobResponse{}, err } - url := sdk.managerURL + jobsEndpoint + reqURL := sdk.managerURL + jobsEndpoint - body, err := sdk.processRequest(http.MethodPost, url, data, http.StatusCreated) + body, err := sdk.processRequest(http.MethodPost, reqURL, data, http.StatusCreated) if err != nil { return JobResponse{}, err } @@ -200,9 +201,9 @@ func (sdk *propSDK) CreateJob(req JobRequest) (JobResponse, error) { } func (sdk *propSDK) GetJob(jobID string) (JobResponse, error) { - url := sdk.managerURL + jobsEndpoint + "/" + jobID + reqURL := sdk.managerURL + jobsEndpoint + "/" + jobID - body, err := sdk.processRequest(http.MethodGet, url, nil, http.StatusOK) + body, err := sdk.processRequest(http.MethodGet, reqURL, nil, http.StatusOK) if err != nil { return JobResponse{}, err } @@ -216,23 +217,23 @@ func (sdk *propSDK) GetJob(jobID string) (JobResponse, error) { } func (sdk *propSDK) ListJobs(offset, limit uint64, status string) (JobPage, error) { - queries := make([]string, 0) + params := make([]string, 0) if offset > 0 { - queries = append(queries, fmt.Sprintf("offset=%d", offset)) + params = append(params, fmt.Sprintf("offset=%d", offset)) } if limit > 0 { - queries = append(queries, fmt.Sprintf("limit=%d", limit)) + params = append(params, fmt.Sprintf("limit=%d", limit)) } if status != "" { - queries = append(queries, "status="+status) + params = append(params, "status="+url.QueryEscape(status)) } query := "" - if len(queries) > 0 { - query = "?" + strings.Join(queries, "&") + if len(params) > 0 { + query = "?" + strings.Join(params, "&") } - url := sdk.managerURL + jobsEndpoint + query + reqURL := sdk.managerURL + jobsEndpoint + query - body, err := sdk.processRequest(http.MethodGet, url, nil, http.StatusOK) + body, err := sdk.processRequest(http.MethodGet, reqURL, nil, http.StatusOK) if err != nil { return JobPage{}, err } @@ -246,9 +247,9 @@ func (sdk *propSDK) ListJobs(offset, limit uint64, status string) (JobPage, erro } func (sdk *propSDK) StartJob(jobID string) error { - url := fmt.Sprintf("%s/jobs/%s/start", sdk.managerURL, jobID) + reqURL := fmt.Sprintf("%s/jobs/%s/start", sdk.managerURL, jobID) - if _, err := sdk.processRequest(http.MethodPost, url, nil, http.StatusOK); err != nil { + if _, err := sdk.processRequest(http.MethodPost, reqURL, nil, http.StatusOK); err != nil { return err } @@ -256,9 +257,9 @@ func (sdk *propSDK) StartJob(jobID string) error { } func (sdk *propSDK) StopJob(jobID string) error { - url := fmt.Sprintf("%s/jobs/%s/stop", sdk.managerURL, jobID) + reqURL := fmt.Sprintf("%s/jobs/%s/stop", sdk.managerURL, jobID) - if _, err := sdk.processRequest(http.MethodPost, url, nil, http.StatusOK); err != nil { + if _, err := sdk.processRequest(http.MethodPost, reqURL, nil, http.StatusOK); err != nil { return err } From 0886016e9b3010b87b234d9c4ac373da75ad8d09 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:01:23 +0300 Subject: [PATCH 08/14] test: add ListProplets status filter and edge case tests Covers: active/inactive filter, invalid status returns error, pagination with filter applied, Interrupted tasks mapping to the failed filter, and Scheduled tasks mapping to the running filter. --- manager/service_proplet_test.go | 132 ++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 manager/service_proplet_test.go diff --git a/manager/service_proplet_test.go b/manager/service_proplet_test.go new file mode 100644 index 00000000..411b545a --- /dev/null +++ b/manager/service_proplet_test.go @@ -0,0 +1,132 @@ +package manager_test + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/absmach/propeller/manager" + mqttmocks "github.com/absmach/propeller/pkg/mqtt/mocks" + "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/scheduler" + "github.com/absmach/propeller/pkg/storage" + "github.com/absmach/propeller/pkg/task" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func newServiceWithRepos(t *testing.T) (manager.Service, *storage.Repositories) { + t.Helper() + repos, err := storage.NewRepositories(storage.Config{Type: "memory"}) + require.NoError(t, err) + sched := scheduler.NewRoundRobin() + pubsub := mqttmocks.NewMockPubSub(t) + pubsub.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + pubsub.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + pubsub.On("Unsubscribe", mock.Anything, mock.Anything).Return(nil).Maybe() + pubsub.On("Disconnect", mock.Anything).Return(nil).Maybe() + logger := slog.Default() + svc, _ := manager.NewService(repos, sched, pubsub, "test-domain", "test-channel", logger) + + return svc, repos +} + +func TestListPropletsFilterByStatus(t *testing.T) { + t.Parallel() + svc, repos := newServiceWithRepos(t) + ctx := context.Background() + + activeProplet := proplet.Proplet{ + ID: uuid.NewString(), + Name: "active-proplet", + AliveHistory: []time.Time{time.Now()}, + } + inactiveProplet := proplet.Proplet{ + ID: uuid.NewString(), + Name: "inactive-proplet", + AliveHistory: []time.Time{time.Now().Add(-1 * time.Hour)}, + } + require.NoError(t, repos.Proplets.Create(ctx, activeProplet)) + require.NoError(t, repos.Proplets.Create(ctx, inactiveProplet)) + + all, err := svc.ListProplets(ctx, 0, 100, "") + require.NoError(t, err) + assert.Equal(t, uint64(2), all.Total) + + active, err := svc.ListProplets(ctx, 0, 100, manager.PropletStatusActive) + require.NoError(t, err) + assert.Equal(t, uint64(1), active.Total) + assert.True(t, active.Proplets[0].Alive) + + inactive, err := svc.ListProplets(ctx, 0, 100, manager.PropletStatusInactive) + require.NoError(t, err) + assert.Equal(t, uint64(1), inactive.Total) + assert.False(t, inactive.Proplets[0].Alive) +} + +func TestListPropletsInvalidStatus(t *testing.T) { + t.Parallel() + svc, _ := newServiceWithRepos(t) + + _, err := svc.ListProplets(context.Background(), 0, 100, "unknown") + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid value") +} + +func TestListPropletsFilterPagination(t *testing.T) { + t.Parallel() + svc, repos := newServiceWithRepos(t) + ctx := context.Background() + + for range 5 { + p := proplet.Proplet{ + ID: uuid.NewString(), + Name: uuid.NewString(), + AliveHistory: []time.Time{time.Now()}, + } + require.NoError(t, repos.Proplets.Create(ctx, p)) + } + + page, err := svc.ListProplets(ctx, 0, 3, manager.PropletStatusActive) + require.NoError(t, err) + assert.Equal(t, uint64(5), page.Total) + assert.Len(t, page.Proplets, 3) + + page2, err := svc.ListProplets(ctx, 3, 3, manager.PropletStatusActive) + require.NoError(t, err) + assert.Equal(t, uint64(5), page2.Total) + assert.Len(t, page2.Proplets, 2) +} + +func TestListJobsInterruptedMapsToFailed(t *testing.T) { + t.Parallel() + svc := newService(t) + ctx := context.Background() + + _, _, err := svc.CreateJob(ctx, "interrupted-job", []task.Task{ + {Name: "t1", State: task.Interrupted}, + }, "parallel") + require.NoError(t, err) + + page, err := svc.ListJobs(ctx, 0, 100, manager.JobStatusFailed) + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total, "interrupted job must appear under 'failed' filter") +} + +func TestListJobsScheduledMapsToRunning(t *testing.T) { + t.Parallel() + svc := newService(t) + ctx := context.Background() + + _, _, err := svc.CreateJob(ctx, "scheduled-job", []task.Task{ + {Name: "t1", State: task.Scheduled}, + }, "parallel") + require.NoError(t, err) + + page, err := svc.ListJobs(ctx, 0, 100, manager.JobStatusRunning) + require.NoError(t, err) + assert.Equal(t, uint64(1), page.Total, "scheduled job must appear under 'running' filter") +} From 8523f365f0bf4b0c9b4f6e5b6cfaf143b277b277 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 19:01:29 +0300 Subject: [PATCH 09/14] fix(test): update invalid status error message assertion The error now wraps ErrInvalidValue so the message starts with "invalid value provided" rather than the old literal prefix. --- manager/service_job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager/service_job_test.go b/manager/service_job_test.go index 546ad0e0..50424eae 100644 --- a/manager/service_job_test.go +++ b/manager/service_job_test.go @@ -320,5 +320,5 @@ func TestListJobsInvalidStatusFilter(t *testing.T) { _, err := svc.ListJobs(context.Background(), 0, 100, "invalid") require.Error(t, err) - assert.Contains(t, err.Error(), "invalid job status filter") + assert.Contains(t, err.Error(), "invalid value provided") } From 9c6adb35fd6151f5c92ffe99175268b590351e34 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 7 Apr 2026 12:03:00 +0300 Subject: [PATCH 10/14] fix(api): use ErrInvalidValue for tasks status and document ComputeJobState guarantees --- manager/api/endpoint.go | 2 +- manager/service.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index 49018820..42da116d 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -232,7 +232,7 @@ func listTasksEndpoint(svc manager.Service) endpoint.Endpoint { return listTaskResponse{}, errors.Join(apiutil.ErrValidation, err) } if req.status != "" { - return listTaskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData) + return listTaskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidValue) } tasks, err := svc.ListTasks(ctx, req.offset, req.limit) diff --git a/manager/service.go b/manager/service.go index d8e8fd38..93160bd9 100644 --- a/manager/service.go +++ b/manager/service.go @@ -446,6 +446,10 @@ func (svc *service) ListJobs(ctx context.Context, offset, limit uint64, status s }) if status != "" { + // ComputeJobState only ever returns Pending, Running, Completed, or Failed. + // Skipped and Interrupted task states are collapsed: Interrupted → Failed, + // Scheduled → Running, all-Skipped → Completed. No job summary can carry + // any other state, so the map below is exhaustive. statusStateMap := map[string]task.State{ JobStatusPending: task.Pending, JobStatusRunning: task.Running, @@ -1732,6 +1736,10 @@ func (svc *service) listAllTasks(ctx context.Context) ([]task.Task, error) { return allTasks, nil } +// listAllProplets fetches every proplet from storage into memory so callers can +// apply in-process filters (e.g. liveness status). This is O(n) in proplet count. +// If proplet counts grow large, consider adding a storage-level ListByStatus query +// to push the filter down to the repository layer instead. func (svc *service) listAllProplets(ctx context.Context) ([]proplet.Proplet, error) { const pageSize uint64 = 100 var allProplets []proplet.Proplet From 55bb7f3b586139236118c3b6564727ed4ef278cc Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 7 Apr 2026 12:25:11 +0300 Subject: [PATCH 11/14] feat(storage): push proplet status filter to storage layer Add ListByAlive to PropletRepository across all four backends (Postgres, SQLite, Badger, Memory) so GET /proplets?status= filters at the repo layer instead of scanning all proplets into memory in the service. Export proplet.AliveTimeout so the service can compute the liveness cutoff without duplicating the constant. Remove listAllProplets helper. --- manager/service.go | 52 ++------------- pkg/proplet/proplet.go | 4 +- pkg/storage/badger/init.go | 1 + pkg/storage/badger/proplets.go | 37 +++++++++++ pkg/storage/factory.go | 13 ++++ pkg/storage/memory_adapter.go | 28 ++++++++ pkg/storage/mocks/proplet_repository.go | 87 +++++++++++++++++++++++++ pkg/storage/postgres/init.go | 1 + pkg/storage/postgres/proplets.go | 40 ++++++++++++ pkg/storage/repository.go | 2 + pkg/storage/sqlite/init.go | 1 + pkg/storage/sqlite/proplets.go | 34 ++++++++++ 12 files changed, 252 insertions(+), 48 deletions(-) diff --git a/manager/service.go b/manager/service.go index 93160bd9..bd50059f 100644 --- a/manager/service.go +++ b/manager/service.go @@ -142,38 +142,21 @@ func (svc *service) ListProplets(ctx context.Context, offset, limit uint64, stat }, nil } - all, err := svc.listAllProplets(ctx) + alive := status == PropletStatusActive + since := time.Now().Add(-proplet.AliveTimeout) + proplets, total, err := svc.propletRepo.ListByAlive(ctx, offset, limit, alive, since) if err != nil { return proplet.PropletPage{}, err } - - filtered := make([]proplet.Proplet, 0, len(all)) - for i := range all { - all[i].SetAlive() - match := (status == PropletStatusActive && all[i].Alive) || - (status == PropletStatusInactive && !all[i].Alive) - if match { - filtered = append(filtered, all[i]) - } - } - - total := uint64(len(filtered)) - if offset >= total { - return proplet.PropletPage{ - Offset: offset, - Limit: limit, - Total: total, - Proplets: []proplet.Proplet{}, - }, nil + for i := range proplets { + proplets[i].SetAlive() } - end := min(offset+limit, total) - return proplet.PropletPage{ Offset: offset, Limit: limit, Total: total, - Proplets: filtered[offset:end], + Proplets: proplets, }, nil } @@ -1736,29 +1719,6 @@ func (svc *service) listAllTasks(ctx context.Context) ([]task.Task, error) { return allTasks, nil } -// listAllProplets fetches every proplet from storage into memory so callers can -// apply in-process filters (e.g. liveness status). This is O(n) in proplet count. -// If proplet counts grow large, consider adding a storage-level ListByStatus query -// to push the filter down to the repository layer instead. -func (svc *service) listAllProplets(ctx context.Context) ([]proplet.Proplet, error) { - const pageSize uint64 = 100 - var allProplets []proplet.Proplet - var offset uint64 - - for { - proplets, total, err := svc.propletRepo.List(ctx, offset, pageSize) - if err != nil { - return nil, err - } - allProplets = append(allProplets, proplets...) - offset += uint64(len(proplets)) - if offset >= total || len(proplets) == 0 { - break - } - } - - return allProplets, nil -} func (svc *service) pinTaskToProplet(ctx context.Context, taskID, propletID string) error { return svc.taskPropletRepo.Create(ctx, taskID, propletID) diff --git a/pkg/proplet/proplet.go b/pkg/proplet/proplet.go index 1f76ed6e..9b7f2777 100644 --- a/pkg/proplet/proplet.go +++ b/pkg/proplet/proplet.go @@ -2,7 +2,7 @@ package proplet import "time" -const aliveTimeout = 10 * time.Second +const AliveTimeout = 10 * time.Second type PropletMetadata struct { Description string `json:"description,omitempty"` @@ -30,7 +30,7 @@ type Proplet struct { func (p *Proplet) SetAlive() { if len(p.AliveHistory) > 0 { lastAlive := p.AliveHistory[len(p.AliveHistory)-1] - if time.Since(lastAlive) <= aliveTimeout { + if time.Since(lastAlive) <= AliveTimeout { p.Alive = true return diff --git a/pkg/storage/badger/init.go b/pkg/storage/badger/init.go index 0155a1b4..9b5d593b 100644 --- a/pkg/storage/badger/init.go +++ b/pkg/storage/badger/init.go @@ -62,6 +62,7 @@ type PropletRepository interface { Get(ctx context.Context, id string) (proplet.Proplet, error) Update(ctx context.Context, p proplet.Proplet) error List(ctx context.Context, offset, limit uint64) ([]proplet.Proplet, uint64, error) + ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/storage/badger/proplets.go b/pkg/storage/badger/proplets.go index f4ec7712..3e5e0ed3 100644 --- a/pkg/storage/badger/proplets.go +++ b/pkg/storage/badger/proplets.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/absmach/propeller/pkg/proplet" ) @@ -78,6 +79,42 @@ func (r *propletRepo) List(ctx context.Context, offset, limit uint64) ([]proplet return proplets, total, nil } +func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + prefix := []byte("proplet:") + totalRaw, err := r.db.countWithPrefix(prefix) + if err != nil { + return nil, 0, err + } + + values, err := r.db.listWithPrefix(prefix, 0, totalRaw) + if err != nil { + return nil, 0, err + } + + var filtered []proplet.Proplet + for _, val := range values { + var p proplet.Proplet + if err := json.Unmarshal(val, &p); err != nil { + return nil, 0, fmt.Errorf("unmarshal error: %w", err) + } + isAlive := len(p.AliveHistory) > 0 && !p.AliveHistory[len(p.AliveHistory)-1].Before(since) + if isAlive == alive { + filtered = append(filtered, p) + } + } + + filteredTotal := uint64(len(filtered)) + if offset >= filteredTotal { + return []proplet.Proplet{}, filteredTotal, nil + } + end := offset + limit + if end > filteredTotal { + end = filteredTotal + } + + return filtered[offset:end], filteredTotal, nil +} + func (r *propletRepo) Delete(ctx context.Context, id string) error { key := []byte("proplet:" + id) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index f4d37798..b2c55d4b 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/absmach/propeller/pkg/job" "github.com/absmach/propeller/pkg/proplet" @@ -189,6 +190,10 @@ func (a *postgresPropletAdapter) List(ctx context.Context, offset, limit uint64) return a.repo.List(ctx, offset, limit) } +func (a *postgresPropletAdapter) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + return a.repo.ListByAlive(ctx, offset, limit, alive, since) +} + func (a *postgresPropletAdapter) Delete(ctx context.Context, id string) error { return a.repo.Delete(ctx, id) } @@ -332,6 +337,10 @@ func (a *sqlitePropletAdapter) List(ctx context.Context, offset, limit uint64) ( return a.repo.List(ctx, offset, limit) } +func (a *sqlitePropletAdapter) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + return a.repo.ListByAlive(ctx, offset, limit, alive, since) +} + func (a *sqlitePropletAdapter) Delete(ctx context.Context, id string) error { return a.repo.Delete(ctx, id) } @@ -475,6 +484,10 @@ func (a *badgerPropletAdapter) List(ctx context.Context, offset, limit uint64) ( return a.repo.List(ctx, offset, limit) } +func (a *badgerPropletAdapter) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + return a.repo.ListByAlive(ctx, offset, limit, alive, since) +} + func (a *badgerPropletAdapter) Delete(ctx context.Context, id string) error { return a.repo.Delete(ctx, id) } diff --git a/pkg/storage/memory_adapter.go b/pkg/storage/memory_adapter.go index 77399099..15f508a1 100644 --- a/pkg/storage/memory_adapter.go +++ b/pkg/storage/memory_adapter.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "time" pkgerrors "github.com/absmach/propeller/pkg/errors" "github.com/absmach/propeller/pkg/job" @@ -152,6 +153,33 @@ func (r *memoryPropletRepo) List(ctx context.Context, offset, limit uint64) ([]p return proplets, total, nil } +func (r *memoryPropletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + data, _, err := r.storage.List(ctx, 0, maxMemoryFetch) + if err != nil { + return nil, 0, err + } + + var filtered []proplet.Proplet + for _, d := range data { + p, ok := d.(proplet.Proplet) + if !ok { + continue + } + isAlive := len(p.AliveHistory) > 0 && !p.AliveHistory[len(p.AliveHistory)-1].Before(since) + if isAlive == alive { + filtered = append(filtered, p) + } + } + + filteredTotal := uint64(len(filtered)) + if offset >= filteredTotal { + return []proplet.Proplet{}, filteredTotal, nil + } + end := min(offset+limit, filteredTotal) + + return filtered[offset:end], filteredTotal, nil +} + func (r *memoryPropletRepo) Delete(ctx context.Context, id string) error { return r.storage.Delete(ctx, id) } diff --git a/pkg/storage/mocks/proplet_repository.go b/pkg/storage/mocks/proplet_repository.go index 6c4040ab..afc00706 100644 --- a/pkg/storage/mocks/proplet_repository.go +++ b/pkg/storage/mocks/proplet_repository.go @@ -6,6 +6,7 @@ package mocks import ( "context" + "time" "github.com/absmach/propeller/pkg/proplet" mock "github.com/stretchr/testify/mock" @@ -298,6 +299,92 @@ func (_c *MockPropletRepository_List_Call) RunAndReturn(run func(ctx context.Con return _c } +// ListByAlive provides a mock function for the type MockPropletRepository +func (_mock *MockPropletRepository) ListByAlive(ctx context.Context, offset uint64, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + ret := _mock.Called(ctx, offset, limit, alive, since) + + if len(ret) == 0 { + panic("no return value specified for ListByAlive") + } + + var r0 []proplet.Proplet + var r1 uint64 + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, bool, time.Time) ([]proplet.Proplet, uint64, error)); ok { + return returnFunc(ctx, offset, limit, alive, since) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, uint64, bool, time.Time) []proplet.Proplet); ok { + r0 = returnFunc(ctx, offset, limit, alive, since) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]proplet.Proplet) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64, uint64, bool, time.Time) uint64); ok { + r1 = returnFunc(ctx, offset, limit, alive, since) + } else { + r1 = ret.Get(1).(uint64) + } + if returnFunc, ok := ret.Get(2).(func(context.Context, uint64, uint64, bool, time.Time) error); ok { + r2 = returnFunc(ctx, offset, limit, alive, since) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 +} + +// MockPropletRepository_ListByAlive_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListByAlive' +type MockPropletRepository_ListByAlive_Call struct { + *mock.Call +} + +// ListByAlive is a helper method to define mock.On call +// - ctx context.Context +// - offset uint64 +// - limit uint64 +// - alive bool +// - since time.Time +func (_e *MockPropletRepository_Expecter) ListByAlive(ctx interface{}, offset interface{}, limit interface{}, alive interface{}, since interface{}) *MockPropletRepository_ListByAlive_Call { + return &MockPropletRepository_ListByAlive_Call{Call: _e.mock.On("ListByAlive", ctx, offset, limit, alive, since)} +} + +func (_c *MockPropletRepository_ListByAlive_Call) Run(run func(ctx context.Context, offset uint64, limit uint64, alive bool, since time.Time)) *MockPropletRepository_ListByAlive_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + var arg3 bool + if args[3] != nil { + arg3 = args[3].(bool) + } + var arg4 time.Time + if args[4] != nil { + arg4 = args[4].(time.Time) + } + run(arg0, arg1, arg2, arg3, arg4) + }) + return _c +} + +func (_c *MockPropletRepository_ListByAlive_Call) Return(proplets []proplet.Proplet, v uint64, err error) *MockPropletRepository_ListByAlive_Call { + _c.Call.Return(proplets, v, err) + return _c +} + +func (_c *MockPropletRepository_ListByAlive_Call) RunAndReturn(run func(ctx context.Context, offset uint64, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error)) *MockPropletRepository_ListByAlive_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function for the type MockPropletRepository func (_mock *MockPropletRepository) Update(ctx context.Context, p proplet.Proplet) error { ret := _mock.Called(ctx, p) diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 6465d4b6..85870376 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -64,6 +64,7 @@ type PropletRepository interface { Get(ctx context.Context, id string) (proplet.Proplet, error) Update(ctx context.Context, p proplet.Proplet) error List(ctx context.Context, offset, limit uint64) ([]proplet.Proplet, uint64, error) + ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/storage/postgres/proplets.go b/pkg/storage/postgres/proplets.go index 0b45700b..2c303de5 100644 --- a/pkg/storage/postgres/proplets.go +++ b/pkg/storage/postgres/proplets.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/absmach/propeller/pkg/proplet" ) @@ -119,6 +120,45 @@ func (r *propletRepo) List(ctx context.Context, offset, limit uint64) ([]proplet return proplets, total, nil } +func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + var whereClause string + if alive { + whereClause = `WHERE alive_history IS NOT NULL AND jsonb_array_length(alive_history) > 0 AND (alive_history ->> (jsonb_array_length(alive_history) - 1))::timestamptz >= $1` + } else { + whereClause = `WHERE alive_history IS NULL OR jsonb_array_length(alive_history) = 0 OR (alive_history ->> (jsonb_array_length(alive_history) - 1))::timestamptz < $1` + } + + var total uint64 + if err := r.db.GetContext(ctx, &total, fmt.Sprintf("SELECT COUNT(*) FROM proplets %s", whereClause), since); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + + query := fmt.Sprintf(`SELECT id, name, task_count, alive, alive_history, metadata FROM proplets %s LIMIT $2 OFFSET $3`, whereClause) + rows, err := r.db.QueryContext(ctx, query, since, limit, offset) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + defer rows.Close() + + proplets := make([]proplet.Proplet, 0) + for rows.Next() { + var dbp dbProplet + if err := rows.Scan(&dbp.ID, &dbp.Name, &dbp.TaskCount, &dbp.Alive, &dbp.AliveHistory, &dbp.Metadata); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBScan, err) + } + p, err := r.toProplet(dbp) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBScan, err) + } + proplets = append(proplets, p) + } + if err := rows.Err(); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + + return proplets, total, nil +} + func (r *propletRepo) Delete(ctx context.Context, id string) error { query := `DELETE FROM proplets WHERE id = $1` diff --git a/pkg/storage/repository.go b/pkg/storage/repository.go index 0379139c..174e5d0d 100644 --- a/pkg/storage/repository.go +++ b/pkg/storage/repository.go @@ -2,6 +2,7 @@ package storage import ( "context" + "time" "github.com/absmach/propeller/pkg/job" "github.com/absmach/propeller/pkg/proplet" @@ -23,6 +24,7 @@ type PropletRepository interface { Get(ctx context.Context, id string) (proplet.Proplet, error) Update(ctx context.Context, p proplet.Proplet) error List(ctx context.Context, offset, limit uint64) ([]proplet.Proplet, uint64, error) + ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index 2ad7da03..af29be2f 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -64,6 +64,7 @@ type PropletRepository interface { Get(ctx context.Context, id string) (proplet.Proplet, error) Update(ctx context.Context, p proplet.Proplet) error List(ctx context.Context, offset, limit uint64) ([]proplet.Proplet, uint64, error) + ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) Delete(ctx context.Context, id string) error } diff --git a/pkg/storage/sqlite/proplets.go b/pkg/storage/sqlite/proplets.go index 86c991f8..7f88566c 100644 --- a/pkg/storage/sqlite/proplets.go +++ b/pkg/storage/sqlite/proplets.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/absmach/propeller/pkg/proplet" ) @@ -120,6 +121,39 @@ func (r *propletRepo) List(ctx context.Context, offset, limit uint64) ([]proplet return proplets, total, nil } +func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { + const pageSize uint64 = 1000 + var all []proplet.Proplet + var scanOffset uint64 + for { + batch, total, err := r.List(ctx, scanOffset, pageSize) + if err != nil { + return nil, 0, err + } + all = append(all, batch...) + scanOffset += uint64(len(batch)) + if scanOffset >= total || len(batch) == 0 { + break + } + } + + var filtered []proplet.Proplet + for _, p := range all { + isAlive := len(p.AliveHistory) > 0 && !p.AliveHistory[len(p.AliveHistory)-1].Before(since) + if isAlive == alive { + filtered = append(filtered, p) + } + } + + filteredTotal := uint64(len(filtered)) + if offset >= filteredTotal { + return []proplet.Proplet{}, filteredTotal, nil + } + end := min(offset+limit, filteredTotal) + + return filtered[offset:end], filteredTotal, nil +} + func (r *propletRepo) Delete(ctx context.Context, id string) error { query := `DELETE FROM proplets WHERE id = ?` From a065a23c4696ae82427d3cec414732aecb180d7c Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 7 Apr 2026 12:30:34 +0300 Subject: [PATCH 12/14] fix(storage): cap Badger scan and add transaction to ListByAlive Cap the Badger in-process filter scan at 100k entries to prevent unbounded memory growth. Wrap Postgres ListByAlive in a REPEATABLE READ transaction so COUNT and SELECT see the same snapshot. Replace the SQLite batch-loop with a single transactional full-table scan to eliminate the same COUNT/SELECT divergence risk. --- pkg/storage/badger/proplets.go | 9 +++----- pkg/storage/postgres/proplets.go | 10 ++++++-- pkg/storage/sqlite/proplets.go | 39 +++++++++++++++++++------------- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/pkg/storage/badger/proplets.go b/pkg/storage/badger/proplets.go index 3e5e0ed3..089d385c 100644 --- a/pkg/storage/badger/proplets.go +++ b/pkg/storage/badger/proplets.go @@ -79,14 +79,11 @@ func (r *propletRepo) List(ctx context.Context, offset, limit uint64) ([]proplet return proplets, total, nil } +const maxBadgerScan uint64 = 100000 + func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { prefix := []byte("proplet:") - totalRaw, err := r.db.countWithPrefix(prefix) - if err != nil { - return nil, 0, err - } - - values, err := r.db.listWithPrefix(prefix, 0, totalRaw) + values, err := r.db.listWithPrefix(prefix, 0, maxBadgerScan) if err != nil { return nil, 0, err } diff --git a/pkg/storage/postgres/proplets.go b/pkg/storage/postgres/proplets.go index 2c303de5..7fa854cc 100644 --- a/pkg/storage/postgres/proplets.go +++ b/pkg/storage/postgres/proplets.go @@ -128,13 +128,19 @@ func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, ali whereClause = `WHERE alive_history IS NULL OR jsonb_array_length(alive_history) = 0 OR (alive_history ->> (jsonb_array_length(alive_history) - 1))::timestamptz < $1` } + tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + defer tx.Rollback() + var total uint64 - if err := r.db.GetContext(ctx, &total, fmt.Sprintf("SELECT COUNT(*) FROM proplets %s", whereClause), since); err != nil { + if err := tx.GetContext(ctx, &total, fmt.Sprintf("SELECT COUNT(*) FROM proplets %s", whereClause), since); err != nil { return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } query := fmt.Sprintf(`SELECT id, name, task_count, alive, alive_history, metadata FROM proplets %s LIMIT $2 OFFSET $3`, whereClause) - rows, err := r.db.QueryContext(ctx, query, since, limit, offset) + rows, err := tx.QueryContext(ctx, query, since, limit, offset) if err != nil { return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } diff --git a/pkg/storage/sqlite/proplets.go b/pkg/storage/sqlite/proplets.go index 7f88566c..51e68015 100644 --- a/pkg/storage/sqlite/proplets.go +++ b/pkg/storage/sqlite/proplets.go @@ -122,36 +122,43 @@ func (r *propletRepo) List(ctx context.Context, offset, limit uint64) ([]proplet } func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { - const pageSize uint64 = 1000 - var all []proplet.Proplet - var scanOffset uint64 - for { - batch, total, err := r.List(ctx, scanOffset, pageSize) - if err != nil { - return nil, 0, err - } - all = append(all, batch...) - scanOffset += uint64(len(batch)) - if scanOffset >= total || len(batch) == 0 { - break - } + tx, err := r.db.BeginTxx(ctx, nil) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } + defer tx.Rollback() + + rows, err := tx.QueryContext(ctx, `SELECT id, name, task_count, alive, alive_history, metadata FROM proplets`) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + defer rows.Close() var filtered []proplet.Proplet - for _, p := range all { + for rows.Next() { + var dbp dbProplet + if err := rows.Scan(&dbp.ID, &dbp.Name, &dbp.TaskCount, &dbp.Alive, &dbp.AliveHistory, &dbp.Metadata); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBScan, err) + } + p, err := r.toProplet(dbp) + if err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBScan, err) + } isAlive := len(p.AliveHistory) > 0 && !p.AliveHistory[len(p.AliveHistory)-1].Before(since) if isAlive == alive { filtered = append(filtered, p) } } + if err := rows.Err(); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } filteredTotal := uint64(len(filtered)) if offset >= filteredTotal { return []proplet.Proplet{}, filteredTotal, nil } - end := min(offset+limit, filteredTotal) - return filtered[offset:end], filteredTotal, nil + return filtered[offset:min(offset+limit, filteredTotal)], filteredTotal, nil } func (r *propletRepo) Delete(ctx context.Context, id string) error { From 7dd30bf7554001262a4adab6ec3b14e9613ae9e9 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Tue, 7 Apr 2026 12:32:15 +0300 Subject: [PATCH 13/14] chore(mocks): remove comments from ListByAlive mock methods --- pkg/storage/mocks/proplet_repository.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/storage/mocks/proplet_repository.go b/pkg/storage/mocks/proplet_repository.go index afc00706..39e4fabf 100644 --- a/pkg/storage/mocks/proplet_repository.go +++ b/pkg/storage/mocks/proplet_repository.go @@ -299,7 +299,6 @@ func (_c *MockPropletRepository_List_Call) RunAndReturn(run func(ctx context.Con return _c } -// ListByAlive provides a mock function for the type MockPropletRepository func (_mock *MockPropletRepository) ListByAlive(ctx context.Context, offset uint64, limit uint64, alive bool, since time.Time) ([]proplet.Proplet, uint64, error) { ret := _mock.Called(ctx, offset, limit, alive, since) @@ -333,17 +332,10 @@ func (_mock *MockPropletRepository) ListByAlive(ctx context.Context, offset uint return r0, r1, r2 } -// MockPropletRepository_ListByAlive_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListByAlive' type MockPropletRepository_ListByAlive_Call struct { *mock.Call } -// ListByAlive is a helper method to define mock.On call -// - ctx context.Context -// - offset uint64 -// - limit uint64 -// - alive bool -// - since time.Time func (_e *MockPropletRepository_Expecter) ListByAlive(ctx interface{}, offset interface{}, limit interface{}, alive interface{}, since interface{}) *MockPropletRepository_ListByAlive_Call { return &MockPropletRepository_ListByAlive_Call{Call: _e.mock.On("ListByAlive", ctx, offset, limit, alive, since)} } From e9bd958d0ea2bdaef89a7ffed7c29439cd68f078 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 8 Apr 2026 11:34:15 +0300 Subject: [PATCH 14/14] fix(storage): resolve linter errors across storage and manager packages --- manager/service.go | 1 - pkg/storage/badger/proplets.go | 5 +---- pkg/storage/postgres/proplets.go | 4 ++-- pkg/storage/sqlite/proplets.go | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/manager/service.go b/manager/service.go index bd50059f..893d3100 100644 --- a/manager/service.go +++ b/manager/service.go @@ -1719,7 +1719,6 @@ func (svc *service) listAllTasks(ctx context.Context) ([]task.Task, error) { return allTasks, nil } - func (svc *service) pinTaskToProplet(ctx context.Context, taskID, propletID string) error { return svc.taskPropletRepo.Create(ctx, taskID, propletID) } diff --git a/pkg/storage/badger/proplets.go b/pkg/storage/badger/proplets.go index 089d385c..09ef6925 100644 --- a/pkg/storage/badger/proplets.go +++ b/pkg/storage/badger/proplets.go @@ -104,10 +104,7 @@ func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, ali if offset >= filteredTotal { return []proplet.Proplet{}, filteredTotal, nil } - end := offset + limit - if end > filteredTotal { - end = filteredTotal - } + end := min(offset+limit, filteredTotal) return filtered[offset:end], filteredTotal, nil } diff --git a/pkg/storage/postgres/proplets.go b/pkg/storage/postgres/proplets.go index 7fa854cc..d7d47f87 100644 --- a/pkg/storage/postgres/proplets.go +++ b/pkg/storage/postgres/proplets.go @@ -132,10 +132,10 @@ func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, ali if err != nil { return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } - defer tx.Rollback() + defer func() { _ = tx.Rollback() }() var total uint64 - if err := tx.GetContext(ctx, &total, fmt.Sprintf("SELECT COUNT(*) FROM proplets %s", whereClause), since); err != nil { + if err := tx.GetContext(ctx, &total, "SELECT COUNT(*) FROM proplets "+whereClause, since); err != nil { return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } diff --git a/pkg/storage/sqlite/proplets.go b/pkg/storage/sqlite/proplets.go index 51e68015..0ad0e432 100644 --- a/pkg/storage/sqlite/proplets.go +++ b/pkg/storage/sqlite/proplets.go @@ -126,7 +126,7 @@ func (r *propletRepo) ListByAlive(ctx context.Context, offset, limit uint64, ali if err != nil { return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) } - defer tx.Rollback() + defer func() { _ = tx.Rollback() }() rows, err := tx.QueryContext(ctx, `SELECT id, name, task_count, alive, alive_history, metadata FROM proplets`) if err != nil {