feat(portal): stream file uploads to reduce memory from O(file_size) to O(5 MiB)#409
feat(portal): stream file uploads to reduce memory from O(file_size) to O(5 MiB)#409lilongen wants to merge 1 commit intoboxlite-ai:mainfrom
Conversation
…to O(5 MiB) Replace full-file Vec buffering in upload_tar with a bounded mpsc channel (capacity=4) and a spawned reader task. This caps peak memory at ~5 MiB regardless of file size, matching the streaming pattern already used in download_tar and the guest-side upload handler. Key changes: - Extract stream_file_chunks helper accepting impl AsyncRead for testability - Use std::mem::take for dest_path/container_id (first chunk only, zero-copy) - Always await reader JoinHandle before checking gRPC result (root-cause priority) - Return error on receiver drop instead of silent Ok (incomplete upload detection) - Add 8 unit tests: multi-chunk, single-chunk, partial, empty, receiver-drop, read-error (FailingReader mock), data-integrity, and file-not-found Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the portal-side tar upload path to stream file contents instead of buffering the entire archive in memory, bringing upload behavior in line with the already-streaming download path and reducing peak heap usage for large uploads.
Changes:
- Reworked
FilesInterface::upload_tarto streamUploadChunks via a boundedmpscchannel +ReceiverStreamrather than prebuilding aVec<UploadChunk>. - Added
stream_file_chunkshelper to read anyAsyncReadinCHUNK_SIZEincrements and emitUploadChunks. - Added unit tests covering chunking behavior, integrity, cancellation/receiver-drop behavior, and error propagation.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let upload_result = self.client.upload(stream).await; | ||
|
|
||
| // Always join the reader task first — its error is the root cause when | ||
| // both the reader and the gRPC call fail (e.g. read error causes the | ||
| // stream to end, which causes the server to report failure). | ||
| match reader_handle.await { | ||
| Ok(Err(read_err)) => return Err(read_err), | ||
| Err(join_err) => { | ||
| return Err(BoxliteError::Internal(format!( | ||
| "Upload reader task failed: {}", | ||
| join_err | ||
| ))); | ||
| } | ||
| Ok(Ok(())) => {} | ||
| } | ||
|
|
||
| // Use futures::stream::iter for the upload stream | ||
| let stream = futures::stream::iter(chunks); | ||
|
|
||
| let response = self | ||
| .client | ||
| .upload(stream) | ||
| .await | ||
| .map_err(map_tonic_err)? | ||
| .into_inner(); | ||
| let response = upload_result.map_err(map_tonic_err)?.into_inner(); |
There was a problem hiding this comment.
upload_tar returns the reader task error before inspecting the gRPC result. If the server rejects the upload early (e.g. permission denied) and drops the receiver, stream_file_chunks will likely return Internal("Upload stream closed..."), which then masks the real tonic Status from upload_result. Consider combining both results: if upload_result is Err, prefer that error unless the reader error is a real I/O failure (e.g. Storage("Failed to read...")), or include both errors in the returned message.
| let upload_result = self.client.upload(stream).await; | ||
|
|
||
| // Always join the reader task first — its error is the root cause when | ||
| // both the reader and the gRPC call fail (e.g. read error causes the | ||
| // stream to end, which causes the server to report failure). | ||
| match reader_handle.await { | ||
| Ok(Err(read_err)) => return Err(read_err), | ||
| Err(join_err) => { | ||
| return Err(BoxliteError::Internal(format!( | ||
| "Upload reader task failed: {}", | ||
| join_err | ||
| ))); | ||
| } | ||
| Ok(Ok(())) => {} | ||
| } |
There was a problem hiding this comment.
After the gRPC call completes, upload_tar always awaits reader_handle. If the upload fails quickly (connection error) but the file read blocks indefinitely (e.g. stalled filesystem), this can hang the whole operation and hide the gRPC failure. Consider aborting the reader task (or using a timeout) when upload_result is Err, while still joining normally on the success path to surface read errors.
Summary
Vec<UploadChunk>buffering inupload_tarwith a boundedmpsc::channel(4)+ spawned reader task, capping peak memory at ~5 MiB regardless of file sizestream_file_chunkshelper (acceptsimpl AsyncRead) for testability and single-responsibilitystd::mem::takefordest_path/container_id(first chunk only, zero-copy on subsequent chunks)JoinHandlebefore checking gRPC result to surface root-cause errors firstOk(())to detect incomplete uploadsMotivation
upload_tarpreviously read the entire tar file into aVec<UploadChunk>before streaming to the guest. For a 500 MB file, this consumed ~500 MB of heap. Thedownload_tarpath in the same file already used streaming correctly — this PR closes the asymmetry.Test plan
cargo fmt -- --checkcleancargo clippy -p boxlite --no-default-features --lib -- -D warningsclean (0 warnings)cargo test -p boxlite --no-default-features --lib— 608 passed, 0 failedstream_file_chunks:FailingReadermockupload_tarfile-not-found error path🤖 Generated with Claude Code