From c932c95f1c741ad5f7ddcb7a2a7e66a93c48b142 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Mon, 30 Jun 2025 10:52:09 +0200 Subject: [PATCH 1/3] Expose ErrnoFilter for other crates Signed-off-by: Alexander Larsson --- crates/composefs/src/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/composefs/src/util.rs b/crates/composefs/src/util.rs index 75dbca5..9ba09b1 100644 --- a/crates/composefs/src/util.rs +++ b/crates/composefs/src/util.rs @@ -106,7 +106,7 @@ pub fn parse_sha256(string: impl AsRef) -> Result { Ok(value) } -pub(crate) trait ErrnoFilter { +pub trait ErrnoFilter { fn filter_errno(self, ignored: Errno) -> ErrnoResult>; } From 0f6d69efef6b6582f62e568dedf16760b4e29a57 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 18 Jun 2025 15:26:15 +0200 Subject: [PATCH 2/3] splitstream: Rework file format This changes the splitstream format a bit. The primary differences are: * The header is not compressed * All referenced fs-verity objects are stored in the header, including external chunks, mapped splitstreams and (a new feature) references that are not used in chunks. * The mapping table is separate from the reference table (and generally smaller), and indexes into it. * There is a magic value to detect the file format. * There is a magic content type to detect the type wrapped in the stream. * We store a tag for what ObjectID format is used * The total size of the stream is stored in the header. The ability to reference file objects in the repo even if they are not part of the splitstream "content" will be useful for the ostree support to reference file content objects. This change also allows More efficient GC enumeration, because we don't have to parse the entire splitstream to find the referenced objects. Signed-off-by: Alexander Larsson --- crates/cfsctl/src/main.rs | 2 +- crates/composefs-http/src/lib.rs | 8 +- crates/composefs-oci/src/image.rs | 10 +- crates/composefs-oci/src/lib.rs | 21 ++- crates/composefs-oci/src/skopeo.rs | 21 ++- crates/composefs/src/fsverity/hashvalue.rs | 30 ++- crates/composefs/src/repository.rs | 29 ++- crates/composefs/src/splitstream.rs | 207 ++++++++++++++++----- doc/splitstream.md | 67 ++++--- 9 files changed, 283 insertions(+), 112 deletions(-) diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index dd66a94..2207ec2 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -188,7 +188,7 @@ async fn main() -> Result<()> { } } Command::Cat { name } => { - repo.merge_splitstream(&name, None, &mut std::io::stdout())?; + repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?; } Command::ImportImage { reference } => { let image_id = repo.import_image(&reference, &mut std::io::stdin())?; diff --git a/crates/composefs-http/src/lib.rs b/crates/composefs-http/src/lib.rs index 1d98387..390d4a0 100644 --- a/crates/composefs-http/src/lib.rs +++ b/crates/composefs-http/src/lib.rs @@ -13,9 +13,7 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinSet; use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::{DigestMapEntry, SplitStreamReader}, + fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader, util::Sha256Digest, }; @@ -61,7 +59,7 @@ impl Downloader { } fn open_splitstream(&self, id: &ObjectID) -> Result> { - SplitStreamReader::new(File::from(self.repo.open_object(id)?)) + SplitStreamReader::new(File::from(self.repo.open_object(id)?), None) } fn read_object(&self, id: &ObjectID) -> Result> { @@ -107,7 +105,7 @@ impl Downloader { // this part is fast: it only touches the header let mut reader = self.open_splitstream(&id)?; - for DigestMapEntry { verity, body } in &reader.refs.map { + for (body, verity) in reader.iter_mappings() { match splitstreams.insert(verity.clone(), Some(*body)) { // This is the (normal) case if we encounter a splitstream we didn't see yet... None => { diff --git a/crates/composefs-oci/src/image.rs b/crates/composefs-oci/src/image.rs index 93c8c75..8a36bdf 100644 --- a/crates/composefs-oci/src/image.rs +++ b/crates/composefs-oci/src/image.rs @@ -9,6 +9,7 @@ use composefs::{ tree::{Directory, FileSystem, Inode, Leaf}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::{TarEntry, TarItem}; pub fn process_entry( @@ -74,14 +75,19 @@ pub fn create_filesystem( ) -> Result> { let mut filesystem = FileSystem::default(); - let mut config_stream = repo.open_stream(config_name, config_verity)?; + let mut config_stream = + repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut config_stream)?; for diff_id in config.rootfs().diff_ids() { let layer_sha256 = super::sha256_from_digest(diff_id)?; let layer_verity = config_stream.lookup(&layer_sha256)?; - let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?; + let mut layer_stream = repo.open_stream( + &hex::encode(layer_sha256), + Some(layer_verity), + Some(TAR_LAYER_CONTENT_TYPE), + )?; while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? { process_entry(&mut filesystem, entry)?; } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 4617f90..1481050 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -15,6 +15,7 @@ use composefs::{ util::{parse_sha256, Sha256Digest}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::get_entry; type ContentAndVerity = (Sha256Digest, ObjectID); @@ -39,14 +40,19 @@ pub fn import_layer( name: Option<&str>, tar_stream: &mut impl Read, ) -> Result { - repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name) + repo.ensure_stream( + sha256, + TAR_LAYER_CONTENT_TYPE, + |writer| tar::split(tar_stream, writer), + name, + ) } pub fn ls_layer( repo: &Repository, name: &str, ) -> Result<()> { - let mut split_stream = repo.open_stream(name, None)?; + let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?; while let Some(entry) = get_entry(&mut split_stream)? { println!("{entry}"); @@ -81,9 +87,9 @@ pub fn open_config( .with_context(|| format!("Object {name} is unknown to us"))? } }; - let mut stream = repo.open_stream(name, Some(id))?; + let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut stream)?; - Ok((config, stream.refs)) + Ok((config, stream.get_mappings())) } fn hash(bytes: &[u8]) -> Sha256Digest { @@ -104,7 +110,7 @@ pub fn open_config_shallow( // we need to manually check the content digest let expected_hash = parse_sha256(name) .context("Containers must be referred to by sha256 if verity is missing")?; - let mut stream = repo.open_stream(name, None)?; + let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?; let mut raw_config = vec![]; stream.read_to_end(&mut raw_config)?; ensure!(hash(&raw_config) == expected_hash, "Data integrity issue"); @@ -121,7 +127,8 @@ pub fn write_config( let json = config.to_string()?; let json_bytes = json.as_bytes(); let sha256 = hash(json_bytes); - let mut stream = repo.create_stream(Some(sha256), Some(refs)); + let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256)); + stream.add_sha256_mappings(refs); stream.write_inline(json_bytes); let id = repo.write_stream(stream, None)?; Ok((sha256, id)) @@ -199,7 +206,7 @@ mod test { let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); - let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap(); + let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap(); while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() { writeln!(dump, "{entry}").unwrap(); } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index 62d981f..2f13744 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -10,12 +10,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use composefs::{ - fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest}; use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; +pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea; +pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b; + struct ImageOp { repo: Arc>, proxy: ImageProxy, @@ -78,7 +79,9 @@ impl ImageOp { self.progress .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + let mut splitstream = self + .repo + .create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256)); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -155,15 +158,15 @@ impl ImageOp { entries.push((layer_sha256, future)); } + let mut splitstream = self + .repo + .create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256)); + // Collect the results. - let mut config_maps = DigestMap::new(); for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); + splitstream.add_sha256_mapping(&layer_sha256, &future.await??); } - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); splitstream.write_inline(&raw_config); let config_id = self.repo.write_stream(splitstream, None)?; diff --git a/crates/composefs/src/fsverity/hashvalue.rs b/crates/composefs/src/fsverity/hashvalue.rs index 5cfb6d7..f50abb6 100644 --- a/crates/composefs/src/fsverity/hashvalue.rs +++ b/crates/composefs/src/fsverity/hashvalue.rs @@ -2,6 +2,7 @@ use core::{fmt, hash::Hash}; use hex::FromHexError; use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512}; +use std::cmp::Ord; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; pub trait FsVerityHashValue @@ -12,6 +13,7 @@ where Self: Hash + Eq, Self: fmt::Debug, Self: Send + Sync + Unpin + 'static, + Self: PartialOrd + Ord, { type Digest: Digest + FixedOutputReset + fmt::Debug; const ALGORITHM: u8; @@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue { } } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha256HashValue([u8; 32]); @@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue { const ID: &str = "sha256"; } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha512HashValue([u8; 64]); diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index cb4c0bc..01d6686 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -25,7 +25,7 @@ use crate::{ EnableVerityError, FsVerityHashValue, MeasureVerityError, }, mount::{composefs_fsmount, mount_at}, - splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter}, + splitstream::{SplitStreamReader, SplitStreamWriter}, util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest}, }; @@ -223,10 +223,10 @@ impl Repository { /// store the result. pub fn create_stream( self: &Arc, + content_type: u64, sha256: Option, - maps: Option>, ) -> SplitStreamWriter { - SplitStreamWriter::new(self, maps, sha256) + SplitStreamWriter::new(self, content_type, sha256) } fn format_object_path(id: &ObjectID) -> String { @@ -272,11 +272,11 @@ impl Repository { Err(other) => Err(other)?, }; let mut context = Sha256::new(); - let mut split_stream = SplitStreamReader::new(File::from(stream))?; + let mut split_stream = SplitStreamReader::new(File::from(stream), None)?; // check the verity of all linked streams - for entry in &split_stream.refs.map { - if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) { + for (body, verity) in split_stream.iter_mappings() { + if self.check_stream(body)?.as_ref() != Some(verity) { bail!("reference mismatch"); } } @@ -319,6 +319,12 @@ impl Repository { Ok(object_id) } + pub fn has_named_stream(&self, name: &str) -> bool { + let stream_path = format!("streams/refs/{}", name); + + readlinkat(&self.repository, &stream_path, []).is_ok() + } + /// Assign the given name to a stream. The stream must already exist. After this operation it /// will be possible to refer to the stream by its new name 'refs/{name}'. pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> { @@ -345,6 +351,7 @@ impl Repository { pub fn ensure_stream( self: &Arc, sha256: &Sha256Digest, + content_type: u64, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, ) -> Result { @@ -353,7 +360,7 @@ impl Repository { let object_id = match self.has_stream(sha256)? { Some(id) => id, None => { - let mut writer = self.create_stream(Some(*sha256), None); + let mut writer = self.create_stream(content_type, Some(*sha256)); callback(&mut writer)?; let object_id = writer.done()?; @@ -375,6 +382,7 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, ) -> Result> { let filename = format!("streams/{name}"); @@ -386,7 +394,7 @@ impl Repository { .with_context(|| format!("Opening ref 'streams/{name}'"))? }); - SplitStreamReader::new(file) + SplitStreamReader::new(file, expected_content_type) } pub fn open_object(&self, id: &ObjectID) -> Result { @@ -397,9 +405,10 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, stream: &mut impl Write, ) -> Result<()> { - let mut split_stream = self.open_stream(name, verity)?; + let mut split_stream = self.open_stream(name, verity, expected_content_type)?; split_stream.cat(stream, |id| -> Result> { let mut data = vec![]; File::from(self.open_object(id)?).read_to_end(&mut data)?; @@ -618,7 +627,7 @@ impl Repository { println!("{object:?} lives as a stream"); objects.insert(object.clone()); - let mut split_stream = self.open_stream(&object.to_hex(), None)?; + let mut split_stream = self.open_stream(&object.to_hex(), None, None)?; split_stream.get_object_refs(|id| { println!(" with {id:?}"); objects.insert(id.clone()); diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index fe50cf4..c5f02f2 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -8,7 +8,7 @@ use std::{ sync::Arc, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use sha2::{Digest, Sha256}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use zstd::stream::{read::Decoder, write::Encoder}; @@ -19,8 +19,31 @@ use crate::{ util::{read_exactish, Sha256Digest}, }; +pub const SPLITSTREAM_MAGIC: [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + #[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] #[repr(C)] +pub struct SplitstreamHeader { + pub magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + pub algorithm: u8, + pub content_type: u64, // User can put whatever magic identifier they want there + pub total_size: u64, // total size of inline chunks and external chunks + pub n_refs: u64, + pub n_mappings: u64, + // Followed by n_refs ObjectIDs, sorted + // Followed by n_mappings MappingEntry, sorted by body + // Followed by zstd compressed chunks +} + +#[derive(Clone, Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, +} + +// These are used during construction before we know the final reference indexes +#[derive(Debug)] pub struct DigestMapEntry { pub body: Sha256Digest, pub verity: ObjectID, @@ -65,8 +88,12 @@ impl DigestMap { pub struct SplitStreamWriter { repo: Arc>, + refs: Vec, + mappings: DigestMap, inline_content: Vec, + total_size: u64, writer: Encoder<'static, Vec>, + pub content_type: u64, pub sha256: Option<(Sha256, Sha256Digest)>, } @@ -84,30 +111,47 @@ impl std::fmt::Debug for SplitStreamWriter SplitStreamWriter { pub fn new( repo: &Arc>, - refs: Option>, + content_type: u64, sha256: Option, ) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec - let mut writer = Encoder::new(vec![], 0).unwrap(); - - match refs { - Some(DigestMap { map }) => { - writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap(); - writer.write_all(map.as_bytes()).unwrap(); - } - None => { - writer.write_all(&0u64.to_le_bytes()).unwrap(); - } - } + let writer = Encoder::new(vec![], 0).unwrap(); Self { repo: Arc::clone(repo), + content_type, inline_content: vec![], + refs: vec![], + total_size: 0, + mappings: DigestMap::new(), writer, sha256: sha256.map(|x| (Sha256::new(), x)), } } + pub fn add_external_reference(&mut self, verity: &ObjectID) { + match self.refs.binary_search(verity) { + Ok(_) => {} // Already added + Err(idx) => self.refs.insert(idx, verity.clone()), + } + } + + // Note: These are only stable if no more references are added + pub fn lookup_external_reference(&self, verity: &ObjectID) -> Option { + self.refs.binary_search(verity).ok() + } + + pub fn add_sha256_mappings(&mut self, maps: DigestMap) { + for m in maps.map { + self.add_sha256_mapping(&m.body, &m.verity); + } + } + + pub fn add_sha256_mapping(&mut self, digest: &Sha256Digest, verity: &ObjectID) { + self.add_external_reference(verity); + self.mappings.insert(digest, verity) + } + fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> { writer.write_all(&(size as u64).to_le_bytes())?; Ok(writer.write_all(data)?) @@ -121,6 +165,7 @@ impl SplitStreamWriter { self.inline_content.len(), &self.inline_content, )?; + self.total_size += self.inline_content.len() as u64; self.inline_content = new_value; } Ok(()) @@ -152,6 +197,9 @@ impl SplitStreamWriter { sha256.update(&padding); } let id = self.repo.ensure_object(data)?; + + self.add_external_reference(&id); + self.total_size += data.len() as u64; self.write_reference(&id, padding) } @@ -160,7 +208,9 @@ impl SplitStreamWriter { sha256.update(&data); sha256.update(&padding); } + self.total_size += data.len() as u64; let id = self.repo.ensure_object_async(data).await?; + self.add_external_reference(&id); self.write_reference(&id, padding) } @@ -173,7 +223,32 @@ impl SplitStreamWriter { } } - self.repo.ensure_object(&self.writer.finish()?) + let mut buf = vec![]; + let header = SplitstreamHeader { + magic: SPLITSTREAM_MAGIC, + algorithm: ObjectID::ALGORITHM, + content_type: self.content_type, + total_size: u64::to_le(self.total_size), + n_refs: u64::to_le(self.refs.len() as u64), + n_mappings: u64::to_le(self.mappings.map.len() as u64), + }; + buf.extend_from_slice(header.as_bytes()); + + for ref_id in self.refs.iter() { + buf.extend_from_slice(ref_id.as_bytes()); + } + + for mapping in self.mappings.map { + let entry = MappingEntry { + body: mapping.body, + reference_idx: u64::to_le(self.refs.binary_search(&mapping.verity).unwrap() as u64), + }; + buf.extend_from_slice(entry.as_bytes()); + } + + buf.extend_from_slice(&self.writer.finish()?); + + self.repo.ensure_object(&buf) } } @@ -186,8 +261,11 @@ pub enum SplitStreamData { // utility class to help read splitstreams pub struct SplitStreamReader { decoder: Decoder<'static, BufReader>, - pub refs: DigestMap, inline_bytes: usize, + pub content_type: u64, + pub total_size: u64, + pub refs: Vec, + mappings: Vec, } impl std::fmt::Debug for SplitStreamReader { @@ -226,29 +304,74 @@ enum ChunkType { } impl SplitStreamReader { - pub fn new(reader: R) -> Result { - let mut decoder = Decoder::new(reader)?; + pub fn new(mut reader: R, expected_content_type: Option) -> Result { + let header = SplitstreamHeader::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Error reading splitstream header: {:?}", e)))?; - let n_map_entries = { - let mut buf = [0u8; 8]; - decoder.read_exact(&mut buf)?; - u64::from_le_bytes(buf) - } as usize; + if header.magic != SPLITSTREAM_MAGIC { + bail!("Invalid splitstream header magic value"); + } - let mut refs = DigestMap:: { - map: Vec::with_capacity(n_map_entries), - }; - for _ in 0..n_map_entries { - refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?); + if header.algorithm != ObjectID::ALGORITHM { + bail!("Invalid splitstream algorithm type"); } + let content_type = u64::from_le(header.content_type); + if let Some(expected) = expected_content_type { + if content_type != expected { + bail!("Invalid splitstream content type"); + } + } + + let total_size = u64::from_le(header.total_size); + let n_refs = usize::try_from(u64::from_le(header.n_refs))?; + let n_mappings = usize::try_from(u64::from_le(header.n_mappings))?; + + let mut refs = Vec::::new(); + for _ in 0..n_refs { + let objid = ObjectID::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid refs array {:?}", e)))?; + refs.push(objid.clone()); + } + + let mut mappings = Vec::::new(); + for _ in 0..n_mappings { + let mut m = MappingEntry::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid mappings array {:?}", e)))?; + m.reference_idx = u64::from_le(m.reference_idx); + if m.reference_idx >= n_refs as u64 { + bail!("Invalid mapping reference") + } + mappings.push(m.clone()); + } + + let decoder = Decoder::new(reader)?; + Ok(Self { decoder, - refs, inline_bytes: 0, + content_type, + total_size, + refs, + mappings, }) } + pub fn iter_mappings(&self) -> impl Iterator { + self.mappings + .iter() + .map(|m| (&m.body, &self.refs[m.reference_idx as usize])) + } + + pub fn get_mappings(&self) -> DigestMap { + let mut m = DigestMap::new(); + + for (body, verity) in self.iter_mappings() { + m.insert(body, verity); + } + m + } + fn ensure_chunk( &mut self, eof_ok: bool, @@ -347,36 +470,22 @@ impl SplitStreamReader { } pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> { - let mut buffer = vec![]; - - for entry in &self.refs.map { - callback(&entry.verity); - } - - loop { - match self.ensure_chunk(true, true, 0)? { - ChunkType::Eof => break Ok(()), - ChunkType::Inline => { - read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?; - self.inline_bytes = 0; - } - ChunkType::External(ref id) => { - callback(id); - } - } + for entry in &self.refs { + callback(entry); } + Ok(()) } pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) { - for entry in &self.refs.map { + for entry in &self.mappings { callback(&entry.body); } } pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> { - match self.refs.lookup(body) { - Some(id) => Ok(id), - None => bail!("Reference is not found in splitstream"), + match self.mappings.binary_search_by_key(body, |e| e.body) { + Ok(idx) => Ok(&self.refs[self.mappings[idx].reference_idx as usize]), + Err(..) => bail!("Reference is not found in splitstream"), } } } diff --git a/doc/splitstream.md b/doc/splitstream.md index 62df66e..2bcb223 100644 --- a/doc/splitstream.md +++ b/doc/splitstream.md @@ -22,39 +22,50 @@ extremely well. ## File format -The file format consists of a header, plus a number of data blocks. +The file format consists of a header, followed by a set of data blocks. -### Mappings +### Header -The file starts with a single u64 le integer which is the number of mapping -structures present in the file. A mapping is a relationship between a file -identified by its sha256 content hash and the fsverity hash of that same file. -These entries are encoded simply as the sha256 hash value (32 bytes) plus the -fsverity hash value (32 bytes) combined together into a single 64 byte record. - -For example, if we had a file that mapped `1234..` to `abcd..` and `5678..` to -`efab..`, the header would look like: +The header format looks like this, where all fields are little endian: ``` - 64bit 32 bytes 32 bytes + 32 bytes + 32 bytes - +--------+----------+----------+----------+---------+ - | 2 | 1234 | abcd | 5678 | efab | - +--------+----------+----------+----------+---------+ +pub const SPLITSTREAM_MAGIC : [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + +struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, // index into references table +} + +struct SplitstreamHeader { + magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + algorithm: u8, // The fs-verity algorithm used, 1 == sha256, 2 == sha512 + total_size: u64, // total size of inline chunks and external chunks + n_refs: u64, + n_mappings: u64, + refs: [ObjectID; n_refs] // sorted + mappings: [MappingEntry; n_mappings] // sorted by body +} ``` -The mappings in the header are always sorted by their sha256 content hash -values. - -The mappings serve two purposes: - - - in the case of splitstreams which refer to other splitstreams without - directly embedding the content of the other stream, this provides a - mechanism to find out which other streams are referenced. This is used for - garbage collection. - - - for the same usecase, it provides a mechanism to be able to verify the - content of the referred splitstream (by checking its fsverity digest) before - starting to iterate it +The table of references are used to allow splitstreams to refer to +other splitstreams or regular file content, either because it is +included in the stream, or just indirectly referenced. This is primarily +used to keep these objects alive during garbage collection. + +Examples of references are: + * OCI manifests reference splitstreams for tar layer split streams. + * External objects embedded in a splitstream, such as a tar layer + splitstream + * External objects indirectly references in a splitstream, such as + references from an ostree commit splitstream + +The mapping table provides a mechanismn to map the sha256 digest of a +split stream to a fs-verity digest. This allows checking of the target +fs-verity digest before use. The primary example here is OCI manifests +which reference the tar layer splitstreams. We could look up such +streams by the sha256 in the streams/ directory, but then we will not +have trusted information about what expected fs-verity the layers +would have. ### Data blocks @@ -81,6 +92,8 @@ There are two kinds of blocks: - "External" blocks (`size == 0`): in this case the length of the data is 32 bytes. This is the binary form of a sha256 hash value and is a reference to an object in the composefs repository (by its fs-verity digest). + Note that these references are *also* in the header, so there is no need + to read the entire file to find what objects are referenced. That's it, really. There's no header. The stream is over when there are no more blocks. From e88573d0bde028b0a9e55d6ebaa88aee9a777a92 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Mon, 16 Jun 2025 18:27:33 +0200 Subject: [PATCH 3/3] Add composefs-ostree and some basic CLI tools Based on ideas from https://github.com/containers/composefs-rs/issues/141 This is an initial version of ostree support. This allows pulling from local and remote ostree repos, which will create a set of regular file content objects, as well as a blob containing all the remaining ostree objects. From the blob we can create an image. When pulling a commit, a base blob (i.e. "the previous version" can be specified. Any objects in that base blob will not be downloaded. If a name is given for the pulled commit, then pre-existing blobs with the same name will automatically be used as a base blob. This is an initial version and there are several things missing: * Pull operations are completely serial * There is no support for ostree summary files * There is no support for ostree delta files * There is no caching of local file availability (other than base blob) * Local ostree repos only support archive mode * There is no GPG validation on ostree pull Signed-off-by: Alexander Larsson --- Cargo.toml | 1 + crates/cfsctl/Cargo.toml | 4 +- crates/cfsctl/src/main.rs | 81 ++++ crates/composefs-ostree/Cargo.toml | 29 ++ crates/composefs-ostree/src/commit.rs | 534 +++++++++++++++++++++++++ crates/composefs-ostree/src/lib.rs | 102 +++++ crates/composefs-ostree/src/objmap.rs | 461 +++++++++++++++++++++ crates/composefs-ostree/src/repo.rs | 553 ++++++++++++++++++++++++++ crates/composefs/src/repository.rs | 8 +- crates/composefs/src/splitstream.rs | 38 +- 10 files changed, 1794 insertions(+), 17 deletions(-) create mode 100644 crates/composefs-ostree/Cargo.toml create mode 100644 crates/composefs-ostree/src/commit.rs create mode 100644 crates/composefs-ostree/src/lib.rs create mode 100644 crates/composefs-ostree/src/objmap.rs create mode 100644 crates/composefs-ostree/src/repo.rs diff --git a/Cargo.toml b/Cargo.toml index 48dc2a8..b43e5f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ composefs = { version = "0.3.0", path = "crates/composefs", default-features = f composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +composefs-ostree = { version = "0.3.0", path = "crates/composefs-ostree", default-features = false } [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/cfsctl/Cargo.toml b/crates/cfsctl/Cargo.toml index 4901f49..6ffe0f6 100644 --- a/crates/cfsctl/Cargo.toml +++ b/crates/cfsctl/Cargo.toml @@ -11,9 +11,10 @@ rust-version.workspace = true version.workspace = true [features] -default = ['pre-6.15', 'oci'] +default = ['pre-6.15', 'oci','ostree'] http = ['composefs-http'] oci = ['composefs-oci'] +ostree = ['composefs-ostree'] rhel9 = ['composefs/rhel9'] 'pre-6.15' = ['composefs/pre-6.15'] @@ -24,6 +25,7 @@ composefs = { workspace = true } composefs-boot = { workspace = true } composefs-oci = { workspace = true, optional = true } composefs-http = { workspace = true, optional = true } +composefs-ostree = { workspace = true, optional = true } env_logger = { version = "0.11.0", default-features = false } hex = { version = "0.4.0", default-features = false } rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] } diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index 2207ec2..7018c53 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -92,6 +92,33 @@ enum OciCommand { }, } +#[cfg(feature = "ostree")] +#[derive(Debug, Subcommand)] +enum OstreeCommand { + PullLocal { + repo_path: PathBuf, + ostree_ref: String, + name: Option, + #[clap(long)] + base_name: Option, + }, + Pull { + repo_url: String, + ostree_ref: String, + name: Option, + #[clap(long)] + base_name: Option, + }, + CreateImage { + commit_name: String, + #[clap(long)] + image_name: Option, + }, + Inspect { + commit_name: String, + }, +} + #[derive(Debug, Subcommand)] enum Command { /// Take a transaction lock on the repository. @@ -114,6 +141,12 @@ enum Command { #[clap(subcommand)] cmd: OciCommand, }, + /// Commands for dealing with OSTree commits + #[cfg(feature = "ostree")] + Ostree { + #[clap(subcommand)] + cmd: OstreeCommand, + }, /// Mounts a composefs, possibly enforcing fsverity of the image Mount { /// the name of the image to mount, either a sha256 digest or prefixed with 'ref/' @@ -311,6 +344,54 @@ async fn main() -> Result<()> { create_dir_all(state.join("etc/work"))?; } }, + #[cfg(feature = "ostree")] + Command::Ostree { cmd: ostree_cmd } => match ostree_cmd { + OstreeCommand::PullLocal { + ref repo_path, + ref ostree_ref, + name, + base_name, + } => { + let verity = composefs_ostree::pull_local( + &Arc::new(repo), + repo_path, + ostree_ref, + name.as_deref(), + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::Pull { + ref repo_url, + ref ostree_ref, + name, + base_name, + } => { + let verity = composefs_ostree::pull( + &Arc::new(repo), + repo_url, + ostree_ref, + name.as_deref(), + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::CreateImage { + ref commit_name, + ref image_name, + } => { + let mut fs = composefs_ostree::create_filesystem(&repo, commit_name)?; + let image_id = fs.commit_image(&repo, image_name.as_deref())?; + println!("{}", image_id.to_id()); + } + OstreeCommand::Inspect { ref commit_name } => { + composefs_ostree::inspect(&repo, commit_name)?; + } + }, Command::ComputeId { ref path, bootable, diff --git a/crates/composefs-ostree/Cargo.toml b/crates/composefs-ostree/Cargo.toml new file mode 100644 index 0000000..24c07db --- /dev/null +++ b/crates/composefs-ostree/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "composefs-ostree" +description = "ostree support for composefs" +keywords = ["composefs", "ostree"] + +edition.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +anyhow = { version = "1.0.87", default-features = false } +composefs = { workspace = true } +configparser = { version = "3.1.0", features = [] } +flate2 = { version = "1.1.2", default-features = true } +gvariant = { version = "0.5.0", default-features = true} +hex = { version = "0.4.0", default-features = false, features = ["std"] } +rustix = { version = "1.0.0", default-features = false, features = ["fs", "mount", "process", "std"] } +sha2 = { version = "0.10.1", default-features = false } +zerocopy = { version = "0.8.0", default-features = false, features = ["derive", "std"] } +reqwest = { version = "0.12.15", features = ["zstd"] } + +[dev-dependencies] +similar-asserts = "1.7.0" + +[lints] +workspace = true diff --git a/crates/composefs-ostree/src/commit.rs b/crates/composefs-ostree/src/commit.rs new file mode 100644 index 0000000..b75576a --- /dev/null +++ b/crates/composefs-ostree/src/commit.rs @@ -0,0 +1,534 @@ +/* Commit objects are stored in a splitstream with the content being just the + * commit data. This means that the content will match the ostree commit id. + * + * Additionally there is an objmap splitstream referenced by a splitstream + * external references. This objmap contains all the external objects referencesd + * by the commit. + */ +use anyhow::{bail, Error, Result}; +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + tree::{Directory, FileSystem, Inode, Leaf, LeafContent, RegularFile, Stat}, + util::Sha256Digest, +}; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, AsAligned, TryAsAligned, A8}; +use gvariant::{gv, Marker, Structure}; +use sha2::{Digest, Sha256}; +use std::{ + collections::{BTreeMap, HashSet, VecDeque}, + ffi::OsStr, + os::unix::ffi::OsStrExt, +}; +use std::{fmt, io::Read, sync::Arc}; + +use crate::objmap::{ObjectMapReader, ObjectMapWriter}; +use crate::repo::{split_sized_variant, ObjectType, OstreeRepo}; + +pub const COMMIT_CONTENT_TYPE: u64 = 0xc72d30f121a31936; + +const S_IFMT: u32 = 0o170000; +const S_IFLNK: u32 = 0o120000; + +#[derive(Debug)] +pub struct OstreeCommit { + data: AlignedBuf, + objmap_id: ObjectID, + objmap: ObjectMapReader, +} + +impl OstreeCommit { + pub fn load(repo: &Repository, commit_name: &str) -> Result { + let mut commit_stream = repo.open_stream(commit_name, None, Some(COMMIT_CONTENT_TYPE))?; + let mut buffer = Vec::new(); + commit_stream.read_to_end(&mut buffer)?; + + // TODO: Should we somehow validate the checksum of the commit? + // We don't have anything (other than the filename) to really tie it down though. + // Maybe gpg validate it per the ostree metadata? + + let Some((_objmap_sha, objmap_id)) = commit_stream.iter_mappings().next() else { + bail!("Missing objmap id mapping") + }; + + let objmap = ObjectMapReader::::load(repo, &objmap_id)?; + + Ok(OstreeCommit { + data: buffer.into(), + objmap_id: objmap_id.clone(), + objmap: objmap, + }) + } + + fn create_filesystem_file(&self, id: &Sha256Digest) -> Result> { + let (maybe_obj_id, file_header) = self.objmap.lookup(id).ok_or(Error::msg(format!( + "Unexpectedly missing ostree file object {}", + hex::encode(id) + )))?; + + let (_sized_data, variant_data, remaining_data) = split_sized_variant(&file_header)?; + + let data = gv!("(tuuuusa(ayay))").cast(variant_data.try_as_aligned()?); + let (size, uid, gid, mode, _zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let content = if (stat.st_mode & S_IFMT) == S_IFLNK { + LeafContent::Symlink(OsStr::new(symlink_target.to_str()).into()) + } else { + let file = if let Some(obj_id) = maybe_obj_id { + if remaining_data.len() > 0 { + bail!("Unexpected trailing file data"); + } + RegularFile::External(obj_id.clone(), u64::from_be(*size)) + } else { + RegularFile::Inline(remaining_data.into()) + }; + LeafContent::Regular(file) + }; + + Ok(Leaf { stat, content }) + } + + fn create_filesystem_dir( + &self, + dirtree_id: &Sha256Digest, + dirmeta_id: &Sha256Digest, + ) -> Result> { + let (_obj_id, dirmeta) = + self.objmap + .lookup(dirmeta_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirmeta object {}", + hex::encode(dirmeta_id) + )))?; + let (_obj_id, dirtree) = + self.objmap + .lookup(dirtree_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirtree object {}", + hex::encode(dirtree_id) + )))?; + + let dirmeta_sha = Sha256::digest(dirmeta); + if *dirmeta_sha != *dirmeta_id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + dirmeta_sha, + dirmeta_id + ); + } + let dirtree_sha = Sha256::digest(dirtree); + if *dirtree_sha != *dirtree_id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + dirtree_sha, + dirtree_id + ); + } + + let data = gv!("(uuua(ayay))").cast(dirmeta.as_aligned()); + let (uid, gid, mode, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let mut directory = Directory::new(stat); + + let tree_data = gv!("(a(say)a(sayay))").cast(dirtree.as_aligned()); + let (files_data, dirs_data) = tree_data.to_tuple(); + + for f in files_data.iter() { + let (name, checksum) = f.to_tuple(); + + let file = self.create_filesystem_file(checksum.try_into()?)?; + directory.insert(OsStr::new(name.to_str()), Inode::Leaf(file.into())); + } + + for d in dirs_data.iter() { + let (name, tree_checksum, meta_checksum) = d.to_tuple(); + + let subdir = + self.create_filesystem_dir(tree_checksum.try_into()?, meta_checksum.try_into()?)?; + + directory.insert( + OsStr::new(name.to_str()), + Inode::Directory(Box::new(subdir)), + ); + } + + Ok(directory) + } + + pub fn create_filesystem(&self) -> Result> { + let data = gv!("(a{sv}aya(say)sstayay)").cast(&self.data); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + let root = self.create_filesystem_dir(root_tree.try_into()?, root_metadata.try_into()?)?; + + Ok(FileSystem:: { + root: root, + have_root_stat: true, + }) + } + + fn lookup_dirmeta(&self, id: &Sha256Digest) -> Option<&AlignedSlice> { + if let Some((None, data)) = self.objmap.lookup(id) { + Some(data) + } else { + None + } + } + + fn lookup_dirtree(&self, id: &Sha256Digest) -> Option<&AlignedSlice> { + if let Some((None, data)) = self.objmap.lookup(id) { + Some(data) + } else { + None + } + } + + fn lookup_file(&self, id: &Sha256Digest) -> Option<(&AlignedSlice, Option<&ObjectID>)> { + if let Some((objectid, data)) = self.objmap.lookup(id) { + Some((data, objectid)) + } else { + None + } + } + + pub(crate) fn inspect(&self) { + println!("objmap id: {:?}", &self.objmap_id); + for (ostree_digest, maybe_obj_id, _data) in self.objmap.iter() { + if let Some(obj_id) = maybe_obj_id { + println!("Ostree {} => {:?}", hex::encode(ostree_digest), obj_id); + } + } + } +} + +#[derive(Debug)] +pub struct OstreeCommitWriter { + repo: Arc>, + objmap: ObjectMapWriter, + commit_id: Option, + commit: Option, +} + +impl OstreeCommitWriter { + pub fn new(repo: &Arc>) -> Self { + OstreeCommitWriter { + repo: repo.clone(), + commit: None, + commit_id: None, + objmap: ObjectMapWriter::::new(), + } + } + + pub fn ensure_commit(&self) -> Result<(Sha256Digest, ObjectID)> { + let commit = self + .commit + .as_ref() + .ok_or(Error::msg("No commit was pulled"))?; + + let commit_id = self + .commit_id + .as_ref() + .ok_or(Error::msg("No commit was pulled"))?; + + let (objmap_id, objmap_sha256) = self.objmap.serialize(&self.repo)?; + + let mut stream = self + .repo + .create_stream(COMMIT_CONTENT_TYPE, Some(*commit_id)); + + stream.add_sha256_mapping(&objmap_sha256, &objmap_id); + + stream.write_inline(&commit); + let object_id = self.repo.write_stream(stream, None)?; + + Ok((*commit_id, object_id)) + } + + fn insert_dirmeta(&mut self, id: &Sha256Digest, data: &[u8]) { + self.objmap.insert(id, None, data); + } + + fn insert_dirtree(&mut self, id: &Sha256Digest, data: &[u8]) { + self.objmap.insert(id, None, data); + } + + fn insert_file( + &mut self, + id: &Sha256Digest, + obj_id: Option<&ObjectID>, + file_header: AlignedBuf, + ) { + self.objmap.insert(id, obj_id, &file_header); + } +} + +struct Outstanding { + id: Sha256Digest, + obj_type: ObjectType, +} + +impl fmt::Debug for Outstanding { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Outstanding") + .field("id", &hex::encode(self.id)) + .field("obj_type", &self.obj_type) + .finish() + } +} + +#[derive(Debug)] +pub struct PullOperation> { + repo: Arc>, + builder: OstreeCommitWriter, + ostree_repo: RepoType, + base_commits: Vec>, + outstanding: VecDeque, + // All ids that were ever enqueued (including already fetched and currently being fetched) + fetched: HashSet, +} + +impl> + PullOperation +{ + pub fn new(repo: &Arc>, ostree_repo: RepoType) -> Self { + PullOperation { + repo: repo.clone(), + builder: OstreeCommitWriter::::new(repo), + ostree_repo: ostree_repo, + outstanding: VecDeque::new(), + base_commits: vec![], + fetched: HashSet::new(), + } + } + + pub fn add_base(&mut self, base_name: &str) -> Result<()> { + let base = OstreeCommit::::load(&self.repo, base_name)?; + self.base_commits.push(base); + Ok(()) + } + + fn enqueue_fetch(&mut self, id: &Sha256Digest, obj_type: ObjectType) { + // To avoid fetching twice, even if the id is not in the outstanding list + // (for example we may be currenly downloading it) we keep all ids we ever + // fetch in a map + if self.fetched.contains(id) { + return; + } + self.fetched.insert(*id); + // We request metadata objects first + if obj_type == ObjectType::File { + self.outstanding.push_back(Outstanding { + id: *id, + obj_type: obj_type, + }); + } else { + self.outstanding.push_front(Outstanding { + id: *id, + obj_type: obj_type, + }); + } + } + + fn maybe_fetch_file(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some((file_header, obj_id)) = base.lookup_file(id) { + self.add_file(id, obj_id.cloned().as_ref(), file_header.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::File); + } + + fn add_file(&mut self, id: &Sha256Digest, obj_id: Option<&ObjectID>, file_header: AlignedBuf) { + self.builder.insert_file(id, obj_id, file_header); + } + + fn maybe_fetch_dirmeta(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirmeta) = base.lookup_dirmeta(id) { + self.add_dirmeta(id, dirmeta.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirMeta); + } + + fn add_dirmeta(&mut self, id: &Sha256Digest, data: AlignedBuf) { + self.builder.insert_dirmeta(id, &data); + } + + fn maybe_fetch_dirtree(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirtree) = base.lookup_dirtree(id) { + self.add_dirtree(id, dirtree.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirTree); + } + + fn add_dirtree(&mut self, id: &Sha256Digest, buf: AlignedBuf) { + let data = gv!("(a(say)a(sayay))").cast(buf.as_aligned()); + let (files_data, dirs_data) = data.to_tuple(); + + for f in files_data.iter() { + let (_name, checksum) = f.to_tuple(); + + self.maybe_fetch_file(checksum.try_into().unwrap()); + } + + for d in dirs_data.iter() { + let (_name, tree_checksum, meta_checksum) = d.to_tuple(); + + self.maybe_fetch_dirmeta(meta_checksum.try_into().unwrap()); + self.maybe_fetch_dirtree(tree_checksum.try_into().unwrap()); + } + + self.builder.insert_dirtree(id, &buf); + } + + fn add_commit(&mut self, id: &Sha256Digest, commit: AlignedBuf) { + let data = gv!("(a{sv}aya(say)sstayay)").cast(&commit); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + self.maybe_fetch_dirmeta(root_metadata.try_into().unwrap()); + self.maybe_fetch_dirtree(root_tree.try_into().unwrap()); + + self.builder.commit_id = Some(*id); + self.builder.commit = Some(commit); + } + + pub async fn pull_commit( + &mut self, + commit_id: &Sha256Digest, + ) -> Result<(Sha256Digest, ObjectID)> { + self.enqueue_fetch(commit_id, ObjectType::Commit); + + // TODO: Support deltas + + // TODO: At least for http we should make parallel fetches + while self.outstanding.len() > 0 { + let fetch = self.outstanding.pop_front().unwrap(); + println!( + "Fetching ostree {:?} object {} ", + fetch.obj_type, + hex::encode(fetch.id) + ); + + match fetch.obj_type { + ObjectType::Commit => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid commit checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_commit(&fetch.id, data); + } + ObjectType::DirMeta => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirmeta(&fetch.id, data); + } + ObjectType::DirTree => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirtree(&fetch.id, data); + } + ObjectType::File => { + let (file_header, obj_id) = self.ostree_repo.fetch_file(&fetch.id).await?; + + self.add_file(&fetch.id, obj_id.as_ref(), file_header); + } + _ => {} + } + } + + self.builder.ensure_commit() + } +} diff --git a/crates/composefs-ostree/src/lib.rs b/crates/composefs-ostree/src/lib.rs new file mode 100644 index 0000000..28b79fb --- /dev/null +++ b/crates/composefs-ostree/src/lib.rs @@ -0,0 +1,102 @@ +use anyhow::Result; +use rustix::fs::CWD; +use std::{path::Path, sync::Arc}; + +use composefs::{fsverity::FsVerityHashValue, repository::Repository, tree::FileSystem}; + +pub mod commit; +pub mod objmap; +pub mod repo; + +use crate::commit::{OstreeCommit, PullOperation}; +use crate::repo::{LocalRepo, RemoteRepo}; + +pub async fn pull_local( + repo: &Arc>, + path: &Path, + ostree_ref: &str, + reference: Option<&str>, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = LocalRepo::open_path(repo, CWD, path)?; + + let commit_checksum = ostree_repo.read_ref(ostree_ref)?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + // If we're giving the new image a new, use any existing image + // with that name as a potential base + if let Some(reference) = reference { + if repo.has_named_stream(&reference) { + let reference_path = format!("refs/{reference}"); + op.add_base(&reference_path)?; + } + } + + let (sha256, objid) = op.pull_commit(&commit_checksum).await?; + + if let Some(name) = reference { + repo.name_stream(sha256, name)?; + } + + Ok(objid) +} + +pub async fn pull( + repo: &Arc>, + url: &str, + ostree_ref: &str, + reference: Option<&str>, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = RemoteRepo::new(repo, url)?; + + let commit_checksum = ostree_repo.resolve_ref(ostree_ref).await?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + // If we're giving the new image a new, use any existing image + // with that name as a potential base + if let Some(reference) = reference { + if repo.has_named_stream(&reference) { + let reference_path = format!("refs/{reference}"); + op.add_base(&reference_path)?; + } + } + + let (sha256, objid) = op.pull_commit(&commit_checksum).await?; + + if let Some(name) = reference { + repo.name_stream(sha256, name)?; + } + + Ok(objid) +} + +/// Creates a filesystem from the given OSTree commit. +pub fn create_filesystem( + repo: &Repository, + commit_name: &str, +) -> Result> { + let image = OstreeCommit::::load(repo, commit_name)?; + let fs = image.create_filesystem()?; + + Ok(fs) +} + +/// Creates a filesystem from the given OSTree commit. +pub fn inspect( + repo: &Repository, + commit_name: &str, +) -> Result<()> { + let image = OstreeCommit::::load(repo, commit_name)?; + image.inspect(); + + Ok(()) +} diff --git a/crates/composefs-ostree/src/objmap.rs b/crates/composefs-ostree/src/objmap.rs new file mode 100644 index 0000000..34b8d3c --- /dev/null +++ b/crates/composefs-ostree/src/objmap.rs @@ -0,0 +1,461 @@ +/* Implementation of the ObjectID Map format + * + * ObjectID maps are mappings from a foreign sha256 digest of some + * form into an header of data, and an optional reference to an + * external ObjectID (i.e. a fsverity) matching a composefs repo + * ObjectID format. + * + * The file format is intended to be inside of a splitstream and + * uses the splitstream header to reference the external object ids. + * + * An object file has this format: + * (All ints are in little endian) + * + * buckets; + * 256 x (indexes are into mapped_ids) + * +-----------------------------------+ + * | u32: end index of bucket | + * +-----------------------------------+ + * + * mapped_ids: + * n_objects x (sorted) + * +-----------------------------------+ + * | [u8; 32] mapped object id | + * +-----------------------------------+ + * + * object_data: + * n_objects x (same order as object_ids) + * +-----------------------------------+ + * | u32: offset to per-object data | + * | u32: length of per-object data | + * | u32: Index of external object ref | + * | or MAXUINT32 if none. | + * +-----------------------------------+ + * + * Offset are 8byte aligned offsets from after the end of the + * object_data array. + * + */ +use anyhow::{Error, Result}; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, TryAsAligned, A8}; +use std::{fmt, fs::File, io::Read, mem::size_of, sync::Arc}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + splitstream::{SplitStreamReader, SplitStreamWriter}, + util::Sha256Digest, +}; + +const OBJMAP_CONTENT_TYPE: u64 = 0xAFE138C18C463EF1; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct ObjMapHeader { + bucket_ends: [u32; 256], +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct Sha256DigestArray { + ids: [Sha256Digest], +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct ObjectIDArray { + ids: [ObjectID], +} + +const NO_EXTERNAL_INDEX: u32 = u32::MAX; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout, Clone)] +#[repr(C)] +struct DataRef { + offset: u32, + size: u32, + external_index: u32, +} + +impl DataRef { + pub fn new(offset: usize, size: usize, external_index: Option) -> Self { + DataRef { + offset: u32::to_le(offset as u32), + size: u32::to_le(size as u32), + external_index: u32::to_le(match external_index { + Some(idx) => idx as u32, + None => NO_EXTERNAL_INDEX, + }), + } + } + pub fn get_offset(&self) -> usize { + return u32::from_le(self.offset) as usize; + } + pub fn get_size(&self) -> usize { + return u32::from_le(self.size) as usize; + } + pub fn get_external_index(&self) -> Option { + match u32::from_le(self.external_index) { + NO_EXTERNAL_INDEX => None, + idx => Some(idx as usize), + } + } +} + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct DataRefs { + datas: [DataRef], +} + +#[derive(Debug)] +struct WriterMapEntry { + mapped_id: Sha256Digest, + verity: Option, + data: AlignedBuf, +} + +#[derive(Debug)] +pub struct ObjectMapWriter { + map: Vec>, +} + +fn align8(x: usize) -> usize { + (x + 7) & !7 +} + +impl ObjectMapWriter { + pub fn new() -> Self { + ObjectMapWriter { map: vec![] } + } + + pub fn iter( + &self, + ) -> impl Iterator, &AlignedSlice)> { + self.map + .iter() + .map(|e| (&e.mapped_id, e.verity.as_ref(), &e.data[..])) + } + + pub fn contains(&self, mapped_id: &Sha256Digest) -> bool { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(_) => true, + Err(..) => false, + } + } + + pub fn lookup( + &self, + mapped_id: &Sha256Digest, + ) -> Option<(Option<&ObjectID>, &AlignedSlice)> { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(idx) => Some((self.map[idx].verity.as_ref(), &self.map[idx].data)), + Err(..) => None, + } + } + + pub fn insert(&mut self, mapped_id: &Sha256Digest, verity: Option<&ObjectID>, data: &[u8]) { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(_idx) => {} + Err(idx) => { + let mut aligned_data = AlignedBuf::new(); + aligned_data.with_vec(|v| v.extend_from_slice(data)); + self.map.insert( + idx, + WriterMapEntry { + mapped_id: *mapped_id, + verity: verity.cloned(), + data: aligned_data, + }, + ); + } + } + } + + pub fn merge_from(&mut self, reader: &ObjectMapReader) { + for (sha256, objid, data) in reader.iter() { + self.insert(sha256, objid, data); + } + } + + pub fn serialize(&self, repo: &Arc>) -> Result<(ObjectID, Sha256Digest)> { + let mut ss = SplitStreamWriter::::new(repo, OBJMAP_CONTENT_TYPE, true, None); + + /* Ensure we can index and count items using u32 (leaving one for NO_EXTERNAL_INDEX) */ + let item_count = self.map.len(); + if item_count > (NO_EXTERNAL_INDEX - 1) as usize { + return Err(Error::msg("Too many items in object map")); + } + + let mut header = ObjMapHeader { + bucket_ends: [0; 256], + }; + + // Compute data offsets and add external object references + let mut data_size = 0usize; + let mut data_offsets = vec![0usize; item_count]; + for (i, e) in self.map.iter().enumerate() { + data_offsets[i] = data_size; + data_size += align8(e.data.len()); + + if let Some(verity) = &e.verity { + ss.add_external_reference(&verity) + } + } + + // Ensure all data can be indexed by u32 + if data_size > u32::MAX as usize { + return Err(Error::msg("Too large data in object map")); + } + + // Compute bucket ends + for e in self.map.iter() { + // Initially end is just the count + header.bucket_ends[e.mapped_id[0] as usize] += 1; + } + for i in 1..256 { + // Then we sum them up to the end + header.bucket_ends[i] += header.bucket_ends[i - 1]; + } + // Convert buckets to little endian + for i in 0..256 { + header.bucket_ends[i] = u32::to_le(header.bucket_ends[i]); + } + + // Add header + ss.write_inline(header.as_bytes()); + // Add mapped ids + for e in self.map.iter() { + ss.write_inline(&e.mapped_id); + } + // Add data refs + for (i, e) in self.map.iter().enumerate() { + let idx = if let Some(verity) = &e.verity { + ss.lookup_external_reference(&verity) + } else { + None + }; + let d = DataRef::new(data_offsets[i], e.data.len(), idx); + ss.write_inline(d.as_bytes()); + } + + // Add 8-aligned data chunks + for e in self.map.iter() { + ss.write_inline(&e.data); + // Pad to 8 + let padding = align8(e.data.len()) - e.data.len(); + if padding > 0 { + ss.write_inline(&vec![0u8; padding]); + } + } + + let (objid, sha256) = ss.done()?; + + // This is safe because we passed true to compute this above + Ok((objid, sha256.unwrap())) + } +} + +pub struct ObjectMapReader { + data: AlignedBuf, + bucket_ends: [u32; 256], + mapped_ids: Vec, + datas: Vec, + pub refs: Vec, +} + +impl fmt::Debug for ObjectMapReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut m = f.debug_map(); + for mapped_id in self.mapped_ids.iter() { + m.entry( + &hex::encode(mapped_id), + &format!("{:?}", self.lookup(mapped_id).unwrap()), + ); + } + m.finish() + } +} + +fn validate_buckets(buckets: &[u32; 256]) -> Result<()> { + for i in 1..256 { + // Bucket ends are (non-strictly) increasing + if buckets[i] < buckets[i - 1] { + return Err(Error::msg(format!("Invalid objmap bucket data"))); + } + } + Ok(()) +} + +impl ObjectMapReader { + pub fn load(repo: &Repository, obj_id: &ObjectID) -> Result { + let fd = repo.open_object(obj_id)?; + + let file = File::from(fd); + let mut ss = SplitStreamReader::new(file, Some(OBJMAP_CONTENT_TYPE))?; + + let mut buf = AlignedBuf::new(); + + buf.with_vec(|v| v.resize(size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + } + + let h = ObjMapHeader::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap header")))?; + + let mut buckets: [u32; 256] = h.bucket_ends; + for b in buckets.iter_mut() { + *b = u32::from_le(*b); + } + validate_buckets(&buckets)?; + let item_count = buckets[255] as usize; + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + let mapped_ids = Sha256DigestArray::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap array")))?; + + if mapped_ids.ids.len() != item_count { + return Err(Error::msg("Invalid objmap array")); + } + let mapped = mapped_ids.ids.to_vec(); + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + + let data_refs = DataRefs::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap array")))?; + + if data_refs.datas.len() != item_count { + return Err(Error::msg("Invalid objmap array")); + } + + let datas = data_refs.datas.to_vec(); + + buf.with_vec(|v| { + v.resize(0, 0u8); + ss.read_to_end(v) + })?; + + Ok(ObjectMapReader { + data: buf, + bucket_ends: buckets, + mapped_ids: mapped, + datas: datas, + refs: ss.refs.clone(), + }) + } + + fn get_data(&self, data_ref: &DataRef) -> (Option<&ObjectID>, &AlignedSlice) { + let start = data_ref.get_offset(); + let end = start + data_ref.get_size(); + // The unwrap here is safe, because data is always 8 aligned + let data = &self.data[start..end].try_as_aligned().unwrap(); + + if let Some(index) = data_ref.get_external_index() { + (Some(&self.refs[index]), data) + } else { + (None, data) + } + } + + fn get_bucket(&self, mapped_id: &Sha256Digest) -> (usize, usize) { + let first = mapped_id[0] as usize; + let start = if first == 0 { + 0 + } else { + self.bucket_ends[first - 1] + }; + let end = self.bucket_ends[first]; + (start as usize, end as usize) + } + + pub fn contains(&self, mapped_id: &Sha256Digest) -> bool { + let (start, end) = self.get_bucket(mapped_id); + let in_bucket = &self.mapped_ids[start..end]; + match in_bucket.binary_search(mapped_id) { + Ok(_) => true, + Err(..) => false, + } + } + + pub fn lookup( + &self, + mapped_id: &Sha256Digest, + ) -> Option<(Option<&ObjectID>, &AlignedSlice)> { + let (start, end) = self.get_bucket(mapped_id); + let mapped_ids_in_bucket = &self.mapped_ids[start..end]; + let data_refs_in_bucket = &self.datas[start..end]; + let index = match mapped_ids_in_bucket.binary_search(mapped_id) { + Ok(i) => i, + Err(..) => return None, + }; + Some(self.get_data(&data_refs_in_bucket[index])) + } + + pub fn iter( + &self, + ) -> impl Iterator, &AlignedSlice)> { + self.mapped_ids.iter().enumerate().map(|e| { + let (objid, data) = self.get_data(&self.datas[e.0]); + (e.1, objid, data) + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use composefs::{fsverity::Sha256HashValue, util::parse_sha256}; + + #[test] + fn test_roundtrip() -> Result<()> { + let mut writer = ObjectMapWriter::::new(); + + let mapped_1 = + parse_sha256("84682bb6f0404ba9b81d5f3b753be2a08f1165389229ee8516acbd5700182cad")?; + let mapped_2 = + parse_sha256("4b37fb400b28a686343ba83f00789608e0b624b13bf50d713bc8a9b0de514e00")?; + let mapped_3 = + parse_sha256("4b37fb400b28a686343ba83f00789608e0b624b13bf50d713bc8a9b0de514e01")?; + let mapped_4 = + parse_sha256("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; + let objid_1 = Sha256HashValue::from_hex( + "0a11b9bb6258495dbe1677b2dc4e0d6c4cc86aef8b7c274756d40a878a921a8a", + )?; + let objid_2 = Sha256HashValue::from_hex( + "a0729185616450a10bd8439549221433edc7154d9f87a454768a368de2e5967a", + )?; + let objid_3 = Sha256HashValue::from_hex( + "37d2eeabfa179742b9b490cc3072cc289124e74f5aa3d4bc270862f07890c1cc", + )?; + let data_1 = vec![42u8]; + let data_2 = vec![12u8, 17u8]; + let data_3 = vec![]; + + writer.insert(&mapped_1, Some(&objid_1), &data_1); + writer.insert(&mapped_2, Some(&objid_2), &data_2); + writer.insert(&mapped_3, Some(&objid_3), &data_3); + + let r1 = writer.lookup(&mapped_1); + assert_eq!(r1, Some((Some(&objid_1), data_1.as_slice()))); + let r2 = writer.lookup(&mapped_2); + assert_eq!(r2, Some((Some(&objid_2), data_2.as_slice()))); + let r3 = writer.lookup(&mapped_3); + assert_eq!(r3, Some((Some(&objid_3), data_3.as_slice()))); + let r4 = writer.lookup(&mapped_4); + assert_eq!(r4, None); + + Ok(()) + } +} diff --git a/crates/composefs-ostree/src/repo.rs b/crates/composefs-ostree/src/repo.rs new file mode 100644 index 0000000..00e11f1 --- /dev/null +++ b/crates/composefs-ostree/src/repo.rs @@ -0,0 +1,553 @@ +use anyhow::{bail, Context, Error, Result}; +use configparser::ini::Ini; +use flate2::read::DeflateDecoder; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, A8}; +use gvariant::{gv, Marker, Structure}; +use reqwest::{Client, Url}; +use rustix::fd::AsRawFd; +use rustix::fs::{fstat, openat, readlinkat, FileType, Mode, OFlags}; +use rustix::io::Errno; +use sha2::{Digest, Sha256}; +use std::{ + fs::File, + future::Future, + io::{empty, Read}, + os::fd::{AsFd, OwnedFd}, + path::Path, + sync::Arc, +}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + util::{parse_sha256, ErrnoFilter, Sha256Digest}, + INLINE_CONTENT_MAX, +}; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum RepoMode { + Bare, + Archive, + BareUser, + BareUserOnly, + BareSplitXAttrs, +} + +#[derive(Debug, PartialEq)] +pub enum ObjectType { + File, + DirTree, + DirMeta, + Commit, + TombstoneCommit, + PayloadLink, + FileXAttrs, + FileXAttrsLink, +} + +impl ObjectType { + pub fn extension(&self, repo_mode: RepoMode) -> &'static str { + match self { + ObjectType::File => { + if repo_mode == RepoMode::Archive { + ".filez" + } else { + ".file" + } + } + ObjectType::DirTree => ".dirtree", + ObjectType::DirMeta => ".dirmeta", + ObjectType::Commit => ".commit", + ObjectType::TombstoneCommit => ".commit-tombstone", + ObjectType::PayloadLink => ".payload-link", + ObjectType::FileXAttrs => ".file-xattrs", + ObjectType::FileXAttrsLink => ".file-xattrs-link", + } + } +} + +impl RepoMode { + pub fn parse(s: &str) -> Result { + match s { + "bare" => Ok(RepoMode::Bare), + "archive" => Ok(RepoMode::Archive), + "archive-z2" => Ok(RepoMode::Archive), + "bare-user" => Ok(RepoMode::BareUser), + "bare-user-only" => Ok(RepoMode::BareUserOnly), + "bare-split-xattrs" => Ok(RepoMode::BareSplitXAttrs), + _ => Err(Error::msg(format!("Unsupported repo mode {}", s))), + } + } +} + +/* Source for locally available data about ostree objects, typically + * in-memory caches */ +pub trait ObjectStore { + fn lookup_dirmeta(&self, _id: &Sha256Digest) -> Option<&AlignedSlice>; + fn lookup_dirtree(&self, _id: &Sha256Digest) -> Option<&AlignedSlice>; + fn lookup_file(&self, _id: &Sha256Digest) -> Option<(&AlignedSlice, &ObjectID)>; +} + +fn get_object_pathname(mode: RepoMode, checksum: &Sha256Digest, object_type: ObjectType) -> String { + format!( + "{:02x}/{}{}", + checksum[0], + hex::encode(&checksum[1..]), + object_type.extension(mode) + ) +} + +fn size_prefix(data: &[u8]) -> AlignedBuf { + let mut buf = AlignedBuf::new(); + let svh = SizedVariantHeader { + size: u32::to_be(data.len() as u32), + padding: 0, + }; + buf.with_vec(|v| v.extend_from_slice(svh.as_bytes())); + buf.with_vec(|v| v.extend_from_slice(data)); + buf +} + +pub(crate) fn get_sized_variant_size(data: &[u8]) -> Result { + let variant_header_size = size_of::(); + if data.len() < variant_header_size { + bail!("Sized variant too small"); + } + + let aligned: AlignedBuf = data[0..variant_header_size].to_vec().into(); + let h = SizedVariantHeader::ref_from_bytes(&aligned) + .map_err(|e| Error::msg(format!("Sized variant header: {:?}", e)))?; + Ok(u32::from_be(h.size) as usize) +} + +pub(crate) fn split_sized_variant(data: &[u8]) -> Result<(&[u8], &[u8], &[u8])> { + let variant_size = get_sized_variant_size(data)?; + let header_size = size_of::(); + if data.len() < header_size + variant_size { + bail!("Sized variant too small"); + } + + let sized_data = &data[0..header_size + variant_size]; + let variant_data = &data[header_size..header_size + variant_size]; + let remaining_data = &data[header_size + variant_size..]; + + Ok((sized_data, variant_data, remaining_data)) +} + +pub(crate) fn ostree_zlib_file_header_to_regular(zlib_header_data: &AlignedSlice) -> Vec { + let data = gv!("(tuuuusa(ayay))").cast(zlib_header_data); + let (_size, uid, gid, mode, zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut s = Vec::<(&[u8], &[u8])>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + s.push((key, value)) + } + + gv!("(uuuusa(ayay))").serialize_to_vec(&(*uid, *gid, *mode, *zero, symlink_target.to_str(), &s)) +} + +/* This is how ostree stores gvariants on disk when used as a header for filez objects */ +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub(crate) struct SizedVariantHeader { + size: u32, + padding: u32, +} + +pub trait OstreeRepo { + fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> impl Future>; + fn fetch_file( + &self, + checksum: &Sha256Digest, + ) -> impl Future)>>; +} + +#[derive(Debug)] +pub struct RemoteRepo { + repo: Arc>, + client: Client, + url: Url, +} + +impl RemoteRepo { + pub fn new(repo: &Arc>, url: &str) -> Result { + Ok(RemoteRepo { + repo: repo.clone(), + client: Client::new(), + url: Url::parse(url)?, + }) + } + + pub async fn resolve_ref(&self, ref_name: &str) -> Result { + // TODO: Support summary format + let path = format!("refs/heads/{}", ref_name); + let url = self.url.join(&path)?; + + let t = self + .client + .get(url.clone()) + .send() + .await? + .text() + .await + .with_context(|| format!("Cannot get ostree ref at {}", url))?; + + Ok(parse_sha256(&t.trim())?) + } +} + +impl OstreeRepo for RemoteRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, object_type) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + let b = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree object at {}", url))?; + + Ok(b.to_vec().into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, ObjectType::File) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + + let data = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree file at {}", url))?; + + let (file_header, variant_data, compressed_data) = split_sized_variant(&data)?; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(variant_data)); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // Decompress rest + let mut uncompressed = DeflateDecoder::new(compressed_data); + + // TODO: Stream files into repo instead of reading it all + + let mut file_content = Vec::new(); + uncompressed.read_to_end(&mut file_content)?; + + hasher.update(&file_content); + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {:?}, expected {:?}", + actual_checksum, + checksum + ); + } + + let mut file_data = file_header.to_vec(); + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + file_data.extend_from_slice(&file_content); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((file_data.into(), obj_id)) + } +} + +#[derive(Debug)] +pub struct LocalRepo { + repo: Arc>, + mode: RepoMode, + dir: OwnedFd, + objects: OwnedFd, +} + +impl LocalRepo { + pub fn open_path( + repo: &Arc>, + dirfd: impl AsFd, + path: impl AsRef, + ) -> Result { + let path = path.as_ref(); + let repofd = openat( + &dirfd, + path, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repository at {}", path.display()))?; + + let configfd = openat( + &repofd, + "config", + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repo config file at {}", path.display()))?; + + let mut config_data = String::new(); + + File::from(configfd) + .read_to_string(&mut config_data) + .with_context(|| format!("Can't read config file"))?; + + let mut config = Ini::new(); + let map = config + .read(config_data) + .map_err(Error::msg) + .with_context(|| format!("Can't read config file"))?; + + let core = if let Some(core_map) = map.get("core") { + core_map + } else { + return Err(Error::msg(format!("No [core] section in config"))); + }; + + let mode = if let Some(Some(mode)) = core.get("mode") { + RepoMode::parse(mode)? + } else { + return Err(Error::msg(format!("No mode in [core] section in config"))); + }; + + if mode != RepoMode::Archive && mode != RepoMode::BareUserOnly { + return Err(Error::msg(format!("Unsupported repo mode {mode:?}"))); + } + + let objectsfd = openat( + &repofd, + "objects", + OFlags::PATH | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ) + .with_context(|| { + format!( + "Cannot open ostree repository objects directory at {}", + path.display() + ) + })?; + + Ok(Self { + repo: repo.clone(), + mode: mode, + dir: repofd, + objects: objectsfd, + }) + } + + pub fn open_object_flags( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + flags: OFlags, + ) -> Result { + let cs = checksum.into(); + let path = get_object_pathname(self.mode, cs, object_type); + + openat(&self.objects, &path, flags | OFlags::CLOEXEC, Mode::empty()) + .with_context(|| format!("Cannot open ostree objects object at {}", path)) + } + + pub fn open_object(&self, checksum: &Sha256Digest, object_type: ObjectType) -> Result { + self.open_object_flags(checksum, object_type, OFlags::RDONLY | OFlags::NOFOLLOW) + } + + pub fn read_ref(&self, ref_name: &str) -> Result { + let path1 = format!("refs/{}", ref_name); + let path2 = format!("refs/heads/{}", ref_name); + + let fd1 = openat( + &self.dir, + &path1, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .filter_errno(Errno::NOENT) + .with_context(|| format!("Cannot open ostree ref at {}", path1))?; + + let fd = if let Some(fd) = fd1 { + fd + } else { + openat( + &self.dir, + &path2, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree ref at {}", path2))? + }; + + let mut buffer = String::new(); + File::from(fd) + .read_to_string(&mut buffer) + .with_context(|| format!("Can't read ref file"))?; + + Ok(parse_sha256(&buffer.trim())?) + } + + async fn fetch_file_bare( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let path_fd = self.open_object_flags( + checksum.into(), + ObjectType::File, + OFlags::PATH | OFlags::NOFOLLOW, + )?; + + let st = fstat(&path_fd)?; + + let filetype = FileType::from_raw_mode(st.st_mode); + + let symlink_target = if filetype.is_symlink() { + readlinkat(&path_fd, "", [])?.into_string()? + } else { + String::from("") + }; + + let xattrs = Vec::<(&[u8], &[u8])>::new(); + + let (uid, gid, mode) = match self.mode { + RepoMode::Bare => { + // TODO: Read xattrs from disk + (st.st_uid, st.st_gid, st.st_mode) + } + RepoMode::BareUser => { + // TODO: read user.ostreemeta xattr + bail!("BareUser not supported yet") + } + RepoMode::BareUserOnly => (0, 0, st.st_mode), + _ => { + bail!("Unsupported repo mode {:?}", self.mode) + } + }; + + let v = gv!("(tuuuusa(ayay))").serialize_to_vec(&( + u64::to_be(st.st_size as u64), + u32::to_be(uid), + u32::to_be(gid), + u32::to_be(mode), + u32::to_be(0), // rdev + &symlink_target, + &xattrs, + )); + + let zlib_header = size_prefix(&v); + + if filetype.is_symlink() { + Ok((zlib_header, Box::new(empty()))) + } else { + let fd_path = format!("/proc/self/fd/{}", path_fd.as_fd().as_raw_fd()); + Ok((zlib_header, Box::new(File::open(fd_path)?))) + } + } + + async fn fetch_file_archive( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let fd = self.open_object(checksum.into(), ObjectType::File)?; + let mut file = File::from(fd); + + let mut header_buf = AlignedBuf::new(); + + // Read variant size header + let header_size = size_of::(); + header_buf.with_vec(|v| { + v.resize(header_size, 0u8); + file.read_exact(v) + })?; + + // Read variant + let variant_size = get_sized_variant_size(&header_buf)?; + header_buf.with_vec(|v| { + v.resize(header_size + variant_size, 0u8); + file.read_exact(&mut v[header_size..]) + })?; + + // Decompress rest + Ok((header_buf, Box::new(DeflateDecoder::new(file)))) + } +} + +impl OstreeRepo for LocalRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let fd = self.open_object(checksum.into(), object_type)?; + + let mut buffer = Vec::new(); + File::from(fd).read_to_end(&mut buffer)?; + Ok(buffer.into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let (mut header_buf, mut rest) = if self.mode == RepoMode::Archive { + self.fetch_file_archive(checksum).await? + } else { + self.fetch_file_bare(checksum).await? + }; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + let header_size = size_of::(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(&header_buf[header_size..])); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // TODO: Stream files into repo instead of reading it all + let mut file_content = Vec::new(); + rest.read_to_end(&mut file_content)?; + hasher.update(&file_content); + + // Ensure matching checksum + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {}, expected {}", + hex::encode(actual_checksum), + hex::encode(checksum) + ); + } + + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + header_buf.with_vec(|v| v.extend_from_slice(&file_content)); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((header_buf.into(), obj_id)) + } +} diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index 01d6686..831765f 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -226,7 +226,7 @@ impl Repository { content_type: u64, sha256: Option, ) -> SplitStreamWriter { - SplitStreamWriter::new(self, content_type, sha256) + SplitStreamWriter::new(self, content_type, false, sha256) } fn format_object_path(id: &ObjectID) -> String { @@ -303,11 +303,11 @@ impl Repository { writer: SplitStreamWriter, reference: Option<&str>, ) -> Result { - let Some((.., ref sha256)) = writer.sha256 else { + let Some(sha256) = writer.expected_sha256 else { bail!("Writer doesn't have sha256 enabled"); }; let stream_path = format!("streams/{}", hex::encode(sha256)); - let object_id = writer.done()?; + let (object_id, _) = writer.done()?; let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; @@ -362,7 +362,7 @@ impl Repository { None => { let mut writer = self.create_stream(content_type, Some(*sha256)); callback(&mut writer)?; - let object_id = writer.done()?; + let (object_id, _) = writer.done()?; let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index c5f02f2..0e0bdf5 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -94,7 +94,8 @@ pub struct SplitStreamWriter { total_size: u64, writer: Encoder<'static, Vec>, pub content_type: u64, - pub sha256: Option<(Sha256, Sha256Digest)>, + pub sha256: Option, + pub expected_sha256: Option, } impl std::fmt::Debug for SplitStreamWriter { @@ -103,6 +104,7 @@ impl std::fmt::Debug for SplitStreamWriter SplitStreamWriter { pub fn new( repo: &Arc>, content_type: u64, - sha256: Option, + compute_sha256: bool, + expected_sha256: Option, ) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec let writer = Encoder::new(vec![], 0).unwrap(); @@ -125,7 +128,12 @@ impl SplitStreamWriter { total_size: 0, mappings: DigestMap::new(), writer, - sha256: sha256.map(|x| (Sha256::new(), x)), + sha256: if compute_sha256 || expected_sha256.is_some() { + Some(Sha256::new()) + } else { + None + }, + expected_sha256, } } @@ -174,7 +182,7 @@ impl SplitStreamWriter { /// really, "add inline content to the buffer" /// you need to call .flush_inline() later pub fn write_inline(&mut self, data: &[u8]) { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256) = self.sha256 { sha256.update(data); } self.inline_content.extend(data); @@ -192,7 +200,7 @@ impl SplitStreamWriter { } pub fn write_external(&mut self, data: &[u8], padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256, ..) = self.sha256 { sha256.update(data); sha256.update(&padding); } @@ -204,7 +212,7 @@ impl SplitStreamWriter { } pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256, ..) = self.sha256 { sha256.update(&data); sha256.update(&padding); } @@ -214,14 +222,20 @@ impl SplitStreamWriter { self.write_reference(&id, padding) } - pub fn done(mut self) -> Result { + pub fn done(mut self) -> Result<(ObjectID, Option)> { self.flush_inline(vec![])?; - if let Some((context, expected)) = self.sha256 { - if Into::::into(context.finalize()) != expected { - bail!("Content doesn't have expected SHA256 hash value!"); + let sha256_digest = if let Some(sha256) = self.sha256 { + let actual = Into::::into(sha256.finalize()); + if let Some(expected) = self.expected_sha256 { + if actual != expected { + bail!("Content doesn't have expected SHA256 hash value!"); + } } - } + Some(actual) + } else { + None + }; let mut buf = vec![]; let header = SplitstreamHeader { @@ -248,7 +262,7 @@ impl SplitStreamWriter { buf.extend_from_slice(&self.writer.finish()?); - self.repo.ensure_object(&buf) + Ok((self.repo.ensure_object(&buf)?, sha256_digest)) } }