From bb7603abed99e80f4ec9de9de14f74f3ee0701e3 Mon Sep 17 00:00:00 2001 From: Nathan Stocks Date: Fri, 27 Mar 2026 17:01:20 -0600 Subject: [PATCH] implement S3VersionStore::store_version_from_reader --- .claude/CLAUDE.md | 1 + Cargo.lock | 201 +++++++++++++++++------------------ Cargo.toml | 4 +- crates/lib/src/error.rs | 6 ++ crates/lib/src/storage/s3.rs | 191 +++++++++++++++++++++++++++++++-- 5 files changed, 288 insertions(+), 115 deletions(-) diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 0ab3d288d..ee6d64b30 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -107,6 +107,7 @@ oxen push origin main # Push to remote - When calling `get_staged_db_manager`, follow the doc comment on that function: drop the returned `StagedDBManager` as soon as possible (via a block scope or explicit `drop()`) to avoid holding the shared database handle longer than necessary. - When altering the `OxenError` enum, consider whether a hint needs to be added or updated in the `hint` method. - After changing any Rust code, verify that tests pass with the `bin/test-rust` script (not `cargo`). The script is documented in a comment at the top of its file. +- Prefer using inline code over creating a new function when the function would only be called once and the function body would be less than 15 lines. # Testing Rules - Use the test helpers in `crates/lib/src/test.rs` (e.g., `run_empty_local_repo_test`) for unit tests in the lib code. diff --git a/Cargo.lock b/Cargo.lock index d2c995e93..fa9f6db91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ dependencies = [ "derive_more 2.1.1", "encoding_rs", "flate2", - "foldhash", + "foldhash 0.1.5", "futures-core", "h2 0.3.27", "http 0.2.12", @@ -219,7 +219,7 @@ dependencies = [ "cookie", "derive_more 2.1.1", "encoding_rs", - "foldhash", + "foldhash 0.1.5", "futures-core", "futures-util", "impl-more", @@ -1078,9 +1078,9 @@ dependencies = [ [[package]] name = "aws-config" -version = "1.8.13" +version = "1.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c456581cb3c77fafcc8c67204a70680d40b61112d6da78c77bd31d945b65f1b5" +checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1088,8 +1088,8 @@ dependencies = [ "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1098,7 +1098,7 @@ dependencies = [ "fastrand", "hex", "http 1.4.0", - "ring 0.17.14", + "sha1", "time", "tokio", "tracing", @@ -1108,9 +1108,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.11" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +checksum = "8f20799b373a1be121fe3005fba0c2090af9411573878f224df44b42727fcaf7" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1142,20 +1142,21 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.6.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c635c2dc792cb4a11ce1a4f392a925340d1bdf499289b5ec1ec6810954eb43f5" +checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", "bytes", + "bytes-utils", "fastrand", "http 0.2.12", "http 1.4.0", @@ -1169,9 +1170,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.119.0" +version = "1.127.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d65fddc3844f902dfe1864acb8494db5f9342015ee3ab7890270d36fbd2e01c" +checksum = "151783f64e0dcddeb4965d08e36c276b4400a46caa88805a2e36d497deaf031a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1179,8 +1180,9 @@ dependencies = [ "aws-smithy-async", "aws-smithy-checksums", "aws-smithy-eventstream", - "aws-smithy-http 0.62.6", - "aws-smithy-json 0.61.9", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1192,8 +1194,8 @@ dependencies = [ "hmac", "http 0.2.12", "http 1.4.0", - "http-body 0.4.6", - "lru 0.12.5", + "http-body 1.0.1", + "lru 0.16.3", "percent-encoding", "regex-lite", "sha2", @@ -1203,15 +1205,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.93.0" +version = "1.97.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcb38bb33fc0a11f1ffc3e3e85669e0a11a37690b86f77e75306d8f369146a0" +checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -1227,15 +1229,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.95.0" +version = "1.99.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ada8ffbea7bd1be1f53df1dadb0f8fdb04badb13185b3321b929d1ee3caad09" +checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -1251,15 +1253,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.97.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6443ccadc777095d5ed13e21f5c364878c9f5bad4e35187a6cdbd863b0afcad" +checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http 0.63.3", - "aws-smithy-json 0.62.3", + "aws-smithy-http", + "aws-smithy-json", "aws-smithy-observability", "aws-smithy-query", "aws-smithy-runtime", @@ -1276,13 +1278,13 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.8" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efa49f3c607b92daae0c078d48a4571f599f966dce3caee5f1ea55c4d9073f99" +checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1304,9 +1306,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.11" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" +checksum = "2ffcaf626bdda484571968400c326a244598634dc75fd451325a54ad1a59acfc" dependencies = [ "futures-util", "pin-project-lite", @@ -1315,17 +1317,18 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.63.12" +version = "0.64.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87294a084b43d649d967efe58aa1f9e0adc260e13a6938eb904c0ae9b45824ae" +checksum = "6750f3dd509b0694a4377f0293ed2f9630d710b1cebe281fa8bac8f099f88bc6" dependencies = [ - "aws-smithy-http 0.62.6", + "aws-smithy-http", "aws-smithy-types", "bytes", "crc-fast", "hex", - "http 0.2.12", - "http-body 0.4.6", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", "md-5", "pin-project-lite", "sha1", @@ -1335,9 +1338,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.18" +version = "0.60.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b9c7354a3b13c66f60fe4616d6d1969c9fd36b1b5333a5dfb3ee716b33c588" +checksum = "faf09d74e5e32f76b8762da505a3cd59303e367a664ca67295387baa8c1d7548" dependencies = [ "aws-smithy-types", "bytes", @@ -1346,32 +1349,11 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.6" +version = "0.63.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +checksum = "ba1ab2dc1c2c3749ead27180d333c42f11be8b0e934058fb4b2258ee8dbe5231" dependencies = [ "aws-smithy-eventstream", - "aws-smithy-runtime-api", - "aws-smithy-types", - "bytes", - "bytes-utils", - "futures-core", - "futures-util", - "http 0.2.12", - "http 1.4.0", - "http-body 0.4.6", - "percent-encoding", - "pin-project-lite", - "pin-utils", - "tracing", -] - -[[package]] -name = "aws-smithy-http" -version = "0.63.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" -dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -1389,9 +1371,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.9" +version = "1.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" +checksum = "6a2f165a7feee6f263028b899d0a181987f4fa7179a6411a32a439fba7c5f769" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1419,36 +1401,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.9" +version = "0.62.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" -dependencies = [ - "aws-smithy-types", -] - -[[package]] -name = "aws-smithy-json" -version = "0.62.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb96aa208d62ee94104645f7b2ecaf77bf27edf161590b6224bfbac2832f979" +checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" +checksum = "a06c2315d173edbf1920da8ba3a7189695827002e4c0fc961973ab1c54abca9c" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.13" +version = "0.60.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" +checksum = "1a56d79744fb3edb5d722ef79d86081e121d3b9422cb209eb03aea6aa4f21ebd" dependencies = [ "aws-smithy-types", "urlencoding", @@ -1456,12 +1429,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.0" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" +checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" dependencies = [ "aws-smithy-async", - "aws-smithy-http 0.63.3", + "aws-smithy-http", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", @@ -1481,9 +1454,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.3" +version = "1.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" +checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1498,9 +1471,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.3" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" +checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" dependencies = [ "base64-simd", "bytes", @@ -1524,18 +1497,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "0ce02add1aa3677d022f8adf81dcbe3046a95f17a1b1e8979c145cd21d3d22b3" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.11" +version = "1.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -2404,9 +2377,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" dependencies = [ "crc-catalog", ] @@ -2419,15 +2392,14 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] name = "crc-fast" -version = "1.6.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" +checksum = "2fd92aca2c6001b1bf5ba0ff84ee74ec8501b52bbef0cac80bf25a6c1d87a83d" dependencies = [ "crc", "digest", - "rand 0.9.2", - "regex", "rustversion", + "spin 0.10.0", ] [[package]] @@ -3261,6 +3233,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -3590,7 +3568,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", "rayon", "serde", ] @@ -3600,6 +3578,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "hashlink" @@ -4669,20 +4652,20 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.5" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" dependencies = [ "hashbrown 0.15.5", ] [[package]] name = "lru" -version = "0.14.0" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" +checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" dependencies = [ - "hashbrown 0.15.5", + "hashbrown 0.16.1", ] [[package]] @@ -6062,7 +6045,7 @@ dependencies = [ "bytes", "compact_str", "flate2", - "foldhash", + "foldhash 0.1.5", "hashbrown 0.15.5", "indexmap 2.13.0", "libc", @@ -7651,6 +7634,12 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" + [[package]] name = "spki" version = "0.6.0" diff --git a/Cargo.toml b/Cargo.toml index aaf346ab6..09e417631 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,8 +40,8 @@ async-tar = "0.5.0" async-tempfile = "0.7.0" async-trait = "0.1.80" async_zip = { version = "0.0.18", features = ["full"] } -aws-config = "1.8.11" -aws-sdk-s3 = "1.118.0" +aws-config = "1.8.15" +aws-sdk-s3 = "1.127.0" bincode = "1.3.3" bytecount = "0.6.3" bytes = "1.5.0" diff --git a/crates/lib/src/error.rs b/crates/lib/src/error.rs index 353fb34e5..03daa70ae 100644 --- a/crates/lib/src/error.rs +++ b/crates/lib/src/error.rs @@ -147,6 +147,8 @@ pub enum OxenError { ImportFileError(StringError), // External Library Errors + #[error("AWS SDK error: {0}")] + AwsSdkError(Box), #[error("{0}")] IO(#[from] io::Error), #[error("Authentication failed: {0}")] @@ -251,6 +253,10 @@ impl OxenError { Some(hint) } + pub fn aws_sdk_error(e: impl std::error::Error + Send + Sync + 'static) -> Self { + OxenError::AwsSdkError(Box::new(e)) + } + pub fn basic_str(s: impl AsRef) -> Self { OxenError::Basic(StringError::from(s.as_ref())) } diff --git a/crates/lib/src/storage/s3.rs b/crates/lib/src/storage/s3.rs index b9e71fcb8..8a504d2ec 100644 --- a/crates/lib/src/storage/s3.rs +++ b/crates/lib/src/storage/s3.rs @@ -4,16 +4,20 @@ use async_trait::async_trait; use aws_config::meta::region::RegionProviderChain; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::head_object::HeadObjectError; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::{Client, config::Region, primitives::ByteStream}; use bytes::Bytes; +use futures::TryStreamExt; +use futures::stream::StreamExt; use log; use std::collections::HashMap; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::OnceCell; use tokio_stream::Stream; +use tokio_stream::wrappers::ReceiverStream; use super::version_store::{LocalFilePath, VersionStore}; use crate::constants::VERSION_FILE_NAME; @@ -171,13 +175,165 @@ impl VersionStore for S3VersionStore { async fn store_version_from_reader( &self, - _hash: &str, - _reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), + hash: &str, + reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), ) -> Result<(), OxenError> { - // TODO: Implement S3 version storage from reader - Err(OxenError::basic_str( - "S3VersionStore store_version_from_reader not yet implemented", - )) + let client = self.init_client().await?; + let key = self.generate_key(hash); + + // S3 multipart upload requires 5MB minimum per part (except the last). + // We use 8MB parts for a balance of memory usage and request count. + const PART_SIZE: usize = 8 * 1024 * 1024; + + let mut reader = tokio::io::BufReader::new(reader); + + // Read the first part to determine if we need multipart upload + let mut first_buf = vec![0u8; PART_SIZE]; + let first_n = read_full(&mut reader, &mut first_buf).await?; + first_buf.truncate(first_n); + + if first_n < PART_SIZE { + // Small file: single put_object + let body = ByteStream::from(first_buf); + client + .put_object() + .bucket(&self.bucket) + .key(&key) + .body(body) + .send() + .await + .map_err(OxenError::aws_sdk_error)?; + return Ok(()); + } + + // Large file: multipart upload + let upload = client + .create_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .send() + .await + .map_err(OxenError::aws_sdk_error)?; + + let upload_id = upload + .upload_id() + .ok_or_else(|| OxenError::basic_str("S3 multipart upload missing upload_id"))? + .to_string(); + + // Pipeline reading and uploading: read parts on this task and send them through a bounded + // channel. A spawned task consumes the channel and uploads parts concurrently via + // buffer_unordered. The bounded channel keeps at most MAX_PARTS_WAITING_FOR_UPLOAD parts in + // memory. + const MAX_CONCURRENT_UPLOADS: usize = 8; + const MAX_PARTS_WAITING_FOR_UPLOAD: usize = 16; + + let (tx, rx) = tokio::sync::mpsc::channel::<(i32, Vec)>(MAX_PARTS_WAITING_FOR_UPLOAD); + + // Spawn the upload consumer -- reads from channel and uploads concurrently + let upload_client = client.clone(); + let upload_bucket = self.bucket.clone(); + let upload_key = key.clone(); + let upload_id_clone = upload_id.clone(); + + let upload_task = tokio::spawn(async move { + let results: Result, OxenError> = ReceiverStream::new(rx) + .map(|(part_num, data)| { + let client = upload_client.clone(); + let bucket = upload_bucket.clone(); + let key = upload_key.clone(); + let upload_id = upload_id_clone.clone(); + async move { + let resp = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .part_number(part_num) + .body(ByteStream::from(data)) + .send() + .await + .map_err(OxenError::aws_sdk_error)?; + + let etag = resp.e_tag().map(|s| s.to_string()).ok_or_else(|| { + OxenError::basic_str("S3 upload_part response missing ETag") + })?; + + Ok(CompletedPart::builder() + .part_number(part_num) + .e_tag(etag) + .build()) + } + }) + .buffer_unordered(MAX_CONCURRENT_UPLOADS) + .try_collect() + .await; + results + }); + + // Run the read-upload-complete pipeline, aborting the multipart upload on any error. + let result = async { + // Read parts on the current task and feed them into the channel + let mut part_num: i32 = 1; + tx.send((part_num, first_buf)).await.map_err(|_| { + OxenError::basic_str("Upload task terminated while sending first part") + })?; + part_num += 1; + + loop { + let mut buf = vec![0u8; PART_SIZE]; + let n = read_full(&mut reader, &mut buf).await?; + if n == 0 { + // We have reached EOF + break; + } + buf.truncate(n); + tx.send((part_num, buf)).await.map_err(|_| { + OxenError::basic_str("Upload task terminated while sending parts") + })?; + part_num += 1; + } + + // Close the channel so the upload task knows we're done + drop(tx); + + // Collect upload results + let mut completed_parts: Vec = upload_task + .await + .map_err(|e| OxenError::basic_str(format!("Upload task panicked: {e}")))??; + + // Sort by part number since buffer_unordered doesn't preserve order + completed_parts.sort_by_key(|p| p.part_number); + + let completed = CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(); + + client + .complete_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .multipart_upload(completed) + .send() + .await + .map_err(OxenError::aws_sdk_error)?; + + Ok(()) + } + .await; + + if let Err(ref e) = result { + log::error!("Multipart upload failed, aborting: {e}"); + let _ = client + .abort_multipart_upload() + .bucket(&self.bucket) + .key(&key) + .upload_id(&upload_id) + .send() + .await; + } + + result } async fn store_version(&self, hash: &str, data: &[u8]) -> Result<(), OxenError> { @@ -477,6 +633,27 @@ impl VersionStore for S3VersionStore { } } +/// Read from `reader` until `buf` is full or EOF, returning the number of +/// bytes read. Unlike a single `read()` call this won't return a short read +/// unless EOF is reached. +async fn read_full( + reader: &mut (dyn tokio::io::AsyncRead + Send + Unpin), + buf: &mut [u8], +) -> Result { + let mut offset = 0; + while offset < buf.len() { + let n = reader + .read(&mut buf[offset..]) + .await + .map_err(|e| OxenError::basic_str(format!("Failed to read from reader: {e}")))?; + if n == 0 { + break; + } + offset += n; + } + Ok(offset) +} + use std::io; use std::pin::Pin; use std::task::{Context, Poll};