Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for zstd layers #24

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ serde_json = "1.0.64"
tar = "0.4.38"
thiserror = "1"
oci-spec = "0.7.0"
zstd = { version = "0.13.2", optional = true }

[dev-dependencies]
anyhow = "1.0.89"

[features]
zstd = ["dep:zstd"]
zstdmt = ["zstd", "zstd/zstdmt"]
49 changes: 49 additions & 0 deletions examples/zstd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#[cfg(feature = "zstdmt")]
fn main() {
use std::{env, path::PathBuf};

use oci_spec::image::Platform;
use ocidir::{cap_std::fs::Dir, new_empty_manifest, OciDir};
let dir = Dir::open_ambient_dir(env::temp_dir(), ocidir::cap_std::ambient_authority()).unwrap();
let oci_dir = OciDir::ensure(&dir).unwrap();

let mut manifest = new_empty_manifest().build().unwrap();
let mut config = ocidir::oci_spec::image::ImageConfigurationBuilder::default()
.build()
.unwrap();

// Add the src as a layer
let mut writer = oci_dir.create_layer_zstd(Some(0)).unwrap();
writer
.append_dir_all(".", PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("src"))
.unwrap();

let layer = writer.into_inner().unwrap().complete().unwrap();
oci_dir.push_layer(&mut manifest, &mut config, layer, "src", None);

// Add the examples as a layer, using multithreaded compression
let mut writer = oci_dir.create_layer_zstd_multithread(Some(0), 4).unwrap();
writer
.append_dir_all(
".",
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("examples"),
)
.unwrap();
let layer = writer.into_inner().unwrap().complete().unwrap();
oci_dir.push_layer(&mut manifest, &mut config, layer, "examples", None);

println!(
"Created image with manifest: {}",
manifest.to_string_pretty().unwrap()
);

// Add the image manifest
let _descriptor = oci_dir
.insert_manifest_and_config(manifest.clone(), config, None, Platform::default())
.unwrap();
}

#[cfg(not(feature = "zstdmt"))]
fn main() {
println!("Run this example with `cargo run --example zstd --features zstdmt`");
}
167 changes: 132 additions & 35 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ pub struct Layer {
pub blob: Blob,
/// The uncompressed digest, which will be used for "diffid"s
pub uncompressed_sha256: Sha256Digest,
/// The media type of the layer
pub media_type: MediaType,
}

impl Layer {
/// Return the descriptor for this layer
pub fn descriptor(&self) -> oci_image::DescriptorBuilder {
self.blob.descriptor().media_type(MediaType::ImageLayerGzip)
self.blob.descriptor().media_type(self.media_type.clone())
}

/// Return a Digest instance for the uncompressed SHA-256.
Expand Down Expand Up @@ -137,20 +139,11 @@ impl<'a> Debug for BlobWriter<'a> {
}

/// Create an OCI tar+gzip layer.
pub struct GzipLayerWriter<'a> {
bw: BlobWriter<'a>,
uncompressed_hash: Hasher,
compressor: GzEncoder<Vec<u8>>,
}
pub struct GzipLayerWriter<'a>(Sha256Writer<GzEncoder<BlobWriter<'a>>>);

impl<'a> Debug for GzipLayerWriter<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GzipLayerWriter")
.field("bw", &self.bw)
.field("compressor", &self.compressor)
.finish()
}
}
#[cfg(feature = "zstd")]
/// Writer for a OCI tar+zstd layer.
pub struct ZstdLayerWriter<'a>(Sha256Writer<zstd::Encoder<'static, BlobWriter<'a>>>);

#[derive(Debug)]
/// An opened OCI directory.
Expand Down Expand Up @@ -271,8 +264,41 @@ impl OciDir {
Ok(tar::Builder::new(self.create_gzip_layer(c)?))
}

/// Add a layer to the top of the image stack. The firsh pushed layer becomes the root.
#[cfg(feature = "zstd")]
/// Create a tar output stream, backed by a zstd compressed blob
///
/// This method is only available when the `zstd` feature is enabled.
pub fn create_layer_zstd(
&self,
compression_level: Option<i32>,
) -> Result<tar::Builder<ZstdLayerWriter>> {
Ok(tar::Builder::new(ZstdLayerWriter::new(
&self.dir,
compression_level,
)?))
}

#[cfg(feature = "zstdmt")]
/// Create a tar output stream, backed by a zstd compressed blob
/// using multithreaded compression.
///
/// The `n_workers` parameter specifies the number of threads to use for compression, per
/// [zstd::Encoder::multithread]]
///
/// This method is only available when the `zstdmt` feature is enabled.
pub fn create_layer_zstd_multithread(
&self,
compression_level: Option<i32>,
n_workers: u32,
) -> Result<tar::Builder<ZstdLayerWriter>> {
Ok(tar::Builder::new(ZstdLayerWriter::multithread(
&self.dir,
compression_level,
n_workers,
)?))
}

