diff --git a/README.MD b/README.MD index 5e37854..66e8d4a 100644 --- a/README.MD +++ b/README.MD @@ -9,6 +9,7 @@ For more information on the reconstruction service, please refer to our [whitepa - [Minimum Requirements](docs/minimum-requirements.md) - [Deployment](docs/deployment.md) - [Development Guide](docs/development.md) +- [Compute Capability](docs/compute-capability.md) ## Acknowledgments diff --git a/docs/compute-capability.md b/docs/compute-capability.md new file mode 100644 index 0000000..95d26d4 --- /dev/null +++ b/docs/compute-capability.md @@ -0,0 +1,146 @@ +# Compute Capability + +Expected Input and Output Files from each compute capability + +
+ /reconstruction/local-refinement/v1 + + ### Input + ```bash + # Input Files + {job_root_path} + ├── datasets + │ └── {dataset} + │ ├── Frames.mp4 + │ ├── Accel.csv + │ ├── ARposes.csv + │ ├── CameraIntrinsics.csv + │ ├── FeaturePoints.ply + │ ├── gyro_accel.csv + │ ├── Gyro.csv + │ ├── PortalDetections.csv + │ └── Manifest.json + ``` + + ### Output + ```bash + # Output Files + {job_root_path} + ├── refined + │ └── local + │ └── {dataset} + │ ├── colmap_rec + │ │ ├── cameras.bin + │ │ ├── frames.bin + │ │ ├── images.bin + │ │ ├── points3D.bin + │ │ └── rigs.bin + │ ├── sfm + │ │ ├── cameras.bin # Uploaded as zip + │ │ ├── database.db + │ │ ├── feature.h5 + │ │ ├── frames.bin # Uploaded as zip + │ │ ├── global_features.h5 + │ │ ├── images.bin # Uploaded as zip + │ │ ├── matches.bin + │ │ ├── pairs-sfm.txt + │ │ ├── points3D.bin # Uploaded as zip + │ │ ├── portals.csv # Portal poses relative to colmap world coordinates. Uploaded as zip + │ │ └── rigs.bin # Uploaded as zip + │ └── local_logs + ``` + +
+ + +
+ /reconstruction/global-refinement/v1 + + ### Input + ```bash + # Input Files + {job_root_path} + ├── refined + │ └── local + │ └── {dataset} + │ └── reconstruction_refined_x1.zip # this is what is expected to downloaded from domain server + ``` + + ### Output + ```bash + # Output Files + {job_root_path} + ├── refined + │ └── global + │ ├── refined_sfm_combined + │ │ ├── cameras.bin + │ │ ├── frames.bin + │ │ ├── images.bin + │ │ ├── points3D.bin + │ │ └── rigs.bin + │ ├── topology + │ │ ├── topology_downsampled_0.111.glb # Uploaded as zip + │ │ ├── topology_downsampled_0.111.obj + │ │ ├── topology_downsampled_0.333.glb + │ │ ├── topology_downsampled_0.333.obj # Uploaded as zip + │ │ ├── topology.glb + │ │ └── topology.obj # Uploaded as zip + │ ├── refined_manifest.json + │ ├── RefinedPointCloud.ply + │ ├── RefinedPointCloud.ply.drc + │ ├── RefinedPointCloudFloat.ply + │ ├── RefinedPointCloudReduced.ply + │ └── global_logs + ``` + +
+ +
+ /reconstruction/update-refinement/v1 + + ### Input + ```bash + # Input Files + {job_root_path} + ├── refined + │ ├── local + │ │ └── {dataset} + │ │ └── reconstruction_refined_x1.zip # this is what is expected to downloaded from domain server + │ └── global + │ ├── refined_sfm_combined + │ │ ├── cameras.bin + │ │ ├── frames.bin + │ │ ├── images.bin + │ │ ├── points3D.bin + │ │ └── rigs.bin + │ └── refined_manifest.json + ``` + + ### Output + ```bash + # Output Files + {job_root_path} + ├── refined + │ └── update + │ ├── refined_sfm_combined + │ │ ├── cameras.bin + │ │ ├── frames.bin + │ │ ├── images.bin + │ │ ├── points3D.bin + │ │ └── rigs.bin + │ ├── topology + │ │ ├── topology_downsampled_0.111.glb # Uploaded as zip + │ │ ├── topology_downsampled_0.111.obj + │ │ ├── topology_downsampled_0.333.glb + │ │ ├── topology_downsampled_0.333.obj # Uploaded as zip + │ │ ├── topology.glb + │ │ └── topology.obj # Uploaded as zip + │ ├── refined_manifest.json + │ ├── RefinedPointCloud.ply + │ ├── RefinedPointCloud.ply.drc + │ ├── RefinedPointCloudFloat.ply + │ ├── RefinedPointCloudReduced.ply + │ └── update_logs + ``` + +
\ No newline at end of file diff --git a/global_main.py b/global_main.py index 98dbc3a..d936940 100644 --- a/global_main.py +++ b/global_main.py @@ -3,33 +3,7 @@ import os from utils.data_utils import get_data_paths, setup_logger from utils.dataset_utils import stitching_helper -from utils.point_cloud_utils import filter_ply, downsample_ply_to_max_size, reduce_decimals_ply, draco_compress_ply - -def post_process_ply(output_path, logger): - ply_path = output_path / "RefinedPointCloud.ply" - filter_ply(ply_path, ply_path, convert_opencv_to_opengl=True, logger=logger) - - # Ensure ply fits in domain data - logger.info("Downsampling ply if needed to be under 20 MB file size...") - ply_path_reduced = output_path / "RefinedPointCloudReduced.ply" - try: - downsample_ply_to_max_size(ply_path, ply_path_reduced, 20000000, logger=logger) - except Exception as e: - logger.error(f"Failed to downsample PLY file: {str(e)}") - - logger.info("Draco compressing the PLY file...") - try: - # Must be float to do draco compression, but open3d outputs double precision. - ply_path_float = output_path / "RefinedPointCloudFloat.ply" - try: - reduce_decimals_ply(ply_path, ply_path_float, 3, logger=logger) - except Exception as e: - logger.error(f"Failed to reduce decimals in PLY file: {str(e)}") - - draco_compress_ply(ply_path_float, output_path / "RefinedPointCloud.ply.drc", logger=logger) - except Exception as e: - logger.error(f"Failed to draco compress the PLY file: {str(e)}") - +from utils.point_cloud_utils import post_process_ply def main(args): # Create and configure logger diff --git a/server/rust/Cargo.lock b/server/rust/Cargo.lock index 7d6799a..0c5f728 100644 --- a/server/rust/Cargo.lock +++ b/server/rust/Cargo.lock @@ -194,6 +194,7 @@ dependencies = [ "posemesh-compute-node-runner-api", "runner-reconstruction-global", "runner-reconstruction-local", + "runner-reconstruction-update", "semver", "tokio", ] @@ -1699,6 +1700,25 @@ dependencies = [ "zip", ] +[[package]] +name = "runner-reconstruction-update" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "posemesh-compute-node-runner-api", + "posemesh-domain-http", + "serde", + "serde_json", + "tempfile", + "tokio", + "tokio-util", + "tracing", + "uuid", + "zip", +] + [[package]] name = "rustc-hash" version = "2.1.1" diff --git a/server/rust/Cargo.toml b/server/rust/Cargo.toml index e048dd6..e841fad 100644 --- a/server/rust/Cargo.toml +++ b/server/rust/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "runner-reconstruction-update", "runner-reconstruction-global", "runner-reconstruction-local", "bin", diff --git a/server/rust/bin/Cargo.toml b/server/rust/bin/Cargo.toml index 66050ca..e791239 100644 --- a/server/rust/bin/Cargo.toml +++ b/server/rust/bin/Cargo.toml @@ -9,6 +9,7 @@ compute-runner-api = { package = "posemesh-compute-node-runner-api", version = " posemesh-compute-node = { version = "0.3.1" } runner-reconstruction-local = { path = "../runner-reconstruction-local" } runner-reconstruction-global = { path = "../runner-reconstruction-global" } +runner-reconstruction-update = { path = "../runner-reconstruction-update" } anyhow = "1" tokio = { version = "1", features = ["rt-multi-thread", "macros", "net"] } diff --git a/server/rust/bin/src/main.rs b/server/rust/bin/src/main.rs index 918985a..0bea8fa 100644 --- a/server/rust/bin/src/main.rs +++ b/server/rust/bin/src/main.rs @@ -15,6 +15,9 @@ async fn main() -> anyhow::Result<()> { for runner in runner_reconstruction_global::RunnerReconstructionGlobal::for_all_capabilities() { reg = reg.register(runner); } + for runner in runner_reconstruction_update::RunnerReconstructionUpdate::for_all_capabilities() { + reg = reg.register(runner); + } let capabilities = reg.capabilities(); posemesh_compute_node::dds::register::spawn_registration_if_configured(&cfg, &capabilities)?; diff --git a/server/rust/bin/tests/bin_wiring.rs b/server/rust/bin/tests/bin_wiring.rs index 42601b4..8055e76 100644 --- a/server/rust/bin/tests/bin_wiring.rs +++ b/server/rust/bin/tests/bin_wiring.rs @@ -38,6 +38,9 @@ async fn registry_contains_scaffold_runners_and_run_node_ok() { for runner in runner_reconstruction_global::RunnerReconstructionGlobal::for_all_capabilities() { reg = reg.register(runner); } + for runner in runner_reconstruction_update::RunnerReconstructionUpdate::for_all_capabilities() { + reg = reg.register(runner); + } for cap in runner_reconstruction_local::CAPABILITIES { assert!(reg.get(cap).is_some()); @@ -45,6 +48,9 @@ async fn registry_contains_scaffold_runners_and_run_node_ok() { for cap in runner_reconstruction_global::CAPABILITIES { assert!(reg.get(cap).is_some()); } + for cap in runner_reconstruction_update::CAPABILITIES { + assert!(reg.get(cap).is_some()); + } // Engine now waits for shutdown; ensure it stays pending. let result = timeout( diff --git a/server/rust/runner-reconstruction-update/Cargo.toml b/server/rust/runner-reconstruction-update/Cargo.toml new file mode 100644 index 0000000..9cbc7a1 --- /dev/null +++ b/server/rust/runner-reconstruction-update/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "runner-reconstruction-update" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +path = "src/lib.rs" + +[dependencies] +compute-runner-api = { package = "posemesh-compute-node-runner-api", version = "0.1.2" } +anyhow = "1" +async-trait = "0.1" +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tempfile = "3" +tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "io-util", "sync", "time", "fs"] } +tokio-util = { version = "0.7", features = ["rt"] } +tracing = "0.1" +posemesh-domain-http = "1.5.1" +uuid = { version = "1", features = ["v4"] } +zip = { version = "0.6", default-features = false, features = ["deflate"] } + +[package.metadata] +description = "Scaffold runner for update refinement, no HTTP wiring." diff --git a/server/rust/runner-reconstruction-update/src/input.rs b/server/rust/runner-reconstruction-update/src/input.rs new file mode 100644 index 0000000..a1dbe5c --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/input.rs @@ -0,0 +1,445 @@ +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Error, Result}; +use compute_runner_api::TaskCtx; +use posemesh_domain_http::domain_data::{download_by_id, download_metadata_v1, DownloadQuery}; +use tokio::fs; +use tracing::info; +use uuid::Uuid; + +use crate::strategy::unzip_refined_scan; +use crate::workspace::Workspace; + +const REQUIRED_SFM_FILES: &[&str] = &["images.bin", "cameras.bin", "points3D.bin", "portals.csv"]; +const REQUIRED_GLOBAL_SFM_FILES: &[&str] = &[ + "images.bin", + "cameras.bin", + "points3D.bin", + "frames.bin", + "rigs.bin", +]; + +/// Data captured for each materialized refined scan. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct MaterializedRefinedScan { + pub name: String, + pub data_id: String, + pub scan_name: String, + pub dataset_dir: PathBuf, + pub zip_path: PathBuf, + pub refined_sfm_dir: PathBuf, +} + +/// Materialize each refined scan name into the workspace, downloading from Domain, +/// keeping the zip under datasets, and extracting into refined/local//sfm. +pub async fn materialize_refined_scans( + ctx: &TaskCtx<'_>, + workspace: &Workspace, +) -> Result> { + let domain_url = ctx + .lease + .domain_server_url + .as_ref() + .map(|u| u.to_string()) + .unwrap_or_default(); + let domain_url = domain_url.trim().trim_end_matches('/').to_string(); + + let domain_id = ctx + .lease + .domain_id + .map(|id| id.to_string()) + .unwrap_or_default(); + + if domain_url.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_server_url in lease" + )); + } + if domain_id.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_id in lease" + )); + } + + let client_id = get_client_id(); + let token = ctx.access_token.get(); + + let mut scans = Vec::new(); + for name in &ctx.lease.task.inputs_cids { + if is_url(name) { + return Err(anyhow!( + "update refinement expects refined scan names, got URL: {}", + name + )); + } + + let meta = match resolve_by_name(&domain_url, &client_id, &token, &domain_id, name) + .await + .with_context(|| format!("resolve refined scan name {}", name)) + { + Ok(meta) => meta, + Err(_) => continue, + }; + + info!( + target: "runner_reconstruction_update", + name = %name, + data_id = %meta.id, + "resolved refined scan name to domain data ID" + ); + + let bytes = download_by_id(&domain_url, &client_id, &token, &domain_id, &meta.id) + .await + .map_err(|e| anyhow!("failed to download refined scan '{}': {}", name, e))?; + + let scan_name = strip_refined_prefix(name); + let dataset_dir = workspace.datasets().join(&scan_name); + fs::create_dir_all(&dataset_dir) + .await + .with_context(|| format!("create dataset directory {}", dataset_dir.display()))?; + + let zip_path = dataset_dir.join("RefinedScan.zip"); + fs::write(&zip_path, &bytes) + .await + .with_context(|| format!("write refined scan zip {}", zip_path.display()))?; + + let refined_sfm_dir = workspace.refined_local().join(&scan_name).join("sfm"); + let _ = unzip_refined_scan(bytes, &refined_sfm_dir) + .await + .with_context(|| { + format!( + "unzip refined scan {} into {}", + zip_path.display(), + refined_sfm_dir.display() + ) + })?; + + if !has_required_sfm_files(&refined_sfm_dir) { + return Err(anyhow!( + "refined scan '{}' missing required sfm files under {}", + scan_name, + refined_sfm_dir.display() + )); + } + + scans.push(MaterializedRefinedScan { + name: name.to_string(), + data_id: meta.id, + scan_name, + dataset_dir, + zip_path, + refined_sfm_dir, + }); + } + + Ok(scans) +} + +#[allow(dead_code)] +pub struct MaterializedRefinedGlobal { + pub name: String, + pub data_id: String, + pub scan_name: String, + pub dataset_dir: PathBuf, + pub refined_sfm_dir: PathBuf, +} + +pub async fn materialize_global_colmap( + ctx: &TaskCtx<'_>, + workspace: &Workspace, +) -> Result { + let domain_url = ctx + .lease + .domain_server_url + .as_ref() + .map(|u| u.to_string()) + .unwrap_or_default(); + let domain_url = domain_url.trim().trim_end_matches('/').to_string(); + + let domain_id = ctx + .lease + .domain_id + .map(|id| id.to_string()) + .unwrap_or_default(); + + if domain_url.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_server_url in lease" + )); + } + if domain_id.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_id in lease" + )); + } + + let client_id = get_client_id(); + let token = ctx.access_token.get(); + + let expected_colmap = [ + ("colmap_images_bin", "images.bin"), + ("colmap_cameras_bin", "cameras.bin"), + ("colmap_points3d_bin", "points3D.bin"), + ("colmap_frames_bin", "frames.bin"), + ("colmap_rigs_bin", "rigs.bin"), + ]; + + let mut global_refinement_name = ""; + + fs::create_dir_all(&workspace.refined_global().join("refined_sfm_combined")) + .await + .with_context(|| { + format!( + "create dataset directory {}", + &workspace + .refined_global() + .join("refined_sfm_combined") + .display() + ) + })?; + + for (display_name, file_name) in &expected_colmap { + for name in &ctx.lease.task.inputs_cids { + if is_url(name) { + return Err(anyhow!( + "update refinement expects refined scan names, got URL: {}", + name + )); + } + + let meta = match resolve_by_name_and_type( + &domain_url, + &client_id, + &token, + &domain_id, + name, + display_name, + ) + .await + .with_context(|| format!("resolve colmap file name {}", name)) + { + Ok(meta) => meta, + Err(_) => continue, + }; + + info!( + target: "runner_reconstruction_update", + name = %name, + data_id = %meta.id, + "resolved refined scan name to domain data ID" + ); + + let bytes = download_by_id(&domain_url, &client_id, &token, &domain_id, &meta.id) + .await + .map_err(|e| anyhow!("failed to download colmap file '{}': {}", name, e))?; + + if global_refinement_name.is_empty() { + let prefix = format!("{}{}", display_name, "_"); + global_refinement_name = name.as_str().strip_prefix(&prefix).unwrap_or(name); + } + + let file_path = &workspace + .root() + .join("refined") + .join("global") + .join("refined_sfm_combined") + .join(file_name); + fs::write(&file_path, &bytes) + .await + .with_context(|| format!("write file {}", file_path.display()))?; + } + } + + let sfm_dir = workspace + .root() + .join("refined") + .join("global") + .join("refined_sfm_combined"); + + if !has_required_global_sfm_files(&sfm_dir) { + return Err(anyhow!( + "global colmap files' missing required sfm files under {}", + workspace + .root() + .join("refined") + .join("global") + .join("refined_sfm_combined") + .display() + )); + } + + let result = Some(MaterializedRefinedGlobal { + name: global_refinement_name.to_string(), + data_id: "".to_string(), + scan_name: global_refinement_name.to_string(), + dataset_dir: PathBuf::new(), + refined_sfm_dir: sfm_dir, + }); + + result.ok_or_else(|| anyhow!("could not materialize any global colmap refined scan")) +} + +pub async fn materialize_refine_manifest( + ctx: &TaskCtx<'_>, + workspace: &Workspace, +) -> Result<(), Error> { + let domain_url = ctx + .lease + .domain_server_url + .as_ref() + .map(|u| u.to_string()) + .unwrap_or_default(); + let domain_url = domain_url.trim().trim_end_matches('/').to_string(); + + let domain_id = ctx + .lease + .domain_id + .map(|id| id.to_string()) + .unwrap_or_default(); + + if domain_url.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_server_url in lease" + )); + } + if domain_id.is_empty() { + return Err(anyhow!( + "cannot resolve refined scan by name: no domain_id in lease" + )); + } + + let client_id = get_client_id(); + let token = ctx.access_token.get(); + + for name in &ctx.lease.task.inputs_cids { + if is_url(name) { + return Err(anyhow!( + "update refinement expects refined scan names, got URL: {}", + name + )); + } + + let meta = match resolve_by_name_and_type( + &domain_url, + &client_id, + &token, + &domain_id, + name, + "refined_manifest_json", + ) + .await + .with_context(|| format!("resolve refined_manifest name {}", name)) + { + Ok(meta) => meta, + Err(_) => continue, + }; + + info!( + target: "runner_reconstruction_update", + name = %name, + data_id = %meta.id, + "resolved refined_manifest to domain data ID" + ); + + let bytes = download_by_id(&domain_url, &client_id, &token, &domain_id, &meta.id) + .await + .map_err(|e| anyhow!("failed to download refined_manifest '{}': {}", name, e))?; + + let file_path = workspace + .root() + .join("refined") + .join("global") + .join("refined_manifest.json"); + fs::write(&file_path, &bytes) + .await + .with_context(|| format!("write file {}", file_path.display()))?; + } + Ok(()) +} + +async fn resolve_by_name( + domain_url: &str, + client_id: &str, + token: &str, + domain_id: &str, + name: &str, +) -> Result { + let metas = download_metadata_v1( + domain_url, + client_id, + token, + domain_id, + &DownloadQuery { + ids: Vec::new(), + name: Some(name.to_string()), + data_type: Some("refined_scan_zip".to_string()), + }, + ) + .await + .map_err(|e| anyhow!("failed to query Domain for artifact '{}': {}", name, e))?; + + metas + .into_iter() + .find(|m| m.name == name && m.data_type == "refined_scan_zip") + .ok_or_else(|| anyhow!("artifact '{}' not found in domain {}", name, domain_id)) +} + +async fn resolve_by_name_and_type( + domain_url: &str, + client_id: &str, + token: &str, + domain_id: &str, + name: &str, + data_type: &str, +) -> Result { + let metas = download_metadata_v1( + domain_url, + client_id, + token, + domain_id, + &DownloadQuery { + ids: Vec::new(), + name: Some(name.to_string()), + data_type: Some(data_type.to_string()), + }, + ) + .await + .map_err(|e| anyhow!("failed to query Domain for artifact '{}': {}", name, e))?; + + metas + .into_iter() + .find(|m| m.name == name && m.data_type == data_type) + .ok_or_else(|| anyhow!("artifact '{}' not found in domain {}", name, domain_id)) +} + +fn strip_refined_prefix(name: &str) -> String { + name.strip_prefix("refined_scan_") + .unwrap_or(name) + .to_string() +} + +fn has_required_sfm_files(sfm: &Path) -> bool { + REQUIRED_SFM_FILES + .iter() + .all(|name| sfm.join(name).exists()) +} + +fn has_required_global_sfm_files(sfm: &Path) -> bool { + REQUIRED_GLOBAL_SFM_FILES + .iter() + .all(|name| sfm.join(name).exists()) +} + +fn is_url(cid: &str) -> bool { + cid.starts_with("http://") || cid.starts_with("https://") +} + +fn get_client_id() -> String { + if let Ok(id) = std::env::var("CLIENT_ID") { + if !id.trim().is_empty() { + return id; + } + } + format!("posemesh-compute-node/{}", Uuid::new_v4()) +} diff --git a/server/rust/runner-reconstruction-update/src/lib.rs b/server/rust/runner-reconstruction-update/src/lib.rs new file mode 100644 index 0000000..e121d8b --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/lib.rs @@ -0,0 +1,549 @@ +//! runner-reconstruction-update: runner for update refinement. + +use std::{ + env, + path::{Path, PathBuf}, + pin::Pin, +}; + +use anyhow::{Context, Result}; +use chrono::Utc; +use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest}; +use compute_runner_api::{ArtifactSink, Runner, TaskCtx}; +use serde::Serialize; +use serde_json::json; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +mod input; +mod output; +mod python; +mod strategy; +mod workspace; + +/// Public crate identifier used by workspace smoke tests. +pub const CRATE_NAME: &str = "runner-reconstruction-update"; + +/// Capability handled by this runner (update refinement). +pub const CAPABILITY: &str = "/reconstruction/update-refinement/v1"; + +/// Convenience slice for wiring all supported capabilities. +pub const CAPABILITIES: [&str; 1] = [CAPABILITY]; + +/// Scaffold runner for update refinement. +pub struct RunnerReconstructionUpdate { + config: RunnerConfig, + capability: &'static str, +} + +impl RunnerReconstructionUpdate { + /// Create a new update refinement runner. + pub fn new() -> Self { + Self::with_capability(CAPABILITY, load_config()) + } + + pub fn with_capability(capability: &'static str, config: RunnerConfig) -> Self { + Self { config, capability } + } + + pub fn for_all_capabilities() -> Vec { + let config = load_config(); + CAPABILITIES + .iter() + .map(|cap| Self::with_capability(cap, config.clone())) + .collect() + } + + /// Access the runner configuration. + pub fn config(&self) -> &RunnerConfig { + &self.config + } + + /// Create a workspace for the given domain/job identifiers using the runner configuration. + pub fn create_workspace( + &self, + domain_id: &str, + job_id: Option<&str>, + task_id: &str, + ) -> Result { + workspace::Workspace::create( + self.config.workspace_root.as_deref(), + domain_id, + job_id, + task_id, + ) + } +} + +impl Default for RunnerReconstructionUpdate { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl Runner for RunnerReconstructionUpdate { + fn capability(&self) -> &'static str { + self.capability + } + + async fn run(&self, ctx: TaskCtx<'_>) -> anyhow::Result<()> { + let lease = ctx.lease; + let domain_id = lease + .domain_id + .map(|id| id.to_string()) + .unwrap_or_else(|| "domain".into()); + let job_id = lease.task.job_id.map(|id| id.to_string()); + let task_id = lease.task.id.to_string(); + + if ctx.ctrl.is_cancelled().await { + anyhow::bail!("task cancelled before execution"); + } + + let workspace = self.create_workspace(&domain_id, job_id.as_deref(), &task_id)?; + struct WorkspaceCleanup(std::path::PathBuf); + impl Drop for WorkspaceCleanup { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + let _workspace_cleanup = WorkspaceCleanup(workspace.root().to_path_buf()); + + let job_ctx = JobContext::from_lease(lease)?; + job_ctx + .persist_metadata(workspace.job_metadata_path()) + .await?; + + info!( + capability = self.capability, + domain_id = %job_ctx.metadata.domain_id, + job_id = %job_ctx.metadata.name, + task_id = %task_id, + workspace = %workspace.root().display(), + configured_workspace_root = %self + .config + .workspace_root + .as_ref() + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "".into()), + "workspace prepared" + ); + let _ = ctx + .ctrl + .progress(json!({"pct": 5, "stage": "workspace", "status": "prepared"})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "workspace", + "message": "workspace prepared", + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + let materialized = input::materialize_refined_scans(&ctx, &workspace).await?; + if ctx.ctrl.is_cancelled().await { + anyhow::bail!("task cancelled during input materialization"); + } + + let scan_names: Vec = materialized + .iter() + .map(|scan| scan.scan_name.clone()) + .collect(); + let _ = ctx + .ctrl + .progress(json!({"pct": 15, "stage": "inputs", "scans": scan_names.len()})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "inputs", + "message": "refined scans materialized", + "count": scan_names.len(), + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + let _ = input::materialize_global_colmap(&ctx, &workspace).await?; + if ctx.ctrl.is_cancelled().await { + anyhow::bail!("task cancelled during input global colmap materialization"); + } + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "inputs", + "message": "refined global colmap materialized", + "count": scan_names.len(), + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + let _ = input::materialize_refine_manifest(&ctx, &workspace).await?; + if ctx.ctrl.is_cancelled().await { + anyhow::bail!("task cancelled during input global colmap materialization"); + } + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "inputs", + "message": "refined global manifest materialized", + "count": scan_names.len(), + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + let python_args = build_python_args(&self.config, &job_ctx, &workspace); + let _ = ctx + .ctrl + .progress(json!({"pct": 20, "stage": "python", "status": "starting"})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "python", + "message": "python pipeline starting", + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + let cancel_token = CancellationToken::new(); + let python_bin = self.config.python_bin.clone(); + // let python_script = self.config.python_script.clone(); + let python_script = self.config.python_script.clone(); + let python_args_clone = python_args.clone(); + let cancel = cancel_token.clone(); + let job_root = workspace.root().to_path_buf(); + let mut python_future: Pin< + Box> + Send>, + > = Box::pin(async move { + python::run_script( + &python_bin, + &python_script, + &python_args_clone, + &cancel, + Some(&job_root), + ) + .await + }); + + let python_result = loop { + tokio::select! { + res = &mut python_future => break res, + cancelled = ctx.ctrl.is_cancelled() => { + if cancelled { + cancel_token.cancel(); + } + } + } + }; + match &python_result { + Ok(()) => { + let _ = ctx + .ctrl + .progress(json!({"pct": 85, "stage": "python", "status": "completed"})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "python", + "message": "python pipeline completed", + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + } + Err(err) => { + let _ = ctx + .ctrl + .progress(json!({"pct": 20, "stage": "python", "status": "failed"})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "error", + "stage": "python", + "message": err.to_string(), + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + } + } + if let Err(err) = upload_task_log(ctx.output, &workspace, &task_id).await { + warn!(error = %err, task_id = %task_id, "failed to upload task log"); + } + python_result?; + + if ctx.ctrl.is_cancelled().await { + anyhow::bail!("task cancelled before upload"); + } + + let name_suffix = job_ctx.domain_data_name_suffix(); + output::upload_final_outputs(&workspace, ctx.output, &name_suffix, None).await?; + let _ = ctx + .ctrl + .progress(json!({"pct": 95, "stage": "upload"})) + .await; + let _ = ctx + .ctrl + .log_event(json!({ + "level": "info", + "stage": "upload", + "message": "outputs uploaded", + "task_id": task_id, + "job_id": job_ctx.metadata.name, + "timestamp": Utc::now().to_rfc3339(), + })) + .await; + + ctx.ctrl + .progress(json!({"progress": 100, "status": "succeeded"})) + .await?; + + Ok(()) + } +} + +async fn upload_task_log( + sink: &dyn ArtifactSink, + workspace: &workspace::Workspace, + task_id: &str, +) -> Result<()> { + let log_path = workspace.root().join("log.txt"); + if !log_path.exists() { + warn!(task_id = %task_id, path = %log_path.display(), "task log missing"); + return Ok(()); + } + + let rel_path = format!("logs/{task_id}.txt"); + let name = format!("task_log_{task_id}"); + sink.put_domain_artifact(DomainArtifactRequest { + rel_path: &rel_path, + name: &name, + data_type: "task_log_txt", + existing_id: None, + content: DomainArtifactContent::File(&log_path), + }) + .await + .with_context(|| format!("upload task log {}", log_path.display()))?; + Ok(()) +} + +#[derive(Serialize)] +struct JobMetadataRecord { + id: String, + name: String, + domain_id: String, + processing_type: String, + created_at: String, + domain_server_url: String, + reconstruction_server_url: Option, + data_ids: Vec, +} + +struct JobContext { + metadata: JobMetadataRecord, +} + +impl JobContext { + fn from_lease(lease: &compute_runner_api::LeaseEnvelope) -> Result { + let job_id = lease + .task + .job_id + .map(|id| id.to_string()) + .unwrap_or_else(|| lease.task.id.to_string()); + let job_name = format!("job_{}", job_id); + + let domain_server_url = lease + .domain_server_url + .as_ref() + .map(|url| url.to_string()) + .unwrap_or_default(); + let domain_server_url = domain_server_url.trim_end_matches('/').to_string(); + + let reconstruction_server_url = None; + + let data_ids: Vec = lease + .task + .inputs_cids + .iter() + .map(|cid| extract_last_segment(cid)) + .collect(); + + let metadata = JobMetadataRecord { + id: job_id.clone(), + name: job_name, + domain_id: lease.domain_id.map(|id| id.to_string()).unwrap_or_default(), + processing_type: "update_refinement".to_string(), + created_at: Utc::now().to_rfc3339(), + domain_server_url, + reconstruction_server_url, + data_ids, + }; + + Ok(Self { metadata }) + } + + fn domain_data_name_suffix(&self) -> String { + if let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(&self.metadata.created_at) { + return parsed.format("%Y-%m-%d_%H-%M-%S").to_string(); + } + if !self.metadata.name.trim().is_empty() { + return self.metadata.name.clone(); + } + "update_job".to_string() + } + + async fn persist_metadata(&self, path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .await + .with_context(|| format!("create metadata directory {}", parent.display()))?; + } + let bytes = serde_json::to_vec_pretty(&self.metadata)?; + fs::write(path, bytes) + .await + .with_context(|| format!("write job metadata to {}", path.display()))?; + Ok(()) + } +} + +/// Configuration for the update reconstruction runner. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RunnerConfig { + /// Optional base directory for job workspaces. + pub workspace_root: Option, + /// Python executable used to launch the refinement pipeline. + pub python_bin: PathBuf, + /// Python entrypoint script. + pub python_script: PathBuf, + /// Additional arguments passed to the python script. + pub python_args: Vec, + /// Number of CPU workers granted to the pipeline. + pub cpu_workers: usize, +} + +impl RunnerConfig { + pub const ENV_WORKSPACE_ROOT: &'static str = "UPDATE_RUNNER_WORKSPACE_ROOT"; + pub const ENV_PYTHON_BIN: &'static str = "UPDATE_RUNNER_PYTHON_BIN"; + pub const ENV_PYTHON_SCRIPT: &'static str = "UPDATE_RUNNER_PYTHON_SCRIPT"; + pub const ENV_PYTHON_ARGS: &'static str = "UPDATE_RUNNER_PYTHON_ARGS"; + pub const ENV_CPU_WORKERS: &'static str = "UPDATE_RUNNER_CPU_WORKERS"; + + pub const DEFAULT_PYTHON_BIN: &'static str = "python3"; + pub const DEFAULT_PYTHON_SCRIPT: &'static str = "update_main.py"; + pub const DEFAULT_CPU_WORKERS: usize = 2; + + /// Build a config from environment variables. + pub fn from_env() -> Result { + let workspace_root = match env::var(Self::ENV_WORKSPACE_ROOT) { + Ok(v) if !v.trim().is_empty() => Some(PathBuf::from(v)), + _ => None, + }; + + let python_bin = env::var(Self::ENV_PYTHON_BIN) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(Self::DEFAULT_PYTHON_BIN)); + + let python_script = env::var(Self::ENV_PYTHON_SCRIPT) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(Self::DEFAULT_PYTHON_SCRIPT)); + + let python_args = env::var(Self::ENV_PYTHON_ARGS) + .map(|raw| raw.split_whitespace().map(|s| s.to_string()).collect()) + .unwrap_or_else(|_| Vec::new()); + + let cpu_workers = match env::var(Self::ENV_CPU_WORKERS) { + Ok(val) if !val.trim().is_empty() => val + .parse::() + .with_context(|| format!("invalid {} value", Self::ENV_CPU_WORKERS))?, + _ => Self::DEFAULT_CPU_WORKERS, + }; + + Ok(Self { + workspace_root, + python_bin, + python_script, + python_args, + cpu_workers, + }) + } +} + +impl Default for RunnerConfig { + fn default() -> Self { + Self { + workspace_root: None, + python_bin: PathBuf::from(Self::DEFAULT_PYTHON_BIN), + python_script: PathBuf::from(Self::DEFAULT_PYTHON_SCRIPT), + python_args: Vec::new(), + cpu_workers: Self::DEFAULT_CPU_WORKERS, + } + } +} + +fn load_config() -> RunnerConfig { + RunnerConfig::from_env().unwrap_or_else(|err| { + warn!(error = %err, "failed to read update runner config; using defaults"); + RunnerConfig::default() + }) +} + +fn build_python_args( + config: &RunnerConfig, + job_ctx: &JobContext, + workspace: &workspace::Workspace, +) -> Vec { + let mut args = config.python_args.clone(); + args.push("--data_path".to_string()); + args.push( + workspace + .root() + .join("refined") + .join("local") + .display() + .to_string(), + ); + args.push("--output_path".to_string()); + args.push( + workspace + .root() + .join("refined") + .join("update") + .display() + .to_string(), + ); + args.push("--domain_id".to_string()); + args.push(job_ctx.metadata.domain_id.clone()); + args.push("--job_id".to_string()); + args.push(job_ctx.metadata.name.clone()); + args +} + +/// Extract the last non-empty segment from a CID/URL-like string. +fn extract_last_segment(input: &str) -> String { + let trimmed = input.trim_end_matches('/'); + match trimmed.rsplit('/').next() { + Some(seg) if !seg.is_empty() => seg.to_string(), + _ => input.to_string(), + } +} diff --git a/server/rust/runner-reconstruction-update/src/output.rs b/server/rust/runner-reconstruction-update/src/output.rs new file mode 100644 index 0000000..7fe20cd --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/output.rs @@ -0,0 +1,218 @@ +use std::collections::HashMap; +use std::path::Path; + +use anyhow::{Context, Result}; +use compute_runner_api::runner::{DomainArtifactContent, DomainArtifactRequest}; +use compute_runner_api::ArtifactSink; +use tracing::{debug, info}; + +use crate::workspace::Workspace; + +#[derive(Debug, Clone)] +pub struct OutputSpec { + pub relative_path: &'static str, + pub display_name: &'static str, + pub mandatory: bool, +} + +const UPDATE_OUTPUTS: &[OutputSpec] = &[ + OutputSpec { + relative_path: "refined/update/refined_manifest.json", + display_name: "refined_manifest", + mandatory: true, + }, + OutputSpec { + relative_path: "refined/update/RefinedPointCloudReduced.ply", + display_name: "refined_pointcloud", + mandatory: true, + }, + OutputSpec { + relative_path: "refined/update/RefinedPointCloud.ply.drc", + display_name: "refined_pointcloud_full_draco", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology_downsampled_0.111.obj", + display_name: "topologymesh_v1_lowpoly_obj", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology_downsampled_0.111.glb", + display_name: "topologymesh_v1_lowpoly_glb", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology_downsampled_0.333.obj", + display_name: "topologymesh_v1_midpoly_obj", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology_downsampled_0.333.glb", + display_name: "topologymesh_v1_midpoly_glb", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology.obj", + display_name: "topologymesh_v1_highpoly_obj", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/topology/topology.glb", + display_name: "topologymesh_v1_highpoly_glb", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/refined_sfm_combined/cameras.bin", + display_name: "colmap_cameras_bin", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/refined_sfm_combined/frames.bin", + display_name: "colmap_frames_bin", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/refined_sfm_combined/images.bin", + display_name: "colmap_images_bin", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/refined_sfm_combined/points3D.bin", + display_name: "colmap_points3d_bin", + mandatory: false, + }, + OutputSpec { + relative_path: "refined/update/refined_sfm_combined/rigs.bin", + display_name: "colmap_rigs_bin", + mandatory: false, + }, +]; + +/// Upload the final global outputs expected by downstream systems. +/// Returns a map from display name to the artifact path used for upload. +pub async fn upload_final_outputs( + workspace: &Workspace, + sink: &dyn ArtifactSink, + name_suffix: &str, + override_manifest_id: Option<&str>, +) -> Result> { + let mut uploaded = HashMap::new(); + + for spec in UPDATE_OUTPUTS { + let path = workspace.root().join(spec.relative_path); + if !path.exists() { + if spec.mandatory { + return Err(anyhow::anyhow!( + "missing mandatory output '{}' at {}", + spec.display_name, + path.display() + )); + } + debug!( + display = spec.display_name, + rel_path = spec.relative_path, + abs_path = %path.display(), + "optional output missing; skipping upload" + ); + continue; + } + let size = std::fs::metadata(&path) + .map(|m| m.len()) + .unwrap_or_default(); + info!( + display = spec.display_name, + rel_path = spec.relative_path, + size_bytes = size, + "uploading output" + ); + let name = format!("{}_{}", spec.display_name, name_suffix); + let existing_id = if spec.display_name == "refined_manifest" { + override_manifest_id + } else { + None + }; + sink.put_domain_artifact(DomainArtifactRequest { + rel_path: spec.relative_path, + name: &name, + data_type: data_type_for_display(spec.display_name), + existing_id, + content: DomainArtifactContent::File(&path), + }) + .await + .with_context(|| format!("upload output {}", spec.display_name))?; + uploaded.insert( + spec.display_name.to_string(), + spec.relative_path.to_string(), + ); + } + + upload_json_if_exists(sink, "outputs_index.json", workspace.root(), name_suffix).await?; + upload_json_if_exists(sink, "result.json", workspace.root(), name_suffix).await?; + upload_json_if_exists( + sink, + "scan_data_summary.json", + workspace.root(), + name_suffix, + ) + .await?; + + Ok(uploaded) +} + +async fn upload_json_if_exists( + sink: &dyn ArtifactSink, + file_name: &str, + root: &Path, + name_suffix: &str, +) -> Result<()> { + let path = root.join(file_name); + if !path.exists() { + debug!(file = file_name, path = %path.display(), "json output missing; skipping upload"); + return Ok(()); + } + let bytes = tokio::fs::read(&path) + .await + .with_context(|| format!("read output {}", path.display()))?; + info!( + file = file_name, + size_bytes = bytes.len(), + "uploading json output" + ); + if let Some((name, data_type)) = crate::strategy::describe_known_output(file_name, name_suffix) + { + sink.put_domain_artifact(DomainArtifactRequest { + rel_path: file_name, + name: &name, + data_type: &data_type, + existing_id: None, + content: DomainArtifactContent::Bytes(&bytes), + }) + .await + .with_context(|| format!("upload output {}", file_name))?; + } else { + sink.put_bytes(file_name, &bytes) + .await + .with_context(|| format!("upload output {}", file_name))?; + } + Ok(()) +} + +fn data_type_for_display(display: &str) -> &str { + match display { + "refined_manifest" => "refined_manifest_json", + "refined_pointcloud" => "refined_pointcloud_ply", + "refined_pointcloud_full_draco" => "refined_pointcloud_ply_draco", + "topologymesh_v1_lowpoly_obj" => "obj", + "topologymesh_v1_lowpoly_glb" => "glb", + "topologymesh_v1_midpoly_obj" => "obj", + "topologymesh_v1_midpoly_glb" => "glb", + "topologymesh_v1_highpoly_obj" => "obj", + "topologymesh_v1_highpoly_glb" => "glb", + "colmap_cameras_bin" => "colmap_cameras_bin", + "colmap_frames_bin" => "colmap_frames_bin", + "colmap_images_bin" => "colmap_images_bin", + "colmap_points3d_bin" => "colmap_points3d_bin", + "colmap_rigs_bin" => "colmap_rigs_bin", + _ => "binary", + } +} diff --git a/server/rust/runner-reconstruction-update/src/python.rs b/server/rust/runner-reconstruction-update/src/python.rs new file mode 100644 index 0000000..5172df1 --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/python.rs @@ -0,0 +1,147 @@ +use std::{path::Path, process::Stdio}; + +use anyhow::{anyhow, Context, Result}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::Command, +}; +use tokio_util::sync::CancellationToken; +use tracing::Level; + +/// Run the given python script with provided arguments, streaming stdout/stderr to the log file. +/// If `job_root` is provided, attempts to read `fail_reason.txt` from it on failure. +pub async fn run_script( + python_bin: &Path, + script_path: &Path, + args: &[String], + cancel: &CancellationToken, + job_root: Option<&Path>, +) -> Result<()> { + // Remove stale fail_reason.txt from previous runs + if let Some(root) = job_root { + let fail_reason_path = root.join("fail_reason.txt"); + if fail_reason_path.exists() { + let _ = std::fs::remove_file(&fail_reason_path); + } + } + + let mut cmd = Command::new(python_bin); + cmd.arg(script_path) + .args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let mut child = cmd.spawn().with_context(|| { + format!( + "spawn python process {} {}", + python_bin.display(), + script_path.display() + ) + })?; + + let mut tasks = Vec::new(); + if let Some(stdout) = child.stdout.take() { + tasks.push(tokio::spawn(forward_stream(BufReader::new(stdout), false))); + } + if let Some(stderr) = child.stderr.take() { + tasks.push(tokio::spawn(forward_stream(BufReader::new(stderr), true))); + } + + let exit_status = tokio::select! { + status = child.wait() => status, + _ = cancel.cancelled() => { + let _ = child.kill().await; + let _ = child.wait().await; + return Err(anyhow!("python execution canceled")); + } + }?; + + for task in tasks { + match task.await { + Ok(Ok(())) => {} + Ok(Err(err)) => return Err(err.context("forwarding python output")), + Err(join_err) => return Err(anyhow!("forward task panicked: {}", join_err)), + } + } + + if exit_status.success() { + Ok(()) + } else { + let code = exit_status + .code() + .map(|c| c.to_string()) + .unwrap_or_else(|| "signal".into()); + + // Try to read fail_reason.txt from job_root for a detailed error message + let fail_reason = job_root + .map(|root| root.join("fail_reason.txt")) + .filter(|p| p.exists()) + .and_then(|p| std::fs::read_to_string(&p).ok()); + + match fail_reason { + Some(reason) => Err(anyhow!("{}", reason.trim())), + None => Err(anyhow!("python execution failed (exit code {})", code)), + } + } +} + +struct ParsedLogInfo { + level: Level, + msg: String, +} + +/// If the provided line is valid JSON, parses "level" and "message" fields. +/// This ensures we propagate log levels correctly from python side. +/// If not JSON or no "message" field, the whole string is logged, as INFO. +/// If JSON is valid but no log level, the "message" is logged as INFO. +fn parse_log_line(line: &str) -> ParsedLogInfo { + if let Ok(json) = serde_json::from_str::(line) { + let level_str = json.get("level").and_then(|v| v.as_str()).unwrap_or("INFO"); + let level = match level_str.to_uppercase().as_str() { + "ERROR" => Level::ERROR, + "WARN" | "WARNING" => Level::WARN, + "DEBUG" => Level::DEBUG, + "TRACE" => Level::TRACE, + _ => Level::INFO, + }; + let msg = json + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or(line) + .to_string(); + ParsedLogInfo { level, msg } + } else { + ParsedLogInfo { + level: Level::INFO, + msg: line.to_string(), + } + } +} + +/// Emits a tracing event at the given level. Macro dispatch required since tracing::event! needs compile-time level. +macro_rules! emit_log { + ($level:expr, $stream:expr, $msg:expr) => { + match $level { + Level::ERROR => tracing::error!(stream = $stream, message = %$msg), + Level::WARN => tracing::warn!(stream = $stream, message = %$msg), + Level::DEBUG => tracing::debug!(stream = $stream, message = %$msg), + Level::TRACE => tracing::trace!(stream = $stream, message = %$msg), + _ => tracing::info!(stream = $stream, message = %$msg), + } + }; +} + +async fn forward_stream(mut reader: BufReader, is_stderr: bool) -> Result<()> +where + R: tokio::io::AsyncRead + Unpin, +{ + let mut line = String::new(); + while reader.read_line(&mut line).await? != 0 { + let trimmed = line.trim_end_matches(&['\r', '\n'][..]); + let parsed = parse_log_line(trimmed); + let stream_name = if is_stderr { "stderr" } else { "stdout" }; + emit_log!(parsed.level, stream_name, parsed.msg); + line.clear(); + } + Ok(()) +} diff --git a/server/rust/runner-reconstruction-update/src/strategy.rs b/server/rust/runner-reconstruction-update/src/strategy.rs new file mode 100644 index 0000000..59f6edf --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/strategy.rs @@ -0,0 +1,76 @@ +//! Reconstruction-specific strategies for artifact naming and input handling. + +use anyhow::Result; +use std::path::{Path, PathBuf}; +use tokio::task; +use zip::ZipArchive; + +/// Map a well-known reconstruction output path to a preferred (name, data_type) +/// pair used when uploading artifacts to Domain. +pub fn describe_known_output(rel_path: &str, suffix: &str) -> Option<(String, String)> { + let name = |base: &str| format!("{}_{}", base, suffix); + match rel_path.trim_start_matches('/') { + "refined/global/refined_manifest.json" => { + Some((name("refined_manifest"), "refined_manifest_json".into())) + } + "refined/global/RefinedPointCloudReduced.ply" => Some(( + name("refined_pointcloud_reduced"), + "refined_pointcloud_ply".into(), + )), + "refined/global/RefinedPointCloud.ply.drc" => Some(( + name("refined_pointcloud_full_draco"), + "refined_pointcloud_ply_draco".into(), + )), + "refined/global/topology/topology_downsampled_0.111.obj" => { + Some((name("topologymesh_v1_lowpoly_obj"), "obj".into())) + } + "refined/global/topology/topology_downsampled_0.111.glb" => { + Some((name("topologymesh_v1_lowpoly_glb"), "glb".into())) + } + "refined/global/topology/topology_downsampled_0.333.obj" => { + Some((name("topologymesh_v1_midpoly_obj"), "obj".into())) + } + "refined/global/topology/topology_downsampled_0.333.glb" => { + Some((name("topologymesh_v1_midpoly_glb"), "glb".into())) + } + "refined/global/topology/topology.obj" => { + Some((name("topologymesh_v1_highpoly_obj"), "obj".into())) + } + "refined/global/topology/topology.glb" => { + Some((name("topologymesh_v1_highpoly_glb"), "glb".into())) + } + "outputs_index.json" => Some((name("outputs_index"), "json".into())), + "result.json" => Some((name("result"), "json".into())), + "scan_data_summary.json" => Some((name("scan_data_summary"), "json".into())), + _ => None, + } +} + +/// Unzip a refined scan zip (bytes) into `unzip_root`, returning the list of +/// extracted file paths. +pub async fn unzip_refined_scan(zip_bytes: Vec, unzip_root: &Path) -> Result> { + let unzip_root = unzip_root.to_path_buf(); + let result = task::spawn_blocking(move || { + std::fs::create_dir_all(&unzip_root)?; + let cursor = std::io::Cursor::new(zip_bytes); + let mut archive = ZipArchive::new(cursor)?; + let mut extracted = Vec::new(); + for idx in 0..archive.len() { + let mut file = archive.by_index(idx)?; + if file.is_dir() { + continue; + } + let mut buf = Vec::new(); + std::io::Read::read_to_end(&mut file, &mut buf)?; + let out_path = unzip_root.join(file.name()); + if let Some(parent) = out_path.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(&out_path, &buf)?; + extracted.push(out_path); + } + Ok::<_, anyhow::Error>(extracted) + }) + .await?; + result +} diff --git a/server/rust/runner-reconstruction-update/src/workspace.rs b/server/rust/runner-reconstruction-update/src/workspace.rs new file mode 100644 index 0000000..4291047 --- /dev/null +++ b/server/rust/runner-reconstruction-update/src/workspace.rs @@ -0,0 +1,132 @@ +use std::{ + fs, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result}; +use tempfile::TempDir; + +/// Represents the on-disk layout for a reconstruction job. +pub struct Workspace { + root: PathBuf, + datasets: PathBuf, + refined_local: PathBuf, + refined_global: PathBuf, + refined_update: PathBuf, + request: PathBuf, + metadata: PathBuf, + _temp_guard: Option, +} + +impl Workspace { + /// Create a workspace using the optional base directory. + pub fn create( + base_root: Option<&Path>, + domain_id: &str, + job_id: Option<&str>, + task_id: &str, + ) -> Result { + let domain_segment = sanitize_segment(domain_id); + let job_segment = job_id + .filter(|s| !s.trim().is_empty()) + .map(|val| format!("job_{}", sanitize_segment(val))) + .unwrap_or_else(|| format!("task_{}", sanitize_segment(task_id))); + + let (temp_guard, base_dir) = match base_root { + Some(base) => (None, base.to_path_buf()), + None => { + let temp = TempDir::new().context("create temporary workspace base dir")?; + let base_path = temp.path().to_path_buf(); + (Some(temp), base_path) + } + }; + + let root = base_dir.join("jobs").join(domain_segment).join(job_segment); + + let datasets = root.join("datasets"); + let refined_local = root.join("refined").join("local"); + let refined_global = root.join("refined").join("global"); + let refined_update = root.join("refined").join("update"); + let request = root.join("job_request.json"); + let metadata = root.join("job_metadata.json"); + + create_dir(&datasets)?; + create_dir(&refined_local)?; + create_dir(&refined_global)?; + create_dir(&refined_update)?; + + Ok(Self { + root, + datasets, + refined_local, + refined_global, + refined_update, + request, + metadata, + _temp_guard: temp_guard, + }) + } + + /// Root directory for the job workspace. + pub fn root(&self) -> &Path { + &self.root + } + + /// Path where datasets are materialized. + pub fn datasets(&self) -> &Path { + &self.datasets + } + + /// Path containing local refinement outputs. + pub fn refined_local(&self) -> &Path { + &self.refined_local + } + + /// Path containing global refinement outputs. + pub fn refined_global(&self) -> &Path { + &self.refined_global + } + + /// Path containing updated refinement outputs. + pub fn refined_update(&self) -> &Path { + &self.refined_update + } + + /// Path to the job request JSON file (unused but preserved for parity). + pub fn job_request_path(&self) -> &Path { + &self.request + } + + /// Path to the job metadata JSON file. + pub fn job_metadata_path(&self) -> &Path { + &self.metadata + } +} + +fn create_dir(path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("create directory {}", parent.display()))?; + } + if path.metadata().map(|m| m.is_dir()).unwrap_or(false) { + return Ok(()); + } + fs::create_dir_all(path).with_context(|| format!("create directory {}", path.display()))?; + Ok(()) +} + +fn sanitize_segment(input: &str) -> String { + if input.is_empty() { + return "unnamed".into(); + } + input + .chars() + .map(|c| { + if c.is_ascii_alphanumeric() || matches!(c, '-' | '_') { + c + } else { + '_' + } + }) + .collect() +} diff --git a/update_main.py b/update_main.py new file mode 100644 index 0000000..b442afb --- /dev/null +++ b/update_main.py @@ -0,0 +1,64 @@ +from pathlib import Path +import argparse +import os +import sys +from utils.data_utils import get_data_paths, setup_logger +from utils.dataset_utils import update_helper +from utils.point_cloud_utils import post_process_ply + +def main(args): + # Create and configure logger + output_path = args.output_path + os.makedirs(output_path, exist_ok=True) + update_log_file = str(output_path) + "/update_logs" + logger = setup_logger( + name="update_refinement", + log_file=update_log_file, + domain_id=args.domain_id, + job_id=args.job_id, + level=args.log_level + ) + + # Find all stitch paths + _, dataset_paths = get_data_paths(args.data_path, "update_refinement") + + # Sort dataset paths by timestamp (indirectly, since folders are named by timestamp) + # Starting with oldest scan keeps the origin portal consistent. + dataset_paths.sort() + + if dataset_paths: + logger.info(f"Found {len(dataset_paths)} dataset paths for stitching:") + for path in dataset_paths: + logger.info(f" - {path}") + else: + logger.warning("No dataset paths found for stitching. Please check the data directory.") + + # Perform Update Refinement + result = update_helper( + dataset_paths=dataset_paths, + job_root_path=args.data_path.parent.parent, + logger_name="update_refinement" + ) + + if not result: + logger.error("Update refinement failed") + return + + post_process_ply(output_path, logger=logger) + + logger.info("Update refinement completed successfully") + return + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--data_path", type=Path, default="./local", help="Path to datasets directory") + parser.add_argument("--output_path", type=Path, default="./merged", help="Path for output files") + parser.add_argument("--domain_id", type=str, default="") + parser.add_argument("--job_id", type=str, default="") + parser.add_argument("--log_level", type=str, default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (default: INFO)" + ) + args = parser.parse_args() + main(args) \ No newline at end of file diff --git a/utils/data_utils.py b/utils/data_utils.py index 5aec847..8e84eed 100644 --- a/utils/data_utils.py +++ b/utils/data_utils.py @@ -18,7 +18,7 @@ import subprocess from dateutil import parser from pathlib import Path -from typing import NamedTuple, Dict +from typing import List, NamedTuple, Dict, Tuple floor_rotation = pycolmap.Rotation3d(np.array([0, 0.7071068, 0, 0.7071068])) floor_rotation_inv = pycolmap.Rotation3d(np.array([0, -0.7071068, 0, 0.7071068])) @@ -424,7 +424,7 @@ def save_failed_manifest_json(json_path, job_root_path, job_status_details): save_manifest_json({}, json_path, job_root_path, job_status="failed", job_progress=100, job_status_details=job_status_details) -def save_manifest_json(portal_poses, json_path, job_root_path, job_status=None, job_progress=None, job_status_details=None, portal_sizes=None): +def save_manifest_json(portal_poses, json_path, job_root_path, job_status=None, job_progress=None, job_status_details=None, portal_sizes=None, previous_scan_files=None): job_root_path = Path(job_root_path) @@ -478,6 +478,9 @@ def save_manifest_json(portal_poses, json_path, job_root_path, job_status=None, manifest_data["domainServerURL"] = job_metadata_json.get("domain_server_url", None) manifest_data["processingType"] = job_metadata_json["processing_type"] manifest_data["dataIDs"] = job_metadata_json["data_ids"] + if previous_scan_files is not None: + manifest_data["dataIDs"].extend(previous_scan_files) + except Exception as e: print("No job metadata found for manifest") print(e) @@ -616,6 +619,37 @@ def save_manifest_json(portal_poses, json_path, job_root_path, job_status=None, json.dump(manifest_data, json_file, indent=4) +def parse_info_from_manifest(manifest_path: Path) -> Tuple[Dict[str, Tuple[np.ndarray, np.ndarray]], List[str]]: + """ + Returns dict: shortId -> (R_world_portal, t_world_portal) + Assumes 'pose' is the transform from portal frame to world frame (world_T_portal). + """ + data = json.loads(Path(manifest_path).read_text()) + portals = data.get('portals', []) + out = {} + for p in portals: + sid = p.get('shortId') or p.get('shortID') or p.get('id') + pose = p.get('pose') or p.get('averagePose') # use pose; fallback to averagePose + if sid is None or pose is None: + continue + pos = pose.get('position', {}) + rot = pose.get('rotation', {}) + if not all(k in pos for k in ('x','y','z')): + continue + if not all(k in rot for k in ('x','y','z','w')): + continue + t = np.array([pos['x'], pos['y'], pos['z']], dtype=np.float64) + R = quaternion_to_rotation_matrix((rot['x'], rot['y'], rot['z'], rot['w'])) + size = p.get('physicalSize', None) + out[sid] = (R, t, size) + + data_id = data.get('dataIDs', []) + + data_id = [str(d) for d in data_id if "refined_scan" in str(d)] + + return out, data_id + + def vec3_angle(v, w): value = v.dot(w)/(norm(v)*norm(w)) diff --git a/utils/dataset_utils.py b/utils/dataset_utils.py index 99306ed..08d3c05 100644 --- a/utils/dataset_utils.py +++ b/utils/dataset_utils.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Tuple import time import pycolmap import os @@ -12,15 +12,18 @@ from dataclasses import dataclass from utils.data_utils import ( + convert_pose_colmap_to_opengl, mean_pose, convert_pose_opengl_to_colmap, precompute_arkit_offsets, get_world_space_qr_codes, save_manifest_json, - export_rec_as_ply + export_rec_as_ply, + parse_info_from_manifest ) from utils.geometry_utils import align_reconstruction_chunks -from utils.io import Model, read_portal_csv +from utils.io import Model, read_portal_csv, read_model, merge_models, write_model, apply_similarity_to_new_model, validate_model_consistency +from utils.voxel_raycast_utils import carve_outdated_reference_geometry class NoOverlapException(Exception): @@ -80,7 +83,7 @@ class Paths: output_path: Path dataset_dir: Path refined_group_dir: Path - + reference_path: Optional[Path] = None # Path for reference reconstruction (global refinement that set as canonical) only used in update_helper def stitching_helper( dataset_paths: List[Path], @@ -111,7 +114,7 @@ def stitching_helper( logger = logging.getLogger(logger_name) # Initialize paths and data - paths = _initialize_paths(group_folder) + paths = _initialize_paths(group_folder.parent, "stitching_helper") stitch_data = StitchingData() # Process datasets @@ -174,6 +177,163 @@ def stitching_helper( ) +def update_helper( + dataset_paths: List[Path], + job_root_path: Path, + logger_name: Optional[str] = None +) -> bool: + """Main function to stitch multiple reconstructions together. + + Args: + dataset_paths: List of paths to datasets to stitch + job_root_path: Path to the root folder for the update job + logger_name: Name of logger to use + + Returns: + StitchingResult containing basic and refined reconstructions + """ + + logger = logging.getLogger(logger_name) + + # Initialize paths and data + paths = _initialize_paths(job_root_path, "update_helper") + + pending_update_rec = [] + + + dataset_rec_paths = [ + _get_refined_rec_dir( + True, + paths.refined_group_dir, + scan_name, + logger + )for scan_name in [path.stem for path in dataset_paths]] + + if len(dataset_rec_paths) > 1: + logger.info("Multiple dataset paths found. Proceeding with stitching.") + + # TODO: Replace this function with bundle scans and perform basic stitch. + pending_update_rec.extend(dataset_rec_paths) + + elif len(dataset_rec_paths) == 1: + pending_update_rec.append(dataset_rec_paths[0]) + logger.info("Only one dataset path found. Skipping stitching and preparing for update refinement.") + else: + logger.error("No dataset paths found. Exiting.") + return False + + # Loading the reference model that will be refined. This should be the model that set to be canonical, which is the latest colmap model of the domain. + cams_r, imgs_r, pts_r = read_model(paths.reference_path / "refined_sfm_combined", ".bin",logger=logger) + portal_r, refined_files_r = parse_info_from_manifest(paths.reference_path / "refined_manifest.json") # return a dict of portal_id -> (R, t, size) + portal_sizes = {pid: portal[2] for pid, portal in portal_r.items()} + portal_r = {pid: pycolmap.Rigid3d(pycolmap.Rotation3d(portal_r[pid][0]), portal_r[pid][1]) for pid in portal_r.keys()} + + # Process datasets + for pending_update_rec_dir in pending_update_rec: + logger.info(f"Processing dataset for update refinement: {pending_update_rec_dir}") + + # Loading the new reconstruction that contains the new geometry to be merged in. + # This should be the local refined reconstruction of the new scan that will be merged in. + cams_u, imgs_u, pts_u = read_model(pending_update_rec_dir, ".bin", logger=logger) + portals_u, portal_sizes_u = load_qr_detections_from_local_refinement(pending_update_rec_dir, logger) + + # Align the new reconstruction to the reference model using the detected QR code portals as anchors. + # This gives us a rough alignment that is good enough for culling out outdated geometry from the reference model. + alignment_mat = _calculate_alignment_transform(portals_u, portal_r, logger) + logger.info(f"Calculated alignment transform for update refinement: \n{alignment_mat.matrix()}") + cams_u_aligned, imgs_u_aligned, pts_u_aligned = apply_similarity_to_new_model(cams_u, imgs_u, pts_u, alignment_mat.matrix()) + + # Pruning the reference model by carving out points that violate the new free-space constraints observed in the new scan. + pruned_imgs_r, pruned_pts_r = carve_outdated_reference_geometry( + ref_imgs=imgs_r, + ref_pts=pts_r, + new_imgs=imgs_u_aligned, + new_pts=pts_u_aligned, + voxel_size=0.15, # Adjust based on scene scale (e.g. 10cm) + clearance_margin=0.1, # Stop 10cm before the target to avoid false collisions + min_surviving_points=50, # Drop old images with < 50 valid points left + logger=logger + ) + logger.info(f"Pruned reference model has {len(pruned_imgs_r)} images and {len(pruned_pts_r)} points (out of original {len(imgs_r)} images and {len(pts_r)} points).") + if logger.level <= logging.DEBUG: + validate_model_consistency(cams_r, pruned_imgs_r, pruned_pts_r, logger=logger) + os.makedirs(paths.output_path / f"pruned_update_{pending_update_rec_dir.parent.name}", exist_ok=True) + write_model(cams_r, pruned_imgs_r, pruned_pts_r, paths.output_path / f"pruned_update_{pending_update_rec_dir.parent.name}") + logger.debug(f"Exported pruned reference model to {paths.output_path / f'pruned_update_{pending_update_rec_dir.parent.name}'}. Model contains {len(cams_r)} cameras, {len(pruned_imgs_r)} images, and {len(pruned_pts_r)} points.") + + # Merging the pruned reference model with the new aligned reconstruction to produce the updated reconstruction. + cams_r, imgs_r, pts_r, _ = merge_models( + (cams_r, pruned_imgs_r, pruned_pts_r), # Use the freshly carved reference map + (cams_u_aligned, imgs_u_aligned, pts_u_aligned) + ) + if logger.level <= logging.DEBUG: + validate_model_consistency(cams_r, imgs_r, pts_r, logger=logger) + os.makedirs(paths.output_path / f"merged_update_{pending_update_rec_dir.parent.name}", exist_ok=True) + write_model(cams_r, imgs_r, pts_r, paths.output_path / f"merged_update_{pending_update_rec_dir.parent.name}") + logger.debug(f"Exported merged model to {paths.output_path / f'merged_update_{pending_update_rec_dir.parent.name}'}. Model contains {len(cams_r)} cameras, {len(imgs_r)} images, and {len(pts_r)} points.") + + # Transform and Merge Portals + # we only add the new portals, do not modify the existing portals in the reference model to avoid instability of portal poses across updates. + # This means the portal poses in the manifest may not be perfectly aligned with the geometry in the case of noisy detections, + # but it avoids the risk of breaking all existing portals in the reference model when a bad alignment happens. + # We can consider more sophisticated strategies for portal merging in the future when we have more experience with real data. + for pid, portal in portals_u.items(): + if pid in portal_r: + logger.info(f"Portal {pid} already exists in reference model. Skipping.") + continue + alignment_sim3d = pycolmap.Sim3d(1.0, alignment_mat.rotation, alignment_mat.translation) + transformed_portal = transform_with_scale(alignment_sim3d, portal) + portal_r[pid] = transformed_portal + portal_sizes[pid] = portal_sizes_u[pid] + + + # Export the merged model for inspection + os.makedirs(paths.output_path / "refined_sfm_combined", exist_ok=True) + write_model(cams_r, imgs_r, pts_r, paths.output_path / "refined_sfm_combined") + logger.debug(f"Exported updated reconstruction to {paths.output_path / 'refined_sfm_combined'}. Model contains {len(cams_r)} cameras, {len(imgs_r)} images, and {len(pts_r)} points.") + validate_model_consistency(cams_r, imgs_r, pts_r, logger=logger) + + manifest_path = paths.output_path / 'refined_manifest.json' + portals_opengl = {pid: convert_pose_colmap_to_opengl(portal.translation, portal.rotation.quat) for pid, portal in portal_r.items()} + portals = {pid: [pycolmap.Rigid3d(pycolmap.Rotation3d(np.array(pose[1])), np.array(pose[0]))] for pid, pose in portals_opengl.items()} + save_manifest_json( + portals, + manifest_path, + paths.parent_dir, + job_status="refined", + job_progress=100, + portal_sizes=portal_sizes, + previous_scan_files=refined_files_r + ) + + ply_path = paths.refined_group_dir / 'update' / "RefinedPointCloud.ply" + rec = pycolmap.Reconstruction() + for point in pts_r.values(): + x,y,z = point.xyz + _ = rec.add_point3D(np.array([x,y,z]), pycolmap.Track(), point.rgb) + export_rec_as_ply(rec, ply_path) # Outputs binary PLY in openCV coords. We convert it to OpenGL in the post_process_ply + + return True + +def load_qr_detections_from_local_refinement(rec_dir: Path, logger) -> Tuple[List[Dict], Dict[str, float]]: + portals_u_dict = read_portal_csv(rec_dir / "portals.csv") + portals_u_list = [] + portal_sizes = {} + for portal in portals_u_dict.values(): + portal_sizes[portal.short_id] = portal.size + gl_tvec, gl_qvec = convert_pose_colmap_to_opengl(portal.tvec, portal.qvec) + portals_u_list.append({ + "short_id": portal.short_id, + "tvec": gl_tvec, + "qvec": gl_qvec, + "image_id": portal.image_id, + "size": portal.size, + "corners": portal.corners, + "pose": pycolmap.Rigid3d(pycolmap.Rotation3d(np.array(gl_qvec)), np.array(gl_tvec)) + }) + chunk_detections_per_qr = _group_detections_by_qr(portals_u_list) + return _calculate_mean_qr_poses(chunk_detections_per_qr), portal_sizes + def load_partial( unzip_folder: Path, truth_portal_poses: Dict[str, pycolmap.Rigid3d], @@ -471,17 +631,26 @@ def _process_reconstruction( image_id_old_to_new[detection["image_id"]] ) -def _initialize_paths(group_folder: Path) -> Paths: +def _initialize_paths(job_root_path: Path, function: str = "stitching_helper") -> Paths: """Initialize all required paths.""" - parent_dir = group_folder.parent - output_path = parent_dir / "refined" / "global" + parent_dir = job_root_path + + if function == "stitching_helper": + output_path = parent_dir / "refined" / "global" + elif function == "update_helper": + output_path = parent_dir / "refined" / "update" + reference_path = parent_dir / "refined" / "global" + else: + raise ValueError(f"Unknown function: {function}") + + dataset_dir = parent_dir / "datasets" refined_group_dir = parent_dir / "refined" os.makedirs(refined_group_dir, exist_ok=True) os.makedirs(dataset_dir, exist_ok=True) - return Paths(parent_dir, output_path, dataset_dir, refined_group_dir) + return Paths(parent_dir, output_path, dataset_dir, refined_group_dir, reference_path if function == "update_helper" else None) def _process_datasets( dataset_paths: List[Path], @@ -705,7 +874,7 @@ def _get_refined_results( paths.parent_dir, job_status="refined", job_progress=100, - portal_sizes=stitch_data.portal_sizes + portal_sizes=stitch_data.portal_sizes, ) if with_3dpoints: diff --git a/utils/io.py b/utils/io.py index 07f8c01..ca9b2cc 100644 --- a/utils/io.py +++ b/utils/io.py @@ -1,3 +1,4 @@ +from typing import Dict, Optional, Tuple import numpy as np import logging import os @@ -8,6 +9,7 @@ import trimesh import uuid import csv +from scipy.spatial.transform import Rotation as R from utils.data_utils import (convert_pose_colmap_to_opengl) CameraModel = collections.namedtuple( @@ -527,7 +529,74 @@ def write_portal_csv(portals, csv_path): csv_writer.writerow(row) return -def read_model(path, ext="", logger=None): + +def validate_model_consistency(cameras: Dict[int, 'Camera'], images: Dict[int, 'Image'], points3D: Dict[int, 'Point3D'], logger=None) -> bool: + """Check camera/image/point3D cross-references for consistency. + + Returns True if valid, False if inconsistencies were detected. + """ + if logger is None: + logger = logging.getLogger() + + valid = True + # Camera references in images + for iid, img in images.items(): + if img.camera_id not in cameras: + logger.error("Invalid model: image %d references missing camera id %d", iid, img.camera_id) + valid = False + + # Image references in points3D + for pid, p in points3D.items(): + for (iid, p2d_idx) in zip(p.image_ids, p.point2D_idxs): + if iid not in images: + logger.error("Invalid model: point3D %d references missing image id %d", pid, iid) + valid = False + continue + img = images[iid] + if p2d_idx < 0 or p2d_idx >= len(img.xys): + logger.error("Invalid model: point3D %d references image %d point2D index %d out of bounds (len=%d)", pid, iid, p2d_idx, len(img.xys)) + valid = False + + # Inverse cross-check image point3D_ids vs point tracks + for iid, img in images.items(): + if img.point3D_ids is None: + continue + if img.xys is not None and len(img.point3D_ids) != len(img.xys): + logger.error("Invalid model: image %d has %d point3D_ids but %d xys", iid, len(img.point3D_ids), len(img.xys)) + valid = False + + for j, pid in enumerate(img.point3D_ids): + if pid < 0: + continue + if pid not in points3D: + logger.error("Invalid model: image %d observation %d references missing point3D id %d", iid, j, pid) + valid = False + continue + p = points3D[pid] + if not any(image_id == iid and point2D_idx == j for image_id, point2D_idx in zip(p.image_ids, p.point2D_idxs)): + logger.error("Invalid model: image %d observation %d -> point3D %d has no matching track entry", iid, j, pid) + valid = False + + # Optional: verify every point3D track is mirrored in image point3D_ids + for pid, p in points3D.items(): + for (iid, p2d_idx) in zip(p.image_ids, p.point2D_idxs): + if iid not in images: + continue + img = images[iid] + if p2d_idx < 0 or p2d_idx >= len(img.point3D_ids): + continue + if img.point3D_ids[p2d_idx] != pid: + logger.error("Invalid model: point3D %d track says image %d idx %d but image lists %d", pid, iid, p2d_idx, img.point3D_ids[p2d_idx]) + valid = False + + if valid: + logger.info("Model consistency check passed: cameras=%d, images=%d, points3D=%d", len(cameras), len(images), len(points3D)) + else: + logger.error("Model consistency check failed: cameras=%d, images=%d, points3D=%d", len(cameras), len(images), len(points3D)) + return valid + + +def read_model(path, ext="", logger=None, validate_consistency=True): if logger is None: logger = logging.getLogger() @@ -549,6 +618,10 @@ def read_model(path, ext="", logger=None): cameras = read_cameras_binary(os.path.join(path, "cameras" + ext)) images = read_images_binary(os.path.join(path, "images" + ext)) points3D = read_points3D_binary(os.path.join(path, "points3D") + ext) + + if validate_consistency: + validate_model_consistency(cameras, images, points3D, logger=logger) + return cameras, images, points3D @@ -564,6 +637,128 @@ def write_model(cameras, images, points3D, path, ext=".bin"): return cameras, images, points3D +def apply_similarity_to_new_model(cams: Dict[int, 'Camera'], imgs: Dict[int, 'BaseImage'], pts: Dict[int, 'Point3D'], + T_a: np.ndarray) -> Tuple[Dict[int, 'Camera'], Dict[int, 'BaseImage'], Dict[int, 'Point3D']]: + """ + Transform the entire new model (cameras & points) into the reference frame + using a 4x4 similarity transformation matrix. + """ + + # Extract scale and pure rotation from the 4x4 matrix + # T_a[:3, :3] contains s * R_a. We can find 's' by taking the norm of the first column. + s = np.linalg.norm(T_a[:3, 0]) + R_a = T_a[:3, :3] / s + + # transform images: update rotations and translations via centers + new_imgs = {} + for iid, im in imgs.items(): + Rn = qvec2rotmat(im.qvec) + tn = im.tvec + Cn = cam_center_from_extrinsics(Rn, tn) + + # Apply 4x4 transformation to the camera center using homogeneous coordinates + Cn_hom = np.append(Cn, 1.0) + C_ref = (T_a @ Cn_hom)[:3] + + R_ref = Rn @ R_a.T + t_ref = -R_ref @ C_ref + + rot_ref = R.from_matrix(R_ref) + q_ref2 = rot_ref.as_quat() + # SciPy returns [x, y, z, w], but COLMAP uses [w, x, y, z] + q_ref2 = np.array([q_ref2[3], q_ref2[0], q_ref2[1], q_ref2[2]]) + new_imgs[iid] = BaseImage(iid, q_ref2, t_ref, im.camera_id, im.name, im.xys, im.point3D_ids) + + # transform points + new_pts = {} + for pid, p in pts.items(): + # Apply 4x4 transformation to 3D points + p_hom = np.append(p.xyz, 1.0) + X_ref = (T_a @ p_hom)[:3] + new_pts[pid] = Point3D(pid, X_ref, p.rgb, p.error, p.image_ids, p.point2D_idxs) + + # cameras unchanged (intrinsics) + return cams, new_imgs, new_pts + + +def merge_models(reference_model: Tuple[Dict[int,Camera], Dict[int,Image], Dict[int,Point3D]], + new_model: Tuple[Dict[int,Camera], Dict[int,Image], Dict[int,Point3D]], + new_name_prefix: Optional[str] = "new/") -> Tuple[Dict[int,Camera], Dict[int,Image], Dict[int,Point3D], Dict[int,int]]: + cams_r, imgs_r, pts_r = reference_model + cams_n, imgs_n, pts_n = new_model + + next_cam_id = (max(cams_r.keys()) + 1) if cams_r else 1 + next_img_id = (max(imgs_r.keys()) + 1) if imgs_r else 1 + next_pt_id = (max(pts_r.keys()) + 1) if pts_r else 1 + + # 1) Cameras: append and map ids + cam_map = {} + for cid, c in cams_n.items(): + cam_map[cid] = next_cam_id + cams_r[next_cam_id] = Camera(next_cam_id, c.model, c.width, c.height, list(c.params)) + next_cam_id += 1 + + # 2) Images: append, avoid name clashes, remember image id remap + existing_names = set([im.name for im in imgs_r.values()]) + img_map = {} + for iid, im in imgs_n.items(): + new_name = im.name + if new_name in existing_names: + new_name = (new_name_prefix or "new/") + new_name + img_map[iid] = next_img_id + # Temporarily copy point3D_ids; we'll fix them after adding points (once we know pid remap) + imgs_r[next_img_id] = BaseImage( + next_img_id, im.qvec, im.tvec, cam_map[im.camera_id], + new_name, im.xys.copy(), im.point3D_ids.copy() + ) + existing_names.add(new_name) + next_img_id += 1 + + # 3) Points: append with new ids, rewrite tracks with remapped image ids + oldpid_to_newpid: Dict[int, int] = {} + for old_pid, p in pts_n.items(): + new_img_ids = [] + new_pt2ds = [] + for (old_iid, j) in zip(p.image_ids, p.point2D_idxs): + if old_iid in img_map: + new_img_ids.append(img_map[old_iid]) + new_pt2ds.append(j) + + if len(new_img_ids) >= 2: + new_pid = next_pt_id + oldpid_to_newpid[old_pid] = new_pid + pts_r[new_pid] = Point3D(new_pid, p.xyz, p.rgb, p.error, np.array(new_img_ids), np.array(new_pt2ds)) + next_pt_id += 1 + # else: drop point (insufficient observations) + + # Build inverse image map once + inv_img_map = {new_id: old_id for old_id, new_id in img_map.items()} + + for new_iid, im_r in imgs_r.items(): + if new_iid not in inv_img_map: + continue # this is an original reference image; its links stay unchanged + old_iid = inv_img_map[new_iid] + # Fetch the original new image to see original point3D_ids (old pids) + im_old = imgs_n[old_iid] + old_pids = im_old.point3D_ids + if old_pids.size == 0: + continue + # Map old pids to new pids where available; else set to -1 + mapped = old_pids.copy() + mask = mapped >= 0 + if mask.any(): + # vectorized map: for speed, use a dict lookup with fallback -1 + mapped_ids = [] + for pid in mapped[mask]: + mapped_ids.append(oldpid_to_newpid.get(int(pid), -1)) + mapped[mask] = np.array(mapped_ids, dtype=np.int64) + # namedtuple-based Image is immutable; replace with updated copy + imgs_r[new_iid] = im_r._replace(point3D_ids=mapped) + + return cams_r, imgs_r, pts_r, oldpid_to_newpid + + + # Load from COLMAP class Model: def __init__(self): @@ -830,4 +1025,8 @@ def save_meshes_obj(meshes, filename): # Save the combined lines to an OBJ file with open(filename, 'w') as f: f.write('\n'.join(lines)) - return \ No newline at end of file + return + +def cam_center_from_extrinsics(R: np.ndarray, t: np.ndarray) -> np.ndarray: + # world->cam: x_c = R X + t; center C satisfies R C + t = 0 => C = -R^T t + return -R.T @ t \ No newline at end of file diff --git a/utils/point_cloud_utils.py b/utils/point_cloud_utils.py index 9498ea7..635ad6f 100644 --- a/utils/point_cloud_utils.py +++ b/utils/point_cloud_utils.py @@ -112,3 +112,28 @@ def draco_compress_ply(ply_path, draco_path, logger=None): if logger is not None: logger.info(f"Draco compressed point cloud saved: {draco_path}") + +def post_process_ply(output_path, logger=None): + ply_path = output_path / "RefinedPointCloud.ply" + filter_ply(ply_path, ply_path, convert_opencv_to_opengl=True, logger=logger) + + # Ensure ply fits in domain data + logger.info("Downsampling ply if needed to be under 20 MB file size...") + ply_path_reduced = output_path / "RefinedPointCloudReduced.ply" + try: + downsample_ply_to_max_size(ply_path, ply_path_reduced, 20000000, logger=logger) + except Exception as e: + logger.error(f"Failed to downsample PLY file: {str(e)}") + + logger.info("Draco compressing the PLY file...") + try: + # Must be float to do draco compression, but open3d outputs double precision. + ply_path_float = output_path / "RefinedPointCloudFloat.ply" + try: + reduce_decimals_ply(ply_path, ply_path_float, 3, logger=logger) + except Exception as e: + logger.error(f"Failed to reduce decimals in PLY file: {str(e)}") + + draco_compress_ply(ply_path_float, output_path / "RefinedPointCloud.ply.drc", logger=logger) + except Exception as e: + logger.error(f"Failed to draco compress the PLY file: {str(e)}") \ No newline at end of file diff --git a/utils/voxel_raycast_utils.py b/utils/voxel_raycast_utils.py new file mode 100644 index 0000000..00d4092 --- /dev/null +++ b/utils/voxel_raycast_utils.py @@ -0,0 +1,134 @@ +import numpy as np +from typing import Dict, Tuple +from utils.io import Image, Point3D, qvec2rotmat, cam_center_from_extrinsics +from collections import defaultdict +import logging + +def carve_outdated_reference_geometry( + ref_imgs: Dict[int, 'Image'], + ref_pts: Dict[int, 'Point3D'], + new_imgs: Dict[int, 'Image'], + new_pts: Dict[int, 'Point3D'], + voxel_size: float = 0.1, + clearance_margin: float = 0.3, + min_surviving_points: int = 15, + logger: logging.Logger = None +) -> Tuple[Dict[int, 'Image'], Dict[int, 'Point3D']]: + """ + Uses new free-space rays to delete conflicting points from the reference model. + """ + if logger is None: + logger = logging.getLogger() + + # 1. Map voxels to reference point IDs + voxel_to_ref_pids = defaultdict(list) + for pid, p in ref_pts.items(): + vox = tuple(np.floor(p.xyz / voxel_size).astype(int)) + voxel_to_ref_pids[vox].append(pid) + + violated_ref_pids = set() + step_size = voxel_size / 2.0 + + # 2. Cast rays from trusted new cameras + for img in new_imgs.values(): + R = qvec2rotmat(img.qvec) + C = cam_center_from_extrinsics(R, img.tvec) + + for pid in img.point3D_ids: + if pid < 0 or pid not in new_pts: + continue + + X = new_pts[pid].xyz + ray_vec = X - C + dist = np.linalg.norm(ray_vec) + + if dist <= clearance_margin: + continue + + ray_dir = ray_vec / dist + t_max = dist - clearance_margin + t_vals = np.arange(clearance_margin, t_max, step_size) + + if len(t_vals) == 0: + continue + + ray_pts = C + t_vals[:, np.newaxis] * ray_dir + ray_voxels = np.floor(ray_pts / voxel_size).astype(int) + + # 3. Collect old points that violate the new free space + for vox in ray_voxels: + vox_tuple = tuple(vox) + if vox_tuple in voxel_to_ref_pids: + violated_ref_pids.update(voxel_to_ref_pids[vox_tuple]) + # Remove from dict so we don't process it multiple times + del voxel_to_ref_pids[vox_tuple] + + logger.info(f"[prune] Vaporizing {len(violated_ref_pids)} outdated reference points.") + + # 4. Rebuild the reference model without the violated points + pruned_ref_pts = {pid: p for pid, p in ref_pts.items() if pid not in violated_ref_pids} + + pruned_ref_imgs = {} + violated_array = np.array(list(violated_ref_pids)) + + dropped_img_names = [] + + for iid, img in ref_imgs.items(): + new_pids = img.point3D_ids.copy() + + # Set deleted points to -1 + mask = np.isin(new_pids, violated_array) + new_pids[mask] = -1 + + # Check if the old image still sees enough valid points to survive + valid_count = np.sum(new_pids >= 0) + if valid_count >= min_surviving_points: + img = img._replace(point3D_ids=new_pids) + pruned_ref_imgs[iid] = img + else: + # If it doesn't have enough points, we drop it and record its name + dropped_img_names.append((img.name, valid_count)) + # Print the summary of dropped images + if dropped_img_names: + logger.info(f"[prune] Dropped {len(dropped_img_names)} old reference images (fell below {min_surviving_points} valid points):") + for name, remaining_pts in dropped_img_names: + logger.debug(f" -> {name} (Only {remaining_pts} points left)") + else: + logger.info(f"[prune] No reference images were completely dropped.") + + logger.info(f"[prune] Kept {len(pruned_ref_imgs)}/{len(ref_imgs)} reference images.") + + final_ref_pts = {} + valid_pids = set() + kept_img_ids = set(pruned_ref_imgs.keys()) + + # Filter the tracks of the surviving points + for pid, p in pruned_ref_pts.items(): + # Keep only track observations from images that survived the prune + new_image_ids = [] + new_point2D_idxs = [] + for (iid, idx) in zip(p.image_ids, p.point2D_idxs): + if iid in kept_img_ids: + new_image_ids.append(iid) + new_point2D_idxs.append(idx) + + + # A 3D point must be seen by at least 2 cameras to physically exist in COLMAP + if len(new_image_ids) >= 2: + final_ref_pts[pid] = Point3D(pid, p.xyz, p.rgb, p.error, np.array(new_image_ids), np.array(new_point2D_idxs)) + valid_pids.add(pid) + + # Final sweep: disconnect the surviving images from any points we just deleted + valid_pids_array = np.array(list(valid_pids)) + for iid, img in list(pruned_ref_imgs.items()): + # Find point IDs that are >= 0 BUT are no longer in our valid_pids set + mask = ~np.isin(img.point3D_ids, valid_pids_array) & (img.point3D_ids >= 0) + if mask.any(): + updated_pids = img.point3D_ids.copy() + updated_pids[mask] = -1 + pruned_ref_imgs[iid] = img._replace(point3D_ids=updated_pids) + + pts_removed = len(pruned_ref_pts) - len(final_ref_pts) + logger.info(f"[prune] Final cleanup: Removed {pts_removed} ghost points that lost camera support.") + + return pruned_ref_imgs, final_ref_pts \ No newline at end of file