diff --git a/manager/api/requests.go b/manager/api/requests.go index 3be90bbe..8e6b4fb0 100644 --- a/manager/api/requests.go +++ b/manager/api/requests.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/absmach/propeller/pkg/cron" + pkgerrors "github.com/absmach/propeller/pkg/errors" "github.com/absmach/propeller/pkg/task" apiutil "github.com/absmach/supermq/api/http/util" ) @@ -31,6 +32,10 @@ func (t *taskReq) validate() error { return fmt.Errorf("priority must be between 0 and 100, got %d", t.Priority) } + if t.Broadcast && t.PropletID != "" { + return fmt.Errorf("%w: broadcast and proplet_id are mutually exclusive", pkgerrors.ErrInvalidValue) + } + return nil } diff --git a/manager/service.go b/manager/service.go index c06d40f1..21c2ca04 100644 --- a/manager/service.go +++ b/manager/service.go @@ -624,6 +624,17 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { } } + if t.Broadcast { + if err := svc.persistTaskBeforeStart(ctx, &t); err != nil { + return err + } + if err := svc.publishStart(ctx, t, ""); err != nil { + return err + } + + return svc.markTaskRunning(ctx, &t) + } + var p proplet.Proplet switch t.PropletID { case "": @@ -1687,7 +1698,9 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str "encrypted": t.Encrypted, "kbs_resource_path": t.KBSResourcePath, "monitoring_profile": t.MonitoringProfile, - "proplet_id": propletID, + } + if propletID != "" { + payload["proplet_id"] = propletID } if len(t.DependsOn) > 0 { diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 6465d4b6..d3bcb2a0 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -250,6 +250,15 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`, }, }, + { + Id: "4_add_broadcast_column", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS broadcast BOOLEAN NOT NULL DEFAULT FALSE`, + }, + Down: []string{ + `ALTER TABLE tasks DROP COLUMN IF EXISTS broadcast`, + }, + }, }, } diff --git a/pkg/storage/postgres/tasks.go b/pkg/storage/postgres/tasks.go index 4fb4b685..e511c926 100644 --- a/pkg/storage/postgres/tasks.go +++ b/pkg/storage/postgres/tasks.go @@ -43,15 +43,16 @@ type dbTask struct { RunIf *string `db:"run_if"` Kind *string `db:"kind"` Mode *string `db:"mode"` + Broadcast bool `db:"broadcast"` } const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, - created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` + created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, broadcast` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -105,6 +106,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + t.Broadcast, ) if err != nil { return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err) @@ -135,7 +137,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12, results = $13, error = $14, monitoring_profile = $15, start_time = $16, finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20, - depends_on = $21, run_if = $22, kind = $23, mode = $24 + depends_on = $21, run_if = $22, kind = $23, mode = $24, broadcast = $25 WHERE id = $1` cliArgs, err := jsonBytes(t.CLIArgs) @@ -188,6 +190,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + t.Broadcast, ) if err != nil { return fmt.Errorf("%w: %w", ErrUpdate, err) @@ -252,7 +255,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, &dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt, &dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf, - &dbt.Kind, &dbt.Mode, + &dbt.Kind, &dbt.Mode, &dbt.Broadcast, ); err != nil { return nil, fmt.Errorf("%w: %w", ErrDBScan, err) } @@ -335,6 +338,7 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.Mode != nil { t.Mode = task.Mode(*dbt.Mode) } + t.Broadcast = dbt.Broadcast return t, nil } diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index 2ad7da03..bb410b0d 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -243,6 +243,15 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN metadata`, }, }, + { + Id: "4_add_broadcast_column", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN broadcast INTEGER NOT NULL DEFAULT 0`, + }, + Down: []string{ + `ALTER TABLE tasks DROP COLUMN broadcast`, + }, + }, }, } diff --git a/pkg/storage/sqlite/tasks.go b/pkg/storage/sqlite/tasks.go index 99c46427..e01e3769 100644 --- a/pkg/storage/sqlite/tasks.go +++ b/pkg/storage/sqlite/tasks.go @@ -45,15 +45,16 @@ type dbTask struct { RunIf *string `db:"run_if"` Kind *string `db:"kind"` Mode *string `db:"mode"` + Broadcast bool `db:"broadcast"` } const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, - created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` + created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, broadcast` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -96,6 +97,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { nullString(t.WorkflowID), nullString(t.JobID), dependsOn, nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + t.Broadcast, ) if err != nil { return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err) @@ -126,7 +128,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?, results = ?, error = ?, monitoring_profile = ?, start_time = ?, finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?, - depends_on = ?, run_if = ?, kind = ?, mode = ? + depends_on = ?, run_if = ?, kind = ?, mode = ?, broadcast = ? WHERE id = ?` cliArgs, err := jsonBytes(t.CLIArgs) @@ -169,6 +171,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { t.UpdatedAt, nullString(t.WorkflowID), nullString(t.JobID), dependsOn, nullString(t.RunIf), nullString(string(t.Kind)), nullString(string(t.Mode)), + t.Broadcast, t.ID, ) if err != nil { @@ -234,7 +237,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, &dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt, &dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf, - &dbt.Kind, &dbt.Mode, + &dbt.Kind, &dbt.Mode, &dbt.Broadcast, ); err != nil { return nil, fmt.Errorf("%w: %w", ErrDBScan, err) } @@ -329,6 +332,7 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.Mode != nil { t.Mode = task.Mode(*dbt.Mode) } + t.Broadcast = dbt.Broadcast return t, nil } diff --git a/pkg/task/task.go b/pkg/task/task.go index 32961ce0..c90bd35f 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -120,6 +120,7 @@ type Task struct { IsRecurring bool `json:"is_recurring,omitempty"` Timezone string `json:"timezone,omitempty"` Priority int `json:"priority,omitempty"` + Broadcast bool `json:"broadcast,omitempty"` } type TaskPage struct { diff --git a/proplet/src/service.rs b/proplet/src/service.rs index aa9e1775..987a1419 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -430,6 +430,12 @@ impl PropletService { })?; req.validate()?; + if let Some(target_id) = &req.proplet_id { + if target_id != &self.config.client_id { + return Ok(()); + } + } + info!("Received start command for task: {}", req.id); let runtime = if req.encrypted {