From e2251419e92d8a1e70e0d8d4a81744d2787dd8ed Mon Sep 17 00:00:00 2001 From: tatiesmars Date: Fri, 27 Feb 2026 10:04:54 +0700 Subject: [PATCH 1/2] feat: support cancellation & cleaner events/stage --- server/rust/runner/src/lib.rs | 684 +++++++++++++++++++--------------- 1 file changed, 393 insertions(+), 291 deletions(-) diff --git a/server/rust/runner/src/lib.rs b/server/rust/runner/src/lib.rs index f55eb42..217ceec 100644 --- a/server/rust/runner/src/lib.rs +++ b/server/rust/runner/src/lib.rs @@ -44,6 +44,23 @@ fn parse_domain_from_cid(cid: &str) -> Option<(String, String)> { Some((base, domain_id)) } +const TASK_CANCELLED_PREFIX: &str = "task cancelled"; + +async fn ensure_task_not_cancelled( + ctx: &compute_runner_api::TaskCtx<'_>, + stage: &str, +) -> Result<()> { + if ctx.ctrl.is_cancelled().await { + return Err(anyhow!("{TASK_CANCELLED_PREFIX}: {stage}")); + } + Ok(()) +} + +fn is_task_cancelled_error(err: &anyhow::Error) -> bool { + err.chain() + .any(|cause| cause.to_string().contains(TASK_CANCELLED_PREFIX)) +} + #[async_trait] impl compute_runner_api::Runner for HelloRunner { fn capability(&self) -> &'static str { @@ -80,7 +97,25 @@ impl compute_runner_api::Runner for HelloRunner { let datasets_dir = job_root.join("datasets"); let task_result: Result<()> = async { - ctx.ctrl.progress(json!({ "status": "started" })).await?; + ensure_task_not_cancelled(&ctx, "before execution").await?; + ctx.ctrl + .progress(json!({ + "pct": 5, + "stage": "workspace", + "status": "prepared", + "job_root": job_root.display().to_string(), + })) + .await?; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "workspace", + "message": "workspace prepared", + "task_id": lease.task.id, + "job_id": lease.task.job_id, + })) + .await; // If an input CID is provided, materialize it and set up job inputs. let maybe_cid = lease.task.inputs_cids.first().cloned(); @@ -93,15 +128,24 @@ impl compute_runner_api::Runner for HelloRunner { ]; let mut datasets_downloaded = 0usize; let mut downloaded_recordings: HashSet = HashSet::new(); + let mut metadata_chunks = 0usize; + let mut metadata_items = 0usize; + let mut recording_download_failures = 0usize; + let mut derived_download_failures = 0usize; + let mut colmap_downloaded = 0usize; + let mut colmap_failed = 0usize; + + let cid = maybe_cid + .as_deref() + .ok_or_else(|| anyhow!("no input cid provided; cannot run splatter job"))?; + + ensure_task_not_cancelled(&ctx, "before input materialization").await?; + let materialized = ctx + .input + .materialize_cid_with_meta(cid) + .await + .with_context(|| format!("materialize cid {}", cid))?; - if let Some(cid) = maybe_cid.as_deref() { - let materialized = ctx - .input - .materialize_cid_with_meta(cid) - .await - .with_context(|| format!("materialize cid {}", cid))?; - - // Print metadata returned with the CID. let metadata_json = json!({ "cid": materialized.cid, "data_id": materialized.data_id, @@ -113,14 +157,6 @@ impl compute_runner_api::Runner for HelloRunner { "extracted_paths": materialized.extracted_paths, }); info!(%cid, metadata = %metadata_json, "input cid metadata"); - ctx.ctrl - .log_event(json!({ - "level": "info", - "message": "input cid metadata", - "cid": cid, - "metadata": metadata_json - })) - .await?; if let Some(name) = materialized.name.as_deref() { if let Some(suffix) = name.strip_prefix("refined_manifest") { @@ -170,14 +206,6 @@ impl compute_runner_api::Runner for HelloRunner { }; info!(%cid, dataIDs = %data_ids, "parsed input dataIDs"); - ctx.ctrl - .log_event(json!({ - "level": "info", - "message": "parsed input json dataIDs", - "cid": cid, - "dataIDs": data_ids - })) - .await?; // Fetch and print metadata for all dataIDs using domain metadata endpoint. if !data_id_list.is_empty() { @@ -187,9 +215,11 @@ impl compute_runner_api::Runner for HelloRunner { ) { let domain_base = domain_base_raw.trim_end_matches('/').to_string(); - // Try single-ID requests to avoid oversized query strings. + // Try chunked requests to avoid oversized query strings. let chunk_size = 50; for chunk in data_id_list.chunks(chunk_size) { + ensure_task_not_cancelled(&ctx, "resolving dataset metadata").await?; + metadata_chunks += 1; let query = DownloadQuery { ids: chunk.to_vec(), name: None, @@ -205,6 +235,7 @@ impl compute_runner_api::Runner for HelloRunner { .await { Ok(metadata) => { + metadata_items += metadata.len(); if metadata.is_empty() { warn!( chunk_len = chunk.len(), @@ -218,74 +249,57 @@ impl compute_runner_api::Runner for HelloRunner { data_type = %m.data_type, "dataID metadata" ); - let name_matches = m.name.starts_with("dmt_recording_"); - if name_matches { - if downloaded_recordings.contains(&m.name) { - continue; + if !m.name.starts_with("dmt_recording_") { + continue; + } + if downloaded_recordings.contains(&m.name) { + continue; + } + ensure_task_not_cancelled(&ctx, "downloading input recording") + .await?; + let data_id = m.id.clone(); + let domain_id_for_download = m.domain_id.clone(); + let folder_name = m + .name + .strip_prefix("dmt_recording_") + .unwrap_or(m.name.as_str()) + .to_string(); + let folder_name = if folder_name.is_empty() { + data_id.clone() + } else { + folder_name + }; + match download_by_id( + &domain_base, + &client_id, + &token, + &domain_id_for_download, + &data_id, + ) + .await + { + Ok(bytes) => { + let dest_dir = datasets_dir.join(&folder_name); + tokio::fs::create_dir_all(&dest_dir).await?; + let dest_path = dest_dir.join("Frames.mp4"); + tokio::fs::write(&dest_path, &bytes).await?; + datasets_downloaded += 1; + downloaded_recordings.insert(m.name.clone()); + info!( + data_id = %data_id, + folder = %folder_name, + bytes = bytes.len(), + dest = %dest_path.display(), + "downloaded dmt recording" + ); } - let data_id = m.id.clone(); - let domain_id_for_download = m.domain_id.clone(); - let folder_name = m - .name - .strip_prefix("dmt_recording_") - .unwrap_or(m.name.as_str()) - .to_string(); - let folder_name = if folder_name.is_empty() { - data_id.clone() - } else { - folder_name - }; - match download_by_id( - &domain_base, - &client_id, - &token, - &domain_id_for_download, - &data_id, - ) - .await - { - Ok(bytes) => { - let dest_dir = datasets_dir.join(&folder_name); - tokio::fs::create_dir_all(&dest_dir).await?; - let dest_path = dest_dir.join("Frames.mp4"); - tokio::fs::write(&dest_path, &bytes).await?; - datasets_downloaded += 1; - downloaded_recordings.insert(m.name.clone()); - - ctx.ctrl - .log_event(json!({ - "level": "info", - "message": "downloaded dmt recording", - "data_id": data_id, - "folder": folder_name, - "bytes": bytes.len(), - "dest": dest_path.display().to_string(), - })) - .await?; - - info!( - data_id = %data_id, - folder = %folder_name, - bytes = bytes.len(), - dest = %dest_path.display(), - "downloaded dmt recording" - ); - } - Err(err) => { - ctx.ctrl - .log_event(json!({ - "level": "error", - "message": "failed to download dmt recording", - "data_id": data_id, - "error": err.to_string(), - })) - .await?; - warn!( - data_id = %data_id, - error = %err, - "failed to download dmt recording" - ); - } + Err(err) => { + recording_download_failures += 1; + warn!( + data_id = %data_id, + error = %err, + "failed to download dmt recording" + ); } } } @@ -317,6 +331,8 @@ impl compute_runner_api::Runner for HelloRunner { if downloaded_recordings.contains(&recording_name) { break; } + ensure_task_not_cancelled(&ctx, "resolving derived recording metadata") + .await?; let query = DownloadQuery { ids: vec![], name: Some(recording_name.clone()), @@ -335,6 +351,11 @@ impl compute_runner_api::Runner for HelloRunner { if let Some(meta) = meta_list.into_iter().next() { let data_id = meta.id.clone(); let domain_id_for_download = meta.domain_id.clone(); + ensure_task_not_cancelled( + &ctx, + "downloading derived recording", + ) + .await?; match download_by_id( &domain_base, &client_id, @@ -355,19 +376,8 @@ impl compute_runner_api::Runner for HelloRunner { let dest_path = dest_dir.join("Frames.mp4"); tokio::fs::write(&dest_path, &bytes).await?; datasets_downloaded += 1; - downloaded_recordings.insert(recording_name.clone()); - - ctx.ctrl - .log_event(json!({ - "level": "info", - "message": "downloaded derived dmt recording", - "data_id": data_id, - "folder": folder_name, - "bytes": bytes.len(), - "dest": dest_path.display().to_string(), - })) - .await?; - + downloaded_recordings + .insert(recording_name.clone()); info!( data_id = %data_id, folder = %folder_name, @@ -378,14 +388,7 @@ impl compute_runner_api::Runner for HelloRunner { break; } Err(err) => { - ctx.ctrl - .log_event(json!({ - "level": "error", - "message": "failed to download derived dmt recording", - "data_id": data_id, - "error": err.to_string(), - })) - .await?; + derived_download_failures += 1; warn!( data_id = %data_id, error = %err, @@ -411,6 +414,7 @@ impl compute_runner_api::Runner for HelloRunner { let mut missing = Vec::new(); for (prefix, _) in expected_colmap { + ensure_task_not_cancelled(&ctx, "resolving colmap metadata").await?; let expected_name = format!("{prefix}{suffix}"); let query = DownloadQuery { ids: vec![], @@ -464,6 +468,8 @@ impl compute_runner_api::Runner for HelloRunner { if let Some((data_id, domain_id_for_download)) = colmap_refs.get(prefix) { + ensure_task_not_cancelled(&ctx, "downloading colmap binary") + .await?; match download_by_id( &domain_base, &client_id, @@ -476,17 +482,7 @@ impl compute_runner_api::Runner for HelloRunner { Ok(bytes) => { let dest_path = dest_dir.join(target_name); tokio::fs::write(&dest_path, &bytes).await?; - - ctx.ctrl - .log_event(json!({ - "level": "info", - "message": "downloaded colmap binary", - "data_id": data_id, - "bytes": bytes.len(), - "dest": dest_path.display().to_string(), - })) - .await?; - + colmap_downloaded += 1; info!( name = %prefix, bytes = bytes.len(), @@ -495,14 +491,7 @@ impl compute_runner_api::Runner for HelloRunner { ); } Err(err) => { - ctx.ctrl - .log_event(json!({ - "level": "error", - "message": "failed to download colmap binary", - "data_id": data_id, - "error": err.to_string(), - })) - .await?; + colmap_failed += 1; warn!( name = %prefix, data_id = %data_id, @@ -514,14 +503,7 @@ impl compute_runner_api::Runner for HelloRunner { } } } else { - ctx.ctrl - .log_event(json!({ - "level": "warn", - "message": "missing colmap binaries", - "suffix": suffix, - "missing": missing, - })) - .await?; + colmap_failed += missing.len(); warn!( suffix = %suffix, missing = ?missing, @@ -533,26 +515,51 @@ impl compute_runner_api::Runner for HelloRunner { warn!(%cid, "could not resolve domain info from cid or lease"); } } - } else { - return Err(anyhow!("no input cid provided; cannot run splatter job")); - } - // Ensure at least one dataset is available before running the pipeline. - let mut datasets_present = datasets_downloaded > 0; - if !datasets_present { - if let Ok(mut rd) = tokio::fs::read_dir(&datasets_dir).await { - if rd.next_entry().await?.is_some() { - datasets_present = true; + // Ensure at least one dataset is available before running the pipeline. + let mut datasets_present = datasets_downloaded > 0; + if !datasets_present { + if let Ok(mut rd) = tokio::fs::read_dir(&datasets_dir).await { + if rd.next_entry().await?.is_some() { + datasets_present = true; + } } } - } if !datasets_present { return Err(anyhow!( "no datasets downloaded; expected at least one dmt_recording_* input" )); } - // Run the Python pipeline and upload the splat. + ctx.ctrl + .progress(json!({ + "pct": 20, + "stage": "inputs", + "status": "materialized", + "datasets": datasets_downloaded, + "metadata_chunks": metadata_chunks, + "metadata_items": metadata_items, + "recording_failures": recording_download_failures + derived_download_failures, + "colmap_downloaded": colmap_downloaded, + "colmap_failed": colmap_failed, + })) + .await?; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "inputs", + "message": "inputs materialized", + "datasets": datasets_downloaded, + "metadata_chunks": metadata_chunks, + "metadata_items": metadata_items, + "recording_failures": recording_download_failures + derived_download_failures, + "colmap_downloaded": colmap_downloaded, + "colmap_failed": colmap_failed, + })) + .await; + + // Run the Python pipeline and upload the splat. let Some(domain_id_str) = lease .domain_id .map(|d| d.to_string()) @@ -563,174 +570,240 @@ impl compute_runner_api::Runner for HelloRunner { )); }; - let job_id_str = lease.task.job_id.map(|j| j.to_string()).unwrap_or_else(|| { - // Fallback to task id so the script always has a value. - lease.task.id.to_string() - }); - - // Resolve run.py path at runtime so the container layout is flexible. - let exe_dir = env::current_exe()? - .parent() - .map(PathBuf::from) - .unwrap_or_else(|| PathBuf::from(".")); - let run_py = env::var_os("SPLATTER_RUN_PY") - .map(PathBuf::from) - .unwrap_or_else(|| exe_dir.join("run.py")); - let project_root = run_py - .parent() - .map(PathBuf::from) - .unwrap_or_else(|| exe_dir.clone()); + let job_id_str = lease.task.job_id.map(|j| j.to_string()).unwrap_or_else(|| { + // Fallback to task id so the script always has a value. + lease.task.id.to_string() + }); + // Resolve run.py path at runtime so the container layout is flexible. + let exe_dir = env::current_exe()? + .parent() + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from(".")); + let run_py = env::var_os("SPLATTER_RUN_PY") + .map(PathBuf::from) + .unwrap_or_else(|| exe_dir.join("run.py")); + let project_root = run_py + .parent() + .map(PathBuf::from) + .unwrap_or_else(|| exe_dir.clone()); + + ensure_task_not_cancelled(&ctx, "before python start").await?; ctx.ctrl .progress(json!({ - "status": "running_python", - "job_root_path": job_root, - "script": run_py + "pct": 35, + "stage": "python", + "status": "starting", + "job_root_path": job_root.display().to_string(), + "script": run_py.display().to_string(), })) .await?; - - let mut child = Command::new("python3") - .arg(&run_py) - .arg("--domain_id") - .arg(&domain_id_str) - .arg("--job_id") - .arg(&job_id_str) - .arg("--job_root_path") - .arg(&job_root) - .arg("--log_level") - .arg("info") - .env("PYTHONUNBUFFERED", "1") - .current_dir(&project_root) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .with_context(|| format!("spawn python3 {}", run_py.display()))?; - - let start = Instant::now(); - let stdout = child - .stdout - .take() - .ok_or_else(|| anyhow!("child missing stdout handle"))?; - let stderr = child - .stderr - .take() - .ok_or_else(|| anyhow!("child missing stderr handle"))?; - - let mut stdout_reader = BufReader::new(stdout).lines(); - let mut stderr_reader = BufReader::new(stderr).lines(); + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "python", + "message": "python pipeline starting", + "script": run_py.display().to_string(), + })) + .await; + + let mut child = Command::new("python3") + .arg(&run_py) + .arg("--domain_id") + .arg(&domain_id_str) + .arg("--job_id") + .arg(&job_id_str) + .arg("--job_root_path") + .arg(&job_root) + .arg("--log_level") + .arg("info") + .env("PYTHONUNBUFFERED", "1") + .current_dir(&project_root) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .with_context(|| format!("spawn python3 {}", run_py.display()))?; + + let start = Instant::now(); + let stdout = child + .stdout + .take() + .ok_or_else(|| anyhow!("child missing stdout handle"))?; + let stderr = child + .stderr + .take() + .ok_or_else(|| anyhow!("child missing stderr handle"))?; + + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); let mut tail: VecDeque = VecDeque::with_capacity(200); - - // Read both streams concurrently to avoid deadlocks and keep logs structured. - let mut stdout_done = false; - let mut stderr_done = false; - while !stdout_done || !stderr_done { - tokio::select! { - line = stdout_reader.next_line(), if !stdout_done => { - match line { - Ok(Some(l)) => { - if tail.len() == 200 { tail.pop_front(); } - tail.push_back(format!("stdout: {l}")); - info!(line = %l, "python stdout"); - let _ = ctx.ctrl.log_event(json!({ - "level": "info", - "message": l, - "source": "python_stdout" - })).await; - } - Ok(None) => stdout_done = true, - Err(err) => { - warn!(%err, "failed to read python stdout"); - stdout_done = true; - } + let mut stdout_lines = 0usize; + let mut stderr_lines = 0usize; + let mut cancel_check = tokio::time::interval(std::time::Duration::from_millis(500)); + cancel_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Read both streams concurrently to avoid deadlocks and keep logs structured. + let mut stdout_done = false; + let mut stderr_done = false; + while !stdout_done || !stderr_done { + tokio::select! { + _ = cancel_check.tick() => { + if ctx.ctrl.is_cancelled().await { + let _ = child.kill().await; + let _ = child.wait().await; + return Err(anyhow!("{TASK_CANCELLED_PREFIX}: python execution")); } } - line = stderr_reader.next_line(), if !stderr_done => { - match line { - Ok(Some(l)) => { - if tail.len() == 200 { tail.pop_front(); } - tail.push_back(format!("stderr: {l}")); - warn!(line = %l, "python stderr"); - let _ = ctx.ctrl.log_event(json!({ - "level": "warn", - "message": l, - "source": "python_stderr" - })).await; + line = stdout_reader.next_line(), if !stdout_done => { + match line { + Ok(Some(l)) => { + if tail.len() == 200 { tail.pop_front(); } + tail.push_back(format!("stdout: {l}")); + stdout_lines += 1; + info!(line = %l, "python stdout"); + } + Ok(None) => stdout_done = true, + Err(err) => { + warn!(%err, "failed to read python stdout"); + stdout_done = true; + } } - Ok(None) => stderr_done = true, - Err(err) => { - warn!(%err, "failed to read python stderr"); - stderr_done = true; + } + line = stderr_reader.next_line(), if !stderr_done => { + match line { + Ok(Some(l)) => { + if tail.len() == 200 { tail.pop_front(); } + tail.push_back(format!("stderr: {l}")); + stderr_lines += 1; + warn!(line = %l, "python stderr"); + } + Ok(None) => stderr_done = true, + Err(err) => { + warn!(%err, "failed to read python stderr"); + stderr_done = true; + } } } } } - } - let status = child.wait().await.with_context(|| "wait for python job")?; - let duration = start.elapsed(); + let status = child.wait().await.with_context(|| "wait for python job")?; + let duration = start.elapsed(); + + if !status.success() { + let summary_tail: Vec = tail.into_iter().collect(); + let _ = ctx + .ctrl + .progress(json!({ + "pct": 35, + "stage": "python", + "status": "failed", + })) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "error", + "stage": "python", + "message": "python job failed", + "status": status.code(), + "duration_ms": duration.as_millis(), + "stdout_lines": stdout_lines, + "stderr_lines": stderr_lines, + "tail": summary_tail, + })) + .await; + return Err(anyhow!("python job failed: status={:?}", status.code())); + } - if !status.success() { - let summary_tail: Vec = tail.into_iter().collect(); ctx.ctrl - .log_event(json!({ - "level": "error", - "message": "python job failed", - "status": status.code(), + .progress(json!({ + "pct": 80, + "stage": "python", + "status": "completed", "duration_ms": duration.as_millis(), - "tail": summary_tail, })) .await?; - return Err(anyhow!( - "python job failed: status={:?}", - status.code() - )); - } - - ctx.ctrl + let _ = ctx + .ctrl .log_event(json!({ "level": "info", - "message": "python job completed", + "stage": "python", + "message": "python pipeline completed", "status": status.code(), "duration_ms": duration.as_millis(), + "stdout_lines": stdout_lines, + "stderr_lines": stderr_lines, })) - .await?; - - // Upload splat_rot.splat if it exists. - let splat_rel = PathBuf::from("refined") - .join("splatter") - .join("splat_rot.splat"); - let splat_abs = job_root.join(&splat_rel); - if !splat_abs.exists() { - return Err(anyhow!("expected output missing: {}", splat_abs.display())); - } + .await; + + // Upload splat_rot.splat if it exists. + ensure_task_not_cancelled(&ctx, "before upload").await?; + let splat_rel = PathBuf::from("refined") + .join("splatter") + .join("splat_rot.splat"); + let splat_abs = job_root.join(&splat_rel); + if !splat_abs.exists() { + return Err(anyhow!("expected output missing: {}", splat_abs.display())); + } - let upload_key = if let Some(suffix) = refined_suffix.as_deref().filter(|s| !s.is_empty()) { - if suffix.starts_with('_') { - format!("refined_splat{suffix}") + let upload_key = if let Some(suffix) = + refined_suffix.as_deref().filter(|s| !s.is_empty()) + { + if suffix.starts_with('_') { + format!("refined_splat{suffix}") + } else { + format!("refined_splat_{suffix}") + } } else { - format!("refined_splat_{suffix}") - } - } else { - warn!("refined manifest suffix missing; uploading as splat_data without timestamp"); - "refined_splat".to_string() - }; - - ctx.output - .put_domain_artifact(compute_runner_api::runner::DomainArtifactRequest { - rel_path: upload_key.as_str(), - name: upload_key.as_str(), - data_type: "splat_data", - existing_id: None, - content: compute_runner_api::runner::DomainArtifactContent::File(&splat_abs), - }) - .await - .with_context(|| format!("upload {} as {}", splat_abs.display(), upload_key))?; + warn!("refined manifest suffix missing; uploading as splat_data without timestamp"); + "refined_splat".to_string() + }; ctx.ctrl .progress(json!({ - "status": "finished", - "uploaded": upload_key, - "splat_path": splat_abs, + "pct": 90, + "stage": "upload", + "status": "starting", + "artifact": upload_key.as_str(), + })) + .await?; + + ctx.output + .put_domain_artifact(compute_runner_api::runner::DomainArtifactRequest { + rel_path: upload_key.as_str(), + name: upload_key.as_str(), + data_type: "splat_data", + existing_id: None, + content: compute_runner_api::runner::DomainArtifactContent::File(&splat_abs), + }) + .await + .with_context(|| format!("upload {} as {}", splat_abs.display(), upload_key))?; + + ctx.ctrl + .progress(json!({ + "pct": 95, + "stage": "upload", + "status": "completed", + "uploaded": upload_key.as_str(), + "splat_path": splat_abs.display().to_string(), + })) + .await?; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "upload", + "message": "output uploaded", + "uploaded": upload_key.as_str(), + })) + .await; + ctx.ctrl + .progress(json!({ + "progress": 100, + "stage": "complete", + "status": "succeeded", })) .await?; @@ -738,6 +811,35 @@ impl compute_runner_api::Runner for HelloRunner { } .await; + if let Err(err) = &task_result { + if is_task_cancelled_error(err) { + let _ = ctx + .ctrl + .progress(json!({ + "stage": "cancelled", + "status": "cancelled", + })) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "warn", + "stage": "cancelled", + "message": err.to_string(), + })) + .await; + } else { + let _ = ctx + .ctrl + .log_event(json!({ + "level": "error", + "stage": "runner", + "message": err.to_string(), + })) + .await; + } + } + if !tasks_cleanup_disabled() { // Best-effort cleanup of this task workspace to avoid disk growth. if let Err(err) = tokio::fs::remove_dir_all(&job_root).await { From 9916ba6254ef19cf4a4451a6f29f44d34757c9b5 Mon Sep 17 00:00:00 2001 From: tatiesmars Date: Fri, 27 Feb 2026 11:38:11 +0700 Subject: [PATCH 2/2] feat: add versioning at build time --- .github/workflows/tag.yml | 2 ++ Dockerfile | 6 ++++++ server/rust/.env.example | 1 - server/rust/Cargo.lock | 1 + server/rust/bin/Cargo.toml | 3 +++ server/rust/bin/build.rs | 30 ++++++++++++++++++++++++++++++ server/rust/bin/src/main.rs | 5 ++++- 7 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 server/rust/bin/build.rs diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml index 7dc31db..f8ca743 100644 --- a/.github/workflows/tag.yml +++ b/.github/workflows/tag.yml @@ -55,6 +55,8 @@ jobs: with: context: . file: Dockerfile + build-args: | + SPLATTER_VERSION=${{ github.ref_name }} push: true platforms: linux/amd64 cache-from: type=gha diff --git a/Dockerfile b/Dockerfile index a2e9fd9..0f6859d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,10 @@ ## ## Build the Rust compute-node binary (splatter-bin) ## +ARG SPLATTER_VERSION=0.0.0-local FROM --platform=$BUILDPLATFORM rust:1.89-bullseye AS rust-build +ARG SPLATTER_VERSION +ENV SPLATTER_VERSION="${SPLATTER_VERSION}" WORKDIR /app COPY server/rust/ server/rust/ RUN cargo build --release -p splatter-bin --manifest-path server/rust/Cargo.toml @@ -11,6 +14,9 @@ RUN cargo build --release -p splatter-bin --manifest-path server/rust/Cargo.toml ## FROM ghcr.io/nerfstudio-project/nerfstudio:latest +ARG SPLATTER_VERSION +ENV SPLATTER_SERVER_VERSION="${SPLATTER_VERSION}" + ARG USERNAME=splatter-server ARG USER_UID=1000 ARG USER_GID=$USER_UID diff --git a/server/rust/.env.example b/server/rust/.env.example index 5a6f97b..9dd8eca 100644 --- a/server/rust/.env.example +++ b/server/rust/.env.example @@ -8,5 +8,4 @@ SECP256K1_PRIVHEX=00000000000000000000000000000000000000000000000000000000000000 # Optional quality-of-life settings LOG_FORMAT=json -NODE_VERSION=v0.0.0 ENABLE_NOOP=false diff --git a/server/rust/Cargo.lock b/server/rust/Cargo.lock index 837f816..76cc043 100644 --- a/server/rust/Cargo.lock +++ b/server/rust/Cargo.lock @@ -1886,6 +1886,7 @@ dependencies = [ "axum", "dotenvy", "posemesh-compute-node", + "semver", "splatter-runner", "tokio", "tracing", diff --git a/server/rust/bin/Cargo.toml b/server/rust/bin/Cargo.toml index 253111c..5ae813b 100644 --- a/server/rust/bin/Cargo.toml +++ b/server/rust/bin/Cargo.toml @@ -13,3 +13,6 @@ tracing = "0.1.40" posemesh-compute-node = { version = "=0.3.1" } splatter-runner = { path = "../runner" } + +[build-dependencies] +semver = "1" diff --git a/server/rust/bin/build.rs b/server/rust/bin/build.rs new file mode 100644 index 0000000..5c4075d --- /dev/null +++ b/server/rust/bin/build.rs @@ -0,0 +1,30 @@ +use std::env; + +fn normalize_version(raw: &str) -> Option { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + + let normalized = trimmed + .strip_prefix('v') + .or_else(|| trimmed.strip_prefix('V')) + .unwrap_or(trimmed); + + if semver::Version::parse(normalized).is_ok() { + Some(normalized.to_string()) + } else { + None + } +} + +fn main() { + println!("cargo:rerun-if-env-changed=SPLATTER_VERSION"); + + let raw = env::var("SPLATTER_VERSION").unwrap_or_else(|_| "0.0.0-local".to_string()); + let normalized = normalize_version(&raw).unwrap_or_else(|| { + panic!("Invalid SPLATTER_VERSION={raw:?}. Expected semver like 0.3.0 or v0.3.0") + }); + + println!("cargo:rustc-env=SPLATTER_NODE_VERSION={normalized}"); +} diff --git a/server/rust/bin/src/main.rs b/server/rust/bin/src/main.rs index 5ff19e3..e65fcb5 100644 --- a/server/rust/bin/src/main.rs +++ b/server/rust/bin/src/main.rs @@ -6,6 +6,8 @@ use std::path::PathBuf; use tracing::info; use tracing::warn; +const SPLATTER_NODE_VERSION: &str = env!("SPLATTER_NODE_VERSION"); + fn tasks_cleanup_disabled() -> bool { match env::var("DISABLE_TASKS_CLEANUP") { Ok(v) => matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on"), @@ -55,7 +57,8 @@ async fn main() -> Result<()> { let _ = axum::serve(listener, app).await; }); - let cfg = posemesh_compute_node::config::NodeConfig::from_env()?; + let mut cfg = posemesh_compute_node::config::NodeConfig::from_env()?; + cfg.node_version = SPLATTER_NODE_VERSION.to_string(); let registry: RunnerRegistry = splatter_runner::registry(); let capabilities = registry.capabilities();