Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions manager/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/absmach/propeller/pkg/cron"
pkgerrors "github.com/absmach/propeller/pkg/errors"
"github.com/absmach/propeller/pkg/task"
apiutil "github.com/absmach/supermq/api/http/util"
)
Expand Down Expand Up @@ -31,6 +32,10 @@ func (t *taskReq) validate() error {
return fmt.Errorf("priority must be between 0 and 100, got %d", t.Priority)
}

if t.Broadcast && t.PropletID != "" {
return fmt.Errorf("%w: broadcast and proplet_id are mutually exclusive", pkgerrors.ErrInvalidValue)
}

return nil
}

Expand Down
15 changes: 14 additions & 1 deletion manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,17 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error {
}
}

if t.Broadcast {
if err := svc.persistTaskBeforeStart(ctx, &t); err != nil {
return err
}
if err := svc.publishStart(ctx, t, ""); err != nil {
return err
}

return svc.markTaskRunning(ctx, &t)
}

var p proplet.Proplet
switch t.PropletID {
case "":
Expand Down Expand Up @@ -1687,7 +1698,9 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str
"encrypted": t.Encrypted,
"kbs_resource_path": t.KBSResourcePath,
"monitoring_profile": t.MonitoringProfile,
"proplet_id": propletID,
}
if propletID != "" {
payload["proplet_id"] = propletID
}

if len(t.DependsOn) > 0 {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ func (db *Database) Migrate() error {
`ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`,
},
},
{
Id: "4_add_broadcast_column",
Up: []string{
`ALTER TABLE tasks ADD COLUMN IF NOT EXISTS broadcast BOOLEAN NOT NULL DEFAULT FALSE`,
},
Down: []string{
`ALTER TABLE tasks DROP COLUMN IF EXISTS broadcast`,
},
},
},
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/postgres/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ type dbTask struct {
RunIf *string `db:"run_if"`
Kind *string `db:"kind"`
Mode *string `db:"mode"`
Broadcast bool `db:"broadcast"`
}

const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted,
kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time,
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode`
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, broadcast`

func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
query := `INSERT INTO tasks (` + taskColumns + `)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)`
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)`

cliArgs, err := jsonBytes(t.CLIArgs)
if err != nil {
Expand Down Expand Up @@ -105,6 +106,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
nullString(t.RunIf),
nullString(string(t.Kind)),
nullString(string(t.Mode)),
t.Broadcast,
)
if err != nil {
return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err)
Expand Down Expand Up @@ -135,7 +137,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12,
results = $13, error = $14, monitoring_profile = $15, start_time = $16,
finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20,
depends_on = $21, run_if = $22, kind = $23, mode = $24
depends_on = $21, run_if = $22, kind = $23, mode = $24, broadcast = $25
WHERE id = $1`

cliArgs, err := jsonBytes(t.CLIArgs)
Expand Down Expand Up @@ -188,6 +190,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
nullString(t.RunIf),
nullString(string(t.Kind)),
nullString(string(t.Mode)),
t.Broadcast,
)
if err != nil {
return fmt.Errorf("%w: %w", ErrUpdate, err)
Expand Down Expand Up @@ -252,7 +255,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([]
&dbt.Results, &dbt.Error, &dbt.MonitoringProfile,
&dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt,
&dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf,
&dbt.Kind, &dbt.Mode,
&dbt.Kind, &dbt.Mode, &dbt.Broadcast,
); err != nil {
return nil, fmt.Errorf("%w: %w", ErrDBScan, err)
}
Expand Down Expand Up @@ -335,6 +338,7 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) {
if dbt.Mode != nil {
t.Mode = task.Mode(*dbt.Mode)
}
t.Broadcast = dbt.Broadcast

return t, nil
}
9 changes: 9 additions & 0 deletions pkg/storage/sqlite/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (db *Database) Migrate() error {
`ALTER TABLE proplets DROP COLUMN metadata`,
},
},
{
Id: "4_add_broadcast_column",
Up: []string{
`ALTER TABLE tasks ADD COLUMN broadcast INTEGER NOT NULL DEFAULT 0`,
},
Down: []string{
`ALTER TABLE tasks DROP COLUMN broadcast`,
},
},
},
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/sqlite/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type dbTask struct {
RunIf *string `db:"run_if"`
Kind *string `db:"kind"`
Mode *string `db:"mode"`
Broadcast bool `db:"broadcast"`
}

const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted,
kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time,
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode`
created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode, broadcast`

func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
query := `INSERT INTO tasks (` + taskColumns + `)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

cliArgs, err := jsonBytes(t.CLIArgs)
if err != nil {
Expand Down Expand Up @@ -96,6 +97,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) {
nullString(t.WorkflowID), nullString(t.JobID),
dependsOn, nullString(t.RunIf),
nullString(string(t.Kind)), nullString(string(t.Mode)),
t.Broadcast,
)
if err != nil {
return task.Task{}, fmt.Errorf("%w: %w", ErrCreate, err)
Expand Down Expand Up @@ -126,7 +128,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?,
results = ?, error = ?, monitoring_profile = ?, start_time = ?,
finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?,
depends_on = ?, run_if = ?, kind = ?, mode = ?
depends_on = ?, run_if = ?, kind = ?, mode = ?, broadcast = ?
WHERE id = ?`

cliArgs, err := jsonBytes(t.CLIArgs)
Expand Down Expand Up @@ -169,6 +171,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error {
t.UpdatedAt, nullString(t.WorkflowID), nullString(t.JobID),
dependsOn, nullString(t.RunIf),
nullString(string(t.Kind)), nullString(string(t.Mode)),
t.Broadcast,
t.ID,
)
if err != nil {
Expand Down Expand Up @@ -234,7 +237,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([]
&dbt.Results, &dbt.Error, &dbt.MonitoringProfile,
&dbt.StartTime, &dbt.FinishTime, &dbt.CreatedAt, &dbt.UpdatedAt,
&dbt.WorkflowID, &dbt.JobID, &dbt.DependsOn, &dbt.RunIf,
&dbt.Kind, &dbt.Mode,
&dbt.Kind, &dbt.Mode, &dbt.Broadcast,
); err != nil {
return nil, fmt.Errorf("%w: %w", ErrDBScan, err)
}
Expand Down Expand Up @@ -329,6 +332,7 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) {
if dbt.Mode != nil {
t.Mode = task.Mode(*dbt.Mode)
}
t.Broadcast = dbt.Broadcast

return t, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type Task struct {
IsRecurring bool `json:"is_recurring,omitempty"`
Timezone string `json:"timezone,omitempty"`
Priority int `json:"priority,omitempty"`
Broadcast bool `json:"broadcast,omitempty"`
}

type TaskPage struct {
Expand Down
6 changes: 6 additions & 0 deletions proplet/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ impl PropletService {
})?;
req.validate()?;

if let Some(target_id) = &req.proplet_id {
if target_id != &self.config.client_id {
return Ok(());
}
}

info!("Received start command for task: {}", req.id);

let runtime = if req.encrypted {
Expand Down