From 0a4e3670d90c063fd8f36243132072995567ce9c Mon Sep 17 00:00:00 2001 From: crrow Date: Tue, 31 Mar 2026 12:23:05 +0800 Subject: [PATCH] fix(ingestor): abort multipart upload on reassembly failure (#243) When streaming parts into a multipart upload during reassembly, if any part fetch or read fails, the multipart upload is now explicitly aborted to prevent orphaned uploads leaking storage on S3/GCS. Closes #243 --- crates/ingestor/src/handlers.rs | 44 ++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/crates/ingestor/src/handlers.rs b/crates/ingestor/src/handlers.rs index 883f6ca..a14ce3a 100644 --- a/crates/ingestor/src/handlers.rs +++ b/crates/ingestor/src/handlers.rs @@ -189,29 +189,43 @@ pub async fn complete_upload( let mut writer = WriteMultipart::new(upload); let mut total_bytes: usize = 0; + // Stream each part into the multipart writer. If any part fetch fails, + // abort the multipart upload to prevent orphaned uploads on S3/GCS. for n in 0..num_parts { let part_key = StorePath::from(format!("raw/{video_id}/part_{n}").as_str()); - let data = state - .store - .get(&part_key) - .await - .map_err(|e| IngestorError::Storage { - message: format!("missing part {n}: {e}"), - })? - .bytes() - .await - .map_err(|e| IngestorError::Storage { - message: e.to_string(), - })?; + let data = match state.store.get(&part_key).await { + Ok(result) => match result.bytes().await { + Ok(bytes) => bytes, + Err(e) => { + warn!(video_id, part = n, error = %e, "aborting multipart upload: failed to read part bytes"); + writer.abort().await.ok(); + return Err(IngestorError::Storage { + message: e.to_string(), + }); + } + }, + Err(e) => { + warn!(video_id, part = n, error = %e, "aborting multipart upload: missing part"); + writer.abort().await.ok(); + return Err(IngestorError::Storage { + message: format!("missing part {n}: {e}"), + }); + } + }; total_bytes += data.len(); // Feed bytes into the multipart writer without copying; each chunk is // flushed to storage once the internal buffer reaches its threshold. writer.put(data); } - writer.finish().await.map_err(|e| IngestorError::Storage { - message: e.to_string(), - })?; + if let Err(e) = writer.finish().await { + warn!(video_id, error = %e, "aborting multipart upload: finish failed"); + // finish() already calls abort internally on failure in recent + // object_store versions, but we log the error for observability. + return Err(IngestorError::Storage { + message: e.to_string(), + }); + } // Clean up individual parts for n in 0..num_parts {