Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@ rhel9 = ['pre-6.15']
anyhow = { version = "1.0.87", default-features = false }
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
clap = { version = "4.0.1", default-features = false, features = ["std", "help", "usage", "derive"] }
containers-image-proxy = "0.7.0"
containers-image-proxy = "0.7.1"
env_logger = "0.11.0"
hex = "0.4.0"
indicatif = { version = "0.17.0", features = ["tokio"] }
log = "0.4.8"
oci-spec = "0.7.0"
once_cell = { version = "1.21.3", default-features = false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks spurious? Also there's no reason to use the external crate since the functionality got merged into std.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned that in the commit message: f286208

We use the external once_cell:: crate (which we already had as a -devel dependency) because the .try() version of the API is not yet stable in the standard library.

regex-automata = { version = "0.4.4", default-features = false }
rustix = { version = "1.0.0", features = ["fs", "mount", "process"] }
serde = "1.0.145"
sha2 = "0.10.1"
tar = { version = "0.4.38", default-features = false }
tempfile = "3.8.0"
thiserror = "2.0.0"
tokio = "1.24.2"
tokio = { version = "1.24.2", features = ["rt-multi-thread"] }
toml = "0.8.0"
xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
zerocopy = { version = "0.8.0", features = ["derive", "std"] }
zstd = "0.13.0"

[dev-dependencies]
insta = "1.42.2"
once_cell = "1.21.3"
similar-asserts = "1.7.0"
test-with = { version = "0.14", default-features = false, features = ["executable", "runtime"] }
tokio-test = "0.4.4"
Expand Down
16 changes: 6 additions & 10 deletions src/bin/cfsctl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};

