diff --git a/Cargo.toml b/Cargo.toml index 1cd88fa..08dddb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,11 @@ serde_json = "1.0.64" tar = "0.4.38" thiserror = "1" oci-spec = "0.7.0" +zstd = { version = "0.13.2", optional = true } [dev-dependencies] anyhow = "1.0.89" + +[features] +zstd = ["dep:zstd"] +zstdmt = ["zstd", "zstd/zstdmt"] diff --git a/examples/zstd.rs b/examples/zstd.rs new file mode 100644 index 0000000..e7ee133 --- /dev/null +++ b/examples/zstd.rs @@ -0,0 +1,49 @@ +#[cfg(feature = "zstdmt")] +fn main() { + use std::{env, path::PathBuf}; + + use oci_spec::image::Platform; + use ocidir::{cap_std::fs::Dir, new_empty_manifest, OciDir}; + let dir = Dir::open_ambient_dir(env::temp_dir(), ocidir::cap_std::ambient_authority()).unwrap(); + let oci_dir = OciDir::ensure(&dir).unwrap(); + + let mut manifest = new_empty_manifest().build().unwrap(); + let mut config = ocidir::oci_spec::image::ImageConfigurationBuilder::default() + .build() + .unwrap(); + + // Add the src as a layer + let mut writer = oci_dir.create_layer_zstd(Some(0)).unwrap(); + writer + .append_dir_all(".", PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src")) + .unwrap(); + + let layer = writer.into_inner().unwrap().complete().unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "src", None); + + // Add the examples as a layer, using multithreaded compression + let mut writer = oci_dir.create_layer_zstd_multithread(Some(0), 4).unwrap(); + writer + .append_dir_all( + ".", + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples"), + ) + .unwrap(); + let layer = writer.into_inner().unwrap().complete().unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "examples", None); + + println!( + "Created image with manifest: {}", + manifest.to_string_pretty().unwrap() + ); + + // Add the image manifest + let _descriptor = oci_dir + .insert_manifest_and_config(manifest.clone(), config, None, Platform::default()) + .unwrap(); +} + +#[cfg(not(feature = "zstdmt"))] +fn main() { + println!("Run this example with `cargo run --example zstd --features zstdmt`"); +} diff --git a/src/lib.rs b/src/lib.rs index 3245ae3..10516a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,12 +104,14 @@ pub struct Layer { pub blob: Blob, /// The uncompressed digest, which will be used for "diffid"s pub uncompressed_sha256: Sha256Digest, + /// The media type of the layer + pub media_type: MediaType, } impl Layer { /// Return the descriptor for this layer pub fn descriptor(&self) -> oci_image::DescriptorBuilder { - self.blob.descriptor().media_type(MediaType::ImageLayerGzip) + self.blob.descriptor().media_type(self.media_type.clone()) } /// Return a Digest instance for the uncompressed SHA-256. @@ -137,20 +139,11 @@ impl<'a> Debug for BlobWriter<'a> { } /// Create an OCI tar+gzip layer. -pub struct GzipLayerWriter<'a> { - bw: BlobWriter<'a>, - uncompressed_hash: Hasher, - compressor: GzEncoder>, -} +pub struct GzipLayerWriter<'a>(Sha256Writer>>); -impl<'a> Debug for GzipLayerWriter<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("GzipLayerWriter") - .field("bw", &self.bw) - .field("compressor", &self.compressor) - .finish() - } -} +#[cfg(feature = "zstd")] +/// Writer for a OCI tar+zstd layer. +pub struct ZstdLayerWriter<'a>(Sha256Writer>>); #[derive(Debug)] /// An opened OCI directory. @@ -271,8 +264,41 @@ impl OciDir { Ok(tar::Builder::new(self.create_gzip_layer(c)?)) } - /// Add a layer to the top of the image stack. The firsh pushed layer becomes the root. + #[cfg(feature = "zstd")] + /// Create a tar output stream, backed by a zstd compressed blob + /// + /// This method is only available when the `zstd` feature is enabled. + pub fn create_layer_zstd( + &self, + compression_level: Option, + ) -> Result> { + Ok(tar::Builder::new(ZstdLayerWriter::new( + &self.dir, + compression_level, + )?)) + } + + #[cfg(feature = "zstdmt")] + /// Create a tar output stream, backed by a zstd compressed blob + /// using multithreaded compression. + /// + /// The `n_workers` parameter specifies the number of threads to use for compression, per + /// [zstd::Encoder::multithread]] + /// + /// This method is only available when the `zstdmt` feature is enabled. + pub fn create_layer_zstd_multithread( + &self, + compression_level: Option, + n_workers: u32, + ) -> Result> { + Ok(tar::Builder::new(ZstdLayerWriter::multithread( + &self.dir, + compression_level, + n_workers, + )?)) + } + /// Add a layer to the top of the image stack. The firsh pushed layer becomes the root. pub fn push_layer( &self, manifest: &mut oci_image::ImageManifest, @@ -621,40 +647,111 @@ impl<'a> GzipLayerWriter<'a> { /// Create a writer for a gzip compressed layer blob. fn new(ocidir: &'a Dir, c: Option) -> Result { let bw = BlobWriter::new(ocidir)?; - Ok(Self { - bw, - uncompressed_hash: Hasher::new(MessageDigest::sha256())?, - compressor: GzEncoder::new(Vec::with_capacity(8192), c.unwrap_or_default()), - }) + let enc = flate2::write::GzEncoder::new(bw, c.unwrap_or_default()); + Ok(Self(Sha256Writer::new(enc))) } /// Consume this writer, flushing buffered data and put the blob in place. - pub fn complete(mut self) -> Result { - self.compressor.get_mut().clear(); - let buf = self.compressor.finish()?; - self.bw.write_all(&buf)?; - let blob = self.bw.complete()?; - let uncompressed_sha256 = - Sha256Digest::from_str(&hex::encode(self.uncompressed_hash.finish()?)).unwrap(); + pub fn complete(self) -> Result { + let (uncompressed_sha256, enc) = self.0.finish(); + let blob = enc.finish()?.complete()?; Ok(Layer { blob, uncompressed_sha256, + media_type: MediaType::ImageLayerGzip, }) } } impl<'a> std::io::Write for GzipLayerWriter<'a> { - fn write(&mut self, srcbuf: &[u8]) -> std::io::Result { - self.compressor.get_mut().clear(); - self.compressor.write_all(srcbuf).unwrap(); - self.uncompressed_hash.update(srcbuf)?; - let compressed_buf = self.compressor.get_mut().as_slice(); - self.bw.write_all(compressed_buf)?; - Ok(srcbuf.len()) + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.0.write(data) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +#[cfg(feature = "zstd")] +impl<'a> ZstdLayerWriter<'a> { + /// Create a writer for a gzip compressed layer blob. + fn new(ocidir: &'a Dir, c: Option) -> Result { + let bw = BlobWriter::new(ocidir)?; + let encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?; + Ok(Self(Sha256Writer::new(encoder))) + } + + /// Consume this writer, flushing buffered data and put the blob in place. + pub fn complete(self) -> Result { + let (uncompressed_sha256, enc) = self.0.finish(); + let blob = enc.finish()?.complete()?; + Ok(Layer { + blob, + uncompressed_sha256, + media_type: MediaType::ImageLayerZstd, + }) + } +} + +#[cfg(feature = "zstdmt")] +impl<'a> ZstdLayerWriter<'a> { + /// Create a writer for a zstd compressed layer blob, with multithreaded compression enabled. + /// + /// The `n_workers` parameter specifies the number of threads to use for compression, per + /// [Encoder::multithread]] + fn multithread(ocidir: &'a Dir, c: Option, n_workers: u32) -> Result { + let bw = BlobWriter::new(ocidir)?; + let mut encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?; + encoder.multithread(n_workers)?; + Ok(Self(Sha256Writer::new(encoder))) + } +} + +#[cfg(feature = "zstd")] +impl<'a> std::io::Write for ZstdLayerWriter<'a> { + fn write(&mut self, data: &[u8]) -> std::io::Result { + self.0.write(data) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +/// Wraps a writer and calculates the sha256 digest of data written to the inner writer +struct Sha256Writer { + inner: W, + sha: openssl::sha::Sha256, +} + +impl Sha256Writer { + pub(crate) fn new(inner: W) -> Self { + Self { + inner, + sha: openssl::sha::Sha256::new(), + } + } + + /// Return the hex encoded sha256 digest of the written data, and the underlying writer + pub(crate) fn finish(self) -> (Sha256Digest, W) { + let digest = hex::encode(self.sha.finish()); + (Sha256Digest::from_str(&digest).unwrap(), self.inner) + } +} + +impl Write for Sha256Writer +where + W: Write, +{ + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let len = self.inner.write(buf)?; + self.sha.update(&buf[..len]); + Ok(len) } fn flush(&mut self) -> std::io::Result<()> { - self.bw.flush() + self.inner.flush() } }