From 415b4e9a2d87f7703ebd7d4a0fdc01f5da663929 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:04 +0300 Subject: [PATCH 1/8] feat(task): add Metadata field to Task struct --- pkg/task/task.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/task/task.go b/pkg/task/task.go index 32961ce0..a73e0e76 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -120,6 +120,7 @@ type Task struct { IsRecurring bool `json:"is_recurring,omitempty"` Timezone string `json:"timezone,omitempty"` Priority int `json:"priority,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` } type TaskPage struct { From bfd3557ec4b7b13a1bd390feaed9d67acb2e2c50 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:09 +0300 Subject: [PATCH 2/8] feat(storage): add migration 4 for task metadata column --- pkg/storage/postgres/init.go | 12 ++++++++++++ pkg/storage/sqlite/init.go | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 6465d4b6..cf6ca286 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -47,6 +47,7 @@ type TaskRepository interface { Get(ctx context.Context, id string) (task.Task, error) Update(ctx context.Context, t task.Task) error List(ctx context.Context, offset, limit uint64) ([]task.Task, uint64, error) + ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) ListByJobID(ctx context.Context, jobID string) ([]task.Task, error) Delete(ctx context.Context, id string) error @@ -250,6 +251,17 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`, }, }, + { + Id: "4_add_task_metadata", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS metadata JSONB`, + `CREATE INDEX IF NOT EXISTS idx_tasks_metadata ON tasks USING GIN (metadata jsonb_path_ops)`, + }, + Down: []string{ + `DROP INDEX IF EXISTS idx_tasks_metadata`, + `ALTER TABLE tasks DROP COLUMN IF EXISTS metadata`, + }, + }, }, } diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index 2ad7da03..9681e970 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -47,6 +47,7 @@ type TaskRepository interface { Get(ctx context.Context, id string) (task.Task, error) Update(ctx context.Context, t task.Task) error List(ctx context.Context, offset, limit uint64) ([]task.Task, uint64, error) + ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) ListByJobID(ctx context.Context, jobID string) ([]task.Task, error) Delete(ctx context.Context, id string) error @@ -243,6 +244,17 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN metadata`, }, }, + { + Id: "4_add_task_metadata", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN metadata TEXT`, + `CREATE INDEX IF NOT EXISTS idx_tasks_metadata ON tasks (json_extract(metadata, '$'))`, + }, + Down: []string{ + `DROP INDEX IF EXISTS idx_tasks_metadata`, + `ALTER TABLE tasks DROP COLUMN metadata`, + }, + }, }, } From 4211e0451710140603821cb5e8227568f9585de9 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:17 +0300 Subject: [PATCH 3/8] feat(storage): implement task metadata persistence across all backends --- pkg/storage/badger/init.go | 1 + pkg/storage/badger/tasks.go | 30 +++++++++++ pkg/storage/factory.go | 12 +++++ pkg/storage/memory_adapter.go | 44 ++++++++++++++++ pkg/storage/postgres/tasks.go | 71 +++++++++++++++++++++++-- pkg/storage/repository.go | 1 + pkg/storage/sqlite/tasks.go | 97 +++++++++++++++++++++++++---------- 7 files changed, 224 insertions(+), 32 deletions(-) diff --git a/pkg/storage/badger/init.go b/pkg/storage/badger/init.go index 0155a1b4..e1b7af08 100644 --- a/pkg/storage/badger/init.go +++ b/pkg/storage/badger/init.go @@ -45,6 +45,7 @@ type TaskRepository interface { Get(ctx context.Context, id string) (task.Task, error) Update(ctx context.Context, t task.Task) error List(ctx context.Context, offset, limit uint64) ([]task.Task, uint64, error) + ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) ListByJobID(ctx context.Context, jobID string) ([]task.Task, error) Delete(ctx context.Context, id string) error diff --git a/pkg/storage/badger/tasks.go b/pkg/storage/badger/tasks.go index 494495e8..0c295100 100644 --- a/pkg/storage/badger/tasks.go +++ b/pkg/storage/badger/tasks.go @@ -79,6 +79,36 @@ func (r *taskRepo) List(ctx context.Context, offset, limit uint64) ([]task.Task, return tasks, total, nil } +func (r *taskRepo) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + all, err := r.listBy(ctx, func(t task.Task) bool { + if t.Metadata == nil { + return false + } + for k, v := range filter { + val, exists := t.Metadata[k] + if !exists { + return false + } + s, ok := val.(string) + if !ok || s != v { + return false + } + } + + return true + }) + if err != nil { + return nil, 0, err + } + total := uint64(len(all)) + if offset >= total { + return []task.Task{}, total, nil + } + end := min(offset+limit, total) + + return all[offset:end], total, nil +} + func (r *taskRepo) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { return r.listBy(ctx, func(t task.Task) bool { return t.WorkflowID == workflowID diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index f4d37798..53dc91c1 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -152,6 +152,10 @@ func (a *postgresTaskAdapter) List(ctx context.Context, offset, limit uint64) ([ return a.repo.List(ctx, offset, limit) } +func (a *postgresTaskAdapter) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + return a.repo.ListByMetadataFilter(ctx, filter, offset, limit) +} + func (a *postgresTaskAdapter) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { return a.repo.ListByWorkflowID(ctx, workflowID) } @@ -295,6 +299,10 @@ func (a *sqliteTaskAdapter) List(ctx context.Context, offset, limit uint64) ([]t return a.repo.List(ctx, offset, limit) } +func (a *sqliteTaskAdapter) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + return a.repo.ListByMetadataFilter(ctx, filter, offset, limit) +} + func (a *sqliteTaskAdapter) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { return a.repo.ListByWorkflowID(ctx, workflowID) } @@ -438,6 +446,10 @@ func (a *badgerTaskAdapter) List(ctx context.Context, offset, limit uint64) ([]t return a.repo.List(ctx, offset, limit) } +func (a *badgerTaskAdapter) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + return a.repo.ListByMetadataFilter(ctx, filter, offset, limit) +} + func (a *badgerTaskAdapter) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { return a.repo.ListByWorkflowID(ctx, workflowID) } diff --git a/pkg/storage/memory_adapter.go b/pkg/storage/memory_adapter.go index 77399099..545609a4 100644 --- a/pkg/storage/memory_adapter.go +++ b/pkg/storage/memory_adapter.go @@ -62,6 +62,50 @@ func (r *memoryTaskRepo) List(ctx context.Context, offset, limit uint64) ([]task return tasks, total, nil } +func (r *memoryTaskRepo) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + data, _, err := r.storage.List(ctx, 0, maxMemoryFetch) + if err != nil { + return nil, 0, err + } + + matched := make([]task.Task, 0) + for _, d := range data { + t, ok := d.(task.Task) + if !ok { + continue + } + if t.Metadata == nil { + continue + } + match := true + for k, v := range filter { + val, exists := t.Metadata[k] + if !exists { + match = false + + break + } + s, ok := val.(string) + if !ok || s != v { + match = false + + break + } + } + if match { + matched = append(matched, t) + } + } + + total := uint64(len(matched)) + if offset >= total { + return []task.Task{}, total, nil + } + end := min(offset+limit, total) + + return matched[offset:end], total, nil +} + func (r *memoryTaskRepo) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { data, _, err := r.storage.List(ctx, 0, maxMemoryFetch) if err != nil { diff --git a/pkg/storage/postgres/tasks.go b/pkg/storage/postgres/tasks.go index 4fb4b685..5e0eae35 100644 --- a/pkg/storage/postgres/tasks.go +++ b/pkg/storage/postgres/tasks.go @@ -3,8 +3,10 @@ package postgres import ( "context" "database/sql" + "encoding/json" "errors" "fmt" + "strings" "github.com/absmach/propeller/pkg/task" ) @@ -43,15 +45,16 @@ type dbTask struct { RunIf *string `db:"run_if"` Kind *string `db:"kind"` Mode *string `db:"mode"` + Metadata []byte `db:"metadata"` } const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, - created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` + created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, metadata` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -83,6 +86,11 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { return task.Task{}, fmt.Errorf("%w: %w", ErrDBQuery, err) } + metadata, err := jsonBytes(t.Metadata) + if err != nil { + return task.Task{}, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), @@ -105,6 +113,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + metadata, ) if err != nil { return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err) @@ -135,7 +144,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12, results = $13, error = $14, monitoring_profile = $15, start_time = $16, finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20, - depends_on = $21, run_if = $22, kind = $23, mode = $24 + depends_on = $21, run_if = $22, kind = $23, mode = $24, metadata = $25 WHERE id = $1` cliArgs, err := jsonBytes(t.CLIArgs) @@ -168,6 +177,11 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { return fmt.Errorf("%w: %w", ErrDBQuery, err) } + metadata, err := jsonBytes(t.Metadata) + if err != nil { + return fmt.Errorf("%w: %w", ErrDBQuery, err) + } + _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), @@ -188,6 +202,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + metadata, ) if err != nil { return fmt.Errorf("%w: %w", ErrUpdate, err) @@ -213,6 +228,49 @@ func (r *taskRepo) List(ctx context.Context, offset, limit uint64) ([]task.Task, return tasks, total, nil } +func (r *taskRepo) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + whereClause, args := buildPostgresMetadataWhere(filter) + countArgs := args + args = append(args, limit, offset) + + var total uint64 + countQuery := "SELECT COUNT(*) FROM tasks" + whereClause + if err := r.db.GetContext(ctx, &total, countQuery, countArgs...); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + + n := len(filter) + 1 + query := `SELECT ` + taskColumns + ` FROM tasks` + whereClause + + fmt.Sprintf(` ORDER BY created_at DESC LIMIT $%d OFFSET $%d`, n, n+1) + tasks, err := r.scanTasks(ctx, query, args...) + if err != nil { + return nil, 0, err + } + + return tasks, total, nil +} + +func buildPostgresMetadataWhere(filter map[string]string) (clause string, args []any) { + if len(filter) == 0 { + return "", nil + } + var sb strings.Builder + sb.WriteString(" WHERE metadata IS NOT NULL") + args = make([]any, 0, len(filter)) + i := 1 + for k, v := range filter { + fmt.Fprintf(&sb, ` AND metadata @> $%d::jsonb`, i) + b, err := json.Marshal(map[string]string{k: v}) + if err != nil { + return "", nil + } + args = append(args, string(b)) + i++ + } + + return sb.String(), args +} + func (r *taskRepo) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { query := `SELECT ` + taskColumns + ` FROM tasks WHERE workflow_id = $1 ORDER BY created_at` @@ -252,7 +310,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, &dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt, &dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf, - &dbt.Kind, &dbt.Mode, + &dbt.Kind, &dbt.Mode, &dbt.Metadata, ); err != nil { return nil, fmt.Errorf("%w: %w", ErrDBScan, err) } @@ -335,6 +393,11 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.Mode != nil { t.Mode = task.Mode(*dbt.Mode) } + if dbt.Metadata != nil { + if err := jsonUnmarshal(dbt.Metadata, &t.Metadata); err != nil { + return task.Task{}, err + } + } return t, nil } diff --git a/pkg/storage/repository.go b/pkg/storage/repository.go index 0379139c..a27598c8 100644 --- a/pkg/storage/repository.go +++ b/pkg/storage/repository.go @@ -13,6 +13,7 @@ type TaskRepository interface { Get(ctx context.Context, id string) (task.Task, error) Update(ctx context.Context, t task.Task) error List(ctx context.Context, offset, limit uint64) ([]task.Task, uint64, error) + ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) ListByJobID(ctx context.Context, jobID string) ([]task.Task, error) Delete(ctx context.Context, id string) error diff --git a/pkg/storage/sqlite/tasks.go b/pkg/storage/sqlite/tasks.go index 99c46427..43f4291d 100644 --- a/pkg/storage/sqlite/tasks.go +++ b/pkg/storage/sqlite/tasks.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" "github.com/absmach/propeller/pkg/task" @@ -45,15 +46,16 @@ type dbTask struct { RunIf *string `db:"run_if"` Kind *string `db:"kind"` Mode *string `db:"mode"` + Metadata []byte `db:"metadata"` } const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, - created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` + created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, metadata` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -85,6 +87,11 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { return task.Task{}, fmt.Errorf("marshal error: %w", err) } + metadata, err := jsonBytes(t.Metadata) + if err != nil { + return task.Task{}, fmt.Errorf("marshal error: %w", err) + } + _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), t.File, cliArgs, inputs, env, @@ -96,6 +103,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { nullString(t.WorkflowID), nullString(t.JobID), dependsOn, nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + metadata, ) if err != nil { return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err) @@ -126,7 +134,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?, results = ?, error = ?, monitoring_profile = ?, start_time = ?, finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?, - depends_on = ?, run_if = ?, kind = ?, mode = ? + depends_on = ?, run_if = ?, kind = ?, mode = ?, metadata = ? WHERE id = ?` cliArgs, err := jsonBytes(t.CLIArgs) @@ -159,6 +167,11 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { return fmt.Errorf("marshal error: %w", err) } + metadata, err := jsonBytes(t.Metadata) + if err != nil { + return fmt.Errorf("marshal error: %w", err) + } + _, err = r.db.ExecContext(ctx, query, t.Name, uint8(t.State), nullString(t.ImageURL), t.File, cliArgs, inputs, env, @@ -169,6 +182,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { t.UpdatedAt, nullString(t.WorkflowID), nullString(t.JobID), dependsOn, nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + metadata, t.ID, ) if err != nil { @@ -195,6 +209,42 @@ func (r *taskRepo) List(ctx context.Context, offset, limit uint64) ([]task.Task, return tasks, total, nil } +func (r *taskRepo) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset, limit uint64) ([]task.Task, uint64, error) { + whereClause, args := buildSQLiteMetadataWhere(filter) + countArgs := args + args = append(args, limit, offset) + + var total uint64 + if err := r.db.GetContext(ctx, &total, "SELECT COUNT(*) FROM tasks"+whereClause, countArgs...); err != nil { + return nil, 0, fmt.Errorf("%w: %w", ErrDBQuery, err) + } + + query := `SELECT ` + taskColumns + ` FROM tasks` + whereClause + ` ORDER BY created_at DESC LIMIT ? OFFSET ?` + tasks, err := r.scanTasks(ctx, query, args...) + if err != nil { + return nil, 0, err + } + + return tasks, total, nil +} + +func buildSQLiteMetadataWhere(filter map[string]string) (clause string, args []any) { + if len(filter) == 0 { + return "", nil + } + var sb strings.Builder + sb.WriteString(" WHERE metadata IS NOT NULL") + args = make([]any, 0, len(filter)) + for k, v := range filter { + sb.WriteString(` AND json_extract(metadata, '$."`) + sb.WriteString(k) + sb.WriteString(`"') = ?`) + args = append(args, v) + } + + return sb.String(), args +} + func (r *taskRepo) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { query := `SELECT ` + taskColumns + ` FROM tasks WHERE workflow_id = ? ORDER BY created_at` @@ -234,7 +284,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, &dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt, &dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf, - &dbt.Kind, &dbt.Mode, + &dbt.Kind, &dbt.Mode, &dbt.Metadata, ); err != nil { return nil, fmt.Errorf("%w: %w", ErrDBScan, err) } @@ -269,20 +319,14 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.ImageURL != nil { t.ImageURL = *dbt.ImageURL } - if dbt.CLIArgs != nil { - if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { + return task.Task{}, err } - if dbt.Inputs != nil { - if err := jsonUnmarshal(dbt.Inputs, &t.Inputs); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.Inputs, &t.Inputs); err != nil { + return task.Task{}, err } - if dbt.Env != nil { - if err := jsonUnmarshal(dbt.Env, &t.Env); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.Env, &t.Env); err != nil { + return task.Task{}, err } if dbt.KBSResourcePath != nil { t.KBSResourcePath = *dbt.KBSResourcePath @@ -290,18 +334,14 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.PropletID != nil { t.PropletID = *dbt.PropletID } - if dbt.Results != nil { - if err := jsonUnmarshal(dbt.Results, &t.Results); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.Results, &t.Results); err != nil { + return task.Task{}, err } if dbt.Error != nil { t.Error = *dbt.Error } - if dbt.MonitoringProfile != nil { - if err := jsonUnmarshal(dbt.MonitoringProfile, &t.MonitoringProfile); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.MonitoringProfile, &t.MonitoringProfile); err != nil { + return task.Task{}, err } if dbt.StartTime.Valid { t.StartTime = dbt.StartTime.Time @@ -315,10 +355,8 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.JobID != nil { t.JobID = *dbt.JobID } - if dbt.DependsOn != nil { - if err := jsonUnmarshal(dbt.DependsOn, &t.DependsOn); err != nil { - return task.Task{}, err - } + if err := jsonUnmarshal(dbt.DependsOn, &t.DependsOn); err != nil { + return task.Task{}, err } if dbt.RunIf != nil { t.RunIf = *dbt.RunIf @@ -329,6 +367,9 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.Mode != nil { t.Mode = task.Mode(*dbt.Mode) } + if err := jsonUnmarshal(dbt.Metadata, &t.Metadata); err != nil { + return task.Task{}, err + } return t, nil } From 9da4ca2a5e5860e00d7fddb19a0e77ca4ede86f1 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:22 +0300 Subject: [PATCH 4/8] feat(sdk): add MetadataFilter to PageMetadata --- pkg/sdk/sdk.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index c3cb6b28..8ce8df0e 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -11,8 +11,9 @@ import ( const CTJSON string = "application/json" type PageMetadata struct { - Offset uint64 `json:"offset"` - Limit uint64 `json:"limit"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` + MetadataFilter map[string]string `json:"metadata_filter,omitempty"` } type SDK interface { From ba4c969dc4e7cdce72340caf97c62e2cbed01305 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:29 +0300 Subject: [PATCH 5/8] feat(manager): add ListTasksByFilter and refactor publishStart --- manager/manager.go | 2 ++ manager/middleware/logging.go | 20 ++++++++++++++++++++ manager/middleware/metrics.go | 10 ++++++++++ manager/middleware/tracing.go | 11 +++++++++++ manager/service.go | 35 ++++++++++++++++++----------------- 5 files changed, 61 insertions(+), 17 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index e38fc7af..d1b45702 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -4,6 +4,7 @@ import ( "context" "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/task" ) @@ -22,6 +23,7 @@ type Service interface { StartJob(ctx context.Context, jobID string) error StopJob(ctx context.Context, jobID string) error ListTasks(ctx context.Context, offset, limit uint64) (task.TaskPage, error) + ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (task.TaskPage, error) UpdateTask(ctx context.Context, task task.Task) (task.Task, error) DeleteTask(ctx context.Context, taskID string) error StartTask(ctx context.Context, taskID string) error diff --git a/manager/middleware/logging.go b/manager/middleware/logging.go index 9ef334a3..a1edc194 100644 --- a/manager/middleware/logging.go +++ b/manager/middleware/logging.go @@ -7,6 +7,7 @@ import ( "github.com/absmach/propeller/manager" "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/task" ) @@ -165,6 +166,25 @@ func (lm *loggingMiddleware) ListTasks(ctx context.Context, offset, limit uint64 return lm.svc.ListTasks(ctx, offset, limit) } +func (lm *loggingMiddleware) ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (resp task.TaskPage, err error) { + defer func(begin time.Time) { + args := []any{ + slog.String("duration", time.Since(begin).String()), + slog.Uint64("offset", pm.Offset), + slog.Uint64("limit", pm.Limit), + } + if err != nil { + args = append(args, slog.Any("error", err)) + lm.logger.Warn("List tasks by filter failed", args...) + + return + } + lm.logger.Info("List tasks by filter completed successfully", args...) + }(time.Now()) + + return lm.svc.ListTasksByFilter(ctx, pm) +} + func (lm *loggingMiddleware) UpdateTask(ctx context.Context, t task.Task) (resp task.Task, err error) { defer func(begin time.Time) { args := []any{ diff --git a/manager/middleware/metrics.go b/manager/middleware/metrics.go index c4ae24db..fd4703e0 100644 --- a/manager/middleware/metrics.go +++ b/manager/middleware/metrics.go @@ -6,6 +6,7 @@ import ( "github.com/absmach/propeller/manager" "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/task" "github.com/go-kit/kit/metrics" ) @@ -89,6 +90,15 @@ func (mm *metricsMiddleware) ListTasks(ctx context.Context, offset, limit uint64 return mm.svc.ListTasks(ctx, offset, limit) } +func (mm *metricsMiddleware) ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (task.TaskPage, error) { + defer func(begin time.Time) { + mm.counter.With("method", "list-tasks-by-filter").Add(1) + mm.latency.With("method", "list-tasks-by-filter").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.ListTasksByFilter(ctx, pm) +} + func (mm *metricsMiddleware) UpdateTask(ctx context.Context, t task.Task) (task.Task, error) { defer func(begin time.Time) { mm.counter.With("method", "update-task").Add(1) diff --git a/manager/middleware/tracing.go b/manager/middleware/tracing.go index 3000a4f0..a876a47a 100644 --- a/manager/middleware/tracing.go +++ b/manager/middleware/tracing.go @@ -5,6 +5,7 @@ import ( "github.com/absmach/propeller/manager" "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/task" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -90,6 +91,16 @@ func (tm *tracing) ListTasks(ctx context.Context, offset, limit uint64) (resp ta return tm.svc.ListTasks(ctx, offset, limit) } +func (tm *tracing) ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (resp task.TaskPage, err error) { + ctx, span := tm.tracer.Start(ctx, "list-tasks-by-filter", trace.WithAttributes( + attribute.Int64("offset", int64(pm.Offset)), + attribute.Int64("limit", int64(pm.Limit)), + )) + defer span.End() + + return tm.svc.ListTasksByFilter(ctx, pm) +} + func (tm *tracing) UpdateTask(ctx context.Context, t task.Task) (resp task.Task, err error) { ctx, span := tm.tracer.Start(ctx, "update-task", trace.WithAttributes( attribute.String("id", resp.ID), diff --git a/manager/service.go b/manager/service.go index c06d40f1..cb660be1 100644 --- a/manager/service.go +++ b/manager/service.go @@ -23,6 +23,7 @@ import ( "github.com/absmach/propeller/pkg/mqtt" "github.com/absmach/propeller/pkg/proplet" "github.com/absmach/propeller/pkg/scheduler" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/storage" "github.com/absmach/propeller/pkg/task" "github.com/google/uuid" @@ -531,6 +532,20 @@ func (svc *service) ListTasks(ctx context.Context, offset, limit uint64) (task.T }, nil } +func (svc *service) ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (task.TaskPage, error) { + tasks, total, err := svc.taskRepo.ListByMetadataFilter(ctx, pm.MetadataFilter, pm.Offset, pm.Limit) + if err != nil { + return task.TaskPage{}, err + } + + return task.TaskPage{ + Offset: pm.Offset, + Limit: pm.Limit, + Total: total, + Tasks: tasks, + }, nil +} + func (svc *service) UpdateTask(ctx context.Context, t task.Task) (task.Task, error) { dbT, err := svc.GetTask(ctx, t.ID) if err != nil { @@ -1674,21 +1689,7 @@ func (svc *service) persistTaskBeforeStart(ctx context.Context, t *task.Task) er } func (svc *service) publishStart(ctx context.Context, t task.Task, propletID string) error { - payload := map[string]any{ - "id": t.ID, - "name": t.Name, - "state": t.State, - "image_url": t.ImageURL, - "file": t.File, - "inputs": t.Inputs, - "cli_args": t.CLIArgs, - "daemon": t.Daemon, - "env": t.Env, - "encrypted": t.Encrypted, - "kbs_resource_path": t.KBSResourcePath, - "monitoring_profile": t.MonitoringProfile, - "proplet_id": propletID, - } + t.PropletID = propletID if len(t.DependsOn) > 0 { parentResults, err := svc.GetParentResults(ctx, t.ID) @@ -1696,12 +1697,12 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str svc.logger.WarnContext(ctx, "failed to get parent results", "task_id", t.ID, "error", err) parentResults = make(map[string]any) } - payload["parent_results"] = parentResults + t.Results = parentResults } topic := svc.baseTopic + "/control/manager/start" - return svc.pubsub.Publish(ctx, topic, payload) + return svc.pubsub.Publish(ctx, topic, t) } func (svc *service) bumpPropletTaskCount(ctx context.Context, p proplet.Proplet, delta int64) error { From 4d3ac0d87f9ea814cbd6edd02716990724346c4a Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:34 +0300 Subject: [PATCH 6/8] feat(api): add metadata filter support to list tasks endpoint --- manager/api/endpoint.go | 42 ++++++++++++++++++++++++++-------------- manager/api/requests.go | 24 +++++++++++++++++++++++ manager/api/transport.go | 39 ++++++++++++++++++++++++++++++++++++- 3 files changed, 90 insertions(+), 15 deletions(-) diff --git a/manager/api/endpoint.go b/manager/api/endpoint.go index 3f68a577..7047b4c1 100644 --- a/manager/api/endpoint.go +++ b/manager/api/endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/absmach/propeller/manager" pkgerrors "github.com/absmach/propeller/pkg/errors" + "github.com/absmach/propeller/pkg/sdk" apiutil "github.com/absmach/supermq/api/http/util" "github.com/go-kit/kit/endpoint" ) @@ -224,22 +225,35 @@ func stopJobEndpoint(svc manager.Service) endpoint.Endpoint { func listTasksEndpoint(svc manager.Service) endpoint.Endpoint { return func(ctx context.Context, request any) (any, error) { - req, ok := request.(listEntityReq) - if !ok { + switch req := request.(type) { + case listEntityReq: + if err := req.validate(); err != nil { + return listTaskResponse{}, errors.Join(apiutil.ErrValidation, err) + } + tasks, err := svc.ListTasks(ctx, req.offset, req.limit) + if err != nil { + return listTaskResponse{}, err + } + + return listTaskResponse{TaskPage: tasks}, nil + case listTasksReq: + if err := req.validate(); err != nil { + return listTaskResponse{}, errors.Join(apiutil.ErrValidation, err) + } + pm := sdk.PageMetadata{ + Offset: req.offset, + Limit: req.limit, + MetadataFilter: req.metadataFilter, + } + tasks, err := svc.ListTasksByFilter(ctx, pm) + if err != nil { + return listTaskResponse{}, err + } + + return listTaskResponse{TaskPage: tasks}, nil + default: return listTaskResponse{}, errors.Join(apiutil.ErrValidation, pkgerrors.ErrInvalidData) } - if err := req.validate(); err != nil { - return listTaskResponse{}, errors.Join(apiutil.ErrValidation, err) - } - - tasks, err := svc.ListTasks(ctx, req.offset, req.limit) - if err != nil { - return listTaskResponse{}, err - } - - return listTaskResponse{ - TaskPage: tasks, - }, nil } } diff --git a/manager/api/requests.go b/manager/api/requests.go index 3be90bbe..6d031ddb 100644 --- a/manager/api/requests.go +++ b/manager/api/requests.go @@ -1,6 +1,8 @@ package api import ( + "encoding/json" + "errors" "fmt" "github.com/absmach/propeller/pkg/cron" @@ -8,6 +10,8 @@ import ( apiutil "github.com/absmach/supermq/api/http/util" ) +const maxMetadataBytes = 65536 + type taskReq struct { task.Task `json:",inline"` } @@ -31,6 +35,16 @@ func (t *taskReq) validate() error { return fmt.Errorf("priority must be between 0 and 100, got %d", t.Priority) } + if len(t.Metadata) > 0 { + b, err := json.Marshal(t.Metadata) + if err != nil { + return fmt.Errorf("invalid metadata: %w", err) + } + if len(b) > maxMetadataBytes { + return errors.New("metadata exceeds 64KB limit") + } + } + return nil } @@ -100,6 +114,16 @@ func (e *listEntityReq) validate() error { return nil } +type listTasksReq struct { + offset uint64 + limit uint64 + metadataFilter map[string]string +} + +func (r *listTasksReq) validate() error { + return nil +} + type metricsReq struct { id string offset, limit uint64 diff --git a/manager/api/transport.go b/manager/api/transport.go index faf0e260..67b4402a 100644 --- a/manager/api/transport.go +++ b/manager/api/transport.go @@ -7,6 +7,7 @@ import ( "io" "log/slog" "net/http" + "regexp" "strings" "github.com/absmach/propeller/manager" @@ -69,7 +70,7 @@ func MakeHandler(svc manager.Service, logger *slog.Logger, instanceID string) ht ), "create-task").ServeHTTP) r.Get("/", otelhttp.NewHandler(kithttp.NewServer( listTasksEndpoint(svc), - decodeListEntityReq, + decodeListTasksReq, api.EncodeResponse, opts..., ), "list-tasks").ServeHTTP) @@ -312,6 +313,42 @@ func decodeListEntityReq(_ context.Context, r *http.Request) (any, error) { }, nil } +var metadataKeyRe = regexp.MustCompile(`^[a-zA-Z0-9._/\-]+$`) + +func decodeListTasksReq(_ context.Context, r *http.Request) (any, error) { + o, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset) + if err != nil { + return nil, errors.Join(apiutil.ErrValidation, err) + } + + l, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit) + if err != nil { + return nil, errors.Join(apiutil.ErrValidation, err) + } + + filter := map[string]string{} + for key, vals := range r.URL.Query() { + if !strings.HasPrefix(key, "metadata[") || !strings.HasSuffix(key, "]") { + continue + } + metaKey := key[len("metadata[") : len(key)-1] + if !metadataKeyRe.MatchString(metaKey) { + return nil, errors.Join(apiutil.ErrValidation, errors.New("metadata key contains invalid characters")) + } + filter[metaKey] = vals[0] + } + + if len(filter) == 0 { + return listEntityReq{offset: o, limit: l}, nil + } + + return listTasksReq{ + offset: o, + limit: l, + metadataFilter: filter, + }, nil +} + func decodeMetricsReq(key string) kithttp.DecodeRequestFunc { return func(_ context.Context, r *http.Request) (any, error) { o, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset) From 568acd45075d0ec8d9b107daa217fb3af9922fd9 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:39 +0300 Subject: [PATCH 7/8] chore(mocks): regenerate mocks after interface changes --- manager/mocks/service.go | 67 ++++++++++++++++++++++ pkg/storage/mocks/task_repository.go | 86 ++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) diff --git a/manager/mocks/service.go b/manager/mocks/service.go index 77f39d3f..5f8bc62e 100644 --- a/manager/mocks/service.go +++ b/manager/mocks/service.go @@ -9,6 +9,7 @@ import ( "github.com/absmach/propeller/manager" "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/sdk" "github.com/absmach/propeller/pkg/task" mock "github.com/stretchr/testify/mock" ) @@ -1277,6 +1278,72 @@ func (_c *MockService_ListTasks_Call) RunAndReturn(run func(ctx context.Context, return _c } +// ListTasksByFilter provides a mock function for the type MockService +func (_mock *MockService) ListTasksByFilter(ctx context.Context, pm sdk.PageMetadata) (task.TaskPage, error) { + ret := _mock.Called(ctx, pm) + + if len(ret) == 0 { + panic("no return value specified for ListTasksByFilter") + } + + var r0 task.TaskPage + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, sdk.PageMetadata) (task.TaskPage, error)); ok { + return returnFunc(ctx, pm) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, sdk.PageMetadata) task.TaskPage); ok { + r0 = returnFunc(ctx, pm) + } else { + r0 = ret.Get(0).(task.TaskPage) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, sdk.PageMetadata) error); ok { + r1 = returnFunc(ctx, pm) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockService_ListTasksByFilter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListTasksByFilter' +type MockService_ListTasksByFilter_Call struct { + *mock.Call +} + +// ListTasksByFilter is a helper method to define mock.On call +// - ctx context.Context +// - pm sdk.PageMetadata +func (_e *MockService_Expecter) ListTasksByFilter(ctx interface{}, pm interface{}) *MockService_ListTasksByFilter_Call { + return &MockService_ListTasksByFilter_Call{Call: _e.mock.On("ListTasksByFilter", ctx, pm)} +} + +func (_c *MockService_ListTasksByFilter_Call) Run(run func(ctx context.Context, pm sdk.PageMetadata)) *MockService_ListTasksByFilter_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 sdk.PageMetadata + if args[1] != nil { + arg1 = args[1].(sdk.PageMetadata) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockService_ListTasksByFilter_Call) Return(taskPage task.TaskPage, err error) *MockService_ListTasksByFilter_Call { + _c.Call.Return(taskPage, err) + return _c +} + +func (_c *MockService_ListTasksByFilter_Call) RunAndReturn(run func(ctx context.Context, pm sdk.PageMetadata) (task.TaskPage, error)) *MockService_ListTasksByFilter_Call { + _c.Call.Return(run) + return _c +} + // PostFLUpdate provides a mock function for the type MockService func (_mock *MockService) PostFLUpdate(ctx context.Context, update manager.FLUpdate) error { ret := _mock.Called(ctx, update) diff --git a/pkg/storage/mocks/task_repository.go b/pkg/storage/mocks/task_repository.go index f699cdcb..790c9458 100644 --- a/pkg/storage/mocks/task_repository.go +++ b/pkg/storage/mocks/task_repository.go @@ -375,6 +375,92 @@ func (_c *MockTaskRepository_ListByJobID_Call) RunAndReturn(run func(ctx context return _c } +// ListByMetadataFilter provides a mock function for the type MockTaskRepository +func (_mock *MockTaskRepository) ListByMetadataFilter(ctx context.Context, filter map[string]string, offset uint64, limit uint64) ([]task.Task, uint64, error) { + ret := _mock.Called(ctx, filter, offset, limit) + + if len(ret) == 0 { + panic("no return value specified for ListByMetadataFilter") + } + + var r0 []task.Task + var r1 uint64 + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, map[string]string, uint64, uint64) ([]task.Task, uint64, error)); ok { + return returnFunc(ctx, filter, offset, limit) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, map[string]string, uint64, uint64) []task.Task); ok { + r0 = returnFunc(ctx, filter, offset, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]task.Task) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, map[string]string, uint64, uint64) uint64); ok { + r1 = returnFunc(ctx, filter, offset, limit) + } else { + r1 = ret.Get(1).(uint64) + } + if returnFunc, ok := ret.Get(2).(func(context.Context, map[string]string, uint64, uint64) error); ok { + r2 = returnFunc(ctx, filter, offset, limit) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 +} + +// MockTaskRepository_ListByMetadataFilter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListByMetadataFilter' +type MockTaskRepository_ListByMetadataFilter_Call struct { + *mock.Call +} + +// ListByMetadataFilter is a helper method to define mock.On call +// - ctx context.Context +// - filter map[string]string +// - offset uint64 +// - limit uint64 +func (_e *MockTaskRepository_Expecter) ListByMetadataFilter(ctx interface{}, filter interface{}, offset interface{}, limit interface{}) *MockTaskRepository_ListByMetadataFilter_Call { + return &MockTaskRepository_ListByMetadataFilter_Call{Call: _e.mock.On("ListByMetadataFilter", ctx, filter, offset, limit)} +} + +func (_c *MockTaskRepository_ListByMetadataFilter_Call) Run(run func(ctx context.Context, filter map[string]string, offset uint64, limit uint64)) *MockTaskRepository_ListByMetadataFilter_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 map[string]string + if args[1] != nil { + arg1 = args[1].(map[string]string) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + var arg3 uint64 + if args[3] != nil { + arg3 = args[3].(uint64) + } + run( + arg0, + arg1, + arg2, + arg3, + ) + }) + return _c +} + +func (_c *MockTaskRepository_ListByMetadataFilter_Call) Return(tasks []task.Task, v uint64, err error) *MockTaskRepository_ListByMetadataFilter_Call { + _c.Call.Return(tasks, v, err) + return _c +} + +func (_c *MockTaskRepository_ListByMetadataFilter_Call) RunAndReturn(run func(ctx context.Context, filter map[string]string, offset uint64, limit uint64) ([]task.Task, uint64, error)) *MockTaskRepository_ListByMetadataFilter_Call { + _c.Call.Return(run) + return _c +} + // ListByWorkflowID provides a mock function for the type MockTaskRepository func (_mock *MockTaskRepository) ListByWorkflowID(ctx context.Context, workflowID string) ([]task.Task, error) { ret := _mock.Called(ctx, workflowID) From 2082b35183ae0a1a03e88bc0f8041d76facca0ab Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 30 Mar 2026 13:36:45 +0300 Subject: [PATCH 8/8] fix(http-server): fix clippy warnings --- examples/http-server/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/http-server/src/lib.rs b/examples/http-server/src/lib.rs index d96ac677..13b34fb9 100644 --- a/examples/http-server/src/lib.rs +++ b/examples/http-server/src/lib.rs @@ -116,7 +116,7 @@ fn headers_response(request: IncomingRequest, response_out: ResponseOutparam) { let out_headers = Fields::new(); out_headers - .set(&"content-type".to_string(), &[b"text/plain".to_vec()]) + .set("content-type", &[b"text/plain".to_vec()]) .unwrap(); let response = OutgoingResponse::new(out_headers); response.set_status_code(200).unwrap(); @@ -152,9 +152,8 @@ fn read_body(request: IncomingRequest) -> Vec { Ok(chunk) => data.extend_from_slice(&chunk), Err(_) => break, } - match stream.read(0) { - Err(_) => break, - Ok(_) => {} + if stream.read(0).is_err() { + break; } } data