diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 7c58661..a0942eb 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -110,6 +110,9 @@ pub struct UploadCmdArgs { #[arg(long, default_value_t = false)] pub show_status_until_transcoded: bool, + + #[arg(long, default_value_t = false)] + pub machine_readable: bool, } #[derive(Clone)] @@ -140,6 +143,26 @@ struct AssetTaskProgress { deep_analyze: Option<(String, f64)>, } +#[derive(Clone, Debug)] +struct StatusWaitOutcome { + success: bool, + error: Option, + assets: Vec, +} + +#[derive(Clone, Debug)] +struct UploadedAssetInfo { + asset_id: String, + local_path: String, +} + +#[derive(Clone, Debug)] +struct MachineAssetStatus { + asset_id: String, + local_path: String, + status: String, +} + fn parse_generate_proxy(s: &str) -> Result { match s.trim() { "360" => Ok(GenerateProxy::Variant360), @@ -446,14 +469,45 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { if let Some(mode) = status_wait_mode { let rt = tokio::runtime::Runtime::new() .map_err(|e| format!("failed to start runtime: {}", e))?; - rt.block_on(wait_for_asset_processing_status( + let wait_result = rt.block_on(wait_for_asset_processing_status( &cfg, &api_key, bearer_header_for_auth.as_deref(), &uploaded_asset_ids, mode, script_start, - ))?; + args.machine_readable, + )); + match wait_result { + Ok(outcome) => { + if args.machine_readable { + print_machine_readable_result(&outcome, Some(script_start.elapsed().as_secs())); + } + if !outcome.success { + return Err(outcome + .error + .unwrap_or_else(|| "one or more assets failed watched tasks".to_string())); + } + } + Err(e) => { + if args.machine_readable { + let failure_outcome = StatusWaitOutcome { + success: false, + error: Some(e.clone()), + assets: uploaded_asset_ids + .iter() + .map(|a| MachineAssetStatus { + asset_id: a.asset_id.clone(), + local_path: a.local_path.clone(), + status: "unknown".to_string(), + }) + .collect(), + }; + print_machine_readable_result(&failure_outcome, None); + } + return Err(e); + } + } } return Ok(()); } @@ -529,15 +583,49 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { println!(); if let Some(mode) = status_wait_mode { - wait_for_asset_processing_status( + let wait_result = wait_for_asset_processing_status( &cfg, &api_key, bearer_header.as_deref(), &uploaded_asset_ids, mode, script_start, + args.machine_readable, ) - .await?; + .await; + match wait_result { + Ok(outcome) => { + if args.machine_readable { + print_machine_readable_result( + &outcome, + Some(script_start.elapsed().as_secs()), + ); + } + if !outcome.success { + return Err(outcome + .error + .unwrap_or_else(|| "one or more assets failed watched tasks".to_string())); + } + } + Err(e) => { + if args.machine_readable { + let failure_outcome = StatusWaitOutcome { + success: false, + error: Some(e.clone()), + assets: uploaded_asset_ids + .iter() + .map(|a| MachineAssetStatus { + asset_id: a.asset_id.clone(), + local_path: a.local_path.clone(), + status: "unknown".to_string(), + }) + .collect(), + }; + print_machine_readable_result(&failure_outcome, None); + } + return Err(e); + } + } } Ok(()) @@ -676,7 +764,7 @@ fn run_two_queue_pipeline( bearer_opt: Option<&str>, user_id: &str, upload_request_id: &str, -) -> Result, String> { +) -> Result, String> { let (upload_tx, mut upload_rx) = tokio_mpsc::channel::(64); let mut progress = TwoQueueProgress::new()?; @@ -753,7 +841,8 @@ fn run_two_queue_pipeline( }; drop(upload_tx); - let uploaded_asset_ids: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); + let uploaded_asset_ids: Arc>> = + Arc::new(std::sync::Mutex::new(Vec::new())); let uploaded_asset_ids_consumer = Arc::clone(&uploaded_asset_ids); let consumer = async move { while let Some(file_info) = upload_rx.recv().await { @@ -812,7 +901,10 @@ fn run_two_queue_pipeline( )); } if let Ok(mut guard) = uploaded_asset_ids_consumer.lock() { - guard.push(upload_resp.asset_id.clone()); + guard.push(UploadedAssetInfo { + asset_id: upload_resp.asset_id.clone(), + local_path: file_info.original_path.to_string_lossy().to_string(), + }); } // Call preprocess as soon as this upload finishes let _ = progress_handle.add_info("Triggering preprocessing..."); @@ -1046,7 +1138,7 @@ async fn upload_with_per_file_presigned( bearer_opt: Option<&str>, disable_description_generation: bool, generate_proxy: Option<&Vec>, -) -> Result, String> { +) -> Result, String> { let http = Arc::new( reqwest::Client::builder() .timeout(std::time::Duration::from_secs(60)) @@ -1056,7 +1148,8 @@ async fn upload_with_per_file_presigned( let semaphore = Arc::new(Semaphore::new(max_concurrent)); let mut upload_tasks = Vec::new(); - let uploaded_asset_ids: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); + let uploaded_asset_ids: Arc>> = + Arc::new(std::sync::Mutex::new(Vec::new())); let cfg = cfg.clone(); let api_key = api_key.to_string(); let bearer_opt = bearer_opt.map(String::from); @@ -1135,7 +1228,10 @@ async fn upload_with_per_file_presigned( if success { if let Ok(mut guard) = uploaded_asset_ids_clone.lock() { - guard.push(upload_resp.asset_id.clone()); + guard.push(UploadedAssetInfo { + asset_id: upload_resp.asset_id.clone(), + local_path: file_info.original_path.to_string_lossy().to_string(), + }); } let mut preproc_req = ProcessAssetsRequest::new( vec![upload_resp], @@ -1303,33 +1399,87 @@ fn render_status_rows( rows } +fn print_machine_readable_result(outcome: &StatusWaitOutcome, elapsed_seconds: Option) { + let assets_json: Vec = outcome + .assets + .iter() + .map(|a| { + serde_json::json!({ + "local_path": a.local_path, + "asset_id": a.asset_id, + "status": a.status, + }) + }) + .collect(); + let payload = if outcome.success { + serde_json::json!({ + "success": true, + "assets": assets_json, + "elapsed_seconds": elapsed_seconds.unwrap_or_default(), + }) + } else { + serde_json::json!({ + "success": false, + "assets": assets_json, + "error": outcome + .error + .clone() + .unwrap_or_else(|| "one or more assets failed watched tasks".to_string()), + }) + }; + println!("{}", payload); +} + async fn wait_for_asset_processing_status( cfg: &Configuration, api_key: &str, bearer_opt: Option<&str>, - uploaded_asset_ids: &[String], + uploaded_assets: &[UploadedAssetInfo], mode: StatusWaitMode, script_start: std::time::Instant, -) -> Result<(), String> { - if uploaded_asset_ids.is_empty() { - output::info("No uploaded asset_id found; skipping task polling"); - return Ok(()); + machine_readable: bool, +) -> Result { + if uploaded_assets.is_empty() { + if !machine_readable { + output::info("No uploaded asset_id found; skipping task polling"); + } + return Ok(StatusWaitOutcome { + success: true, + error: None, + assets: Vec::new(), + }); } - let mut progress_by_asset: HashMap = uploaded_asset_ids + let mut progress_by_asset: HashMap = uploaded_assets .iter() - .cloned() - .map(|id| (id, AssetTaskProgress::default())) + .map(|a| (a.asset_id.clone(), AssetTaskProgress::default())) .collect(); + let ordered_asset_ids: Vec = uploaded_assets.iter().map(|a| a.asset_id.clone()).collect(); - output::info("Polling /users/tasks every 2s for processing status..."); - let mut status_progress = - crate::tui::InlineProgress::new("Processing Uploaded Assets", uploaded_asset_ids.len())?; - let status_handle = status_progress.clone_handle(); - let _ = status_handle.set_show_elapsed(false); - for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { - let _ = status_handle.start_task(task_id, asset_id.clone(), 100); + if !machine_readable { + output::info("Polling /users/tasks every 2s for processing status..."); } - let status_render_handle = status_progress.start_render_loop(status_handle.clone()); + let mut status_progress = if machine_readable { + None + } else { + Some(crate::tui::InlineProgress::new( + "Processing Uploaded Assets", + uploaded_assets.len(), + )?) + }; + let status_handle = status_progress.as_ref().map(|p| p.clone_handle()); + if let Some(handle) = status_handle.as_ref() { + let _ = handle.set_show_elapsed(false); + for (task_id, asset) in uploaded_assets.iter().enumerate() { + let _ = handle.start_task(task_id, asset.asset_id.clone(), 100); + } + } + let status_render_handle = if let (Some(progress), Some(handle)) = + (status_progress.as_mut(), status_handle.as_ref()) + { + Some(progress.start_render_loop(handle.clone())) + } else { + None + }; loop { let finish_before_seconds = (script_start.elapsed().as_secs() as i32) + 60; let tasks = api::get_tasks_users_tasks_get( @@ -1362,56 +1512,58 @@ async fn wait_for_asset_processing_status( } } - let rendered_rows = render_status_rows(uploaded_asset_ids, &progress_by_asset); - for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { - if let Some(asset_progress) = progress_by_asset.get(asset_id) { - let row = rendered_rows - .get(task_id) - .cloned() - .unwrap_or_else(|| render_status_row(asset_id, asset_progress)); - let _ = status_handle.set_task_label(task_id, row); - let pct = match mode { - StatusWaitMode::Done => { - let mut sum = 0.0; - let mut count = 0.0; - if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { - sum += task_progress_to_percent(*p); - count += 1.0; - } - if let Some((_, p)) = asset_progress.downscaling.as_ref() { - sum += task_progress_to_percent(*p); - count += 1.0; - } - if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { - sum += task_progress_to_percent(*p); - count += 1.0; - } - if count > 0.0 { sum / count } else { 0.0 } - } - StatusWaitMode::Analysed => { - let mut sum = 0.0; - let mut count = 0.0; - if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { - sum += task_progress_to_percent(*p); - count += 1.0; + if let Some(handle) = status_handle.as_ref() { + let rendered_rows = render_status_rows(&ordered_asset_ids, &progress_by_asset); + for (task_id, asset_id) in ordered_asset_ids.iter().enumerate() { + if let Some(asset_progress) = progress_by_asset.get(asset_id) { + let row = rendered_rows + .get(task_id) + .cloned() + .unwrap_or_else(|| render_status_row(asset_id, asset_progress)); + let _ = handle.set_task_label(task_id, row); + let pct = match mode { + StatusWaitMode::Done => { + let mut sum = 0.0; + let mut count = 0.0; + if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.downscaling.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if count > 0.0 { sum / count } else { 0.0 } } - if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { - sum += task_progress_to_percent(*p); - count += 1.0; + StatusWaitMode::Analysed => { + let mut sum = 0.0; + let mut count = 0.0; + if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if count > 0.0 { sum / count } else { 0.0 } } - if count > 0.0 { sum / count } else { 0.0 } - } - StatusWaitMode::Transcoded => asset_progress - .downscaling - .as_ref() - .map(|(_, p)| task_progress_to_percent(*p)) - .unwrap_or(0.0), - }; - let _ = status_handle.set_task_progress_pct(task_id, pct); + StatusWaitMode::Transcoded => asset_progress + .downscaling + .as_ref() + .map(|(_, p)| task_progress_to_percent(*p)) + .unwrap_or(0.0), + }; + let _ = handle.set_task_progress_pct(task_id, pct); + } } } - let all_done = uploaded_asset_ids.iter().all(|asset_id| { + let all_done = ordered_asset_ids.iter().all(|asset_id| { progress_by_asset .get(asset_id) .map(|p| asset_done_for_mode(mode, p)) @@ -1419,35 +1571,77 @@ async fn wait_for_asset_processing_status( }); if all_done { - for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { - let has_error = progress_by_asset + let had_error = ordered_asset_ids.iter().any(|asset_id| { + progress_by_asset .get(asset_id) .map(|p| has_error_for_mode(mode, p)) - .unwrap_or(false); - let _ = status_handle.finish_task(task_id, !has_error); - if let Some(asset_progress) = progress_by_asset.get(asset_id) { - if let Some((status, _)) = asset_progress.downscaling.as_ref() { - if status.eq_ignore_ascii_case("error") - || status.eq_ignore_ascii_case("failed") - { - let _ = status_handle.add_warning(format!( - "asset_id={} reached terminal status with downscaling={}", - asset_id, status - )); + .unwrap_or(false) + }); + if let Some(handle) = status_handle.as_ref() { + for (task_id, asset_id) in ordered_asset_ids.iter().enumerate() { + let has_error = progress_by_asset + .get(asset_id) + .map(|p| has_error_for_mode(mode, p)) + .unwrap_or(false); + let _ = handle.finish_task(task_id, !has_error); + if let Some(asset_progress) = progress_by_asset.get(asset_id) { + if let Some((status, _)) = asset_progress.downscaling.as_ref() { + if status.eq_ignore_ascii_case("error") + || status.eq_ignore_ascii_case("failed") + { + let _ = handle.add_warning(format!( + "asset_id={} reached terminal status with downscaling={}", + asset_id, status + )); + } } } } } - crate::tui::InlineProgress::stop_render_loop(status_render_handle).await; - status_progress.finish()?; - println!(); - output::success("All watched assets reached terminal state for selected tasks"); - break; + if let Some(render_handle) = status_render_handle { + crate::tui::InlineProgress::stop_render_loop(render_handle).await; + } + if let Some(progress) = status_progress.as_mut() { + progress.finish()?; + println!(); + } + if !machine_readable { + output::success("All watched assets reached terminal state for selected tasks"); + } + let assets: Vec = uploaded_assets + .iter() + .map(|asset| { + let status = progress_by_asset + .get(&asset.asset_id) + .map(|p| { + if has_error_for_mode(mode, p) { + "error" + } else { + "success" + } + }) + .unwrap_or("unknown") + .to_string(); + MachineAssetStatus { + asset_id: asset.asset_id.clone(), + local_path: asset.local_path.clone(), + status, + } + }) + .collect(); + return Ok(StatusWaitOutcome { + success: !had_error, + error: if had_error { + Some("one or more assets failed watched tasks".to_string()) + } else { + None + }, + assets, + }); } sleep(Duration::from_secs(2)).await; } - Ok(()) } fn single_put_url(resp: &AssetUploadResponse) -> String {