diff --git a/Cargo.toml b/Cargo.toml index 8c45cfe2..d11df8f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,13 @@ rhel9 = ['pre-6.15'] anyhow = { version = "1.0.87", default-features = false } async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] } clap = { version = "4.0.1", default-features = false, features = ["std", "help", "usage", "derive"] } -containers-image-proxy = "0.7.0" +containers-image-proxy = "0.7.1" env_logger = "0.11.0" hex = "0.4.0" indicatif = { version = "0.17.0", features = ["tokio"] } log = "0.4.8" oci-spec = "0.7.0" +once_cell = { version = "1.21.3", default-features = false } regex-automata = { version = "0.4.4", default-features = false } rustix = { version = "1.0.0", features = ["fs", "mount", "process"] } serde = "1.0.145" @@ -33,7 +34,7 @@ sha2 = "0.10.1" tar = { version = "0.4.38", default-features = false } tempfile = "3.8.0" thiserror = "2.0.0" -tokio = "1.24.2" +tokio = { version = "1.24.2", features = ["rt-multi-thread"] } toml = "0.8.0" xxhash-rust = { version = "0.8.2", features = ["xxh32"] } zerocopy = { version = "0.8.0", features = ["derive", "std"] } @@ -41,7 +42,6 @@ zstd = "0.13.0" [dev-dependencies] insta = "1.42.2" -once_cell = "1.21.3" similar-asserts = "1.7.0" test-with = { version = "0.14", default-features = false, features = ["executable", "runtime"] } tokio-test = "0.4.4" diff --git a/src/bin/cfsctl.rs b/src/bin/cfsctl.rs index 8cca5aaa..893af1df 100644 --- a/src/bin/cfsctl.rs +++ b/src/bin/cfsctl.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use clap::{Parser, Subcommand}; @@ -106,7 +106,8 @@ enum Command { }, } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { env_logger::init(); let args = App::parse(); @@ -140,7 +141,7 @@ fn main() -> Result<()> { Command::Oci { cmd: oci_cmd } => match oci_cmd { OciCommand::ImportLayer { name, sha256 } => { let object_id = oci::import_layer( - &repo, + &Arc::new(repo), &parse_sha256(sha256)?, name.as_deref(), &mut std::io::stdin(), @@ -158,16 +159,11 @@ fn main() -> Result<()> { println!("{}", image_id.to_hex()); } OciCommand::Pull { ref image, name } => { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Failed to build tokio runtime"); - // And invoke the async_main - runtime.block_on(async move { oci::pull(&repo, image, name.as_deref()).await })?; + oci::pull(&Arc::new(repo), image, name.as_deref()).await? } OciCommand::Seal { verity, ref name } => { let (sha256, verity) = oci::seal( - &repo, + &Arc::new(repo), name, verity.map(Sha256HashValue::from_hex).transpose()?.as_ref(), )?; diff --git a/src/bin/composefs-setup-root.rs b/src/bin/composefs-setup-root.rs index 479aa759..10b9b5aa 100644 --- a/src/bin/composefs-setup-root.rs +++ b/src/bin/composefs-setup-root.rs @@ -169,7 +169,7 @@ fn open_root_fs(path: &Path) -> Result { fn mount_composefs_image(sysroot: &OwnedFd, name: &str) -> Result { let repo = Repository::::open_path(sysroot, "composefs")?; let image = repo.open_image(name)?; - composefs_fsmount(image, name, repo.object_dir()?).context("Failed to mount composefs image") + composefs_fsmount(image, name, repo.objects_dir()?).context("Failed to mount composefs image") } fn mount_subdir( diff --git a/src/fsverity/hashvalue.rs b/src/fsverity/hashvalue.rs index e75f1800..5cfb6d71 100644 --- a/src/fsverity/hashvalue.rs +++ b/src/fsverity/hashvalue.rs @@ -11,6 +11,7 @@ where Self: FromBytes + Immutable + IntoBytes + KnownLayout + Unaligned, Self: Hash + Eq, Self: fmt::Debug, + Self: Send + Sync + Unpin + 'static, { type Digest: Digest + FixedOutputReset + fmt::Debug; const ALGORITHM: u8; @@ -60,17 +61,17 @@ where } fn to_object_pathname(&self) -> String { - format!("{:02x}/{}", self.as_bytes()[0], self.to_object_basename()) + format!( + "{:02x}/{}", + self.as_bytes()[0], + hex::encode(&self.as_bytes()[1..]) + ) } fn to_object_dir(&self) -> String { format!("{:02x}", self.as_bytes()[0]) } - fn to_object_basename(&self) -> String { - hex::encode(&self.as_bytes()[1..]) - } - fn to_hex(&self) -> String { hex::encode(self.as_bytes()) } diff --git a/src/oci/mod.rs b/src/oci/mod.rs index 63497416..8d78b0a3 100644 --- a/src/oci/mod.rs +++ b/src/oci/mod.rs @@ -1,9 +1,9 @@ -use std::process::Command; +use std::{cmp::Reverse, process::Command, thread::available_parallelism}; pub mod image; pub mod tar; -use std::{collections::HashMap, io::Read, iter::zip, path::Path}; +use std::{collections::HashMap, io::Read, iter::zip, path::Path, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; @@ -11,7 +11,7 @@ use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use sha2::{Digest, Sha256}; -use tokio::io::AsyncReadExt; +use tokio::{io::AsyncReadExt, sync::Semaphore}; use crate::{ fs::write_to_path, @@ -23,7 +23,7 @@ use crate::{ }; pub fn import_layer( - repo: &Repository, + repo: &Arc>, sha256: &Sha256Digest, name: Option<&str>, tar_stream: &mut impl Read, @@ -44,8 +44,8 @@ pub fn ls_layer( Ok(()) } -struct ImageOp<'repo, ObjectID: FsVerityHashValue> { - repo: &'repo Repository, +struct ImageOp { + repo: Arc>, proxy: ImageProxy, img: OpenedImage, progress: MultiProgress, @@ -67,8 +67,8 @@ fn sha256_from_digest(digest: &str) -> Result { type ContentAndVerity = (Sha256Digest, ObjectID); -impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { - async fn new(repo: &'repo Repository, imgref: &str) -> Result { +impl ImageOp { + async fn new(repo: &Arc>, imgref: &str) -> Result { // See https://github.com/containers/skopeo/issues/2563 let skopeo_cmd = if imgref.starts_with("containers-storage:") { let mut cmd = Command::new("podman"); @@ -87,7 +87,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { let img = proxy.open_image(imgref).await.context("Opening image")?; let progress = MultiProgress::new(); Ok(ImageOp { - repo, + repo: Arc::clone(repo), proxy, img, progress, @@ -96,16 +96,16 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { pub async fn ensure_layer( &self, - layer_sha256: &Sha256Digest, + layer_sha256: Sha256Digest, descriptor: &Descriptor, ) -> Result { // We need to use the per_manifest descriptor to download the compressed layer but it gets // stored in the repository via the per_config descriptor. Our return value is the // fsverity digest for the corresponding splitstream. - if let Some(layer_id) = self.repo.check_stream(layer_sha256)? { + if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? { self.progress - .println(format!("Already have layer {layer_sha256:?}"))?; + .println(format!("Already have layer {}", hex::encode(layer_sha256)))?; Ok(layer_id) } else { // Otherwise, we need to fetch it... @@ -122,7 +122,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { self.progress .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None); + let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -136,13 +136,19 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { other => bail!("Unsupported layer media type {:?}", other), }; let layer_id = self.repo.write_stream(splitstream, None)?; - driver.await?; + + // We intentionally explicitly ignore this, even though we're supposed to check it. + // See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion + // about why. Note: we only care about the uncompressed layer tar, and we checksum it + // ourselves. + drop(driver); + Ok(layer_id) } } pub async fn ensure_config( - &self, + self: &Arc, manifest_layers: &[Descriptor], descriptor: &Descriptor, ) -> Result> { @@ -172,14 +178,31 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { let raw_config = config?; let config = ImageConfiguration::from_reader(&raw_config[..])?; + // We want to sort the layers based on size so we can get started on the big layers + // first. The last thing we want is to start on the biggest layer right at the end. + let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect(); + layers.sort_by_key(|(mld, ..)| Reverse(mld.size())); + + // Bound the number of tasks to the available parallelism. + let threads = available_parallelism()?; + let sem = Arc::new(Semaphore::new(threads.into())); + let mut entries = vec![]; + for (mld, diff_id) in layers { + let self_ = Arc::clone(self); + let permit = Arc::clone(&sem).acquire_owned().await?; + let layer_sha256 = sha256_from_digest(diff_id)?; + let descriptor = mld.clone(); + let future = tokio::spawn(async move { + let _permit = permit; + self_.ensure_layer(layer_sha256, &descriptor).await + }); + entries.push((layer_sha256, future)); + } + + // Collect the results. let mut config_maps = DigestMap::new(); - for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) { - let layer_sha256 = sha256_from_digest(cld)?; - let layer_id = self - .ensure_layer(&layer_sha256, mld) - .await - .with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?; - config_maps.insert(&layer_sha256, &layer_id); + for (layer_sha256, future) in entries { + config_maps.insert(&layer_sha256, &future.await??); } let mut splitstream = self @@ -192,7 +215,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { } } - pub async fn pull(&self) -> Result> { + pub async fn pull(self: &Arc) -> Result> { let (_manifest_digest, raw_manifest) = self .proxy .fetch_manifest_raw_oci(&self.img) @@ -213,11 +236,11 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> { /// Pull the target image, and add the provided tag. If this is a mountable /// image (i.e. not an artifact), it is *not* unpacked by default. pub async fn pull( - repo: &Repository, + repo: &Arc>, imgref: &str, reference: Option<&str>, ) -> Result<()> { - let op = ImageOp::new(repo, imgref).await?; + let op = Arc::new(ImageOp::new(repo, imgref).await?); let (sha256, id) = op .pull() .await @@ -280,7 +303,7 @@ pub fn open_config_shallow( } pub fn write_config( - repo: &Repository, + repo: &Arc>, config: &ImageConfiguration, refs: DigestMap, ) -> Result> { @@ -294,7 +317,7 @@ pub fn write_config( } pub fn seal( - repo: &Repository, + repo: &Arc>, name: &str, verity: Option<&ObjectID>, ) -> Result> { @@ -421,7 +444,7 @@ mod test { let layer_id: [u8; 32] = context.finalize().into(); let repo_dir = tempdir(); - let repo = Repository::::open_path(CWD, &repo_dir).unwrap(); + let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); diff --git a/src/oci/tar.rs b/src/oci/tar.rs index 4f3a0cb5..1358ff37 100644 --- a/src/oci/tar.rs +++ b/src/oci/tar.rs @@ -75,7 +75,7 @@ pub fn split( pub async fn split_async( mut tar_stream: impl AsyncRead + Unpin, - writer: &mut SplitStreamWriter<'_, impl FsVerityHashValue>, + writer: &mut SplitStreamWriter, ) -> Result<()> { while let Some(header) = read_header_async(&mut tar_stream).await? { // the header always gets stored as inline data @@ -94,7 +94,7 @@ pub async fn split_async( if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX { // non-empty regular file: store the data in the object store let padding = buffer.split_off(actual_size); - writer.write_external(&buffer, padding)?; + writer.write_external_async(buffer, padding).await?; } else { // else: store the data inline in the split stream writer.write_inline(&buffer); diff --git a/src/repository.rs b/src/repository.rs index 66d343fa..76986b42 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -2,16 +2,18 @@ use std::{ collections::HashSet, ffi::CStr, fs::File, - io::{ErrorKind, Read, Write}, + io::{Read, Write}, os::fd::{AsFd, OwnedFd}, path::{Path, PathBuf}, + sync::Arc, }; use anyhow::{bail, ensure, Context, Result}; +use once_cell::sync::OnceCell; use rustix::{ fs::{ - accessat, fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, symlinkat, Access, - AtFlags, Dir, FileType, FlockOperation, Mode, OFlags, CWD, + fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, symlinkat, AtFlags, Dir, + FileType, FlockOperation, Mode, OFlags, CWD, }, io::{Errno, Result as ErrnoResult}, }; @@ -26,9 +28,35 @@ use crate::{ util::{proc_self_fd, Sha256Digest}, }; +/// Call openat() on the named subdirectory of "dirfd", possibly creating it first. +/// +/// We assume that the directory will probably exist (ie: we try the open first), and on ENOENT, we +/// mkdirat() and retry. +fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult { + match openat( + &dirfd, + filename, + flags | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ) { + Ok(file) => Ok(file), + Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) { + Ok(()) | Err(Errno::EXIST) => openat( + dirfd, + filename, + flags | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ), + Err(other) => Err(other), + }, + Err(other) => Err(other), + } +} + #[derive(Debug)] pub struct Repository { repository: OwnedFd, + objects: OnceCell, _data: std::marker::PhantomData, } @@ -39,11 +67,9 @@ impl Drop for Repository { } impl Repository { - pub fn object_dir(&self) -> ErrnoResult { - self.openat( - "objects", - OFlags::PATH | OFlags::DIRECTORY | OFlags::CLOEXEC, - ) + pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> { + self.objects + .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH)) } pub fn open_path(dirfd: impl AsFd, path: impl AsRef) -> Result { @@ -58,6 +84,7 @@ impl Repository { Ok(Self { repository, + objects: OnceCell::new(), _data: std::marker::PhantomData, }) } @@ -79,49 +106,67 @@ impl Repository { }) } + pub async fn ensure_object_async(self: &Arc, data: Vec) -> Result { + let self_ = Arc::clone(self); + tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await? + } + pub fn ensure_object(&self, data: &[u8]) -> Result { - let digest: ObjectID = compute_verity(data); - let dir = PathBuf::from(format!("objects/{}", digest.to_object_dir())); - let file = dir.join(digest.to_object_basename()); + let dirfd = self.objects_dir()?; + let id: ObjectID = compute_verity(data); - // fairly common... - if accessat(&self.repository, &file, Access::READ_OK, AtFlags::empty()) == Ok(()) { - return Ok(digest); - } + let path = id.to_object_pathname(); - self.ensure_dir("objects")?; - self.ensure_dir(&dir)?; + // the usual case is that the file will already exist + match openat( + dirfd, + &path, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) { + Ok(fd) => { + // measure the existing file to ensure that it's correct + // TODO: try to replace file if it's broken? + ensure_verity_equal(fd, &id)?; + return Ok(id); + } + Err(Errno::NOENT) => { + // in this case we'll create the file + } + Err(other) => { + return Err(other).context("Checking for existing object in repository")?; + } + } - let fd = openat( - &self.repository, - &dir, - OFlags::RDWR | OFlags::CLOEXEC | OFlags::TMPFILE, - 0o666.into(), - )?; - rustix::io::write(&fd, data)?; // TODO: no write_all() here... - fdatasync(&fd)?; + let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)?; + let mut file = File::from(fd); + file.write_all(data)?; + fdatasync(&file)?; // We can't enable verity with an open writable fd, so re-open and close the old one. - let ro_fd = open(proc_self_fd(&fd), OFlags::RDONLY, Mode::empty())?; - drop(fd); + let ro_fd = open(proc_self_fd(&file), OFlags::RDONLY, Mode::empty())?; + drop(file); enable_verity::(&ro_fd).context("Enabling verity digest")?; - ensure_verity_equal(&ro_fd, &digest).context("Double-checking verity digest")?; + ensure_verity_equal(&ro_fd, &id).context("Double-checking verity digest")?; - if let Err(err) = linkat( + match linkat( CWD, proc_self_fd(&ro_fd), - &self.repository, - file, + dirfd, + path, AtFlags::SYMLINK_FOLLOW, ) { - if err.kind() != ErrorKind::AlreadyExists { - return Err(err.into()); + Ok(()) => {} + Err(Errno::EXIST) => { + // TODO: strictly, we should measure the newly-appeared file + } + Err(other) => { + return Err(other).context("Linking created object file"); } } - drop(ro_fd); - Ok(digest) + Ok(id) } fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result { @@ -134,7 +179,7 @@ impl Repository { /// You should write the data to the returned object and then pass it to .store_stream() to /// store the result. pub fn create_stream( - &self, + self: &Arc, sha256: Option, maps: Option>, ) -> SplitStreamWriter { @@ -246,7 +291,7 @@ impl Repository { /// On success, the object ID of the new object is returned. It is expected that this object /// ID will be used when referring to the stream from other linked streams. pub fn ensure_stream( - &self, + self: &Arc, sha256: &Sha256Digest, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, @@ -350,7 +395,7 @@ impl Repository { Ok(mount_composefs_at( image, name, - &self.object_dir()?, + self.objects_dir()?, mountpoint, )?) } diff --git a/src/splitstream.rs b/src/splitstream.rs index 0924ff06..c61f5b87 100644 --- a/src/splitstream.rs +++ b/src/splitstream.rs @@ -3,7 +3,10 @@ * See doc/splitstream.md */ -use std::io::{BufReader, Read, Write}; +use std::{ + io::{BufReader, Read, Write}, + sync::Arc, +}; use anyhow::{bail, Result}; use sha2::{Digest, Sha256}; @@ -60,14 +63,14 @@ impl DigestMap { } } -pub struct SplitStreamWriter<'a, ObjectID: FsVerityHashValue> { - repo: &'a Repository, +pub struct SplitStreamWriter { + repo: Arc>, inline_content: Vec, - writer: Encoder<'a, Vec>, + writer: Encoder<'static, Vec>, pub sha256: Option<(Sha256, Sha256Digest)>, } -impl std::fmt::Debug for SplitStreamWriter<'_, ObjectID> { +impl std::fmt::Debug for SplitStreamWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // writer doesn't impl Debug f.debug_struct("SplitStreamWriter") @@ -78,9 +81,9 @@ impl std::fmt::Debug for SplitStreamWriter<'_, Obje } } -impl<'a, ObjectID: FsVerityHashValue> SplitStreamWriter<'a, ObjectID> { +impl SplitStreamWriter { pub fn new( - repo: &'a Repository, + repo: &Arc>, refs: Option>, sha256: Option, ) -> Self { @@ -98,7 +101,7 @@ impl<'a, ObjectID: FsVerityHashValue> SplitStreamWriter<'a, ObjectID> { } Self { - repo, + repo: Arc::clone(repo), inline_content: vec![], writer, sha256: sha256.map(|x| (Sha256::new(), x)), @@ -152,6 +155,15 @@ impl<'a, ObjectID: FsVerityHashValue> SplitStreamWriter<'a, ObjectID> { self.write_reference(&id, padding) } + pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { + if let Some((ref mut sha256, ..)) = self.sha256 { + sha256.update(&data); + sha256.update(&padding); + } + let id = self.repo.ensure_object_async(data).await?; + self.write_reference(&id, padding) + } + pub fn done(mut self) -> Result { self.flush_inline(vec![])?;