From 464511d300f22a4419ec8bfe7cf1547395490fb0 Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Tue, 25 Mar 2025 04:09:46 +0000 Subject: [PATCH 1/4] regression test for #7796 --- sled-agent/src/artifact_store.rs | 93 +++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index ab66b0b278..7ee5b577f7 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -927,8 +927,9 @@ mod test { use std::collections::BTreeSet; use std::sync::Arc; + use bytes::Bytes; use camino_tempfile::Utf8TempDir; - use futures::stream; + use futures::stream::{self, StreamExt}; use hex_literal::hex; use omicron_common::disk::{ DatasetConfig, DatasetKind, DatasetName, DatasetsConfig, @@ -941,6 +942,7 @@ mod test { use sled_agent_api::ArtifactConfig; use sled_storage::error::Error as StorageError; use tokio::io::AsyncReadExt; + use tokio::sync::oneshot; use super::{ArtifactStore, DatasetsManager, Error}; @@ -1257,4 +1259,93 @@ mod test { log.cleanup_successful(); } + + #[tokio::test] + async fn issue_7796() { + // Tests a specific multi-writer issue described in omicron#7796. + // Previously, creating a writer would create temporary files at + // `tmp/{sha256}`, returning an error if there was already a writer in + // progress. However a `tempfile::TempPath` was created before this and + // dropped when an error was returned, deleting the temporary file in + // use by the previous writer. + // + // One manifestation of the issue is: + // 1. Writer A is created and starts writing. + // 2. Writer B fails to create, returning `AlreadyInProgress`. This + // triggers a logic error where a `TempPath` is dropped, unlinking + // one of writer A's temporary files. + // 2. Writer C fails to create, returning `AlreadyInProgress`. Similarly + // to writer B, this unlinks the other of writer A's temporary files. + // 3. Writer D is created successfully because writer A's files are no + // longer present. + // 4. Writer A finishes and incorrectly persists writer C's incomplete + // files. + // 5. Writer D finishes and fails because its files have already been + // moved. + // + // We no longer use a temporary file with a known name or fail if + // another writer is already in progress, but it's good to have + // regression tests. + + let log = test_setup_log("issue_7796"); + let backend = TestBackend::new(2); + let store = ArtifactStore::new(&log.log, backend).await.unwrap(); + + let mut config = ArtifactConfig { + generation: 1u32.into(), + artifacts: BTreeSet::new(), + }; + config.artifacts.insert(TEST_HASH); + store.put_config(config.clone()).await.unwrap(); + + // Start the first writer. + let first_writer = store.writer(TEST_HASH, 1u32.into()).await.unwrap(); + // Start two additional writers and immediately drop them. Currently + // both are successful. In the omicron#7796 implementation both fail + // with `AlreadyInProgress`. Two writers are necessary to delete both of + // the temporary files from the first writer. + for _ in 0..2 { + let _ = store.writer(TEST_HASH, 1u32.into()).await; + } + // Start a fourth writer and partially write to it. + let fourth_writer = store.writer(TEST_HASH, 1u32.into()).await.unwrap(); + let (tx, rx) = oneshot::channel(); + let stream = stream::once(async { Ok(Bytes::from_static(b"I'm an ")) }) + .chain(stream::once(async { + rx.await.unwrap(); + Ok(Bytes::from_static(b"artifact!\n")) + })); + let handle = + tokio::task::spawn(fourth_writer.write_stream(Box::pin(stream))); + // Write to the first writer. + first_writer + .write_stream(Box::pin(stream::once(async { Ok(TEST_ARTIFACT) }))) + .await + .unwrap(); + // The artifacts should be complete. + for mountpoint in store.storage.artifact_storage_paths().await.unwrap() + { + assert_eq!( + tokio::fs::read(mountpoint.join(TEST_HASH.to_string())) + .await + .unwrap(), + TEST_ARTIFACT + ); + } + // Allow the fourth writer to finish. It should not have failed. + tx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + // The artifacts should still be complete. + for mountpoint in store.storage.artifact_storage_paths().await.unwrap() + { + assert_eq!( + tokio::fs::read(mountpoint.join(TEST_HASH.to_string())) + .await + .unwrap(), + TEST_ARTIFACT + ); + } + + log.cleanup_successful(); + } } From f67a545c5fd5463aaab050e8cc36c56deb9d733e Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Tue, 25 Mar 2025 04:09:32 +0000 Subject: [PATCH 2/4] artifact store: use the atomicwrites crate --- Cargo.lock | 1 + sled-agent/Cargo.toml | 1 + sled-agent/src/artifact_store.rs | 309 +++++++++++++------------------ 3 files changed, 132 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dff1bec8cf..9f86c4ee2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7363,6 +7363,7 @@ dependencies = [ "anyhow", "assert_matches", "async-trait", + "atomicwrites", "base64 0.22.1", "bootstore", "bootstrap-agent-api", diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index b8ce58c6cb..dfd3191019 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -11,6 +11,7 @@ workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +atomicwrites.workspace = true base64.workspace = true bootstore.workspace = true bootstrap-agent-api.workspace = true diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index 7ee5b577f7..fe614e551c 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -18,13 +18,14 @@ //! Nexus and handled by the Sled Agent API. use std::future::Future; -use std::io::ErrorKind; +use std::io::{ErrorKind, Write}; use std::net::SocketAddrV6; use std::str::FromStr; use std::time::Duration; +use atomicwrites::{AtomicFile, OverwriteBehavior}; +use bytes::Bytes; use camino::Utf8PathBuf; -use camino_tempfile::{NamedUtf8TempFile, Utf8TempPath}; use dropshot::{ Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, Path, RequestContext, ServerBuilder, StreamingBody, @@ -44,9 +45,9 @@ use sled_storage::error::Error as StorageError; use sled_storage::manager::StorageHandle; use slog::{Logger, error, info}; use slog_error_chain::{InlineErrorChain, SlogInlineError}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::AsyncWriteExt; +use tokio::fs::File; use tokio::sync::{mpsc, oneshot, watch}; +use tokio::task::JoinSet; // These paths are defined under the artifact storage dataset. They // cannot conflict with any artifact paths because all artifact paths are @@ -210,13 +211,19 @@ macro_rules! log_io_err { }; } -macro_rules! log_and_store { - ($last_error:expr, $log:expr, $verb:literal, $path:expr, $err:expr) => {{ +macro_rules! log_io_err_into { + ($log:expr, $verb:literal, $path:expr, $err:expr) => {{ log_io_err!($log, $verb, $path, $err); - $last_error = Some(Error::File { verb: $verb, path: $path, err: $err }); + Error::File { verb: $verb, path: $path, err: $err } }}; } +macro_rules! log_and_store { + ($last_error:expr, $log:expr, $verb:literal, $path:expr, $err:expr) => { + $last_error = Some(log_io_err_into!($log, $verb, $path, $err)) + }; +} + impl ArtifactStore { /// Get the current [`ArtifactConfig`]. pub(crate) fn get_config(&self) -> Option { @@ -342,11 +349,9 @@ impl ArtifactStore { return Err(Error::NoConfig); } - let mut files = Vec::new(); + let mut writer = ArtifactWriter::new(self.log.clone(), sha256); let mut last_error = None; - let mut datasets = 0; for mountpoint in self.storage.artifact_storage_paths().await? { - datasets += 1; let temp_dir = mountpoint.join(TEMP_SUBDIR); if let Err(err) = tokio::fs::create_dir(&temp_dir).await { if err.kind() != ErrorKind::AlreadyExists { @@ -356,42 +361,12 @@ impl ArtifactStore { continue; } } - - let temp_path = - Utf8TempPath::from_path(temp_dir.join(sha256.to_string())); - let file = match OpenOptions::new() - .write(true) - .create_new(true) - .open(&temp_path) - .await - { - Ok(file) => file, - Err(err) => { - if err.kind() == ErrorKind::AlreadyExists { - return Err(Error::AlreadyInProgress { sha256 }); - } else { - let path = temp_path.to_path_buf(); - log_and_store!( - last_error, &self.log, "create", path, err - ); - continue; - } - } - }; - let file = NamedUtf8TempFile::from_parts(file, temp_path); - - files.push(Some((file, mountpoint))); + writer.add_path(mountpoint, temp_dir); } - if files.is_empty() { + if writer.write_tasks.is_empty() { Err(last_error.unwrap_or(Error::NoUpdateDataset)) } else { - Ok(ArtifactWriter { - datasets, - hasher: Sha256::new(), - files, - log: self.log.clone(), - sha256, - }) + Ok(writer) } } @@ -404,7 +379,7 @@ impl ArtifactStore { ) -> Result { self.writer(sha256, generation) .await? - .write_stream(body.into_stream().map_err(Error::Body)) + .write_stream(Box::pin(body.into_stream().map_err(Error::Body))) .await } @@ -635,141 +610,129 @@ impl DatasetsManager for StorageHandle { /// Abstraction that handles writing to several temporary files. #[derive(Debug)] struct ArtifactWriter { - datasets: usize, - files: Vec, Utf8PathBuf)>>, - hasher: Sha256, + senders: Vec>, + write_tasks: JoinSet>, log: Logger, sha256: ArtifactHash, } impl ArtifactWriter { - /// Calls [`ArtifactWriter::write`] for each chunk in the stream, then - /// [`ArtifactWriter::finalize`]. See the documentation for these functions - /// for error handling information. - async fn write_stream( - self, - stream: impl Stream, Error>>, - ) -> Result { - let writer = stream - .try_fold(self, |mut writer, chunk| async { - writer.write(chunk).await?; - Ok(writer) - }) - .await?; - writer.finalize().await + fn new(log: Logger, sha256: ArtifactHash) -> ArtifactWriter { + ArtifactWriter { + senders: Vec::new(), + write_tasks: JoinSet::new(), + log, + sha256, + } } - /// Write `chunk` to all temporary files. - /// - /// Errors in this method are considered non-fatal errors. All errors - /// are logged. If all files have failed, this method returns the most - /// recently-seen non-fatal error as a fatal error. - async fn write(&mut self, chunk: impl AsRef<[u8]>) -> Result<(), Error> { - self.hasher.update(&chunk); - - let mut last_error = None; - for option in &mut self.files { - if let Some((mut file, mountpoint)) = option.take() { - match file.as_file_mut().write_all(chunk.as_ref()).await { - Ok(()) => { - *option = Some((file, mountpoint)); + fn add_path(&mut self, mountpoint: Utf8PathBuf, temp_dir: Utf8PathBuf) { + let log = self.log.clone(); + let path = mountpoint.join(self.sha256.to_string()); + let atomic_file = AtomicFile::new_with_tmpdir( + &path, + OverwriteBehavior::AllowOverwrite, + temp_dir, + ); + let (tx, mut rx) = mpsc::channel(16); // TODO + let expected = self.sha256; + self.senders.push(tx); + self.write_tasks.spawn_blocking(move || { + let moved_path = path.clone(); + atomic_file + .write(|file| { + let mut hasher = Sha256::new(); + while let Some(bytes) = rx.blocking_recv() { + hasher.update(&bytes); + if let Err(err) = file.write_all(&bytes) { + return Err(log_io_err_into!( + log, + "write to temporary file for", + moved_path, + err + )); + } } - Err(err) => { - let path = file.path().to_owned(); - log_and_store!( - last_error, &self.log, "write to", path, err - ); - // `file` and `final_path` are dropped here, cleaning up - // the file + let actual = ArtifactHash(hasher.finalize().into()); + if expected == actual { + Ok(()) + } else { + Err(Error::HashMismatch { expected, actual }) } - } - } - } - - self.files.retain(Option::is_some); - if self.files.is_empty() { - Err(last_error.unwrap_or(Error::NoUpdateDataset)) - } else { - Ok(()) - } + }) + .map_err(|err| match err { + atomicwrites::Error::Internal(err) => { + log_io_err_into!( + log, + "create or persist temporary file for", + path, + err + ) + } + atomicwrites::Error::User(err) => err, + }) + }); } - /// Rename all files to their final paths. - /// - /// Errors in this method are considered non-fatal errors. If all files have - /// failed in some way, the most recently-seen error is returned as a fatal - /// error. - async fn finalize(self) -> Result { - let digest = self.hasher.finalize(); - if digest.as_slice() != self.sha256.as_ref() { - return Err(Error::HashMismatch { - expected: self.sha256, - actual: ArtifactHash(digest.into()), - }); + async fn write_stream( + mut self, + mut stream: impl Stream> + Unpin, + ) -> Result { + let mut swap = Vec::with_capacity(self.senders.len()); + while let Some(chunk) = stream.try_next().await? { + // Send the chunk to all the write tasks, pruning any that failed + // because the other end hung up. + for sender in self.senders.drain(..) { + if sender.send(chunk.clone()).await.is_ok() { + swap.push(sender); + } + } + if swap.is_empty() { + // All tasks have hung up! + warn!( + &self.log, + "All writer tasks have hung up"; + "sha256" => &self.sha256.to_string() + ); + break; + } + std::mem::swap(&mut self.senders, &mut swap); } + // Drop the senders so that tasks stop waiting for new chunks. + drop(self.senders); + // `swap` should be empty, but drop it early just in case. + debug_assert!(swap.is_empty()); + drop(swap); - let mut last_error = None; + let datasets = self.write_tasks.len(); let mut successful_writes = 0; - for (mut file, mountpoint) in self.files.into_iter().flatten() { - // 1. fsync the temporary file. - if let Err(err) = file.as_file_mut().sync_all().await { - let path = file.path().to_owned(); - log_and_store!(last_error, &self.log, "sync", path, err); - continue; - } - // 2. Open the parent directory so we can fsync it. - let parent_dir = match File::open(&mountpoint).await { - Ok(dir) => dir, - Err(err) => { - log_and_store!( - last_error, &self.log, "open", mountpoint, err + let mut last_error = None; + while let Some(result) = self.write_tasks.join_next().await { + match result + .map_err(|err| { + error!( + &self.log, + "Failed to join artifact write task"; + "error" => InlineErrorChain::new(&err), + "sha256" => &self.sha256.to_string(), ); - continue; - } - }; - // 3. Rename the temporary file. - let final_path = mountpoint.join(self.sha256.to_string()); - let moved_final_path = final_path.clone(); - if let Err(err) = tokio::task::spawn_blocking(move || { - file.persist(&moved_final_path) - }) - .await? + Error::Join(err) + }) + .and_then(|r| r) { - error!( - &self.log, - "Failed to rename temporary file"; - "error" => &err.error, - "from" => err.file.path().as_str(), - "to" => final_path.as_str(), - ); - last_error = Some(Error::FileRename { - from: err.file.path().to_owned(), - to: final_path, - err: err.error, - }); - continue; + Ok(()) => successful_writes += 1, + Err(err) => last_error = Some(err), } - // 4. fsync the parent directory. - if let Err(err) = parent_dir.sync_all().await { - log_and_store!(last_error, &self.log, "sync", mountpoint, err); - continue; - } - - successful_writes += 1; } - if successful_writes > 0 { info!( &self.log, "Wrote artifact"; "sha256" => &self.sha256.to_string(), - "datasets" => self.datasets, + "datasets" => datasets, "successful_writes" => successful_writes, ); - Ok(ArtifactPutResponse { - datasets: self.datasets, - successful_writes, - }) + Ok(ArtifactPutResponse { datasets, successful_writes }) } else { Err(last_error.unwrap_or(Error::NoUpdateDataset)) } @@ -799,9 +762,6 @@ impl RepoDepotApi for RepoDepotImpl { #[derive(Debug, thiserror::Error, SlogInlineError)] pub(crate) enum Error { - #[error("Another task is already writing artifact {sha256}")] - AlreadyInProgress { sha256: ArtifactHash }, - #[error("Error while reading request body")] Body(dropshot::HttpError), @@ -824,14 +784,6 @@ pub(crate) enum Error { err: std::io::Error, }, - #[error("Failed to rename `{from}` to `{to}`")] - FileRename { - from: Utf8PathBuf, - to: Utf8PathBuf, - #[source] - err: std::io::Error, - }, - #[error( "Attempt to modify config to generation {attempted_generation} \ while at {current_generation}" @@ -854,7 +806,7 @@ pub(crate) enum Error { HashMismatch { expected: ArtifactHash, actual: ArtifactHash }, #[error("Blocking task failed")] - Join(#[from] tokio::task::JoinError), + Join(#[source] tokio::task::JoinError), #[error("Failed to commit ledger")] LedgerCommit(#[from] omicron_common::ledger::Error), @@ -894,13 +846,11 @@ impl From for HttpError { dropshot::ClientErrorStatusCode::CONFLICT, err.to_string(), ), - Error::AlreadyInProgress { .. } | Error::GenerationPut { .. } => { - HttpError::for_client_error( - None, - dropshot::ClientErrorStatusCode::CONFLICT, - err.to_string(), - ) - } + Error::GenerationPut { .. } => HttpError::for_client_error( + None, + dropshot::ClientErrorStatusCode::CONFLICT, + err.to_string(), + ), // 5xx errors: ensure the error chain is logged Error::Body(inner) => inner, @@ -912,7 +862,6 @@ impl From for HttpError { } Error::DepotCopy { .. } | Error::File { .. } - | Error::FileRename { .. } | Error::Join(_) | Error::LedgerCommit(_) | Error::LedgerChannel => HttpError::for_internal_error( @@ -995,7 +944,7 @@ mod test { } } - const TEST_ARTIFACT: &[u8] = b"I'm an artifact!\n"; + const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n"); const TEST_HASH: ArtifactHash = ArtifactHash(hex!( "ab3581cd62f6645518f61a8e4391af6c062d5d60111edb0e51b37bd84827f5b4" )); @@ -1137,7 +1086,9 @@ mod test { .writer(TEST_HASH, 1u32.into()) .await .unwrap() - .write_stream(stream::once(async { Ok(TEST_ARTIFACT) })) + .write_stream(Box::pin(stream::once(async { + Ok(TEST_ARTIFACT) + }))) .await .unwrap(); // list lists the file @@ -1240,9 +1191,9 @@ mod test { .writer(TEST_HASH, 1u32.into()) .await .unwrap() - .write_stream(stream::once(async { - Ok(b"This isn't right at all.") - })) + .write_stream(Box::pin(stream::once(async { + Ok(Bytes::from_static(b"This isn't right at all.")) + }))) .await .unwrap_err(); match err { From 688ff0101336d94792f22597d2bb9b3b008f3818 Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Tue, 25 Mar 2025 04:52:54 +0000 Subject: [PATCH 3/4] remove the read timeout for copy requests --- sled-agent/src/artifact_store.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index fe614e551c..a10e4335e7 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -152,7 +152,6 @@ impl ArtifactStore { log, reqwest_client: reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(15)) - .read_timeout(Duration::from_secs(15)) .build() .unwrap(), ledger_tx, From 36faafb666e626cf55506ab85f65e3e830c6d8f9 Mon Sep 17 00:00:00 2001 From: iliana etaoin Date: Tue, 25 Mar 2025 04:54:33 +0000 Subject: [PATCH 4/4] fix comment numbering --- sled-agent/src/artifact_store.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sled-agent/src/artifact_store.rs b/sled-agent/src/artifact_store.rs index a10e4335e7..fd7a81dc41 100644 --- a/sled-agent/src/artifact_store.rs +++ b/sled-agent/src/artifact_store.rs @@ -1224,13 +1224,13 @@ mod test { // 2. Writer B fails to create, returning `AlreadyInProgress`. This // triggers a logic error where a `TempPath` is dropped, unlinking // one of writer A's temporary files. - // 2. Writer C fails to create, returning `AlreadyInProgress`. Similarly + // 3. Writer C fails to create, returning `AlreadyInProgress`. Similarly // to writer B, this unlinks the other of writer A's temporary files. - // 3. Writer D is created successfully because writer A's files are no + // 4. Writer D is created successfully because writer A's files are no // longer present. - // 4. Writer A finishes and incorrectly persists writer C's incomplete + // 5. Writer A finishes and incorrectly persists writer C's incomplete // files. - // 5. Writer D finishes and fails because its files have already been + // 6. Writer D finishes and fails because its files have already been // moved. // // We no longer use a temporary file with a known name or fail if