From e38436699dd09d641915463456fe4a51fc61db88 Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Wed, 25 Sep 2024 10:03:28 +0100 Subject: [PATCH 1/2] add support for zstd layers behind `zstd` feature - zstd multi-threaded compression supported behind `zstdmt` feature Signed-off-by: Tom Fay --- Cargo.toml | 5 ++ examples/zstd.rs | 49 ++++++++++++++++++ src/lib.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 examples/zstd.rs diff --git a/Cargo.toml b/Cargo.toml index 1cd88fa..08dddb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,11 @@ serde_json = "1.0.64" tar = "0.4.38" thiserror = "1" oci-spec = "0.7.0" +zstd = { version = "0.13.2", optional = true } [dev-dependencies] anyhow = "1.0.89" + +[features] +zstd = ["dep:zstd"] +zstdmt = ["zstd", "zstd/zstdmt"] diff --git a/examples/zstd.rs b/examples/zstd.rs new file mode 100644 index 0000000..e7ee133 --- /dev/null +++ b/examples/zstd.rs @@ -0,0 +1,49 @@ +#[cfg(feature = "zstdmt")] +fn main() { + use std::{env, path::PathBuf}; + + use oci_spec::image::Platform; + use ocidir::{cap_std::fs::Dir, new_empty_manifest, OciDir}; + let dir = Dir::open_ambient_dir(env::temp_dir(), ocidir::cap_std::ambient_authority()).unwrap(); + let oci_dir = OciDir::ensure(&dir).unwrap(); + + let mut manifest = new_empty_manifest().build().unwrap(); + let mut config = ocidir::oci_spec::image::ImageConfigurationBuilder::default() + .build() + .unwrap(); + + // Add the src as a layer + let mut writer = oci_dir.create_layer_zstd(Some(0)).unwrap(); + writer + .append_dir_all(".", PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src")) + .unwrap(); + + let layer = writer.into_inner().unwrap().complete().unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "src", None); + + // Add the examples as a layer, using multithreaded compression + let mut writer = oci_dir.create_layer_zstd_multithread(Some(0), 4).unwrap(); + writer + .append_dir_all( + ".", + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples"), + ) + .unwrap(); + let layer = writer.into_inner().unwrap().complete().unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "examples", None); + + println!( + "Created image with manifest: {}", + manifest.to_string_pretty().unwrap() + ); + + // Add the image manifest + let _descriptor = oci_dir + .insert_manifest_and_config(manifest.clone(), config, None, Platform::default()) + .unwrap(); +} + +#[cfg(not(feature = "zstdmt"))] +fn main() { + println!("Run this example with `cargo run --example zstd --features zstdmt`"); +} diff --git a/src/lib.rs b/src/lib.rs index 3245ae3..77e32e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,12 +104,14 @@ pub struct Layer { pub blob: Blob, /// The uncompressed digest, which will be used for "diffid"s pub uncompressed_sha256: Sha256Digest, + /// The media type of the layer + pub media_type: MediaType, } impl Layer { /// Return the descriptor for this layer pub fn descriptor(&self) -> oci_image::DescriptorBuilder { - self.blob.descriptor().media_type(MediaType::ImageLayerGzip) + self.blob.descriptor().media_type(self.media_type.clone()) } /// Return a Digest instance for the uncompressed SHA-256. @@ -152,6 +154,10 @@ impl<'a> Debug for GzipLayerWriter<'a> { } } +#[cfg(feature = "zstd")] +/// Writer for a OCI tar+zstd layer. +pub struct ZstdLayerWriter<'a>(Sha256Writer>>); + #[derive(Debug)] /// An opened OCI directory. pub struct OciDir { @@ -271,8 +277,41 @@ impl OciDir { Ok(tar::Builder::new(self.create_gzip_layer(c)?)) } - /// Add a layer to the top of the image stack. The firsh pushed layer becomes the root. + #[cfg(feature = "zstd")] + /// Create a tar output stream, backed by a zstd compressed blob + /// + /// This method is only available when the `zstd` feature is enabled. + pub fn create_layer_zstd( + &self, + compression_level: Option, + ) -> Result> { + Ok(tar::Builder::new(ZstdLayerWriter::new( + &self.dir, + compression_level, + )?)) + } + + #[cfg(feature = "zstdmt")] + /// Create a tar output stream, backed by a zstd compressed blob + /// using multithreaded compression. + /// + /// The `n_workers` parameter specifies the number of threads to use for compression, per + /// [zstd::Encoder::multithread]] + /// + /// This method is only available when the `zstdmt` feature is enabled. + pub fn create_layer_zstd_multithread( + &self, + compression_level: Option, + n_workers: u32, + ) -> Result> { + Ok(tar::Builder::new(ZstdLayerWriter::multithread( + &self.dir, + compression_level, + n_workers, + )?)) + } + /// Add a layer to the top of the image stack. The firsh pushed layer becomes the root. pub fn push_layer( &self, manifest: &mut oci_image::ImageManifest, @@ -639,6 +678,7 @@ impl<'a> GzipLayerWriter<'a> { Ok(Layer { blob, uncompressed_sha256, + media_type: MediaType::ImageLayerGzip, }) } } @@ -658,6 +698,91 @@ impl<'a> std::io::Write for GzipLayerWriter<'a> { } } +#[cfg(feature = "zstd")] +impl<'a> ZstdLayerWriter<'a> { + /// Create a writer for a gzip compressed layer blob. + fn new(ocidir: &'a Dir, c: Option) -> Result { + let bw = BlobWriter::new(ocidir)?; + let encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?; + Ok(Self(Sha256Writer::new(encoder))) + } + + /// Consume this writer, flushing buffered data and put the blob in place. + pub fn complete(self) -> Result { + let (digest, enc) = self.0.finish(); + let blob = enc.finish()?.complete()?; + let uncompressed_sha256 = Sha256Digest::from_str(&digest).unwrap(); + Ok(Layer { + blob, + uncompressed_sha256, + media_type: MediaType::ImageLayerZstd, + }) + } +} + +#[cfg(feature = "zstdmt")] +impl<'a> ZstdLayerWriter<'a> { + /// Create a writer for a zstd compressed layer blob, with multithreaded compression enabled. + /// + /// The `n_workers` parameter specifies the number of threads to use for compression, per + /// [Encoder::multithread]] + fn multithread(ocidir: &'a Dir, c: Option, n_workers: u32) -> Result { + let bw = BlobWriter::new(ocidir)?; + let mut encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?; + encoder.multithread(n_workers)?; + Ok(Self(Sha256Writer::new(encoder))) + } +} + +#[cfg(feature = "zstd")] +impl<'a> std::io::Write for ZstdLayerWriter<'a> { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.0.write(data) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +#[cfg(feature = "zstd")] +/// Wraps a writer and calculates the sha256 digest of data written to the inner writer +struct Sha256Writer { + inner: W, + sha: openssl::sha::Sha256, +} + +#[cfg(feature = "zstd")] +impl Sha256Writer { + pub(crate) fn new(inner: W) -> Self { + Self { + inner, + sha: openssl::sha::Sha256::new(), + } + } + + /// Return the hex encoded sha256 digest of the written data, and the underlying writer + pub(crate) fn finish(self) -> (String, W) { + (hex::encode(self.sha.finish()), self.inner) + } +} + +#[cfg(feature = "zstd")] +impl Write for Sha256Writer +where + W: Write, +{ + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let len = self.inner.write(buf)?; + self.sha.update(&buf[..len]); + Ok(len) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + #[cfg(test)] mod tests { use cap_std::fs::OpenOptions; From ff301f7c1f156723632c26ddf845bcf48b560d64 Mon Sep 17 00:00:00 2001 From: Tom Fay Date: Thu, 26 Sep 2024 06:30:53 +0100 Subject: [PATCH 2/2] use sha256writer in gziplayerwriter Signed-off-by: Tom Fay --- src/lib.rs | 54 +++++++++++++----------------------------------------- 1 file changed, 13 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 77e32e2..10516a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,20 +139,7 @@ impl<'a> Debug for BlobWriter<'a> { } /// Create an OCI tar+gzip layer. -pub struct GzipLayerWriter<'a> { - bw: BlobWriter<'a>, - uncompressed_hash: Hasher, - compressor: GzEncoder>, -} - -impl<'a> Debug for GzipLayerWriter<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("GzipLayerWriter") - .field("bw", &self.bw) - .field("compressor", &self.compressor) - .finish() - } -} +pub struct GzipLayerWriter<'a>(Sha256Writer>>); #[cfg(feature = "zstd")] /// Writer for a OCI tar+zstd layer. @@ -660,21 +647,14 @@ impl<'a> GzipLayerWriter<'a> { /// Create a writer for a gzip compressed layer blob. fn new(ocidir: &'a Dir, c: Option) -> Result { let bw = BlobWriter::new(ocidir)?; - Ok(Self { - bw, - uncompressed_hash: Hasher::new(MessageDigest::sha256())?, - compressor: GzEncoder::new(Vec::with_capacity(8192), c.unwrap_or_default()), - }) + let enc = flate2::write::GzEncoder::new(bw, c.unwrap_or_default()); + Ok(Self(Sha256Writer::new(enc))) } /// Consume this writer, flushing buffered data and put the blob in place. - pub fn complete(mut self) -> Result { - self.compressor.get_mut().clear(); - let buf = self.compressor.finish()?; - self.bw.write_all(&buf)?; - let blob = self.bw.complete()?; - let uncompressed_sha256 = - Sha256Digest::from_str(&hex::encode(self.uncompressed_hash.finish()?)).unwrap(); + pub fn complete(self) -> Result { + let (uncompressed_sha256, enc) = self.0.finish(); + let blob = enc.finish()?.complete()?; Ok(Layer { blob, uncompressed_sha256, @@ -684,17 +664,12 @@ impl<'a> GzipLayerWriter<'a> { } impl<'a> std::io::Write for GzipLayerWriter<'a> { - fn write(&mut self, srcbuf: &[u8]) -> std::io::Result { - self.compressor.get_mut().clear(); - self.compressor.write_all(srcbuf).unwrap(); - self.uncompressed_hash.update(srcbuf)?; - let compressed_buf = self.compressor.get_mut().as_slice(); - self.bw.write_all(compressed_buf)?; - Ok(srcbuf.len()) + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.0.write(data) } fn flush(&mut self) -> std::io::Result<()> { - self.bw.flush() + self.0.flush() } } @@ -709,9 +684,8 @@ impl<'a> ZstdLayerWriter<'a> { /// Consume this writer, flushing buffered data and put the blob in place. pub fn complete(self) -> Result { - let (digest, enc) = self.0.finish(); + let (uncompressed_sha256, enc) = self.0.finish(); let blob = enc.finish()?.complete()?; - let uncompressed_sha256 = Sha256Digest::from_str(&digest).unwrap(); Ok(Layer { blob, uncompressed_sha256, @@ -745,14 +719,12 @@ impl<'a> std::io::Write for ZstdLayerWriter<'a> { } } -#[cfg(feature = "zstd")] /// Wraps a writer and calculates the sha256 digest of data written to the inner writer struct Sha256Writer { inner: W, sha: openssl::sha::Sha256, } -#[cfg(feature = "zstd")] impl Sha256Writer { pub(crate) fn new(inner: W) -> Self { Self { @@ -762,12 +734,12 @@ impl Sha256Writer { } /// Return the hex encoded sha256 digest of the written data, and the underlying writer - pub(crate) fn finish(self) -> (String, W) { - (hex::encode(self.sha.finish()), self.inner) + pub(crate) fn finish(self) -> (Sha256Digest, W) { + let digest = hex::encode(self.sha.finish()); + (Sha256Digest::from_str(&digest).unwrap(), self.inner) } } -#[cfg(feature = "zstd")] impl Write for Sha256Writer where W: Write,