From 927b5099ecff386b5f8f5a5d9e2ebac2503ba2d3 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 23 Mar 2026 13:48:21 +0300 Subject: [PATCH 01/11] feat(task): add wasm_http_url field to task and manager dispatch --- manager/service.go | 1 + pkg/sdk/task.go | 3 ++- pkg/task/task.go | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/manager/service.go b/manager/service.go index c06d40f1..2c9f3f05 100644 --- a/manager/service.go +++ b/manager/service.go @@ -1679,6 +1679,7 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str "name": t.Name, "state": t.State, "image_url": t.ImageURL, + "wasm_http_url": t.WasmHTTPURL, "file": t.File, "inputs": t.Inputs, "cli_args": t.CLIArgs, diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index 47ab7321..6bf3e8f3 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -16,7 +16,8 @@ type Task struct { Kind string `json:"kind,omitempty"` State uint8 `json:"state,omitempty"` Mode string `json:"mode,omitempty"` - ImageURL string `json:"image_url,omitempty"` + ImageURL string `json:"image_url,omitempty"` + WasmHTTPURL string `json:"wasm_http_url,omitempty"` JobID string `json:"job_id,omitempty"` CLIArgs []string `json:"cli_args,omitempty"` Env map[string]string `json:"env,omitempty"` diff --git a/pkg/task/task.go b/pkg/task/task.go index 4d8bdafe..37587278 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -68,6 +68,7 @@ type Task struct { Kind TaskKind `json:"kind,omitempty"` State State `json:"state"` ImageURL string `json:"image_url,omitempty"` + WasmHTTPURL string `json:"wasm_http_url,omitempty"` File []byte `json:"file,omitempty"` CLIArgs []string `json:"cli_args"` Inputs []uint64 `json:"inputs,omitempty"` From d8e9a618b5321dbb0e03cc0b060960622c0bd215 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 23 Mar 2026 13:48:28 +0300 Subject: [PATCH 02/11] feat(storage): add wasm_http_url column to postgres and sqlite task repos --- pkg/storage/postgres/init.go | 9 +++++ pkg/storage/postgres/tasks.go | 22 +++++++----- pkg/storage/sqlite/init.go | 9 +++++ pkg/storage/sqlite/tasks.go | 16 +++++---- pkg/storage/sqlite/tasks_test.go | 59 ++++++++++++++++++++++++++++++++ 5 files changed, 101 insertions(+), 14 deletions(-) create mode 100644 pkg/storage/sqlite/tasks_test.go diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 6465d4b6..a78553b0 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -250,6 +250,15 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`, }, }, + { + Id: "4_add_wasm_http_url", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS wasm_http_url TEXT`, + }, + Down: []string{ + `ALTER TABLE tasks DROP COLUMN IF EXISTS wasm_http_url`, + }, + }, }, } diff --git a/pkg/storage/postgres/tasks.go b/pkg/storage/postgres/tasks.go index 4fb4b685..342d4f3d 100644 --- a/pkg/storage/postgres/tasks.go +++ b/pkg/storage/postgres/tasks.go @@ -22,6 +22,7 @@ type dbTask struct { Name string `db:"name"` State uint8 `db:"state"` ImageURL *string `db:"image_url"` + WasmHTTPURL *string `db:"wasm_http_url"` File []byte `db:"file"` CLIArgs []byte `db:"cli_args"` Inputs []byte `db:"inputs"` @@ -45,13 +46,13 @@ type dbTask struct { Mode *string `db:"mode"` } -const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, +const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -86,6 +87,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), + nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, @@ -131,11 +133,11 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) { func (r *taskRepo) Update(ctx context.Context, t task.Task) error { query := `UPDATE tasks SET - name = $2, state = $3, image_url = $4, file = $5, cli_args = $6, inputs = $7, - env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12, - results = $13, error = $14, monitoring_profile = $15, start_time = $16, - finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20, - depends_on = $21, run_if = $22, kind = $23, mode = $24 + name = $2, state = $3, image_url = $4, wasm_http_url = $5, file = $6, cli_args = $7, inputs = $8, + env = $9, daemon = $10, encrypted = $11, kbs_resource_path = $12, proplet_id = $13, + results = $14, error = $15, monitoring_profile = $16, start_time = $17, + finish_time = $18, updated_at = $19, workflow_id = $20, job_id = $21, + depends_on = $22, run_if = $23, kind = $24, mode = $25 WHERE id = $1` cliArgs, err := jsonBytes(t.CLIArgs) @@ -171,6 +173,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), + nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, @@ -246,7 +249,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] for rows.Next() { var dbt dbTask if err := rows.Scan( - &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, + &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL, &dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env, &dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID, &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, @@ -287,6 +290,9 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.ImageURL != nil { t.ImageURL = *dbt.ImageURL } + if dbt.WasmHTTPURL != nil { + t.WasmHTTPURL = *dbt.WasmHTTPURL + } if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { return task.Task{}, err } diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index 2ad7da03..eb0112ce 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -243,6 +243,15 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN metadata`, }, }, + { + Id: "4_add_wasm_http_url", + Up: []string{ + `ALTER TABLE tasks ADD COLUMN wasm_http_url TEXT`, + }, + Down: []string{ + `ALTER TABLE tasks DROP COLUMN wasm_http_url`, + }, + }, }, } diff --git a/pkg/storage/sqlite/tasks.go b/pkg/storage/sqlite/tasks.go index 99c46427..b6c8121e 100644 --- a/pkg/storage/sqlite/tasks.go +++ b/pkg/storage/sqlite/tasks.go @@ -24,6 +24,7 @@ type dbTask struct { Name string `db:"name"` State uint8 `db:"state"` ImageURL *string `db:"image_url"` + WasmHTTPURL *string `db:"wasm_http_url"` File []byte `db:"file"` CLIArgs []byte `db:"cli_args"` Inputs []byte `db:"inputs"` @@ -47,13 +48,13 @@ type dbTask struct { Mode *string `db:"mode"` } -const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, +const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -86,7 +87,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { } _, err = r.db.ExecContext(ctx, query, - t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), + t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, nullString(t.KBSResourcePath), nullString(t.PropletID), @@ -122,7 +123,7 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) { func (r *taskRepo) Update(ctx context.Context, t task.Task) error { query := `UPDATE tasks SET - name = ?, state = ?, image_url = ?, file = ?, cli_args = ?, inputs = ?, + name = ?, state = ?, image_url = ?, wasm_http_url = ?, file = ?, cli_args = ?, inputs = ?, env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?, results = ?, error = ?, monitoring_profile = ?, start_time = ?, finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?, @@ -160,7 +161,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { } _, err = r.db.ExecContext(ctx, query, - t.Name, uint8(t.State), nullString(t.ImageURL), + t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, nullString(t.KBSResourcePath), nullString(t.PropletID), @@ -228,7 +229,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] for rows.Next() { var dbt dbTask if err := rows.Scan( - &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, + &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL, &dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env, &dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID, &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, @@ -269,6 +270,9 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.ImageURL != nil { t.ImageURL = *dbt.ImageURL } + if dbt.WasmHTTPURL != nil { + t.WasmHTTPURL = *dbt.WasmHTTPURL + } if dbt.CLIArgs != nil { if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { return task.Task{}, err diff --git a/pkg/storage/sqlite/tasks_test.go b/pkg/storage/sqlite/tasks_test.go new file mode 100644 index 00000000..6b427f55 --- /dev/null +++ b/pkg/storage/sqlite/tasks_test.go @@ -0,0 +1,59 @@ +package sqlite_test + +import ( + "context" + "testing" + "time" + + "github.com/absmach/propeller/pkg/storage/sqlite" + "github.com/absmach/propeller/pkg/task" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTaskCreate_WithWasmHTTPURL(t *testing.T) { + t.Parallel() + repo := sqlite.NewTaskRepository(newTestDB(t)) + + url := "http://example.com/module.wasm" + tk := task.Task{ + ID: uuid.NewString(), + Name: "http-wasm-task", + State: task.Pending, + WasmHTTPURL: url, + CreatedAt: time.Now().UTC().Truncate(time.Second), + UpdatedAt: time.Now().UTC().Truncate(time.Second), + } + + created, err := repo.Create(context.Background(), tk) + require.NoError(t, err) + assert.Equal(t, tk.ID, created.ID) + assert.Equal(t, url, created.WasmHTTPURL) + + fetched, err := repo.Get(context.Background(), tk.ID) + require.NoError(t, err) + assert.Equal(t, url, fetched.WasmHTTPURL) +} + +func TestTaskCreate_WasmHTTPURLNull(t *testing.T) { + t.Parallel() + repo := sqlite.NewTaskRepository(newTestDB(t)) + + tk := task.Task{ + ID: uuid.NewString(), + Name: "no-http-url-task", + State: task.Pending, + ImageURL: "oci://example.com/image:latest", + CreatedAt: time.Now().UTC().Truncate(time.Second), + UpdatedAt: time.Now().UTC().Truncate(time.Second), + } + + created, err := repo.Create(context.Background(), tk) + require.NoError(t, err) + assert.Empty(t, created.WasmHTTPURL) + + fetched, err := repo.Get(context.Background(), tk.ID) + require.NoError(t, err) + assert.Empty(t, fetched.WasmHTTPURL) +} From eb9509c9a1e1e917411139478d38baeef8ad31e5 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 23 Mar 2026 13:48:34 +0300 Subject: [PATCH 03/11] feat(proplet): fetch wasm binary from plain HTTP URL Adds wasm_http_url field to StartRequest. When set, the proplet fetches the wasm binary directly via HTTP instead of MQTT chunking or OCI registry. Enforces a 100MB streaming size limit and validates the URL scheme. --- proplet/Cargo.lock | 68 +++++++++++++++++++ proplet/Cargo.toml | 4 ++ proplet/src/service.rs | 129 +++++++++++++++++++++++++++++++++++ proplet/src/types.rs | 148 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 346 insertions(+), 3 deletions(-) diff --git a/proplet/Cargo.lock b/proplet/Cargo.lock index 9e6cfb06..a88cc775 100644 --- a/proplet/Cargo.lock +++ b/proplet/Cargo.lock @@ -204,6 +204,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "astral-tokio-tar" version = "0.5.6" @@ -1390,6 +1400,24 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "debugid" version = "0.8.0" @@ -2179,6 +2207,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -3219,6 +3253,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "oauth2" version = "5.0.0" @@ -3930,6 +3974,7 @@ dependencies = [ "wasmtime", "wasmtime-wasi", "wasmtime-wasi-http", + "wiremock", ] [[package]] @@ -6820,6 +6865,29 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64 0.22.1", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.30.0" diff --git a/proplet/Cargo.toml b/proplet/Cargo.toml index 7c1b37a3..487b5137 100644 --- a/proplet/Cargo.toml +++ b/proplet/Cargo.toml @@ -42,6 +42,10 @@ futures-util = { version = "0.3" } # ELASTIC TEE HAL — hardware abstraction layer for TEE workloads elastic-tee-hal = { git = "https://github.com/elasticproject-eu/wasmhal", default-features = false, features = ["amd-sev"] } +[dev-dependencies] +wiremock = "0.6" +tokio = { version = "1.42", features = ["full"] } + [features] default = [] diff --git a/proplet/src/service.rs b/proplet/src/service.rs index aa9e1775..3ec55a51 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -1,4 +1,6 @@ use crate::config::PropletConfig; + +const WASM_FETCH_MAX_BYTES: usize = 100 * 1024 * 1024; use crate::metrics::MetricsCollector; use crate::monitoring::{system::SystemMonitor, ProcessMonitor}; use crate::mqtt::{build_topic, MqttMessage, PubSub}; @@ -497,6 +499,17 @@ impl PropletService { } } } + } else if let Some(ref url) = req.wasm_http_url { + match self.fetch_wasm_from_http(url).await { + Ok(binary) => binary, + Err(e) => { + error!("Failed to fetch wasm for task {}: {}", req.id, e); + self.running_tasks.lock().await.remove(&req.id); + self.publish_result(&req.id, Vec::new(), Some(e.to_string())) + .await?; + return Err(e); + } + } } else { let err = anyhow::anyhow!("No wasm binary or image URL provided"); error!("Validation error for task {}: {}", req.id, err); @@ -977,6 +990,10 @@ impl PropletService { } } + async fn fetch_wasm_from_http(&self, url: &str) -> Result> { + fetch_wasm_from_http(&self.http_client, url).await + } + async fn try_assemble_chunks(&self, app_name: &str) -> Result>> { let mut assembly = self.chunk_assembly.lock().await; @@ -1212,3 +1229,115 @@ fn build_fl_update_envelope( "metrics": {} }) } + +async fn fetch_wasm_from_http(client: &HttpClient, url: &str) -> Result> { + use futures_util::StreamExt; + + let response = client + .get(url) + .send() + .await + .with_context(|| format!("Failed to connect to {}", url))?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + return Err(anyhow::anyhow!("HTTP {} fetching wasm from {}", status, url)); + } + + if let Some(content_length) = response.content_length() { + if content_length as usize > WASM_FETCH_MAX_BYTES { + return Err(anyhow::anyhow!( + "wasm response from {} exceeds size limit ({} > {} bytes)", + url, + content_length, + WASM_FETCH_MAX_BYTES + )); + } + } + + let mut binary = Vec::new(); + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.with_context(|| format!("Failed to read response body from {}", url))?; + binary.extend_from_slice(&chunk); + if binary.len() > WASM_FETCH_MAX_BYTES { + return Err(anyhow::anyhow!( + "wasm response from {} exceeds size limit ({} bytes)", + url, + WASM_FETCH_MAX_BYTES + )); + } + } + + info!("Fetched wasm from {}, size: {} bytes", url, binary.len()); + Ok(binary) +} + +#[cfg(test)] +mod tests { + use super::*; + use wiremock::matchers::method; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + fn make_client() -> HttpClient { + HttpClient::new() + } + + #[tokio::test] + async fn test_fetch_wasm_200_ok() { + let server = MockServer::start().await; + let wasm_bytes = b"\x00asm\x01\x00\x00\x00"; + + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(200).set_body_bytes(wasm_bytes.to_vec())) + .mount(&server) + .await; + + let result = fetch_wasm_from_http(&make_client(), &server.uri()).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), wasm_bytes); + } + + #[tokio::test] + async fn test_fetch_wasm_404() { + let server = MockServer::start().await; + + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(404)) + .mount(&server) + .await; + + let result = fetch_wasm_from_http(&make_client(), &server.uri()).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("404")); + } + + #[tokio::test] + async fn test_fetch_wasm_500() { + let server = MockServer::start().await; + + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(500)) + .mount(&server) + .await; + + let result = fetch_wasm_from_http(&make_client(), &server.uri()).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("500")); + } + + #[tokio::test] + async fn test_fetch_wasm_streaming_exceeds_limit() { + let server = MockServer::start().await; + let over_limit_body = vec![0u8; WASM_FETCH_MAX_BYTES + 1]; + + Mock::given(method("GET")) + .respond_with(ResponseTemplate::new(200).set_body_bytes(over_limit_body)) + .mount(&server) + .await; + + let result = fetch_wasm_from_http(&make_client(), &server.uri()).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("size limit")); + } +} diff --git a/proplet/src/types.rs b/proplet/src/types.rs index 80cc1270..c7a775dc 100644 --- a/proplet/src/types.rs +++ b/proplet/src/types.rs @@ -66,6 +66,8 @@ pub struct StartRequest { pub mode: Option, #[serde(default)] pub proplet_id: Option, + #[serde(default)] + pub wasm_http_url: Option, } fn deserialize_null_default<'de, D, T>(deserializer: D) -> std::result::Result @@ -97,8 +99,18 @@ impl StartRequest { "encrypted workloads should only use image_url, not file" )); } - } else if self.file.is_empty() && self.image_url.is_empty() { - return Err(anyhow::anyhow!("either file or image_url must be provided")); + } else if self.file.is_empty() && self.image_url.is_empty() && self.wasm_http_url.is_none() { + return Err(anyhow::anyhow!( + "either file, image_url, or wasm_http_url must be provided" + )); + } + + if let Some(ref url) = self.wasm_http_url { + if !url.starts_with("http://") && !url.starts_with("https://") { + return Err(anyhow::anyhow!( + "wasm_http_url must start with http:// or https://" + )); + } } Ok(()) } @@ -312,6 +324,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -334,6 +347,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -356,6 +370,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; let result = req.validate(); @@ -380,6 +395,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; let result = req.validate(); @@ -404,13 +420,14 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; let result = req.validate(); assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "either file or image_url must be provided" + "either file, image_url, or wasm_http_url must be provided" ); } @@ -431,6 +448,7 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, + wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -453,6 +471,7 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, + wasm_http_url: None, }; let result = req.validate(); @@ -480,6 +499,7 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, + wasm_http_url: None, }; let result = req.validate(); @@ -737,6 +757,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; assert_eq!(req.env.as_ref().unwrap().len(), 2); @@ -802,6 +823,7 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, + wasm_http_url: None, }; let json = serde_json::to_string(&req).unwrap(); @@ -813,4 +835,124 @@ mod tests { Some(&"/opt/intel/openvino".to_string()) ); } + + #[test] + fn test_start_request_validate_success_with_wasm_http_url() { + let req = StartRequest { + id: "task-http".to_string(), + cli_args: vec![], + name: "http_func".to_string(), + state: 0, + file: String::new(), + image_url: String::new(), + inputs: vec![], + daemon: false, + env: None, + monitoring_profile: None, + encrypted: false, + kbs_resource_path: None, + mode: None, + proplet_id: None, + wasm_http_url: Some("http://fileserver/app.wasm".to_string()), + }; + + assert!(req.validate().is_ok()); + } + + #[test] + fn test_start_request_validate_success_with_wasm_https_url() { + let req = StartRequest { + id: "task-https".to_string(), + cli_args: vec![], + name: "https_func".to_string(), + state: 0, + file: String::new(), + image_url: String::new(), + inputs: vec![], + daemon: false, + env: None, + monitoring_profile: None, + encrypted: false, + kbs_resource_path: None, + mode: None, + proplet_id: None, + wasm_http_url: Some("https://releases.example.com/app.wasm".to_string()), + }; + + assert!(req.validate().is_ok()); + } + + #[test] + fn test_start_request_validate_invalid_wasm_http_url_scheme() { + let req = StartRequest { + id: "task-ftp".to_string(), + cli_args: vec![], + name: "ftp_func".to_string(), + state: 0, + file: String::new(), + image_url: String::new(), + inputs: vec![], + daemon: false, + env: None, + monitoring_profile: None, + encrypted: false, + kbs_resource_path: None, + mode: None, + proplet_id: None, + wasm_http_url: Some("ftp://fileserver/app.wasm".to_string()), + }; + + let result = req.validate(); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "wasm_http_url must start with http:// or https://" + ); + } + + #[test] + fn test_start_request_validate_no_source() { + let req = StartRequest { + id: "task-nosource".to_string(), + cli_args: vec![], + name: "func".to_string(), + state: 0, + file: String::new(), + image_url: String::new(), + inputs: vec![], + daemon: false, + env: None, + monitoring_profile: None, + encrypted: false, + kbs_resource_path: None, + mode: None, + proplet_id: None, + wasm_http_url: None, + }; + + let result = req.validate(); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "either file, image_url, or wasm_http_url must be provided" + ); + } + + #[test] + fn test_start_request_deserialize_wasm_http_url() { + let json_data = serde_json::json!({ + "id": "task-deser", + "name": "deser_func", + "file": "", + "image_url": "", + "wasm_http_url": "http://example.com/app.wasm", + "state": 0 + }); + + let req: StartRequest = serde_json::from_value(json_data).unwrap(); + assert_eq!( + req.wasm_http_url, + Some("http://example.com/app.wasm".to_string()) + ); + } } From ce807b282ab192ef267d5884a41e8333fcdcbc94 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 23 Mar 2026 14:04:44 +0300 Subject: [PATCH 04/11] fix(sdk): gofmt formatting for Task struct --- pkg/sdk/task.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index 6bf3e8f3..8dac8f33 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -11,21 +11,21 @@ import ( const tasksEndpoint = "/tasks" type Task struct { - ID string `json:"id,omitempty"` - Name string `json:"name"` - Kind string `json:"kind,omitempty"` - State uint8 `json:"state,omitempty"` - Mode string `json:"mode,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name"` + Kind string `json:"kind,omitempty"` + State uint8 `json:"state,omitempty"` + Mode string `json:"mode,omitempty"` ImageURL string `json:"image_url,omitempty"` WasmHTTPURL string `json:"wasm_http_url,omitempty"` - JobID string `json:"job_id,omitempty"` - CLIArgs []string `json:"cli_args,omitempty"` - Env map[string]string `json:"env,omitempty"` - StartTime time.Time `json:"start_time"` - FinishTime time.Time `json:"finish_time"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - Results any `json:"results,omitempty"` + JobID string `json:"job_id,omitempty"` + CLIArgs []string `json:"cli_args,omitempty"` + Env map[string]string `json:"env,omitempty"` + StartTime time.Time `json:"start_time"` + FinishTime time.Time `json:"finish_time"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Results any `json:"results,omitempty"` } type TaskPage struct { From 029f41128b1c5f29ac20dc32149df243db6d395a Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Mon, 23 Mar 2026 14:40:00 +0300 Subject: [PATCH 05/11] fix(proplet): cargo fmt formatting --- proplet/src/service.rs | 6 +++++- proplet/src/types.rs | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/proplet/src/service.rs b/proplet/src/service.rs index 3ec55a51..82cf91e5 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -1241,7 +1241,11 @@ async fn fetch_wasm_from_http(client: &HttpClient, url: &str) -> Result> let status = response.status(); if status.is_client_error() || status.is_server_error() { - return Err(anyhow::anyhow!("HTTP {} fetching wasm from {}", status, url)); + return Err(anyhow::anyhow!( + "HTTP {} fetching wasm from {}", + status, + url + )); } if let Some(content_length) = response.content_length() { diff --git a/proplet/src/types.rs b/proplet/src/types.rs index c7a775dc..a83617d3 100644 --- a/proplet/src/types.rs +++ b/proplet/src/types.rs @@ -99,7 +99,8 @@ impl StartRequest { "encrypted workloads should only use image_url, not file" )); } - } else if self.file.is_empty() && self.image_url.is_empty() && self.wasm_http_url.is_none() { + } else if self.file.is_empty() && self.image_url.is_empty() && self.wasm_http_url.is_none() + { return Err(anyhow::anyhow!( "either file, image_url, or wasm_http_url must be provided" )); From 977520e6cd50bfb0b12b111155d65441fb3f0733 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 25 Mar 2026 10:13:35 +0300 Subject: [PATCH 06/11] refactor(task): detect HTTP delivery via image_url scheme instead of separate field Remove the wasm_http_url field from Task and StartRequest. Proplets now detect plain HTTP delivery by checking if image_url starts with http:// or https://, falling through to registry fetch otherwise. Drops the DB migration and storage columns added for wasm_http_url. --- manager/service.go | 1 - pkg/sdk/task.go | 1 - pkg/storage/postgres/init.go | 9 ---- pkg/storage/postgres/tasks.go | 22 +++----- pkg/storage/sqlite/init.go | 9 ---- pkg/storage/sqlite/tasks.go | 16 +++--- pkg/storage/sqlite/tasks_test.go | 59 ---------------------- pkg/task/task.go | 1 - proplet/src/service.rs | 22 ++++---- proplet/src/types.rs | 86 +++----------------------------- 10 files changed, 33 insertions(+), 193 deletions(-) delete mode 100644 pkg/storage/sqlite/tasks_test.go diff --git a/manager/service.go b/manager/service.go index 2c9f3f05..c06d40f1 100644 --- a/manager/service.go +++ b/manager/service.go @@ -1679,7 +1679,6 @@ func (svc *service) publishStart(ctx context.Context, t task.Task, propletID str "name": t.Name, "state": t.State, "image_url": t.ImageURL, - "wasm_http_url": t.WasmHTTPURL, "file": t.File, "inputs": t.Inputs, "cli_args": t.CLIArgs, diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index 8dac8f33..8a7c42a3 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -17,7 +17,6 @@ type Task struct { State uint8 `json:"state,omitempty"` Mode string `json:"mode,omitempty"` ImageURL string `json:"image_url,omitempty"` - WasmHTTPURL string `json:"wasm_http_url,omitempty"` JobID string `json:"job_id,omitempty"` CLIArgs []string `json:"cli_args,omitempty"` Env map[string]string `json:"env,omitempty"` diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index a78553b0..17c9eb2d 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -250,16 +250,7 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`, }, }, - { - Id: "4_add_wasm_http_url", - Up: []string{ - `ALTER TABLE tasks ADD COLUMN IF NOT EXISTS wasm_http_url TEXT`, - }, - Down: []string{ - `ALTER TABLE tasks DROP COLUMN IF EXISTS wasm_http_url`, - }, }, - }, } _, err := migrate.Exec(db.DB.DB, "postgres", migrations, migrate.Up) diff --git a/pkg/storage/postgres/tasks.go b/pkg/storage/postgres/tasks.go index 342d4f3d..4fb4b685 100644 --- a/pkg/storage/postgres/tasks.go +++ b/pkg/storage/postgres/tasks.go @@ -22,7 +22,6 @@ type dbTask struct { Name string `db:"name"` State uint8 `db:"state"` ImageURL *string `db:"image_url"` - WasmHTTPURL *string `db:"wasm_http_url"` File []byte `db:"file"` CLIArgs []byte `db:"cli_args"` Inputs []byte `db:"inputs"` @@ -46,13 +45,13 @@ type dbTask struct { Mode *string `db:"mode"` } -const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted, +const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26)` + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -87,7 +86,6 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), - nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, @@ -133,11 +131,11 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) { func (r *taskRepo) Update(ctx context.Context, t task.Task) error { query := `UPDATE tasks SET - name = $2, state = $3, image_url = $4, wasm_http_url = $5, file = $6, cli_args = $7, inputs = $8, - env = $9, daemon = $10, encrypted = $11, kbs_resource_path = $12, proplet_id = $13, - results = $14, error = $15, monitoring_profile = $16, start_time = $17, - finish_time = $18, updated_at = $19, workflow_id = $20, job_id = $21, - depends_on = $22, run_if = $23, kind = $24, mode = $25 + name = $2, state = $3, image_url = $4, file = $5, cli_args = $6, inputs = $7, + env = $8, daemon = $9, encrypted = $10, kbs_resource_path = $11, proplet_id = $12, + results = $13, error = $14, monitoring_profile = $15, start_time = $16, + finish_time = $17, updated_at = $18, workflow_id = $19, job_id = $20, + depends_on = $21, run_if = $22, kind = $23, mode = $24 WHERE id = $1` cliArgs, err := jsonBytes(t.CLIArgs) @@ -173,7 +171,6 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { _, err = r.db.ExecContext(ctx, query, t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), - nullString(t.WasmHTTPURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, @@ -249,7 +246,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] for rows.Next() { var dbt dbTask if err := rows.Scan( - &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL, + &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env, &dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID, &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, @@ -290,9 +287,6 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.ImageURL != nil { t.ImageURL = *dbt.ImageURL } - if dbt.WasmHTTPURL != nil { - t.WasmHTTPURL = *dbt.WasmHTTPURL - } if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { return task.Task{}, err } diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index eb0112ce..bd04b805 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -243,16 +243,7 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN metadata`, }, }, - { - Id: "4_add_wasm_http_url", - Up: []string{ - `ALTER TABLE tasks ADD COLUMN wasm_http_url TEXT`, - }, - Down: []string{ - `ALTER TABLE tasks DROP COLUMN wasm_http_url`, - }, }, - }, } _, err := migrate.Exec(db.DB.DB, "sqlite3", migrations, migrate.Up) diff --git a/pkg/storage/sqlite/tasks.go b/pkg/storage/sqlite/tasks.go index b6c8121e..99c46427 100644 --- a/pkg/storage/sqlite/tasks.go +++ b/pkg/storage/sqlite/tasks.go @@ -24,7 +24,6 @@ type dbTask struct { Name string `db:"name"` State uint8 `db:"state"` ImageURL *string `db:"image_url"` - WasmHTTPURL *string `db:"wasm_http_url"` File []byte `db:"file"` CLIArgs []byte `db:"cli_args"` Inputs []byte `db:"inputs"` @@ -48,13 +47,13 @@ type dbTask struct { Mode *string `db:"mode"` } -const taskColumns = `id, name, state, image_url, wasm_http_url, file, cli_args, inputs, env, daemon, encrypted, +const taskColumns = `id, name, state, image_url, file, cli_args, inputs, env, daemon, encrypted, kbs_resource_path, proplet_id, results, error, monitoring_profile, start_time, finish_time, created_at, updated_at, workflow_id, job_id, depends_on, run_if, kind, mode` func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { query := `INSERT INTO tasks (` + taskColumns + `) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` cliArgs, err := jsonBytes(t.CLIArgs) if err != nil { @@ -87,7 +86,7 @@ func (r *taskRepo) Create(ctx context.Context, t task.Task) (task.Task, error) { } _, err = r.db.ExecContext(ctx, query, - t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL), + t.ID, t.Name, uint8(t.State), nullString(t.ImageURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, nullString(t.KBSResourcePath), nullString(t.PropletID), @@ -123,7 +122,7 @@ func (r *taskRepo) Get(ctx context.Context, id string) (task.Task, error) { func (r *taskRepo) Update(ctx context.Context, t task.Task) error { query := `UPDATE tasks SET - name = ?, state = ?, image_url = ?, wasm_http_url = ?, file = ?, cli_args = ?, inputs = ?, + name = ?, state = ?, image_url = ?, file = ?, cli_args = ?, inputs = ?, env = ?, daemon = ?, encrypted = ?, kbs_resource_path = ?, proplet_id = ?, results = ?, error = ?, monitoring_profile = ?, start_time = ?, finish_time = ?, updated_at = ?, workflow_id = ?, job_id = ?, @@ -161,7 +160,7 @@ func (r *taskRepo) Update(ctx context.Context, t task.Task) error { } _, err = r.db.ExecContext(ctx, query, - t.Name, uint8(t.State), nullString(t.ImageURL), nullString(t.WasmHTTPURL), + t.Name, uint8(t.State), nullString(t.ImageURL), t.File, cliArgs, inputs, env, t.Daemon, t.Encrypted, nullString(t.KBSResourcePath), nullString(t.PropletID), @@ -229,7 +228,7 @@ func (r *taskRepo) scanTasks(ctx context.Context, query string, args ...any) ([] for rows.Next() { var dbt dbTask if err := rows.Scan( - &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.WasmHTTPURL, + &dbt.ID, &dbt.Name, &dbt.State, &dbt.ImageURL, &dbt.File, &dbt.CLIArgs, &dbt.Inputs, &dbt.Env, &dbt.Daemon, &dbt.Encrypted, &dbt.KBSResourcePath, &dbt.PropletID, &dbt.Results, &dbt.Error, &dbt.MonitoringProfile, @@ -270,9 +269,6 @@ func (r *taskRepo) toTask(dbt dbTask) (task.Task, error) { if dbt.ImageURL != nil { t.ImageURL = *dbt.ImageURL } - if dbt.WasmHTTPURL != nil { - t.WasmHTTPURL = *dbt.WasmHTTPURL - } if dbt.CLIArgs != nil { if err := jsonUnmarshal(dbt.CLIArgs, &t.CLIArgs); err != nil { return task.Task{}, err diff --git a/pkg/storage/sqlite/tasks_test.go b/pkg/storage/sqlite/tasks_test.go deleted file mode 100644 index 6b427f55..00000000 --- a/pkg/storage/sqlite/tasks_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package sqlite_test - -import ( - "context" - "testing" - "time" - - "github.com/absmach/propeller/pkg/storage/sqlite" - "github.com/absmach/propeller/pkg/task" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestTaskCreate_WithWasmHTTPURL(t *testing.T) { - t.Parallel() - repo := sqlite.NewTaskRepository(newTestDB(t)) - - url := "http://example.com/module.wasm" - tk := task.Task{ - ID: uuid.NewString(), - Name: "http-wasm-task", - State: task.Pending, - WasmHTTPURL: url, - CreatedAt: time.Now().UTC().Truncate(time.Second), - UpdatedAt: time.Now().UTC().Truncate(time.Second), - } - - created, err := repo.Create(context.Background(), tk) - require.NoError(t, err) - assert.Equal(t, tk.ID, created.ID) - assert.Equal(t, url, created.WasmHTTPURL) - - fetched, err := repo.Get(context.Background(), tk.ID) - require.NoError(t, err) - assert.Equal(t, url, fetched.WasmHTTPURL) -} - -func TestTaskCreate_WasmHTTPURLNull(t *testing.T) { - t.Parallel() - repo := sqlite.NewTaskRepository(newTestDB(t)) - - tk := task.Task{ - ID: uuid.NewString(), - Name: "no-http-url-task", - State: task.Pending, - ImageURL: "oci://example.com/image:latest", - CreatedAt: time.Now().UTC().Truncate(time.Second), - UpdatedAt: time.Now().UTC().Truncate(time.Second), - } - - created, err := repo.Create(context.Background(), tk) - require.NoError(t, err) - assert.Empty(t, created.WasmHTTPURL) - - fetched, err := repo.Get(context.Background(), tk.ID) - require.NoError(t, err) - assert.Empty(t, fetched.WasmHTTPURL) -} diff --git a/pkg/task/task.go b/pkg/task/task.go index 37587278..4d8bdafe 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -68,7 +68,6 @@ type Task struct { Kind TaskKind `json:"kind,omitempty"` State State `json:"state"` ImageURL string `json:"image_url,omitempty"` - WasmHTTPURL string `json:"wasm_http_url,omitempty"` File []byte `json:"file,omitempty"` CLIArgs []string `json:"cli_args"` Inputs []uint64 `json:"inputs,omitempty"` diff --git a/proplet/src/service.rs b/proplet/src/service.rs index 82cf91e5..cebdf1dc 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -484,6 +484,17 @@ impl PropletService { if req.encrypted { info!("Encrypted workload with image_url: {}", req.image_url); Vec::new() + } else if req.image_url.starts_with("http://") || req.image_url.starts_with("https://") { + match self.fetch_wasm_from_http(&req.image_url).await { + Ok(binary) => binary, + Err(e) => { + error!("Failed to fetch wasm for task {}: {}", req.id, e); + self.running_tasks.lock().await.remove(&req.id); + self.publish_result(&req.id, Vec::new(), Some(e.to_string())) + .await?; + return Err(e); + } + } } else { info!("Requesting binary from registry: {}", req.image_url); self.request_binary_from_registry(&req.image_url).await?; @@ -499,17 +510,6 @@ impl PropletService { } } } - } else if let Some(ref url) = req.wasm_http_url { - match self.fetch_wasm_from_http(url).await { - Ok(binary) => binary, - Err(e) => { - error!("Failed to fetch wasm for task {}: {}", req.id, e); - self.running_tasks.lock().await.remove(&req.id); - self.publish_result(&req.id, Vec::new(), Some(e.to_string())) - .await?; - return Err(e); - } - } } else { let err = anyhow::anyhow!("No wasm binary or image URL provided"); error!("Validation error for task {}: {}", req.id, err); diff --git a/proplet/src/types.rs b/proplet/src/types.rs index a83617d3..63b2292f 100644 --- a/proplet/src/types.rs +++ b/proplet/src/types.rs @@ -66,8 +66,6 @@ pub struct StartRequest { pub mode: Option, #[serde(default)] pub proplet_id: Option, - #[serde(default)] - pub wasm_http_url: Option, } fn deserialize_null_default<'de, D, T>(deserializer: D) -> std::result::Result @@ -99,20 +97,11 @@ impl StartRequest { "encrypted workloads should only use image_url, not file" )); } - } else if self.file.is_empty() && self.image_url.is_empty() && self.wasm_http_url.is_none() - { + } else if self.file.is_empty() && self.image_url.is_empty() { return Err(anyhow::anyhow!( - "either file, image_url, or wasm_http_url must be provided" + "either file or image_url must be provided" )); } - - if let Some(ref url) = self.wasm_http_url { - if !url.starts_with("http://") && !url.starts_with("https://") { - return Err(anyhow::anyhow!( - "wasm_http_url must start with http:// or https://" - )); - } - } Ok(()) } } @@ -325,7 +314,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -348,7 +336,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -371,7 +358,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); @@ -396,7 +382,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); @@ -421,14 +406,13 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "either file, image_url, or wasm_http_url must be provided" + "either file or image_url must be provided" ); } @@ -449,7 +433,6 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, - wasm_http_url: None, }; assert!(req.validate().is_ok()); @@ -472,7 +455,6 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); @@ -500,7 +482,6 @@ mod tests { kbs_resource_path: Some("default/key1/value".to_string()), mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); @@ -758,7 +739,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; assert_eq!(req.env.as_ref().unwrap().len(), 2); @@ -824,7 +804,6 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; let json = serde_json::to_string(&req).unwrap(); @@ -838,14 +817,14 @@ mod tests { } #[test] - fn test_start_request_validate_success_with_wasm_http_url() { + fn test_start_request_validate_success_with_http_image_url() { let req = StartRequest { id: "task-http".to_string(), cli_args: vec![], name: "http_func".to_string(), state: 0, file: String::new(), - image_url: String::new(), + image_url: "http://fileserver/app.wasm".to_string(), inputs: vec![], daemon: false, env: None, @@ -854,21 +833,20 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: Some("http://fileserver/app.wasm".to_string()), }; assert!(req.validate().is_ok()); } #[test] - fn test_start_request_validate_success_with_wasm_https_url() { + fn test_start_request_validate_success_with_https_image_url() { let req = StartRequest { id: "task-https".to_string(), cli_args: vec![], name: "https_func".to_string(), state: 0, file: String::new(), - image_url: String::new(), + image_url: "https://releases.example.com/app.wasm".to_string(), inputs: vec![], daemon: false, env: None, @@ -877,40 +855,11 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: Some("https://releases.example.com/app.wasm".to_string()), }; assert!(req.validate().is_ok()); } - #[test] - fn test_start_request_validate_invalid_wasm_http_url_scheme() { - let req = StartRequest { - id: "task-ftp".to_string(), - cli_args: vec![], - name: "ftp_func".to_string(), - state: 0, - file: String::new(), - image_url: String::new(), - inputs: vec![], - daemon: false, - env: None, - monitoring_profile: None, - encrypted: false, - kbs_resource_path: None, - mode: None, - proplet_id: None, - wasm_http_url: Some("ftp://fileserver/app.wasm".to_string()), - }; - - let result = req.validate(); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "wasm_http_url must start with http:// or https://" - ); - } - #[test] fn test_start_request_validate_no_source() { let req = StartRequest { @@ -928,32 +877,13 @@ mod tests { kbs_resource_path: None, mode: None, proplet_id: None, - wasm_http_url: None, }; let result = req.validate(); assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), - "either file, image_url, or wasm_http_url must be provided" - ); - } - - #[test] - fn test_start_request_deserialize_wasm_http_url() { - let json_data = serde_json::json!({ - "id": "task-deser", - "name": "deser_func", - "file": "", - "image_url": "", - "wasm_http_url": "http://example.com/app.wasm", - "state": 0 - }); - - let req: StartRequest = serde_json::from_value(json_data).unwrap(); - assert_eq!( - req.wasm_http_url, - Some("http://example.com/app.wasm".to_string()) + "either file or image_url must be provided" ); } } From 058d05888cae36cf04b13bcb37798895b8a4320b Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 25 Mar 2026 12:54:25 +0300 Subject: [PATCH 07/11] feat(proplet): route http/https image_url to HTTP fetch instead of registry --- proplet/src/service.rs | 46 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/proplet/src/service.rs b/proplet/src/service.rs index cebdf1dc..32168f48 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -7,7 +7,10 @@ use crate::mqtt::{build_topic, MqttMessage, PubSub}; use crate::runtime::{Runtime, RuntimeContext, StartConfig}; use crate::types::*; use anyhow::{Context, Result}; +use futures_util::StreamExt; use reqwest::Client as HttpClient; + +const WASM_FETCH_MAX_BYTES: usize = 100 * 1024 * 1024; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::SystemTime; @@ -1045,6 +1048,49 @@ impl PropletService { Ok(()) } + async fn fetch_wasm_from_http(&self, url: &str) -> Result> { + let response = self + .http_client + .get(url) + .send() + .await + .with_context(|| format!("Failed to connect to {}", url))?; + + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + return Err(anyhow::anyhow!("HTTP {} fetching wasm from {}", status, url)); + } + + if let Some(content_length) = response.content_length() { + if content_length as usize > WASM_FETCH_MAX_BYTES { + return Err(anyhow::anyhow!( + "wasm response from {} exceeds size limit ({} > {} bytes)", + url, + content_length, + WASM_FETCH_MAX_BYTES + )); + } + } + + let mut binary = Vec::new(); + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = + chunk.with_context(|| format!("Failed to read response body from {}", url))?; + binary.extend_from_slice(&chunk); + if binary.len() > WASM_FETCH_MAX_BYTES { + return Err(anyhow::anyhow!( + "wasm response from {} exceeds size limit ({} bytes)", + url, + WASM_FETCH_MAX_BYTES + )); + } + } + + info!("Fetched wasm from {}, size: {} bytes", url, binary.len()); + Ok(binary) + } + #[allow(dead_code)] fn parse_http_requests_from_stderr(stderr: &str) -> Vec { let mut requests = Vec::new(); From 55dbbb412140d1c33dc83cea67f93b36453d8e04 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Wed, 25 Mar 2026 13:13:52 +0300 Subject: [PATCH 08/11] fix(proplet): cargo fmt formatting --- proplet/src/service.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/proplet/src/service.rs b/proplet/src/service.rs index 32168f48..69306cf9 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -487,7 +487,8 @@ impl PropletService { if req.encrypted { info!("Encrypted workload with image_url: {}", req.image_url); Vec::new() - } else if req.image_url.starts_with("http://") || req.image_url.starts_with("https://") { + } else if req.image_url.starts_with("http://") || req.image_url.starts_with("https://") + { match self.fetch_wasm_from_http(&req.image_url).await { Ok(binary) => binary, Err(e) => { @@ -1058,7 +1059,11 @@ impl PropletService { let status = response.status(); if status.is_client_error() || status.is_server_error() { - return Err(anyhow::anyhow!("HTTP {} fetching wasm from {}", status, url)); + return Err(anyhow::anyhow!( + "HTTP {} fetching wasm from {}", + status, + url + )); } if let Some(content_length) = response.content_length() { From a95befda84e0f64cc307e4cc021e33161b1456fa Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Fri, 27 Mar 2026 13:09:42 +0300 Subject: [PATCH 09/11] fix(ci): fix gci import formatting and remove duplicate Rust definitions --- pkg/sdk/task.go | 28 ++++++++++----------- pkg/storage/postgres/init.go | 9 ++++--- pkg/storage/sqlite/init.go | 9 ++++--- proplet/src/service.rs | 49 ------------------------------------ 4 files changed, 24 insertions(+), 71 deletions(-) diff --git a/pkg/sdk/task.go b/pkg/sdk/task.go index 8a7c42a3..47ab7321 100644 --- a/pkg/sdk/task.go +++ b/pkg/sdk/task.go @@ -11,20 +11,20 @@ import ( const tasksEndpoint = "/tasks" type Task struct { - ID string `json:"id,omitempty"` - Name string `json:"name"` - Kind string `json:"kind,omitempty"` - State uint8 `json:"state,omitempty"` - Mode string `json:"mode,omitempty"` - ImageURL string `json:"image_url,omitempty"` - JobID string `json:"job_id,omitempty"` - CLIArgs []string `json:"cli_args,omitempty"` - Env map[string]string `json:"env,omitempty"` - StartTime time.Time `json:"start_time"` - FinishTime time.Time `json:"finish_time"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - Results any `json:"results,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name"` + Kind string `json:"kind,omitempty"` + State uint8 `json:"state,omitempty"` + Mode string `json:"mode,omitempty"` + ImageURL string `json:"image_url,omitempty"` + JobID string `json:"job_id,omitempty"` + CLIArgs []string `json:"cli_args,omitempty"` + Env map[string]string `json:"env,omitempty"` + StartTime time.Time `json:"start_time"` + FinishTime time.Time `json:"finish_time"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Results any `json:"results,omitempty"` } type TaskPage struct { diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 17c9eb2d..58b59e5c 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -6,12 +6,13 @@ import ( "fmt" "time" - "github.com/absmach/propeller/pkg/job" - "github.com/absmach/propeller/pkg/proplet" - "github.com/absmach/propeller/pkg/task" _ "github.com/jackc/pgx/v5/stdlib" "github.com/jmoiron/sqlx" migrate "github.com/rubenv/sql-migrate" + + "github.com/absmach/propeller/pkg/job" + "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/task" ) var ( @@ -250,7 +251,7 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN IF EXISTS metadata`, }, }, - }, + }, } _, err := migrate.Exec(db.DB.DB, "postgres", migrations, migrate.Up) diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index bd04b805..7ea85e65 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -6,12 +6,13 @@ import ( "fmt" "time" - "github.com/absmach/propeller/pkg/job" - "github.com/absmach/propeller/pkg/proplet" - "github.com/absmach/propeller/pkg/task" "github.com/jmoiron/sqlx" _ "github.com/mattn/go-sqlite3" migrate "github.com/rubenv/sql-migrate" + + "github.com/absmach/propeller/pkg/job" + "github.com/absmach/propeller/pkg/proplet" + "github.com/absmach/propeller/pkg/task" ) var ( @@ -243,7 +244,7 @@ func (db *Database) Migrate() error { `ALTER TABLE proplets DROP COLUMN metadata`, }, }, - }, + }, } _, err := migrate.Exec(db.DB.DB, "sqlite3", migrations, migrate.Up) diff --git a/proplet/src/service.rs b/proplet/src/service.rs index 69306cf9..56bf67f7 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -7,10 +7,8 @@ use crate::mqtt::{build_topic, MqttMessage, PubSub}; use crate::runtime::{Runtime, RuntimeContext, StartConfig}; use crate::types::*; use anyhow::{Context, Result}; -use futures_util::StreamExt; use reqwest::Client as HttpClient; -const WASM_FETCH_MAX_BYTES: usize = 100 * 1024 * 1024; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::SystemTime; @@ -1049,53 +1047,6 @@ impl PropletService { Ok(()) } - async fn fetch_wasm_from_http(&self, url: &str) -> Result> { - let response = self - .http_client - .get(url) - .send() - .await - .with_context(|| format!("Failed to connect to {}", url))?; - - let status = response.status(); - if status.is_client_error() || status.is_server_error() { - return Err(anyhow::anyhow!( - "HTTP {} fetching wasm from {}", - status, - url - )); - } - - if let Some(content_length) = response.content_length() { - if content_length as usize > WASM_FETCH_MAX_BYTES { - return Err(anyhow::anyhow!( - "wasm response from {} exceeds size limit ({} > {} bytes)", - url, - content_length, - WASM_FETCH_MAX_BYTES - )); - } - } - - let mut binary = Vec::new(); - let mut stream = response.bytes_stream(); - while let Some(chunk) = stream.next().await { - let chunk = - chunk.with_context(|| format!("Failed to read response body from {}", url))?; - binary.extend_from_slice(&chunk); - if binary.len() > WASM_FETCH_MAX_BYTES { - return Err(anyhow::anyhow!( - "wasm response from {} exceeds size limit ({} bytes)", - url, - WASM_FETCH_MAX_BYTES - )); - } - } - - info!("Fetched wasm from {}, size: {} bytes", url, binary.len()); - Ok(binary) - } - #[allow(dead_code)] fn parse_http_requests_from_stderr(stderr: &str) -> Vec { let mut requests = Vec::new(); From f0f087c45b54b643c1341b8bd0eaeea6e68e5277 Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Fri, 27 Mar 2026 13:50:11 +0300 Subject: [PATCH 10/11] fix(ci): fix gci import ordering in postgres and sqlite init files --- pkg/storage/postgres/init.go | 7 +++---- pkg/storage/sqlite/init.go | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/storage/postgres/init.go b/pkg/storage/postgres/init.go index 58b59e5c..6465d4b6 100644 --- a/pkg/storage/postgres/init.go +++ b/pkg/storage/postgres/init.go @@ -6,13 +6,12 @@ import ( "fmt" "time" - _ "github.com/jackc/pgx/v5/stdlib" - "github.com/jmoiron/sqlx" - migrate "github.com/rubenv/sql-migrate" - "github.com/absmach/propeller/pkg/job" "github.com/absmach/propeller/pkg/proplet" "github.com/absmach/propeller/pkg/task" + _ "github.com/jackc/pgx/v5/stdlib" + "github.com/jmoiron/sqlx" + migrate "github.com/rubenv/sql-migrate" ) var ( diff --git a/pkg/storage/sqlite/init.go b/pkg/storage/sqlite/init.go index 7ea85e65..2ad7da03 100644 --- a/pkg/storage/sqlite/init.go +++ b/pkg/storage/sqlite/init.go @@ -6,13 +6,12 @@ import ( "fmt" "time" - "github.com/jmoiron/sqlx" - _ "github.com/mattn/go-sqlite3" - migrate "github.com/rubenv/sql-migrate" - "github.com/absmach/propeller/pkg/job" "github.com/absmach/propeller/pkg/proplet" "github.com/absmach/propeller/pkg/task" + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + migrate "github.com/rubenv/sql-migrate" ) var ( From e645fe1bfaf5025c7f67f2ee91d39f53024bc0ac Mon Sep 17 00:00:00 2001 From: JeffMboya Date: Fri, 27 Mar 2026 13:51:17 +0300 Subject: [PATCH 11/11] fix(ci): fix rustfmt formatting in types.rs --- proplet/src/types.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/proplet/src/types.rs b/proplet/src/types.rs index 63b2292f..ea9571d3 100644 --- a/proplet/src/types.rs +++ b/proplet/src/types.rs @@ -98,9 +98,7 @@ impl StartRequest { )); } } else if self.file.is_empty() && self.image_url.is_empty() { - return Err(anyhow::anyhow!( - "either file or image_url must be provided" - )); + return Err(anyhow::anyhow!("either file or image_url must be provided")); } Ok(()) }