Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use base64::Engine;

pub fn get_user_id_from_bearer(bearer: Option<&str>) -> String {
get_user_id_from_bearer_with_logging(bearer, true)
}

pub fn get_user_id_from_bearer_with_logging(bearer: Option<&str>, with_logging: bool) -> String {
if bearer.is_none() || bearer.map(|s| s.is_empty()).unwrap_or(true) {
crate::output::warning("Bearer token is None or empty, using default user_id");
if with_logging {
crate::output::warning("Bearer token is None or empty, using default user_id");
}
return "__current_user__".to_string();
}

Expand All @@ -15,14 +21,18 @@ pub fn get_user_id_from_bearer(bearer: Option<&str>) -> String {

match decode_jwt_sub(token) {
Ok(user_id) => {
crate::output::info(format!("Successfully extracted user_id: {}", user_id));
if with_logging {
crate::output::info(format!("Successfully extracted user_id: {}", user_id));
}
user_id
}
Err(e) => {
crate::output::warning(format!(
"Failed to decode JWT: {}, using default user_id",
e
));
if with_logging {
crate::output::warning(format!(
"Failed to decode JWT: {}, using default user_id",
e
));
}
"__current_user__".to_string()
}
}
Expand Down
146 changes: 91 additions & 55 deletions src/commands/upload/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,21 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {

let cfg = api_config::create_config();
let api_key = api_config::get_api_key(None)?;
output::info(format!("API base: {}", cfg.base_path));
if !args.machine_readable {
output::info(format!("API base: {}", cfg.base_path));
}

let bearer_header_for_auth = api_config::get_bearer_header(args.auth_bearer.clone());
let user_id = auth::get_user_id_from_bearer(bearer_header_for_auth.as_deref());
let user_id =
auth::get_user_id_from_bearer_with_logging(bearer_header_for_auth.as_deref(), !args.machine_readable);

if !args.force_upload {
let before = original_files.len();
original_files.retain(|path| {
!super::utils::is_already_uploaded(path, &user_id, &base_dir, &args.in_app_path)
});
let skipped = before - original_files.len();
if skipped > 0 {
if skipped > 0 && !args.machine_readable {
output::info(format!("Skipped {} already uploaded file(s)", skipped));
}
if original_files.is_empty() {
Expand All @@ -442,19 +445,23 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {

if args.local_encoding {
ensure_ffmpeg_available()?;
output::info(format!(
"Local encoding: downscale + upload queues ({} quality)",
args.qualities
.iter()
.map(|q| q.as_label())
.collect::<Vec<_>>()
.join(", ")
));
if !args.machine_readable {
output::info(format!(
"Local encoding: downscale + upload queues ({} quality)",
args.qualities
.iter()
.map(|q| q.as_label())
.collect::<Vec<_>>()
.join(", ")
));
}
if args.qualities.len() > 1 {
return Err("Only supporting single quality for now".to_string());
}
let work_items: Vec<DownscaleWork> = build_downscale_work(&original_files)?;
output::info(format!("{} file(s) in downscale queue", work_items.len()));
if !args.machine_readable {
output::info(format!("{} file(s) in downscale queue", work_items.len()));
}
let uploaded_asset_ids = run_two_queue_pipeline(
work_items,
&base_dir,
Expand Down Expand Up @@ -513,15 +520,17 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
}

let mut files_to_upload: Vec<FileToUpload> = Vec::new();
output::info(format!(
"Discovered {} file(s) to upload",
original_files.len()
));
for f in &original_files {
if let Ok(md) = std::fs::metadata(f) {
output::item(format!("{} ({} bytes)", f.display(), md.len()));
} else {
output::item(format!("{}", f.display()));
if !args.machine_readable {
output::info(format!(
"Discovered {} file(s) to upload",
original_files.len()
));
for f in &original_files {
if let Ok(md) = std::fs::metadata(f) {
output::item(format!("{} ({} bytes)", f.display(), md.len()));
} else {
output::item(format!("{}", f.display()));
}
}
}
for original_file in original_files {
Expand All @@ -542,16 +551,28 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
.map_err(|e| format!("failed to start runtime: {}", e))?
.block_on(async move {
let bearer_header = api_config::get_bearer_header(args.auth_bearer.clone());
let mut progress =
crate::tui::InlineProgress::new("Uploading Files", files_to_upload.len())?;
let progress_handle = progress.clone_handle();
let render_handle = progress.start_render_loop(progress_handle.clone());

let _ = progress_handle.add_info(format!(
"One presigned request per file ({} files), {} parallel upload(s)",
files_to_upload.len(),
args.parallel_uploads
));
let mut progress = if args.machine_readable {
None
} else {
Some(crate::tui::InlineProgress::new(
"Uploading Files",
files_to_upload.len(),
)?)
};
let progress_handle = progress.as_ref().map(|p| p.clone_handle());
let render_handle = if let (Some(p), Some(ph)) = (progress.as_mut(), progress_handle.as_ref()) {
Some(p.start_render_loop(ph.clone()))
} else {
None
};

if let Some(ph) = progress_handle.as_ref() {
let _ = ph.add_info(format!(
"One presigned request per file ({} files), {} parallel upload(s)",
files_to_upload.len(),
args.parallel_uploads
));
}

let upload_result = upload_with_per_file_presigned(
&files_to_upload,
Expand All @@ -560,7 +581,7 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
&upload_request_id,
&user_id,
args.parallel_uploads,
&progress_handle,
progress_handle.as_ref(),
&cfg,
&api_key,
bearer_header.as_deref(),
Expand All @@ -569,18 +590,23 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
)
.await;

if let Err(ref e) = upload_result {
let _ = progress_handle.add_error(format!("Upload failed: {}", e));
if let (Err(e), Some(ph)) = (&upload_result, progress_handle.as_ref()) {
let _ = ph.add_error(format!("Upload failed: {}", e));
}
let uploaded_asset_ids = upload_result?;

let _ = progress_handle.add_success("All uploads completed");

crate::tui::InlineProgress::stop_render_loop(render_handle).await;
progress.finish()?;
if let Some(ph) = progress_handle.as_ref() {
let _ = ph.add_success("All uploads completed");
}

// Add empty line after progress display
println!();
if let Some(handle) = render_handle {
crate::tui::InlineProgress::stop_render_loop(handle).await;
}
if let Some(mut p) = progress {
p.finish()?;
// Add empty line after progress display
println!();
}

if let Some(mode) = status_wait_mode {
let wait_result = wait_for_asset_processing_status(
Expand Down Expand Up @@ -1132,7 +1158,7 @@ async fn upload_with_per_file_presigned(
upload_request_id: &str,
user_id: &str,
max_concurrent: usize,
progress_handle: &ProgressHandle,
progress_handle: Option<&ProgressHandle>,
cfg: &Configuration,
api_key: &str,
bearer_opt: Option<&str>,
Expand Down Expand Up @@ -1163,7 +1189,7 @@ async fn upload_with_per_file_presigned(
let file_info = file_info.clone();
let http_clone = Arc::clone(&http);
let semaphore_clone = Arc::clone(&semaphore);
let progress_handle_clone = progress_handle.clone();
let progress_handle_clone = progress_handle.cloned();
let cfg_clone = cfg.clone();
let api_key_clone = api_key.clone();
let bearer_clone = bearer_opt.clone();
Expand Down Expand Up @@ -1204,7 +1230,9 @@ async fn upload_with_per_file_presigned(
.unwrap_or_default()
.to_string_lossy()
.to_string();
let _ = progress_handle_clone.start_task(task_id, file_name.clone(), file_size);
if let Some(ph) = progress_handle_clone.as_ref() {
let _ = ph.start_task(task_id, file_name.clone(), file_size);
}

let result = upload_single_file(
&file_info.upload_path,
Expand All @@ -1215,7 +1243,7 @@ async fn upload_with_per_file_presigned(
&user_id_clone,
task_id,
http_clone.as_ref(),
&progress_handle_clone,
progress_handle_clone.as_ref(),
file_size,
&cfg_clone,
&api_key_clone,
Expand All @@ -1224,7 +1252,9 @@ async fn upload_with_per_file_presigned(
.await;

let success = result.is_ok();
let _ = progress_handle_clone.finish_task(task_id, success);
if let Some(ph) = progress_handle_clone.as_ref() {
let _ = ph.finish_task(task_id, success);
}

if success {
if let Ok(mut guard) = uploaded_asset_ids_clone.lock() {
Expand Down Expand Up @@ -1254,10 +1284,9 @@ async fn upload_with_per_file_presigned(
)
.await
{
let _ = progress_handle_clone.add_error(format!(
"Failed to trigger preprocess: {}",
e
));
if let Some(ph) = progress_handle_clone.as_ref() {
let _ = ph.add_error(format!("Failed to trigger preprocess: {}", e));
}
return Err(format!("preprocess: {}", e));
}
}
Expand Down Expand Up @@ -1728,7 +1757,7 @@ async fn upload_single_file(
user_id: &str,
task_id: usize,
http: &reqwest::Client,
progress_handle: &ProgressHandle,
progress_handle: Option<&ProgressHandle>,
total_bytes: u64,
_cfg: &Configuration,
_api_key: &str,
Expand All @@ -1753,7 +1782,9 @@ async fn upload_single_file(
}
buf.extend_from_slice(&chunk[..n]);
uploaded += n as u64;
let _ = progress_handle.update_task(task_id, uploaded);
if let Some(ph) = progress_handle {
let _ = ph.update_task(task_id, uploaded);
}
}

let content_type = mime_guess::from_path(file_path)
Expand All @@ -1770,7 +1801,9 @@ async fn upload_single_file(
.await
.map_err(|e| format!("upload failed for {}: {}", file_path.display(), e))?;

let _ = progress_handle.update_task(task_id, total_bytes);
if let Some(ph) = progress_handle {
let _ = ph.update_task(task_id, total_bytes);
}

if !put_res.status().is_success() {
let status = put_res.status();
Expand All @@ -1784,7 +1817,9 @@ async fn upload_single_file(
status,
body
);
let _ = progress_handle.add_error(error_msg.clone());
if let Some(ph) = progress_handle {
let _ = ph.add_error(error_msg.clone());
}
return Err(error_msg);
}

Expand All @@ -1795,8 +1830,9 @@ async fn upload_single_file(
&upload_resp.asset_id,
upload_request_id,
) {
let _ =
progress_handle.add_warning(format!("Failed to record upload in tracking file: {}", e));
if let Some(ph) = progress_handle {
let _ = ph.add_warning(format!("Failed to record upload in tracking file: {}", e));
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/commands/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ mod dry_run;
mod main;
mod utils;

pub use main::{run, UploadArgs};
pub use main::{run, UploadArgs, UploadCommand};


8 changes: 7 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ fn main() {
}
}
Some(cli::Command::Upload(upload_args)) => {
let suppress_plain_error = matches!(
&upload_args.command,
commands::upload::UploadCommand::Upload(args) if args.machine_readable
);
if let Err(error) = commands::upload::run(upload_args) {
eprintln!("error: {}", error);
if !suppress_plain_error {
eprintln!("error: {}", error);
}
std::process::exit(1);
}
}
Expand Down
Loading