use anyhow::Result;
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -106,7 +106,8 @@ enum Command {
},
}

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let args = App::parse();
Expand Down Expand Up @@ -140,7 +141,7 @@ fn main() -> Result<()> {
Command::Oci { cmd: oci_cmd } => match oci_cmd {
OciCommand::ImportLayer { name, sha256 } => {
let object_id = oci::import_layer(
&repo,
&Arc::new(repo),
&parse_sha256(sha256)?,
name.as_deref(),
&mut std::io::stdin(),
Expand All @@ -158,16 +159,11 @@ fn main() -> Result<()> {
println!("{}", image_id.to_hex());
}
OciCommand::Pull { ref image, name } => {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to build tokio runtime");
// And invoke the async_main
runtime.block_on(async move { oci::pull(&repo, image, name.as_deref()).await })?;
oci::pull(&Arc::new(repo), image, name.as_deref()).await?
}
OciCommand::Seal { verity, ref name } => {
let (sha256, verity) = oci::seal(
&repo,
&Arc::new(repo),
name,
verity.map(Sha256HashValue::from_hex).transpose()?.as_ref(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion src/bin/composefs-setup-root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn open_root_fs(path: &Path) -> Result<OwnedFd> {
fn mount_composefs_image(sysroot: &OwnedFd, name: &str) -> Result<OwnedFd> {
let repo = Repository::<Sha256HashValue>::open_path(sysroot, "composefs")?;
let image = repo.open_image(name)?;
composefs_fsmount(image, name, repo.object_dir()?).context("Failed to mount composefs image")
composefs_fsmount(image, name, repo.objects_dir()?).context("Failed to mount composefs image")
}

fn mount_subdir(
Expand Down
11 changes: 6 additions & 5 deletions src/fsverity/hashvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ where
Self: FromBytes + Immutable + IntoBytes + KnownLayout + Unaligned,
Self: Hash + Eq,
Self: fmt::Debug,
Self: Send + Sync + Unpin + 'static,
{
type Digest: Digest + FixedOutputReset + fmt::Debug;
const ALGORITHM: u8;
Expand Down Expand Up @@ -60,17 +61,17 @@ where
}

fn to_object_pathname(&self) -> String {
format!("{:02x}/{}", self.as_bytes()[0], self.to_object_basename())
format!(
"{:02x}/{}",
self.as_bytes()[0],
hex::encode(&self.as_bytes()[1..])
)
}

fn to_object_dir(&self) -> String {
format!("{:02x}", self.as_bytes()[0])
}

fn to_object_basename(&self) -> String {
hex::encode(&self.as_bytes()[1..])
}

fn to_hex(&self) -> String {
hex::encode(self.as_bytes())
}
Expand Down
79 changes: 51 additions & 28 deletions src/oci/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::process::Command;
use std::{cmp::Reverse, process::Command, thread::available_parallelism};

pub mod image;
pub mod tar;

use std::{collections::HashMap, io::Read, iter::zip, path::Path};
use std::{collections::HashMap, io::Read, iter::zip, path::Path, sync::Arc};

use anyhow::{bail, ensure, Context, Result};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
use sha2::{Digest, Sha256};
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, sync::Semaphore};

use crate::{
fs::write_to_path,
Expand All @@ -23,7 +23,7 @@ use crate::{
};

pub fn import_layer<ObjectID: FsVerityHashValue>(
repo: &Repository<ObjectID>,
repo: &Arc<Repository<ObjectID>>,
sha256: &Sha256Digest,
name: Option<&str>,
tar_stream: &mut impl Read,
Expand All @@ -44,8 +44,8 @@ pub fn ls_layer<ObjectID: FsVerityHashValue>(
Ok(())
}

struct ImageOp<'repo, ObjectID: FsVerityHashValue> {
repo: &'repo Repository<ObjectID>,
struct ImageOp<ObjectID: FsVerityHashValue> {
repo: Arc<Repository<ObjectID>>,
proxy: ImageProxy,
img: OpenedImage,
progress: MultiProgress,
Expand All @@ -67,8 +67,8 @@ fn sha256_from_digest(digest: &str) -> Result<Sha256Digest> {

type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);

impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
async fn new(repo: &'repo Repository<ObjectID>, imgref: &str) -> Result<Self> {
impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
async fn new(repo: &Arc<Repository<ObjectID>>, imgref: &str) -> Result<Self> {
// See https://github.com/containers/skopeo/issues/2563
let skopeo_cmd = if imgref.starts_with("containers-storage:") {
let mut cmd = Command::new("podman");
Expand All @@ -87,7 +87,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
let img = proxy.open_image(imgref).await.context("Opening image")?;
let progress = MultiProgress::new();
Ok(ImageOp {
repo,
repo: Arc::clone(repo),
proxy,
img,
progress,
Expand All @@ -96,16 +96,16 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {

pub async fn ensure_layer(
&self,
layer_sha256: &Sha256Digest,
layer_sha256: Sha256Digest,
descriptor: &Descriptor,
) -> Result<ObjectID> {
// We need to use the per_manifest descriptor to download the compressed layer but it gets
// stored in the repository via the per_config descriptor. Our return value is the
// fsverity digest for the corresponding splitstream.

if let Some(layer_id) = self.repo.check_stream(layer_sha256)? {
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
self.progress
.println(format!("Already have layer {layer_sha256:?}"))?;
.println(format!("Already have layer {}", hex::encode(layer_sha256)))?;
Ok(layer_id)
} else {
// Otherwise, we need to fetch it...
Expand All @@ -122,7 +122,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
self.progress
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;

let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None);
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
match descriptor.media_type() {
MediaType::ImageLayer => {
split_async(progress, &mut splitstream).await?;
Expand All @@ -136,13 +136,19 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
other => bail!("Unsupported layer media type {:?}", other),
};
let layer_id = self.repo.write_stream(splitstream, None)?;
driver.await?;

// We intentionally explicitly ignore this, even though we're supposed to check it.
// See https://github.com/containers/containers-image-proxy-rs/issues/80 for discussion
// about why. Note: we only care about the uncompressed layer tar, and we checksum it
// ourselves.
drop(driver);

Ok(layer_id)
}
}

pub async fn ensure_config(
&self,
self: &Arc<Self>,
manifest_layers: &[Descriptor],
descriptor: &Descriptor,
) -> Result<ContentAndVerity<ObjectID>> {
Expand Down Expand Up @@ -172,14 +178,31 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
let raw_config = config?;
let config = ImageConfiguration::from_reader(&raw_config[..])?;

// We want to sort the layers based on size so we can get started on the big layers
// first. The last thing we want is to start on the biggest layer right at the end.
let mut layers: Vec<_> = zip(manifest_layers, config.rootfs().diff_ids()).collect();
layers.sort_by_key(|(mld, ..)| Reverse(mld.size()));

// Bound the number of tasks to the available parallelism.
let threads = available_parallelism()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is tricky because we're intermixing CPU and I/O work; one problem I've seen in the past is running on large CPU count servers (e.g. 64+) but with more limited I/O bandwidth. In those cases we end up with 64 threads competing pointlessly for more limited I/O.

There's no convenient way to get any estimate for I/O parallelism unfortunately that I know of but in some equivalent places I've capped at an arbitrary number like 4.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky because we do a fair amount of computation in these workers as well: we compute the full Merkle tree twice (once in userspace, once in the kernel). And some of those threads will be sleeping some of the time, because they're doing fdatasync() or blocked on the download or whatever. I considered doing something like 2 * available_parallelism() in fact.

Different hardware combinations could end up being either CPU or IO bound, but if we use at least the available_parallelism() then at least we stand a decent chance of keeping at least the CPUs busy. If IO throttles, then it'll end up slowing down the CPUs...

let sem = Arc::new(Semaphore::new(threads.into()));
let mut entries = vec![];
for (mld, diff_id) in layers {
let self_ = Arc::clone(self);
let permit = Arc::clone(&sem).acquire_owned().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah using a semaphore for this makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I think I'll stick with it here...

let layer_sha256 = sha256_from_digest(diff_id)?;
let descriptor = mld.clone();
let future = tokio::spawn(async move {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://github.com/bootc-dev/bootc/blob/2de8e0d23fb89bed76722ff1466614afacec64b3/lib/src/fsck.rs#L195 which uses a JoinSet which is designed for this, especially that it enforces structured concurrency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer. I used this for the HTTP downloader.

let _permit = permit;
self_.ensure_layer(layer_sha256, &descriptor).await
});
entries.push((layer_sha256, future));
}

// Collect the results.
let mut config_maps = DigestMap::new();
for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) {
let layer_sha256 = sha256_from_digest(cld)?;
let layer_id = self
.ensure_layer(&layer_sha256, mld)
.await
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
config_maps.insert(&layer_sha256, &layer_id);
for (layer_sha256, future) in entries {
config_maps.insert(&layer_sha256, &future.await??);
}

let mut splitstream = self
Expand All @@ -192,7 +215,7 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
}
}

pub async fn pull(&self) -> Result<ContentAndVerity<ObjectID>> {
pub async fn pull(self: &Arc<Self>) -> Result<ContentAndVerity<ObjectID>> {
let (_manifest_digest, raw_manifest) = self
.proxy
.fetch_manifest_raw_oci(&self.img)
Expand All @@ -213,11 +236,11 @@ impl<'repo, ObjectID: FsVerityHashValue> ImageOp<'repo, ObjectID> {
/// Pull the target image, and add the provided tag. If this is a mountable
/// image (i.e. not an artifact), it is *not* unpacked by default.
pub async fn pull(
repo: &Repository<impl FsVerityHashValue>,
repo: &Arc<Repository<impl FsVerityHashValue>>,
imgref: &str,
reference: Option<&str>,
) -> Result<()> {
let op = ImageOp::new(repo, imgref).await?;
let op = Arc::new(ImageOp::new(repo, imgref).await?);
let (sha256, id) = op
.pull()
.await
Expand Down Expand Up @@ -280,7 +303,7 @@ pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
}

pub fn write_config<ObjectID: FsVerityHashValue>(
repo: &Repository<ObjectID>,
repo: &Arc<Repository<ObjectID>>,
config: &ImageConfiguration,
refs: DigestMap<ObjectID>,
) -> Result<ContentAndVerity<ObjectID>> {
Expand All @@ -294,7 +317,7 @@ pub fn write_config<ObjectID: FsVerityHashValue>(
}

pub fn seal<ObjectID: FsVerityHashValue>(
repo: &Repository<ObjectID>,
repo: &Arc<Repository<ObjectID>>,
name: &str,
verity: Option<&ObjectID>,
) -> Result<ContentAndVerity<ObjectID>> {
Expand Down Expand Up @@ -421,7 +444,7 @@ mod test {
let layer_id: [u8; 32] = context.finalize().into();

let repo_dir = tempdir();
let repo = Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap();
let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());
let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();

let mut dump = String::new();
Expand Down
4 changes: 2 additions & 2 deletions src/oci/tar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub fn split(

pub async fn split_async(
mut tar_stream: impl AsyncRead + Unpin,
writer: &mut SplitStreamWriter<'_, impl FsVerityHashValue>,
writer: &mut SplitStreamWriter<impl FsVerityHashValue>,
) -> Result<()> {
while let Some(header) = read_header_async(&mut tar_stream).await? {
// the header always gets stored as inline data
Expand All @@ -94,7 +94,7 @@ pub async fn split_async(
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
// non-empty regular file: store the data in the object store
let padding = buffer.split_off(actual_size);
writer.write_external(&buffer, padding)?;
writer.write_external_async(buffer, padding).await?;
} else {
// else: store the data inline in the split stream
writer.write_inline(&buffer);
Expand Down
Loading