/// Add a layer to the top of the image stack. The firsh pushed layer becomes the root.
pub fn push_layer(
&self,
manifest: &mut oci_image::ImageManifest,
Expand Down Expand Up @@ -621,40 +647,111 @@ impl<'a> GzipLayerWriter<'a> {
/// Create a writer for a gzip compressed layer blob.
fn new(ocidir: &'a Dir, c: Option<flate2::Compression>) -> Result<Self> {
let bw = BlobWriter::new(ocidir)?;
Ok(Self {
bw,
uncompressed_hash: Hasher::new(MessageDigest::sha256())?,
compressor: GzEncoder::new(Vec::with_capacity(8192), c.unwrap_or_default()),
})
let enc = flate2::write::GzEncoder::new(bw, c.unwrap_or_default());
Ok(Self(Sha256Writer::new(enc)))
}

/// Consume this writer, flushing buffered data and put the blob in place.
pub fn complete(mut self) -> Result<Layer> {
self.compressor.get_mut().clear();
let buf = self.compressor.finish()?;
self.bw.write_all(&buf)?;
let blob = self.bw.complete()?;
let uncompressed_sha256 =
Sha256Digest::from_str(&hex::encode(self.uncompressed_hash.finish()?)).unwrap();
pub fn complete(self) -> Result<Layer> {
let (uncompressed_sha256, enc) = self.0.finish();
let blob = enc.finish()?.complete()?;
Ok(Layer {
blob,
uncompressed_sha256,
media_type: MediaType::ImageLayerGzip,
})
}
}

impl<'a> std::io::Write for GzipLayerWriter<'a> {
fn write(&mut self, srcbuf: &[u8]) -> std::io::Result<usize> {
self.compressor.get_mut().clear();
self.compressor.write_all(srcbuf).unwrap();
self.uncompressed_hash.update(srcbuf)?;
let compressed_buf = self.compressor.get_mut().as_slice();
self.bw.write_all(compressed_buf)?;
Ok(srcbuf.len())
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.0.write(data)
}

fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}

#[cfg(feature = "zstd")]
impl<'a> ZstdLayerWriter<'a> {
/// Create a writer for a gzip compressed layer blob.
fn new(ocidir: &'a Dir, c: Option<i32>) -> Result<Self> {
let bw = BlobWriter::new(ocidir)?;
let encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?;
Ok(Self(Sha256Writer::new(encoder)))
}

/// Consume this writer, flushing buffered data and put the blob in place.
pub fn complete(self) -> Result<Layer> {
let (uncompressed_sha256, enc) = self.0.finish();
let blob = enc.finish()?.complete()?;
Ok(Layer {
blob,
uncompressed_sha256,
media_type: MediaType::ImageLayerZstd,
})
}
}

#[cfg(feature = "zstdmt")]
impl<'a> ZstdLayerWriter<'a> {
/// Create a writer for a zstd compressed layer blob, with multithreaded compression enabled.
///
/// The `n_workers` parameter specifies the number of threads to use for compression, per
/// [Encoder::multithread]]
fn multithread(ocidir: &'a Dir, c: Option<i32>, n_workers: u32) -> Result<Self> {
let bw = BlobWriter::new(ocidir)?;
let mut encoder = zstd::Encoder::new(bw, c.unwrap_or(0))?;
encoder.multithread(n_workers)?;
Ok(Self(Sha256Writer::new(encoder)))
}
}

#[cfg(feature = "zstd")]
impl<'a> std::io::Write for ZstdLayerWriter<'a> {
fn write(&mut self, data: &[u8]) -> std::io::Result<usize> {
self.0.write(data)
}

fn flush(&mut self) -> std::io::Result<()> {
self.0.flush()
}
}

/// Wraps a writer and calculates the sha256 digest of data written to the inner writer
struct Sha256Writer<W> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this could be factored out and reused for gzip too right?

Copy link
Collaborator Author

@tofay tofay Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done. didn't do it for blobwriter since that also determines the size (and I thought adding a SizeWriter that keeps track of data size, and making blobwriter have a SizeWriter or similar was too much!)

inner: W,
sha: openssl::sha::Sha256,
}

impl<W> Sha256Writer<W> {
pub(crate) fn new(inner: W) -> Self {
Self {
inner,
sha: openssl::sha::Sha256::new(),
}
}

/// Return the hex encoded sha256 digest of the written data, and the underlying writer
pub(crate) fn finish(self) -> (Sha256Digest, W) {
let digest = hex::encode(self.sha.finish());
(Sha256Digest::from_str(&digest).unwrap(), self.inner)
}
}

impl<W> Write for Sha256Writer<W>
where
W: Write,
{
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let len = self.inner.write(buf)?;
self.sha.update(&buf[..len]);
Comment on lines +748 to +749
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you know what, this is clearly better code than what it replaces. We need to handle short writes, and I think that's what I messed up in the original.

Ok(len)
}

fn flush(&mut self) -> std::io::Result<()> {
self.bw.flush()
self.inner.flush()
}
}

Expand Down