Implement S3VersionStore::store_version_from_reader#398
Implement S3VersionStore::store_version_from_reader#398
Conversation
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis PR updates AWS SDK dependencies, introduces AWS SDK error handling to the error module, and implements the S3 version storage functionality with support for both single-part PUT and concurrent multipart uploads. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant S3Store as S3VersionStore
participant S3Client
participant Reader
participant PartUploader as Part Uploader Task
participant S3Service as AWS S3
Caller->>S3Store: store_version_from_reader(hash, reader)
S3Store->>S3Client: Initialize AWS S3 client
S3Store->>Reader: read_full() - Read first chunk
Reader-->>S3Store: Data chunk
alt Small file (< PART_SIZE)
S3Store->>S3Service: put_object(key, data)
S3Service-->>S3Store: Success
else Large file (>= PART_SIZE)
S3Store->>S3Service: create_multipart_upload(key)
S3Service-->>S3Store: upload_id
S3Store->>Reader: Read remaining chunks
Reader-->>S3Store: Chunk 1, 2, 3, ...
S3Store->>S3Store: Send parts to channel
activate PartUploader
PartUploader->>S3Service: upload_part(upload_id, part_num, data)
S3Service-->>PartUploader: CompletedPart
PartUploader->>PartUploader: Collect & sort parts
deactivate PartUploader
S3Store->>S3Service: complete_multipart_upload(upload_id, parts)
S3Service-->>S3Store: Success
end
S3Store-->>Caller: Result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
crates/lib/src/error.rs (1)
150-151: Preserve the wrapped AWS error as the source.Line 151 stores the SDK error, but
thiserroronly wiresError::source()from fields marked#[source], namedsource, or annotated with#[from]. As written, callers will see the display message but lose the underlying AWS error chain. (docs.rs)♻️ Likely fix
- AwsSdkError(Box<dyn std::error::Error + Send + Sync>), + AwsSdkError(#[source] Box<dyn std::error::Error + Send + Sync>),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/lib/src/error.rs` around lines 150 - 151, The AwsSdkError enum variant currently wraps the AWS error but doesn't expose it as the source for Error::source(); update the AwsSdkError variant in the error enum (the AwsSdkError(...) variant in error.rs) to mark the inner boxed error as the source (either by adding the #[source] attribute to the variant or by renaming the inner field to source) so thiserror will wire through the underlying AWS error chain; keep the existing #[error("AWS SDK error: {0}")] display message while adding the source annotation to preserve error chaining.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/lib/src/storage/s3.rs`:
- Line 186: The multipart upload currently hardcodes PART_SIZE = 8 * 1024 * 1024
and increments part_num in the upload loop; add a guard to prevent exceeding
S3's 10,000-part limit by either (a) checking the reader/content length up front
(if available) and computing a larger PART_SIZE so total_parts <= 10_000, or (b)
if length is unknown, detect in the upload loop (where part_num is incremented)
and return a clear error when part_num > 10_000. Update the multipart upload
function that defines PART_SIZE and uses part_num to perform this validation and
return an error early instead of continuing to stream.
- Around line 238-336: The spawned upload_task (JoinHandle named upload_task)
can continue issuing upload_part calls after the producer errors and before
abort_multipart_upload runs; ensure the worker is explicitly cancelled and
awaited before calling abort_multipart_upload: on any early return/error in the
producer/result block, call upload_task.abort() (or send a cancellation signal
via tx/closure), then await upload_task (handle the JoinError) to guarantee all
in-flight part uploads stop/complete before invoking
client.abort_multipart_upload(); update the error paths around the producer loop
and the Result handling where upload_task is dropped so upload_task.abort() and
upload_task.await are executed prior to calling abort_multipart_upload.
---
Nitpick comments:
In `@crates/lib/src/error.rs`:
- Around line 150-151: The AwsSdkError enum variant currently wraps the AWS
error but doesn't expose it as the source for Error::source(); update the
AwsSdkError variant in the error enum (the AwsSdkError(...) variant in error.rs)
to mark the inner boxed error as the source (either by adding the #[source]
attribute to the variant or by renaming the inner field to source) so thiserror
will wire through the underlying AWS error chain; keep the existing #[error("AWS
SDK error: {0}")] display message while adding the source annotation to preserve
error chaining.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3b22fdac-fc5c-48a3-8b7b-9093413ee5d9
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
.claude/CLAUDE.mdCargo.tomlcrates/lib/src/error.rscrates/lib/src/storage/s3.rs
|
|
||
| // S3 multipart upload requires 5MB minimum per part (except the last). | ||
| // We use 8MB parts for a balance of memory usage and request count. | ||
| const PART_SIZE: usize = 8 * 1024 * 1024; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "s3.rs" -path "*/storage/*" | head -20Repository: Oxen-AI/Oxen
Length of output: 86
🏁 Script executed:
cat -n ./crates/lib/src/storage/s3.rs | sed -n '180,200p'Repository: Oxen-AI/Oxen
Length of output: 1042
🏁 Script executed:
cat -n ./crates/lib/src/storage/s3.rs | sed -n '270,310p'Repository: Oxen-AI/Oxen
Length of output: 1909
🏁 Script executed:
cat -n ./crates/lib/src/storage/s3.rs | sed -n '240,275p'Repository: Oxen-AI/Oxen
Length of output: 1843
🏁 Script executed:
cat -n ./crates/lib/src/storage/s3.rs | sed -n '165,195p'Repository: Oxen-AI/Oxen
Length of output: 1332
🏁 Script executed:
rg -n "MAX_S3_PARTS|10000|10_000|part.*limit|part_num" ./crates/lib/src/storage/s3.rsRepository: Oxen-AI/Oxen
Length of output: 516
🏁 Script executed:
# Check if there are any other constraints or error handling around file size
cat -n ./crates/lib/src/storage/s3.rs | sed -n '200,240p'Repository: Oxen-AI/Oxen
Length of output: 1964
Guard against S3's 10,000 part-count ceiling in the fixed 8 MiB multipart upload.
The implementation at line 186 uses a fixed 8 MiB part size without validating against S3's maximum part limit of 10,000. This means files larger than ~78 GiB (8 MiB × 10,000) will exceed the limit and fail during upload after most data has already been streamed. There is no validation in the loop at lines 276–293 that increments part_num.
Add a guard to return an error when part_num > 10_000, or compute a larger part size up front if the reader length is available.
Minimal guard
const MAX_CONCURRENT_UPLOADS: usize = 8;
const MAX_PARTS_WAITING_FOR_UPLOAD: usize = 16;
+ const MAX_S3_PARTS: i32 = 10_000;
...
loop {
let mut buf = vec![0u8; PART_SIZE];
let n = read_full(&mut reader, &mut buf).await?;
if n == 0 {
// We have reached EOF
break;
}
+ if part_num > MAX_S3_PARTS {
+ return Err(OxenError::basic_str(
+ "S3 multipart upload exceeds the 10,000 part limit; larger files need a bigger part size",
+ ));
+ }
buf.truncate(n);
tx.send((part_num, buf)).await.map_err(|_| {
OxenError::basic_str("Upload task terminated while sending parts")
})?;
part_num += 1;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/lib/src/storage/s3.rs` at line 186, The multipart upload currently
hardcodes PART_SIZE = 8 * 1024 * 1024 and increments part_num in the upload
loop; add a guard to prevent exceeding S3's 10,000-part limit by either (a)
checking the reader/content length up front (if available) and computing a
larger PART_SIZE so total_parts <= 10_000, or (b) if length is unknown, detect
in the upload loop (where part_num is incremented) and return a clear error when
part_num > 10_000. Update the multipart upload function that defines PART_SIZE
and uses part_num to perform this validation and return an error early instead
of continuing to stream.
| let upload_task = tokio::spawn(async move { | ||
| let results: Result<Vec<CompletedPart>, OxenError> = ReceiverStream::new(rx) | ||
| .map(|(part_num, data)| { | ||
| let client = upload_client.clone(); | ||
| let bucket = upload_bucket.clone(); | ||
| let key = upload_key.clone(); | ||
| let upload_id = upload_id_clone.clone(); | ||
| async move { | ||
| let resp = client | ||
| .upload_part() | ||
| .bucket(bucket) | ||
| .key(key) | ||
| .upload_id(upload_id) | ||
| .part_number(part_num) | ||
| .body(ByteStream::from(data)) | ||
| .send() | ||
| .await | ||
| .map_err(OxenError::aws_sdk_error)?; | ||
|
|
||
| let etag = resp.e_tag().map(|s| s.to_string()).ok_or_else(|| { | ||
| OxenError::basic_str("S3 upload_part response missing ETag") | ||
| })?; | ||
|
|
||
| Ok(CompletedPart::builder() | ||
| .part_number(part_num) | ||
| .e_tag(etag) | ||
| .build()) | ||
| } | ||
| }) | ||
| .buffer_unordered(MAX_CONCURRENT_UPLOADS) | ||
| .try_collect() | ||
| .await; | ||
| results | ||
| }); | ||
|
|
||
| // Run the read-upload-complete pipeline, aborting the multipart upload on any error. | ||
| let result = async { | ||
| // Read parts on the current task and feed them into the channel | ||
| let mut part_num: i32 = 1; | ||
| tx.send((part_num, first_buf)).await.map_err(|_| { | ||
| OxenError::basic_str("Upload task terminated while sending first part") | ||
| })?; | ||
| part_num += 1; | ||
|
|
||
| loop { | ||
| let mut buf = vec![0u8; PART_SIZE]; | ||
| let n = read_full(&mut reader, &mut buf).await?; | ||
| if n == 0 { | ||
| // We have reached EOF | ||
| break; | ||
| } | ||
| buf.truncate(n); | ||
| tx.send((part_num, buf)).await.map_err(|_| { | ||
| OxenError::basic_str("Upload task terminated while sending parts") | ||
| })?; | ||
| part_num += 1; | ||
| } | ||
|
|
||
| // Close the channel so the upload task knows we're done | ||
| drop(tx); | ||
|
|
||
| // Collect upload results | ||
| let mut completed_parts: Vec<CompletedPart> = upload_task | ||
| .await | ||
| .map_err(|e| OxenError::basic_str(format!("Upload task panicked: {e}")))??; | ||
|
|
||
| // Sort by part number since buffer_unordered doesn't preserve order | ||
| completed_parts.sort_by_key(|p| p.part_number); | ||
|
|
||
| let completed = CompletedMultipartUpload::builder() | ||
| .set_parts(Some(completed_parts)) | ||
| .build(); | ||
|
|
||
| client | ||
| .complete_multipart_upload() | ||
| .bucket(&self.bucket) | ||
| .key(&key) | ||
| .upload_id(&upload_id) | ||
| .multipart_upload(completed) | ||
| .send() | ||
| .await | ||
| .map_err(OxenError::aws_sdk_error)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
| .await; | ||
|
|
||
| if let Err(ref e) = result { | ||
| log::error!("Multipart upload failed, aborting: {e}"); | ||
| let _ = client | ||
| .abort_multipart_upload() | ||
| .bucket(&self.bucket) | ||
| .key(&key) | ||
| .upload_id(&upload_id) | ||
| .send() | ||
| .await; | ||
| } | ||
|
|
||
| result |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the S3 storage file to examine the full context
fd -t f "s3\.rs" crates/lib/src/storage/Repository: Oxen-AI/Oxen
Length of output: 84
🏁 Script executed:
# Get the full context around the mentioned lines (238-336) to understand scoping
if fd -t f "s3\.rs" crates/lib/src/storage/ | head -1 | xargs wc -l | awk '{print $1}'; then
fd -t f "s3\.rs" crates/lib/src/storage/ | head -1 | xargs sed -n '200,350p'
fiRepository: Oxen-AI/Oxen
Length of output: 5645
🏁 Script executed:
# Search for the function definition to understand the full structure
fd -t f "s3\.rs" crates/lib/src/storage/ | head -1 | xargs rg -n "store_version_from_reader" -A 5 -B 5Repository: Oxen-AI/Oxen
Length of output: 438
🌐 Web query:
Tokio JoinHandle drop detach task continue running
💡 Result:
In Tokio, dropping a JoinHandle detaches the associated task, allowing it to continue running in the background independently. The task is not aborted or cancelled; it runs to completion (or indefinitely if it doesn't terminate), but its return value becomes inaccessible since there's no longer a handle to join on it. Key points from official documentation: - "A JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to join on it." - "If a JoinHandle is dropped, then the task continues running in the background and its return value is lost." Example from docs showing a detached task continuing after its JoinHandle is dropped: let original_task = task::spawn(async { let _detached_task = task::spawn(async { time::sleep(Duration::from_millis(10)).await; println!("♫ Still alive ♫"); // Prints even after JoinHandle dropped }); // original_task completes, dropping _detached_task }); To explicitly detach without dropping, use handle.detach (available in recent Tokio versions), which consumes the JoinHandle and returns a DetachedTaskHandle, but the behavior is the same as dropping: task continues running. Dropping does NOT abort the task. Use handle.abort for cancellation (panics the task if cooperative).
Citations:
- 1: https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html
- 2: https://docs.rs/tokio/latest/%20tokio/task/struct.JoinHandle.html
- 3: https://teaclave.apache.org/api-docs/client-sdk-rust/tokio/task/struct.JoinHandle.html
🌐 Web query:
AWS S3 abort_multipart_upload can upload_part succeed after abort
💡 Result:
No, after calling abort_multipart_upload on an AWS S3 multipart upload, no additional parts can be uploaded using that upload ID. However, if a part upload was already in progress at the time of the abort call, that ongoing upload_part request might or might not succeed due to the asynchronous nature of the operation. Previously completed parts are deleted (storage freed), but to ensure complete cleanup, check ListParts afterward as multiple aborts may be needed if in-flight parts complete.
Citations:
- 1: https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
- 2: https://docs.aws.amazon.com/AmazonS3/latest/userguide/abort-mpu.html
- 3: https://docs.aws.amazon.com/cli/latest/reference/s3api/abort-multipart-upload.html
- 4: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/multipartupload/abort.html
- 5: https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
The spawned upload task may continue issuing S3 upload_part calls after abort_multipart_upload is invoked.
If the producer fails before reaching line 300 (upload_task.await), the result block exits early via the ? operator. The upload_task JoinHandle is then dropped without being awaited, which per Tokio's documented behavior detaches the task and allows it to continue running in the background. The subsequent abort_multipart_upload call races against these in-flight part uploads. S3's abort does not retroactively cancel parts already in flight—they may still succeed before the abort propagates. To prevent this, join the upload task with an explicit cancellation (via upload_task.abort() or by ensuring it's awaited) before calling abort_multipart_upload, or restructure to guarantee the spawned task is cancelled on any producer error.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/lib/src/storage/s3.rs` around lines 238 - 336, The spawned upload_task
(JoinHandle named upload_task) can continue issuing upload_part calls after the
producer errors and before abort_multipart_upload runs; ensure the worker is
explicitly cancelled and awaited before calling abort_multipart_upload: on any
early return/error in the producer/result block, call upload_task.abort() (or
send a cancellation signal via tx/closure), then await upload_task (handle the
JoinError) to guarantee all in-flight part uploads stop/complete before invoking
client.abort_multipart_upload(); update the error paths around the producer loop
and the Result handling where upload_task is dropped so upload_task.abort() and
upload_task.await are executed prior to calling abort_multipart_upload.
I'm tackling the S3 implementation one method at a time, and there's many more methods to go, so it will be a bit until we get into a state where we can actually try it out.
OxenError::AwsSdkErrorvariant. It basically uses adyn Errorbecause AWS can return all sorts of different errors, and at this point we don't want to deal with them individually.