From 7ec9a18fd9922c013677d52087936564a8e7e688 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Tue, 22 Apr 2025 10:24:16 +0200 Subject: [PATCH 1/7] oci: fix printing of "already" layer IDs When we already have a layer, we print a message saying as much. In fb1de9f482aa ("src/fsverity: up our FsVerityHashValue trait game") we accidentally changed this to use the debug trait. This would have been correct if we were printing a layer ID, but this is an array, so we see the bytes if we do that. Revert to the previous behaviour. Signed-off-by: Allison Karlitskaya --- src/oci/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/oci/mod.rs b/src/oci/mod.rs index 63497416..589c4048 100644 --- a/src/oci/mod.rs +++ b/src/oci/mod.rs @@ -105,7 +105,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { if let Some(layer_id) = self.repo.check_stream(layer_sha256)? { self.progress - .println(format!("Already have layer {layer_sha256:?}"))?; + .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; Ok(layer_id) } else { // Otherwise, we need to fetch it... From f286208a104ee3350043dcff7201885deb49d3fd Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Sat, 19 Apr 2025 14:06:16 +0200 Subject: [PATCH 2/7] repository: use OnceCell for objects_dir Rename our `Repository::object_dir()` accessor to `.objects_dir()` (the directory is called `"objects"`) and change its implementation to use a OnceCell. ostree uses this "initialize on first use" pattern for some of its directories as well, and I like it. We use the external `once_cell::` crate (which we already had as a -devel dependency) because the .try() version of the API is not yet stable in the standard library. Clean up our ensure_object() implementation to use `.objects_dir()` and generally make things a bit more robust: - we avoid unnecessary mkdir() calls for directories which almost certainly already exist - instead of checking merely for the existence of an object file with the correct name, we actually measure it now - we play around less with joining pathnames (and we can drop a now-unused trait helper method on FsVerityHashValue) Signed-off-by: Allison Karlitskaya --- Cargo.toml | 2 +- src/bin/composefs-setup-root.rs | 2 +- src/fsverity/hashvalue.rs | 10 +-- src/repository.rs | 111 +++++++++++++++++++++----------- 4 files changed, 82 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8c45cfe2..0d05d267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4.0" indicatif = { version = "0.17.0", features = ["tokio"] } log = "0.4.8" oci-spec = "0.7.0" +once_cell = { version = "1.21.3", default-features = false } regex-automata = { version = "0.4.4", default-features = false } rustix = { version = "1.0.0", features = ["fs", "mount", "process"] } serde = "1.0.145" @@ -41,7 +42,6 @@ zstd = "0.13.0" [dev-dependencies] insta = "1.42.2" -once_cell = "1.21.3" similar-asserts = "1.7.0" test-with = { version = "0.14", default-features = false, features = ["executable", "runtime"] } tokio-test = "0.4.4" diff --git a/src/bin/composefs-setup-root.rs b/src/bin/composefs-setup-root.rs index 479aa759..10b9b5aa 100644 --- a/src/bin/composefs-setup-root.rs +++ b/src/bin/composefs-setup-root.rs @@ -169,7 +169,7 @@ fn open_root_fs(path: &Path) -> Result { fn mount_composefs_image(sysroot: &OwnedFd, name: &str) -> Result { let repo = Repository::::open_path(sysroot, "composefs")?; let image = repo.open_image(name)?; - composefs_fsmount(image, name, repo.object_dir()?).context("Failed to mount composefs image") + composefs_fsmount(image, name, repo.objects_dir()?).context("Failed to mount composefs image") } fn mount_subdir( diff --git a/src/fsverity/hashvalue.rs b/src/fsverity/hashvalue.rs index e75f1800..6a833900 100644 --- a/src/fsverity/hashvalue.rs +++ b/src/fsverity/hashvalue.rs @@ -60,17 +60,17 @@ where } fn to_object_pathname(&self) -> String { - format!("{:02x}/{}", self.as_bytes()[0], self.to_object_basename()) + format!( + "{:02x}/{}", + self.as_bytes()[0], + hex::encode(&self.as_bytes()[1..]) + ) } fn to_object_dir(&self) -> String { format!("{:02x}", self.as_bytes()[0]) } - fn to_object_basename(&self) -> String { - hex::encode(&self.as_bytes()[1..]) - } - fn to_hex(&self) -> String { hex::encode(self.as_bytes()) } diff --git a/src/repository.rs b/src/repository.rs index 66d343fa..c10703af 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -2,16 +2,17 @@ use std::{ collections::HashSet, ffi::CStr, fs::File, - io::{ErrorKind, Read, Write}, + io::{Read, Write}, os::fd::{AsFd, OwnedFd}, path::{Path, PathBuf}, }; use anyhow::{bail, ensure, Context, Result}; +use once_cell::sync::OnceCell; use rustix::{ fs::{ - accessat, fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, symlinkat, Access, - AtFlags, Dir, FileType, FlockOperation, Mode, OFlags, CWD, + fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, symlinkat, AtFlags, Dir, + FileType, FlockOperation, Mode, OFlags, CWD, }, io::{Errno, Result as ErrnoResult}, }; @@ -26,9 +27,35 @@ use crate::{ util::{proc_self_fd, Sha256Digest}, }; +/// Call openat() on the named subdirectory of "dirfd", possibly creating it first. +/// +/// We assume that the directory will probably exist (ie: we try the open first), and on ENOENT, we +/// mkdirat() and retry. +fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult { + match openat( + &dirfd, + filename, + flags | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ) { + Ok(file) => Ok(file), + Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) { + Ok(()) | Err(Errno::EXIST) => openat( + dirfd, + filename, + flags | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ), + Err(other) => Err(other), + }, + Err(other) => Err(other), + } +} + #[derive(Debug)] pub struct Repository { repository: OwnedFd, + objects: OnceCell, _data: std::marker::PhantomData, } @@ -39,11 +66,9 @@ impl Drop for Repository { } impl Repository { - pub fn object_dir(&self) -> ErrnoResult { - self.openat( - "objects", - OFlags::PATH | OFlags::DIRECTORY | OFlags::CLOEXEC, - ) + pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> { + self.objects + .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH)) } pub fn open_path(dirfd: impl AsFd, path: impl AsRef) -> Result { @@ -58,6 +83,7 @@ impl Repository { Ok(Self { repository, + objects: OnceCell::new(), _data: std::marker::PhantomData, }) } @@ -80,48 +106,61 @@ impl Repository { } pub fn ensure_object(&self, data: &[u8]) -> Result { - let digest: ObjectID = compute_verity(data); - let dir = PathBuf::from(format!("objects/{}", digest.to_object_dir())); - let file = dir.join(digest.to_object_basename()); + let dirfd = self.objects_dir()?; + let id: ObjectID = compute_verity(data); - // fairly common... - if accessat(&self.repository, &file, Access::READ_OK, AtFlags::empty()) == Ok(()) { - return Ok(digest); - } + let path = id.to_object_pathname(); - self.ensure_dir("objects")?; - self.ensure_dir(&dir)?; + // the usual case is that the file will already exist + match openat( + dirfd, + &path, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) { + Ok(fd) => { + // measure the existing file to ensure that it's correct + // TODO: try to replace file if it's broken? + ensure_verity_equal(fd, &id)?; + return Ok(id); + } + Err(Errno::NOENT) => { + // in this case we'll create the file + } + Err(other) => { + return Err(other).context("Checking for existing object in repository")?; + } + } - let fd = openat( - &self.repository, - &dir, - OFlags::RDWR | OFlags::CLOEXEC | OFlags::TMPFILE, - 0o666.into(), - )?; - rustix::io::write(&fd, data)?; // TODO: no write_all() here... - fdatasync(&fd)?; + let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?; + let mut file = File::from(fd); + file.write_all(data)?; + fdatasync(&file)?; // We can't enable verity with an open writable fd, so re-open and close the old one. - let ro_fd = open(proc_self_fd(&fd), OFlags::RDONLY, Mode::empty())?; - drop(fd); + let ro_fd = open(proc_self_fd(&file), OFlags::RDONLY, Mode::empty())?; + drop(file); enable_verity::(&ro_fd).context("Enabling verity digest")?; - ensure_verity_equal(&ro_fd, &digest).context("Double-checking verity digest")?; + ensure_verity_equal(&ro_fd, &id).context("Double-checking verity digest")?; - if let Err(err) = linkat( + match linkat( CWD, proc_self_fd(&ro_fd), - &self.repository, - file, + dirfd, + path, AtFlags::SYMLINK_FOLLOW, ) { - if err.kind() != ErrorKind::AlreadyExists { - return Err(err.into()); + Ok(()) => {} + Err(Errno::EXIST) => { + // TODO: strictly, we should measure the newly-appeared file + } + Err(other) => { + return Err(other).context("Linking created object file"); } } - drop(ro_fd); - Ok(digest) + Ok(id) } fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result { @@ -350,7 +389,7 @@ impl Repository { Ok(mount_composefs_at( image, name, - &self.object_dir()?, + self.objects_dir()?, mountpoint, )?) } From 64269f6a2801bc74f4a0194302d5507f0dae1067 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Mon, 21 Apr 2025 13:59:18 +0200 Subject: [PATCH 3/7] cfsctl: use #[tokio::main] Let's just have an `async fn main()` instead of doing this ourselves. This also gets us access to the multithreaded executor, which we'll start using soon. Signed-off-by: Allison Karlitskaya --- Cargo.toml | 2 +- src/bin/cfsctl.rs | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0d05d267..15727905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ sha2 = "0.10.1" tar = { version = "0.4.38", default-features = false } tempfile = "3.8.0" thiserror = "2.0.0" -tokio = "1.24.2" +tokio = { version = "1.24.2", features = ["rt-multi-thread"] } toml = "0.8.0" xxhash-rust = { version = "0.8.2", features = ["xxh32"] } zerocopy = { version = "0.8.0", features = ["derive", "std"] } diff --git a/src/bin/cfsctl.rs b/src/bin/cfsctl.rs index 8cca5aaa..bfd8ed95 100644 --- a/src/bin/cfsctl.rs +++ b/src/bin/cfsctl.rs @@ -106,7 +106,8 @@ enum Command { }, } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { env_logger::init(); let args = App::parse(); @@ -158,12 +159,7 @@ fn main() -> Result<()> { println!("{}", image_id.to_hex()); } OciCommand::Pull { ref image, name } => { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to build tokio runtime"); - // And invoke the async_main - runtime.block_on(async move { oci::pull(&repo, image, name.as_deref()).await })?; + oci::pull(&repo, image, name.as_deref()).await? } OciCommand::Seal { verity, ref name } => { let (sha256, verity) = oci::seal( From ba2ba48cb4ceae24f0bc57171c8a5ba9302c1105 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Mon, 21 Apr 2025 14:02:13 +0200 Subject: [PATCH 4/7] fsverity: mark FsVerityHashValue: Send + Sync + 'static Our implementations trivially satisfy all of these constraints. Signed-off-by: Allison Karlitskaya --- src/fsverity/hashvalue.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/fsverity/hashvalue.rs b/src/fsverity/hashvalue.rs index 6a833900..2761b96b 100644 --- a/src/fsverity/hashvalue.rs +++ b/src/fsverity/hashvalue.rs @@ -7,6 +7,7 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; pub trait FsVerityHashValue where Self: Clone, + Self: Send + Sync + 'static, Self: From>, Self: FromBytes + Immutable + IntoBytes + KnownLayout + Unaligned, Self: Hash + Eq, From 9ab1b73a99116ad9b5d4427d18e42dd92706685b Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Mon, 21 Apr 2025 14:22:04 +0200 Subject: [PATCH 5/7] src: start using a bit of refcounting Our `SplitStreamWriter` and `oci::ImageOp` structs contain simple references to `Repository` which results in some awkward lifetime rules on those structs. We can simplify things substantially if we lean into ref-counting a bit more. I'm not yet ready to declare that Repository is always refcounted, but for operations involving splitstreams (including oci downloads) it is now required. The ergonomics of this change surprised me. The Deref trait on `Arc<>` and the ability to define `self: &Arc` methods makes this all quite nice to use. Signed-off-by: Allison Karlitskaya --- src/bin/cfsctl.rs | 8 ++++---- src/oci/mod.rs | 28 ++++++++++++++-------------- src/oci/tar.rs | 2 +- src/repository.rs | 5 +++-- src/splitstream.rs | 19 +++++++++++-------- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/bin/cfsctl.rs b/src/bin/cfsctl.rs index bfd8ed95..893af1df 100644 --- a/src/bin/cfsctl.rs +++ b/src/bin/cfsctl.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use clap::{Parser, Subcommand}; @@ -141,7 +141,7 @@ async fn main() -> Result<()> { Command::Oci { cmd: oci_cmd } => match oci_cmd { OciCommand::ImportLayer { name, sha256 } => { let object_id = oci::import_layer( - &repo, + &Arc::new(repo), &parse_sha256(sha256)?, name.as_deref(), &mut std::io::stdin(), @@ -159,11 +159,11 @@ async fn main() -> Result<()> { println!("{}", image_id.to_hex()); } OciCommand::Pull { ref image, name } => { - oci::pull(&repo, image, name.as_deref()).await? + oci::pull(&Arc::new(repo), image, name.as_deref()).await? } OciCommand::Seal { verity, ref name } => { let (sha256, verity) = oci::seal( - &repo, + &Arc::new(repo), name, verity.map(Sha256HashValue::from_hex).transpose()?.as_ref(), )?; diff --git a/src/oci/mod.rs b/src/oci/mod.rs index 589c4048..bf92a8cc 100644 --- a/src/oci/mod.rs +++ b/src/oci/mod.rs @@ -3,7 +3,7 @@ use std::process::Command; pub mod image; pub mod tar; -use std::{collections::HashMap, io::Read, iter::zip, path::Path}; +use std::{collections::HashMap, io::Read, iter::zip, path::Path, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; @@ -23,7 +23,7 @@ use crate::{ }; pub fn import_layer( - repo: &Repository, + repo: &Arc>, sha256: &Sha256Digest, name: Option<&str>, tar_stream: &mut impl Read, @@ -44,8 +44,8 @@ pub fn ls_layer( Ok(()) } -struct ImageOp<'repo, ObjectID: FsVerityHashValue> { - repo: &'repo Repository, +struct ImageOp { + repo: Arc>, proxy: ImageProxy, img: OpenedImage, progress: MultiProgress, @@ -67,8 +67,8 @@ fn sha256_from_digest(digest: &str) -> Result { type ContentAndVerity = (Sha256Digest, ObjectID); -impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { - async fn new(repo: &'repo Repository, imgref: &str) -> Result { +impl ImageOp { + async fn new(repo: &Arc>, imgref: &str) -> Result { // See https://github.com/containers/skopeo/issues/2563 let skopeo_cmd = if imgref.starts_with("containers-storage:") { let mut cmd = Command::new("podman"); @@ -87,7 +87,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { let img = proxy.open_image(imgref).await.context("Opening image")?; let progress = MultiProgress::new(); Ok(ImageOp { - repo, + repo: Arc::clone(repo), proxy, img, progress, @@ -142,7 +142,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { } pub async fn ensure_config( - &self, + self: &Arc, manifest_layers: &[Descriptor], descriptor: &Descriptor, ) -> Result> { @@ -192,7 +192,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { } } - pub async fn pull(&self) -> Result> { + pub async fn pull(self: &Arc) -> Result> { let (_manifest_digest, raw_manifest) = self .proxy .fetch_manifest_raw_oci(&self.img) @@ -213,11 +213,11 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { /// Pull the target image, and add the provided tag. If this is a mountable /// image (i.e. not an artifact), it is *not* unpacked by default. pub async fn pull( - repo: &Repository, + repo: &Arc>, imgref: &str, reference: Option<&str>, ) -> Result<()> { - let op = ImageOp::new(repo, imgref).await?; + let op = Arc::new(ImageOp::new(repo, imgref).await?); let (sha256, id) = op .pull() .await @@ -280,7 +280,7 @@ pub fn open_config_shallow( } pub fn write_config( - repo: &Repository, + repo: &Arc>, config: &ImageConfiguration, refs: DigestMap, ) -> Result> { @@ -294,7 +294,7 @@ pub fn write_config( } pub fn seal( - repo: &Repository, + repo: &Arc>, name: &str, verity: Option<&ObjectID>, ) -> Result> { @@ -421,7 +421,7 @@ mod test { let layer_id: [u8; 32] = context.finalize().into(); let repo_dir = tempdir(); - let repo = Repository::::open_path(CWD, &repo_dir).unwrap(); + let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); diff --git a/src/oci/tar.rs b/src/oci/tar.rs index 4f3a0cb5..5bf698fd 100644 --- a/src/oci/tar.rs +++ b/src/oci/tar.rs @@ -75,7 +75,7 @@ pub fn split( pub async fn split_async( mut tar_stream: impl AsyncRead + Unpin, - writer: &mut SplitStreamWriter<'_, impl FsVerityHashValue>, + writer: &mut SplitStreamWriter, ) -> Result<()> { while let Some(header) = read_header_async(&mut tar_stream).await? { // the header always gets stored as inline data diff --git a/src/repository.rs b/src/repository.rs index c10703af..f75c4892 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -5,6 +5,7 @@ use std::{ io::{Read, Write}, os::fd::{AsFd, OwnedFd}, path::{Path, PathBuf}, + sync::Arc, }; use anyhow::{bail, ensure, Context, Result}; @@ -173,7 +174,7 @@ impl Repository { /// You should write the data to the returned object and then pass it to .store_stream() to /// store the result. pub fn create_stream( - &self, + self: &Arc, sha256: Option, maps: Option>, ) -> SplitStreamWriter { @@ -285,7 +286,7 @@ impl Repository { /// On success, the object ID of the new object is returned. It is expected that this object /// ID will be used when referring to the stream from other linked streams. pub fn ensure_stream( - &self, + self: &Arc, sha256: &Sha256Digest, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, diff --git a/src/splitstream.rs b/src/splitstream.rs index 0924ff06..6527f873 100644 --- a/src/splitstream.rs +++ b/src/splitstream.rs @@ -3,7 +3,10 @@ * See doc/splitstream.md */ -use std::io::{BufReader, Read, Write}; +use std::{ + io::{BufReader, Read, Write}, + sync::Arc, +}; use anyhow::{bail, Result}; use sha2::{Digest, Sha256}; @@ -60,14 +63,14 @@ impl DigestMap { } } -pub struct SplitStreamWriter<'a, ObjectID: FsVerityHashValue> { - repo: &'a Repository, +pub struct SplitStreamWriter { + repo: Arc>, inline_content: Vec, - writer: Encoder<'a, Vec>, + writer: Encoder<'static, Vec>, pub sha256: Option<(Sha256, Sha256Digest)>, } -impl std::fmt::Debug for SplitStreamWriter<'_, ObjectID> { +impl std::fmt::Debug for SplitStreamWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // writer doesn't impl Debug f.debug_struct("SplitStreamWriter") @@ -78,9 +81,9 @@ impl std::fmt::Debug for SplitStreamWriter<'_, Obje } } -impl<'a, ObjectID: FsVerityHashValue> SplitStreamWriter<'a, ObjectID> { +impl SplitStreamWriter { pub fn new( - repo: &'a Repository, + repo: &Arc>, refs: Option>, sha256: Option, ) -> Self { @@ -98,7 +101,7 @@ impl<'a, ObjectID: FsVerityHashValue> SplitStreamWriter<'a, ObjectID> { } Self { - repo, + repo: Arc::clone(repo), inline_content: vec![], writer, sha256: sha256.map(|x| (Sha256::new(), x)), From 6790c6bde390a45bcb9ae9822a8e68d72c1462e8 Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Mon, 21 Apr 2025 14:32:24 +0200 Subject: [PATCH 6/7] oci: make `oci pull` multi-threaded async Add an async version of `Repository::ensure_object()` and wire it through `SplitStreamWriter::write_external()`. Call that when we're splitting OCI layer tarballs to offload the writing of external objects (and the `fdatasync()` that goes with it) to a separate thread. This is something like some prep work for something we've been trying to accomplish for a while in #62 but it doesn't come close to the complete picture (since it still writes the objects sequentially). Modify the (already) async code in oci::ImageOp to download layers in parallel. This is a big deal for images with many layers (as is often the case for bootc images, due to the splitting heuristics). This takes a pull of the Fedora Silverblue 42 container image (when pulled from a local `oci-dir`) from ~90s to ~8.5s time to complete on my laptop. Unfortunately, container images made from large single layers are not substantially improved. In order to make this change we need to depend on a new version of containers-image-proxy-rs which makes ImageProxy: Send + Sync, so bump our required version to the one released today. Signed-off-by: Allison Karlitskaya --- Cargo.toml | 2 +- src/fsverity/hashvalue.rs | 2 +- src/oci/mod.rs | 41 +++++++++++++++++++++++++++------------ src/oci/tar.rs | 2 +- src/repository.rs | 5 +++++ src/splitstream.rs | 9 +++++++++ 6 files changed, 46 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 15727905..d11df8f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ rhel9 = ['pre-6.15'] anyhow = { version = "1.0.87", default-features = false } async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] } clap = { version = "4.0.1", default-features = false, features = ["std", "help", "usage", "derive"] } -containers-image-proxy = "0.7.0" +containers-image-proxy = "0.7.1" env_logger = "0.11.0" hex = "0.4.0" indicatif = { version = "0.17.0", features = ["tokio"] } diff --git a/src/fsverity/hashvalue.rs b/src/fsverity/hashvalue.rs index 2761b96b..5cfb6d71 100644 --- a/src/fsverity/hashvalue.rs +++ b/src/fsverity/hashvalue.rs @@ -7,11 +7,11 @@ use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; pub trait FsVerityHashValue where Self: Clone, - Self: Send + Sync + 'static, Self: From>, Self: FromBytes + Immutable + IntoBytes + KnownLayout + Unaligned, Self: Hash + Eq, Self: fmt::Debug, + Self: Send + Sync + Unpin + 'static, { type Digest: Digest + FixedOutputReset + fmt::Debug; const ALGORITHM: u8; diff --git a/src/oci/mod.rs b/src/oci/mod.rs index bf92a8cc..dad71ea5 100644 --- a/src/oci/mod.rs +++ b/src/oci/mod.rs @@ -1,4 +1,4 @@ -use std::process::Command; +use std::{cmp::Reverse, process::Command, thread::available_parallelism}; pub mod image; pub mod tar; @@ -11,7 +11,7 @@ use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use sha2::{Digest, Sha256}; -use tokio::io::AsyncReadExt; +use tokio::{io::AsyncReadExt, sync::Semaphore}; use crate::{ fs::write_to_path, @@ -96,14 +96,14 @@ impl ImageOp { pub async fn ensure_layer( &self, - layer_sha256: &Sha256Digest, + layer_sha256: Sha256Digest, descriptor: &Descriptor, ) -> Result { // We need to use the per_manifest descriptor to download the compressed layer but it gets // stored in the repository via the per_config descriptor. Our return value is the // fsverity digest for the corresponding splitstream. - if let Some(layer_id) = self.repo.check_stream(layer_sha256)? { + if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { self.progress .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; Ok(layer_id) @@ -122,7 +122,7 @@ impl ImageOp { self.progress .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None); + let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -172,14 +172,31 @@ impl ImageOp { let raw_config = config?; let config = ImageConfiguration::from_reader(&raw_config[..])?; + // We want to sort the layers based on size so we can get started on the big layers + // first. The last thing we want is to start on the biggest layer right at the end. + let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect(); + layers.sort_by_key(|(mld, ..)| Reverse(mld.size())); + + // Bound the number of tasks to the available parallelism. + let threads = available_parallelism()?; + let sem = Arc::new(Semaphore::new(threads.into())); + let mut entries = vec![]; + for (mld, diff_id) in layers { + let self_ = Arc::clone(self); + let permit = Arc::clone(&sem).acquire_owned().await?; + let layer_sha256 = sha256_from_digest(diff_id)?; + let descriptor = mld.clone(); + let future = tokio::spawn(async move { + let _permit = permit; + self_.ensure_layer(layer_sha256, &descriptor).await + }); + entries.push((layer_sha256, future)); + } + + // Collect the results. let mut config_maps = DigestMap::new(); - for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) { - let layer_sha256 = sha256_from_digest(cld)?; - let layer_id = self - .ensure_layer(&layer_sha256, mld) - .await - .with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?; - config_maps.insert(&layer_sha256, &layer_id); + for (layer_sha256, future) in entries { + config_maps.insert(&layer_sha256, &future.await??); } let mut splitstream = self diff --git a/src/oci/tar.rs b/src/oci/tar.rs index 5bf698fd..1358ff37 100644 --- a/src/oci/tar.rs +++ b/src/oci/tar.rs @@ -94,7 +94,7 @@ pub async fn split_async( if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { // non-empty regular file: store the data in the object store let padding = buffer.split_off(actual_size); - writer.write_external(&buffer, padding)?; + writer.write_external_async(buffer, padding).await?; } else { // else: store the data inline in the split stream writer.write_inline(&buffer); diff --git a/src/repository.rs b/src/repository.rs index f75c4892..76986b42 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -106,6 +106,11 @@ impl Repository { }) } + pub async fn ensure_object_async(self: &Arc, data: Vec) -> Result { + let self_ = Arc::clone(self); + tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await? + } + pub fn ensure_object(&self, data: &[u8]) -> Result { let dirfd = self.objects_dir()?; let id: ObjectID = compute_verity(data); diff --git a/src/splitstream.rs b/src/splitstream.rs index 6527f873..c61f5b87 100644 --- a/src/splitstream.rs +++ b/src/splitstream.rs @@ -155,6 +155,15 @@ impl SplitStreamWriter { self.write_reference(&id, padding) } + pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { + if let Some((ref mut sha256, ..)) = self.sha256 { + sha256.update(&data); + sha256.update(&padding); + } + let id = self.repo.ensure_object_async(data).await?; + self.write_reference(&id, padding) + } + pub fn done(mut self) -> Result { self.flush_inline(vec![])?; From 4311a3d353c29dc17000b8e9173f2ac5e56443bc Mon Sep 17 00:00:00 2001 From: Allison Karlitskaya Date: Mon, 28 Apr 2025 15:48:45 +0200 Subject: [PATCH 7/7] oci: don't await the image-proxy GetBlob driver This can cause the proxy's control channel to experience lockups in heavily-threaded situations and we don't gain any benefit from doing it. Just drop it. See https://github.com/containers/containers-image-proxy-rs/issues/80 Signed-off-by: Allison Karlitskaya --- src/oci/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/oci/mod.rs b/src/oci/mod.rs index dad71ea5..8d78b0a3 100644 --- a/src/oci/mod.rs +++ b/src/oci/mod.rs @@ -136,7 +136,13 @@ impl ImageOp { other => bail!("Unsupported layer media type {:?}", other), }; let layer_id = self.repo.write_stream(splitstream, None)?; - driver.await?; + + // We intentionally explicitly ignore this, even though we're supposed to check it. + // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion + // about why. Note: we only care about the uncompressed layer tar, and we checksum it + // ourselves. + drop(driver); + Ok(layer_id) } }