diff --git a/Cargo.lock b/Cargo.lock index 9f48f2e63..addf2005d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3847,6 +3847,7 @@ version = "0.0.1" dependencies = [ "anyhow", "chrono", + "crucible-common", "crucible-workspace-hack", "percent-encoding", "progenitor", @@ -3854,6 +3855,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "uuid", ] [[package]] diff --git a/common/src/lib.rs b/common/src/lib.rs index af8d06909..b21ed0500 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -158,6 +158,9 @@ pub enum CrucibleError { #[error("missing block context for non-empty block")] MissingBlockContext, + + #[error("Incompatible RegionDefinition {0}")] + RegionIncompatible(String), } impl From for CrucibleError { @@ -363,6 +366,7 @@ impl From for dropshot::HttpError { | CrucibleError::ModifyingReadOnlyRegion | CrucibleError::OffsetInvalid | CrucibleError::OffsetUnaligned + | CrucibleError::RegionIncompatible(_) | CrucibleError::ReplaceRequestInvalid(_) | CrucibleError::SnapshotExistsAlready(_) | CrucibleError::Unsupported(_) => { diff --git a/common/src/region.rs b/common/src/region.rs index 46c2f4371..a0eae700b 100644 --- a/common/src/region.rs +++ b/common/src/region.rs @@ -1,5 +1,6 @@ // Copyright 2021 Oxide Computer Company use anyhow::{bail, Result}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -17,7 +18,16 @@ use super::*; * downstairs expects Block { 2, 12 }. */ #[derive( - Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, + Deserialize, + Serialize, + Copy, + Clone, + Debug, + PartialEq, + Eq, + JsonSchema, + PartialOrd, + Ord, )] pub struct Block { // Value could mean a size or offset @@ -118,7 +128,7 @@ impl Block { } } -#[derive(Deserialize, Serialize, Copy, Clone, Debug, PartialEq)] +#[derive(Deserialize, Serialize, Copy, Clone, Debug, JsonSchema, PartialEq)] pub struct RegionDefinition { /** * The size of each block in bytes. Must be a power of 2, minimum 512. @@ -170,6 +180,55 @@ impl RegionDefinition { }) } + // Compare two RegionDefinitions and verify they are compatible. + // compatible is valid if all fields are the same, expect for the + // UUID. The UUID should be different. + pub fn compatible( + self, + other: RegionDefinition, + ) -> Result<(), CrucibleError> { + // These fields should be the same. + if self.block_size != other.block_size { + return Err(CrucibleError::RegionIncompatible( + "block_size".to_string(), + )); + } + if self.extent_size != other.extent_size { + return Err(CrucibleError::RegionIncompatible( + "extent_size".to_string(), + )); + } + if self.extent_count != other.extent_count { + return Err(CrucibleError::RegionIncompatible( + "extent_count".to_string(), + )); + } + if self.encrypted != other.encrypted { + return Err(CrucibleError::RegionIncompatible( + "encrypted".to_string(), + )); + } + if self.database_read_version != other.database_read_version { + return Err(CrucibleError::RegionIncompatible( + "database_read_version".to_string(), + )); + } + if self.database_write_version != other.database_write_version { + return Err(CrucibleError::RegionIncompatible( + "database_write_version".to_string(), + )); + } + + // If the UUIDs are the same, this is invalid. + if self.uuid == other.uuid { + return Err(CrucibleError::RegionIncompatible( + "UUIDs are the same".to_string(), + )); + } + + Ok(()) + } + pub fn database_read_version(&self) -> usize { self.database_read_version } @@ -489,4 +548,113 @@ mod test { */ assert!(rd.validate_io(Block::new(1, 9), 2048).is_err()); } + + fn test_rd() -> RegionDefinition { + RegionDefinition { + block_size: 512, + extent_size: Block::new(10, 9), + extent_count: 8, + uuid: Uuid::new_v4(), + encrypted: false, + database_read_version: DATABASE_READ_VERSION, + database_write_version: DATABASE_WRITE_VERSION, + } + } + + #[test] + fn test_region_compare_block() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + // Basic positive test first. + assert_eq!(rd1.compatible(rd2), Ok(())); + + rd1.block_size = 4096; + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.block_size = 4096; + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_extent_size() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.extent_size = Block::new(2, 9); + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.extent_size = Block::new(2, 9); + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_extent_count() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.extent_count = 9; + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.extent_count = 9; + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_uuid() { + // Verify region compare, UUIDs must be different + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.uuid = rd2.uuid; + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_encrypted() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.encrypted = true; + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.encrypted = true; + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_db_read_version() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.database_read_version = DATABASE_READ_VERSION + 1; + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.database_read_version = DATABASE_READ_VERSION + 1; + assert!(rd1.compatible(rd2).is_err()); + } + + #[test] + fn test_region_compare_db_write_version() { + let mut rd1 = test_rd(); + let rd2 = test_rd(); + + rd1.database_write_version = DATABASE_WRITE_VERSION + 1; + assert!(rd1.compatible(rd2).is_err()); + + let rd1 = test_rd(); + let mut rd2 = test_rd(); + rd2.database_write_version = DATABASE_WRITE_VERSION + 1; + assert!(rd1.compatible(rd2).is_err()); + } } diff --git a/downstairs/src/extent.rs b/downstairs/src/extent.rs index 345751233..322fa4dcc 100644 --- a/downstairs/src/extent.rs +++ b/downstairs/src/extent.rs @@ -144,7 +144,7 @@ impl ExtentMeta { } } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] #[allow(clippy::enum_variant_names)] pub enum ExtentType { Data, @@ -169,7 +169,7 @@ impl fmt::Display for ExtentType { * FileType from the repair client. */ impl ExtentType { - pub fn to_file_type(&self) -> FileType { + pub fn to_file_type(self) -> FileType { match self { ExtentType::Data => FileType::Data, ExtentType::Db => FileType::Db, @@ -308,7 +308,7 @@ impl Extent { "Extent {} found replacement dir, finishing replacement", number ); - move_replacement_extent(dir, number as usize, log)?; + move_replacement_extent(dir, number as usize, log, false)?; } // We will migrate every read-write extent with a SQLite file present. @@ -618,6 +618,7 @@ pub(crate) fn move_replacement_extent>( region_dir: P, eid: usize, log: &Logger, + clone: bool, ) -> Result<(), CrucibleError> { let destination_dir = extent_dir(®ion_dir, eid as u32); let extent_file_name = extent_file_name(eid as u32, ExtentType::Data); @@ -653,13 +654,31 @@ pub(crate) fn move_replacement_extent>( sync_path(&original_file, log)?; // We distinguish between SQLite-backend and raw-file extents based on the - // presence of the `.db` file. We should never do live migration across - // different extent formats; in fact, we should never live-migrate - // SQLite-backed extents at all, but must still handle the case of - // unfinished migrations. + // presence of the `.db` file. We should never do extent repair across + // different extent formats; it must be SQLite-to-SQLite or raw-to-raw. + // + // It is uncommon to perform extent repair on SQLite-backed extents at all, + // because they are automatically migrated into raw file extents or + // read-only. However, it must be supported for two cases: + // + // - If there was an unfinished replacement, we must finish that replacement + // before migrating from SQLite -> raw file backend, which happens + // automatically later in startup. + // - We use this same code path to perform clones of read-only regions, + // which may be SQLite-backed (and will never migrate to raw files, + // because they are read only). This is only the case when the `clone` + // argument is `true`. + // + // In the first case, we know that we are repairing an SQLite-based extent + // because the target (original) extent includes a `.db` file. + // + // In the second case, the target (original) extent is not present, so we + // check whether the new files include a `.db` file. + new_file.set_extension("db"); original_file.set_extension("db"); - if original_file.exists() { + + if original_file.exists() || (new_file.exists() && clone) { if let Err(e) = std::fs::copy(new_file.clone(), original_file.clone()) { crucible_bail!( IoError, @@ -690,6 +709,10 @@ pub(crate) fn move_replacement_extent>( } sync_path(&original_file, log)?; } else if original_file.exists() { + // If we are cloning, then our new region will have been + // created with Backend::RawFile, and we should have no SQLite + // files. + assert!(!clone); info!( log, "Remove old file {:?} as there is no replacement", @@ -714,6 +737,10 @@ pub(crate) fn move_replacement_extent>( } sync_path(&original_file, log)?; } else if original_file.exists() { + // If we are cloning, then our new region will have been + // created with Backend::RawFile, and we should have no SQLite + // files. + assert!(!clone); info!( log, "Remove old file {:?} as there is no replacement", diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index eb1585cd2..523e18663 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -16,6 +16,7 @@ use std::time::Duration; use crucible::*; use crucible_common::{build_logger, Block, CrucibleError, MAX_BLOCK_SIZE}; +use repair_client::Client; use anyhow::{bail, Result}; use bytes::BytesMut; @@ -935,7 +936,7 @@ where ); match d .region - .repair_extent(extent_id, source_repair_address) + .repair_extent(extent_id, source_repair_address, false) .await { Ok(()) => Message::RepairAckId { repair_id }, @@ -2304,7 +2305,7 @@ impl Downstairs { Err(CrucibleError::UpstairsInactive) } else { self.region - .repair_extent(*extent, *source_repair_address) + .repair_extent(*extent, *source_repair_address, false) .await }; debug!( @@ -3291,7 +3292,7 @@ pub async fn start_downstairs( * it and wait for another connection. Downstairs can handle * multiple Upstairs connecting but only one active one. */ - info!(log, "listening on {}", listen_on); + info!(log, "downstairs listening on {}", listen_on); loop { let (sock, raddr) = listener.accept().await?; @@ -3350,6 +3351,72 @@ pub async fn start_downstairs( Ok(join_handle) } +/// Clone the extent files in a region from another running downstairs. +/// +/// Use the reconcile/repair extent methods to copy another downstairs. +/// The source downstairs must have the same RegionDefinition as we do, +/// and both downstairs must be running in read only mode. +pub async fn clone_region( + d: Arc>, + source: SocketAddr, +) -> Result<()> { + let info = crucible_common::BuildInfo::default(); + let log = d.lock().await.log.new(o!("task" => "clone".to_string())); + info!(log, "Crucible Version: {}", info); + info!( + log, + "Upstairs <-> Downstairs Message Version: {}", CRUCIBLE_MESSAGE_VERSION + ); + + info!(log, "Connecting to {source} to obtain our extent files."); + + let url = format!("http://{:?}", source); + let repair_server = Client::new(&url); + let source_def = match repair_server.get_region_info().await { + Ok(def) => def.into_inner(), + Err(e) => { + bail!("Failed to get source region definition: {e}"); + } + }; + info!(log, "The source RegionDefinition is: {:?}", source_def); + + let source_ro_mode = match repair_server.get_region_mode().await { + Ok(ro) => ro.into_inner(), + Err(e) => { + bail!("Failed to get source mode: {e}"); + } + }; + + info!(log, "The source mode is: {:?}", source_ro_mode); + if !source_ro_mode { + bail!("Source downstairs is not read only"); + } + + let mut ds = d.lock().await; + + let my_def = ds.region.def(); + info!(log, "my def is {:?}", my_def); + + if let Err(e) = my_def.compatible(source_def) { + bail!("Incompatible region definitions: {e}"); + } + + if let Err(e) = ds.region.close_all_extents().await { + bail!("Failed to close all extents: {e}"); + } + + for eid in 0..my_def.extent_count() as usize { + info!(log, "Repair extent {eid}"); + + if let Err(e) = ds.region.repair_extent(eid, source, true).await { + bail!("repair extent {eid} returned: {e}"); + } + } + info!(log, "Region has been cloned"); + + Ok(()) +} + #[cfg(test)] mod test { use super::*; @@ -6412,7 +6479,7 @@ mod test { Ok(()) } #[tokio::test] - async fn test_version_uprev_compatable() -> Result<()> { + async fn test_version_uprev_compatible() -> Result<()> { // Test sending the +1 version to the DS, but also include the // current version on the supported list. The downstairs should // see that and respond with the version it does support. diff --git a/downstairs/src/main.rs b/downstairs/src/main.rs index 6ad009975..08d3c8e51 100644 --- a/downstairs/src/main.rs +++ b/downstairs/src/main.rs @@ -41,6 +41,22 @@ impl std::str::FromStr for Mode { #[derive(Debug, Parser)] #[clap(about = "disk-side storage component")] enum Args { + /// Clone the extents from another region into this region. + /// + /// The other downstairs should be running read-only. All data in + /// the region here will be replaced. + Clone { + /// Directory where the region is located. + #[clap(short, long, name = "DIRECTORY", action)] + data: PathBuf, + + /// Source IP:Port where the extent files will come from. + #[clap(short, long, action)] + source: SocketAddr, + + #[clap(short, long, action)] + trace_endpoint: Option, + }, Create { #[clap(long, default_value = "512", action)] block_size: u64, @@ -239,6 +255,41 @@ async fn main() -> Result<()> { let log = build_logger(); match args { + Args::Clone { + data, + source, + trace_endpoint, + } => { + // Instrumentation is shared. + if let Some(endpoint) = trace_endpoint { + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_endpoint(endpoint) // usually port 6831 + .with_service_name("downstairs") + .install_simple() + .expect("Error initializing Jaeger exporter"); + + let telemetry = + tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(telemetry) + .try_init() + .expect("Error init tracing subscriber"); + } + + let d = build_downstairs_for_region( + &data, + false, + false, + false, + false, + true, // read_only + Some(log), + ) + .await?; + + clone_region(d, source).await + } Args::Create { block_size, data, diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index b2acbbb8d..b3344afdf 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -17,7 +17,6 @@ use tracing::instrument; use crucible_common::*; use crucible_protocol::SnapshotDetails; -use repair_client::types::FileType; use repair_client::Client; use super::*; @@ -36,6 +35,26 @@ pub fn validate_repair_files(eid: usize, files: &[String]) -> bool { files == [extent_file_name(eid as u32, ExtentType::Data)] } +/// Validate the possible files during a clone. +/// +/// During a clone of a downstairs region, we have one, two or four +/// possible files we expect to see. +pub fn validate_clone_files(eid: usize, files: &[String]) -> bool { + let one = vec![extent_file_name(eid as u32, ExtentType::Data)]; + + let mut some = one.clone(); + some.extend(vec![extent_file_name(eid as u32, ExtentType::Db)]); + + let mut all = some.clone(); + all.extend(vec![ + extent_file_name(eid as u32, ExtentType::DbShm), + extent_file_name(eid as u32, ExtentType::DbWal), + ]); + + // For replacement, we require one, some, or all + files == one || files == some || files == all +} + /// Wrapper type for either a job or reconciliation ID /// /// This is useful for debug logging / DTrace probes, and not much else @@ -75,7 +94,7 @@ pub struct Region { def: RegionDefinition, pub extents: Vec>>, - /// extents which are dirty and need to be flushed. should be true if the + /// Extents which are dirty and need to be flushed. Should be true if the /// dirty flag in the extent's metadata is set. When an extent is opened, if /// it's dirty, it's added to here. When a write is issued to an extent, /// it's added to here. If the write doesn't actually make the extent dirty @@ -389,6 +408,20 @@ impl Region { Ok(()) } + /// Walk the list of extents and close each one. + /// + /// If we fail to close an extent, we exit right away, leaving the + /// remaining extents alone. + pub async fn close_all_extents(&mut self) -> Result<()> { + for eid in 0..self.def.extent_count() as usize { + if let Err(e) = self.close_extent(eid).await { + bail!("Failed closing extent {eid} with {e}"); + } + } + + Ok(()) + } + /** * Walk the list of all extents and find any that are not open. * Open any extents that are not. @@ -501,25 +534,35 @@ impl Region { * C. If xxx.replace dir found start at step 4 above and continue * on through 6. * D. Only then, open extent. + * + * If clone == true, then we are replacing our extent files + * with ones from a remote downstairs. We make a few different + * decisions as that remote downstairs might have a different + * backend than our region was created with. */ pub async fn repair_extent( &self, eid: usize, repair_addr: SocketAddr, + clone: bool, ) -> Result<(), CrucibleError> { // Make sure the extent: - // is currently closed, matches our eid, is not read-only + // is currently closed, matches our eid let mg = self.extents[eid].lock().await; assert!(matches!(*mg, ExtentState::Closed)); drop(mg); - assert!(!self.read_only); - self.get_extent_copy(eid, repair_addr).await?; + // If this is not a clone, then we must not be read_only + if !clone { + assert!(!self.read_only); + } + + self.get_extent_copy(eid, repair_addr, clone).await?; // Returning from get_extent_copy means we have copied all our // files and moved the copy directory to replace directory. // Now, replace the current extent files with the replacement ones. - move_replacement_extent(&self.dir, eid, &self.log)?; + move_replacement_extent(&self.dir, eid, &self.log, clone)?; Ok(()) } @@ -535,6 +578,7 @@ impl Region { &self, eid: usize, repair_addr: SocketAddr, + clone: bool, ) -> Result<(), CrucibleError> { // An extent must be closed before we replace its files. let mg = self.extents[eid].lock().await; @@ -578,9 +622,17 @@ impl Region { "eid:{} Found repair files: {:?}", eid, repair_files ); - // The repair file list should always contain the extent data - // file itself, and nothing else. - if !validate_repair_files(eid, &repair_files) { + // Depending on if this is a clone or not, we have a different + // set of files we expect to find. + if clone { + if !validate_clone_files(eid, &repair_files) { + crucible_bail!( + RepairFilesInvalid, + "Invalid clone file list: {:?}", + repair_files, + ); + } + } else if !validate_repair_files(eid, &repair_files) { crucible_bail!( RepairFilesInvalid, "Invalid repair file list: {:?}", @@ -588,28 +640,47 @@ impl Region { ); } - // First, copy the main extent data file. - let extent_copy = Self::create_copy_file(copy_dir.clone(), eid, None)?; - let repair_stream = match repair_server - .get_extent_file(eid as u32, FileType::Data) - .await - { - Ok(rs) => rs, - Err(e) => { - crucible_bail!( - RepairRequestError, - "Failed to get extent {} data file: {:?}", - eid, - e, - ); + // Replace local files with their remote copies. + // If we are replacing our region with one from an older version + // that contained SQLite files, then we need to copy those files + // over as well. + let mut count = 0; + for opt_file in &[ + ExtentType::Data, + ExtentType::Db, + ExtentType::DbShm, + ExtentType::DbWal, + ] { + let filename = extent_file_name(eid as u32, *opt_file); + + if !repair_files.contains(&filename) { + continue; } - }; - save_stream_to_file(extent_copy, repair_stream.into_inner()).await?; + let local_file = + Self::create_copy_file(copy_dir.clone(), eid, *opt_file)?; + let repair_stream = match repair_server + .get_extent_file(eid as u32, opt_file.to_file_type()) + .await + { + Ok(rs) => rs, + Err(e) => { + crucible_bail!( + RepairRequestError, + "Failed to get extent {} {} file: {:?}", + eid, + opt_file, + e, + ); + } + }; + save_stream_to_file(local_file, repair_stream.into_inner()).await?; + count += 1; + } // After we have all files: move the repair dir. info!( self.log, - "Repair files downloaded, move directory {:?} to {:?}", + "{count} repair files downloaded, move directory {:?} to {:?}", copy_dir, rd ); @@ -1012,15 +1083,14 @@ impl Region { pub fn create_copy_file( mut copy_dir: PathBuf, eid: usize, - extension: Option, + extent_type: ExtentType, ) -> Result { // Get the base extent name before we consider the actual Type let name = extent_file_name(eid as u32, ExtentType::Data); copy_dir.push(name); - if let Some(extension) = extension { - let ext = format!("{}", extension); - copy_dir.set_extension(ext); - } + + let ext = format!("{}", extent_type); + copy_dir.set_extension(ext); let copy_path = copy_dir; if Path::new(©_path).exists() { @@ -1180,7 +1250,7 @@ impl<'a> BatchedPwritev<'a> { } } -fn config_path>(dir: P) -> PathBuf { +pub fn config_path>(dir: P) -> PathBuf { let mut out = dir.as_ref().to_path_buf(); out.push("region.json"); out @@ -2047,10 +2117,12 @@ pub(crate) mod test { Ok(()) } + // wrapper to send to sub-tests for true/false here #[test] fn validate_repair_files_empty() { // No repair files is a failure assert!(!validate_repair_files(1, &Vec::new())); + assert!(!validate_clone_files(1, &Vec::new())); } #[test] @@ -2059,6 +2131,7 @@ pub(crate) mod test { let good_files: Vec = vec!["001".to_string()]; assert!(validate_repair_files(1, &good_files)); + assert!(validate_clone_files(1, &good_files)); } #[test] @@ -2067,23 +2140,106 @@ pub(crate) mod test { let good_files: Vec = vec!["002".to_string(), "002".to_string()]; assert!(!validate_repair_files(2, &good_files)); + assert!(!validate_clone_files(2, &good_files)); } #[test] - fn validate_repair_files_offbyon() { + fn validate_repair_files_offbyon_data() { // Incorrect file names for extent 2 let good_files: Vec = vec!["001".to_string()]; assert!(!validate_repair_files(2, &good_files)); + assert!(!validate_clone_files(2, &good_files)); } #[test] - fn validate_repair_files_old() { - // Old extent files + fn validate_repair_files_db() { + // db file only exists on replacement let good_files: Vec = vec!["001".to_string(), "001.db".to_string()]; assert!(!validate_repair_files(1, &good_files)); + // Valid for replacement + assert!(validate_clone_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_db_more() { + // This list can only exist for replacement + let many_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + + assert!(!validate_repair_files(1, &many_files)); + assert!(validate_clone_files(1, &many_files)); + } + + #[test] + fn validate_repair_files_duplicate_pair() { + // duplicate file names for extent 2 + let good_files: Vec = vec![ + "002".to_string(), + "002".to_string(), + "002.db".to_string(), + "002.db".to_string(), + ]; + assert!(!validate_repair_files(2, &good_files)); + assert!(!validate_clone_files(2, &good_files)); + } + + #[test] + fn validate_repair_files_quad_duplicate() { + // Duplicate db-shm file + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-shm".to_string(), + ]; + assert!(!validate_repair_files(1, &good_files)); + assert!(!validate_clone_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_offbyon_all() { + // Incorrect file names for extent 2 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + assert!(!validate_repair_files(2, &good_files)); + assert!(!validate_clone_files(2, &good_files)); + } + + #[test] + fn validate_repair_files_too_good() { + // Duplicate data file in list + let good_files: Vec = vec![ + "001".to_string(), + "001".to_string(), + "001.db".to_string(), + "001.db-shm".to_string(), + "001.db-wal".to_string(), + ]; + assert!(!validate_repair_files(1, &good_files)); + assert!(!validate_clone_files(1, &good_files)); + } + + #[test] + fn validate_repair_files_not_good_enough() { + // Replacement requires 1, 2 or 4 files, not 3 + let good_files: Vec = vec![ + "001".to_string(), + "001.db".to_string(), + "001.db-wal".to_string(), + ]; + assert!(!validate_repair_files(1, &good_files)); + assert!(!validate_clone_files(1, &good_files)); } #[tokio::test] diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index 3130fff33..458cc9fa7 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -23,6 +23,8 @@ use crate::extent::{extent_dir, extent_file_name, extent_path, ExtentType}; */ pub struct FileServerContext { region_dir: PathBuf, + read_only: bool, + region_definition: RegionDefinition, } pub fn write_openapi(f: &mut W) -> Result<()> { @@ -35,6 +37,8 @@ fn build_api() -> ApiDescription> { let mut api = ApiDescription::new(); api.register(get_extent_file).unwrap(); api.register(get_files_for_extent).unwrap(); + api.register(get_region_info).unwrap(); + api.register(get_region_mode).unwrap(); api } @@ -65,11 +69,17 @@ pub async fn repair_main( */ let ds = ds.lock().await; let region_dir = ds.region.dir.clone(); + let read_only = ds.read_only; + let region_definition = ds.region.def(); drop(ds); - let context = FileServerContext { region_dir }; + info!(log, "Repair listens on {} for path:{:?}", addr, region_dir); + let context = FileServerContext { + region_dir, + read_only, + region_definition, + }; - info!(log, "Repair listens on {}", addr); /* * Set up the server. */ @@ -126,10 +136,10 @@ async fn get_extent_file( extent_path.set_extension("db"); } FileType::DatabaseSharedMemory => { - extent_path.set_extension("db-wal"); + extent_path.set_extension("db-shm"); } FileType::DatabaseLog => { - extent_path.set_extension("db-shm"); + extent_path.set_extension("db-wal"); } FileType::Data => (), }; @@ -226,13 +236,19 @@ fn extent_file_list( eid: u32, ) -> Result, HttpError> { let mut files = Vec::new(); - let possible_files = vec![(extent_file_name(eid, ExtentType::Data), true)]; + let possible_files = [ + (ExtentType::Data, true), + (ExtentType::Db, false), + (ExtentType::DbShm, false), + (ExtentType::DbWal, false), + ]; for (file, required) in possible_files.into_iter() { let mut fullname = extent_dir.clone(); - fullname.push(file.clone()); + let file_name = extent_file_name(eid, file); + fullname.push(file_name.clone()); if fullname.exists() { - files.push(file); + files.push(file_name); } else if required { return Err(HttpError::for_bad_request(None, "EBADF".to_string())); } @@ -240,6 +256,31 @@ fn extent_file_list( Ok(files) } +/// Return the RegionDefinition describing our region. +#[endpoint { + method = GET, + path = "/region-info", +}] +async fn get_region_info( + rqctx: RequestContext>, +) -> Result, HttpError> { + let region_definition = rqctx.context().region_definition; + + Ok(HttpResponseOk(region_definition)) +} + +/// Return the region-mode describing our region. +#[endpoint { + method = GET, + path = "/region-mode", +}] +async fn get_region_mode( + rqctx: RequestContext>, +) -> Result, HttpError> { + let read_only = rqctx.context().read_only; + + Ok(HttpResponseOk(read_only)) +} #[cfg(test)] mod test { diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 115786e72..bf818a1f5 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -150,10 +150,36 @@ mod test { Ok(()) } + // Take a downstairs and start it with clone option. This should + // attempt to clone extents from the provided source IP:Port downstairs. + // + // The Result is returned to the caller. + pub async fn reboot_clone(&mut self, source: SocketAddr) -> Result<()> { + let log = csl(); + self.downstairs = build_downstairs_for_region( + self.tempdir.path(), + false, /* lossy */ + false, /* read errors */ + false, /* write errors */ + false, /* flush errors */ + true, + Some(log.clone()), + ) + .await?; + + clone_region(self.downstairs.clone(), source).await + } + pub async fn address(&self) -> SocketAddr { // If start_downstairs returned Ok, then address will be populated self.downstairs.lock().await.address.unwrap() } + + // Return the repair address for a running downstairs + pub async fn repair_address(&self) -> SocketAddr { + // If start_downstairs returned Ok, then address will be populated + self.downstairs.lock().await.repair_address.unwrap() + } } #[derive(Debug)] @@ -2570,6 +2596,370 @@ mod test { Ok(()) } + #[tokio::test] + async fn integration_test_clone_raw() -> Result<()> { + // Test downstairs region clone. + // Create three downstairs with raw backend, write some data to them. + // Restart them all read only. + // Create a new downstairs. + // Clone a read only downstairs to the new downstairs + // Make a new volume with two old and the one new downstairs and verify + // the original data we wrote. + const BLOCK_SIZE: usize = 512; + + // boot three downstairs, write some data to them, then change to + // read-only. + let mut test_downstairs_set = TestDownstairsSet::small(false).await?; + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 1, + None, + ) + .await?; + + volume.activate().await?; + + let random_buffer = { + let mut random_buffer = + vec![0u8; volume.total_size().await? as usize]; + rand::thread_rng().fill(&mut random_buffer[..]); + random_buffer + }; + + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(random_buffer.clone()), + ) + .await?; + + volume.deactivate().await.unwrap(); + drop(volume); + + // Restart all the downstairs read only + test_downstairs_set.reboot_read_only().await?; + + // Make the new downstairs + let mut new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + // Clone an original downstairs to our new downstairs + let clone_source = + test_downstairs_set.downstairs1.repair_address().await; + new_ds.reboot_clone(clone_source).await.unwrap(); + + // Restart the new downstairs read only. + new_ds.reboot_read_only().await.unwrap(); + let new_target = new_ds.address().await; + + // Take our original opts, and replace a target with the downstairs + // we just cloned. + let mut new_opts = test_downstairs_set.opts(); + new_opts.target[0] = new_target; + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + new_opts, + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 3, + None, + ) + .await?; + + volume.activate().await?; + + // Verify our volume still has the random data we wrote. + let volume_size = volume.total_size().await? as usize; + let mut buffer = Buffer::new(volume_size / BLOCK_SIZE, BLOCK_SIZE); + volume + .read(Block::new(0, BLOCK_SIZE.trailing_zeros()), &mut buffer) + .await?; + + assert_eq!(&buffer[BLOCK_SIZE..], &random_buffer[BLOCK_SIZE..]); + + Ok(()) + } + + #[tokio::test] + async fn integration_test_clone_sql() -> Result<()> { + // Test downstairs region clone. + // Create three downstairs with sql backend, write some data to them. + // Restart them all read only. + // Create a new downstairs. + // Clone a read only downstairs to the new downstairs + // Make a new volume with two old and the one new downstairs and verify + // the original data we wrote. + const BLOCK_SIZE: usize = 512; + + // boot three downstairs, write some data to them, then change to + // read-only. + let mut test_downstairs_set = + TestDownstairsSet::small_sqlite(false).await?; + + // This must be a SQLite extent! + assert!(test_downstairs_set + .downstairs1 + .tempdir + .path() + .join("00/000/000.db") + .exists()); + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 1, + None, + ) + .await?; + + volume.activate().await?; + + let random_buffer = { + let mut random_buffer = + vec![0u8; volume.total_size().await? as usize]; + rand::thread_rng().fill(&mut random_buffer[..]); + random_buffer + }; + + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(random_buffer.clone()), + ) + .await?; + + volume.deactivate().await.unwrap(); + drop(volume); + + // Restart the original downstairs read only. + test_downstairs_set.reboot_read_only().await?; + + // Make the new downstairs + let mut new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + // Clone a read only downstairs to our new downstairs. + let clone_source = + test_downstairs_set.downstairs1.repair_address().await; + new_ds.reboot_clone(clone_source).await.unwrap(); + + new_ds.reboot_read_only().await.unwrap(); + let new_target = new_ds.address().await; + + // The cloned region should have the .db file + assert!(new_ds.tempdir.path().join("00/000/000.db").exists()); + + // Replace one of the targets in our original with the new downstairs. + let mut new_opts = test_downstairs_set.opts(); + new_opts.target[0] = new_target; + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + new_opts, + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 3, + None, + ) + .await?; + + volume.activate().await?; + + // Verify our volume still has the random data we wrote. + let volume_size = volume.total_size().await? as usize; + let mut buffer = Buffer::new(volume_size / BLOCK_SIZE, BLOCK_SIZE); + volume + .read(Block::new(0, BLOCK_SIZE.trailing_zeros()), &mut buffer) + .await?; + + assert_eq!(&buffer[BLOCK_SIZE..], &random_buffer[BLOCK_SIZE..]); + + Ok(()) + } + + #[tokio::test] + async fn integration_test_clone_diff_ec() -> Result<()> { + // Test downstairs region clone. + // Verify different extent count will fail. + + let mut ds_one = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 3, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + ds_one.reboot_read_only().await.unwrap(); + let clone_source = ds_one.repair_address().await; + + let mut ds_two = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 2, // <- Different than above + false, + Backend::RawFile, + ) + .await + .unwrap(); + + assert!(ds_two.reboot_clone(clone_source).await.is_err()); + Ok(()) + } + + #[tokio::test] + async fn integration_test_clone_diff_es() -> Result<()> { + // Test downstairs region clone. + // Verify different extent size will fail. + + let mut ds_one = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 9, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + ds_one.reboot_read_only().await.unwrap(); + let clone_source = ds_one.repair_address().await; + + let mut ds_two = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, // <- Different than above + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + assert!(ds_two.reboot_clone(clone_source).await.is_err()); + Ok(()) + } + + #[tokio::test] + async fn integration_test_clone_not_ro() -> Result<()> { + // Test downstairs region clone. + // Verify you can't clone from a RW downstairs + + let ds_one = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + false, // <- RO is false. + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let clone_source = ds_one.repair_address().await; + + let mut ds_two = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + assert!(ds_two.reboot_clone(clone_source).await.is_err()); + Ok(()) + } + + #[tokio::test] + async fn integration_test_clone_diff_encrypted() -> Result<()> { + // Test downstairs region clone. + // Verify downstairs encryption state must match. + + let ds_one = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + false, + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let clone_source = ds_one.repair_address().await; + + let mut ds_two = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, // <- Encrypted is different + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + assert!(ds_two.reboot_clone(clone_source).await.is_err()); + Ok(()) + } + #[tokio::test] async fn integration_test_volume_replace_downstairs() -> Result<()> { // Replace a downstairs with a new one @@ -2662,6 +3052,142 @@ mod test { Ok(()) } + #[tokio::test] + async fn integration_test_volume_clone_replace_ro_downstairs() -> Result<()> + { + // Replace a read only downstairs with a new one, that we cloned + // from the original ro downstairs. + const BLOCK_SIZE: usize = 512; + + // boot three downstairs, write some data to them, then change to + // read-only. + let mut test_downstairs_set = TestDownstairsSet::small(false).await?; + + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 1, + None, + ) + .await?; + + volume.activate().await?; + + let random_buffer = { + let mut random_buffer = + vec![0u8; volume.total_size().await? as usize]; + rand::thread_rng().fill(&mut random_buffer[..]); + random_buffer + }; + + volume + .write( + Block::new(0, BLOCK_SIZE.trailing_zeros()), + Bytes::from(random_buffer.clone()), + ) + .await?; + + volume.deactivate().await.unwrap(); + + // Restart the three downstairs in read only mode. + test_downstairs_set.reboot_read_only().await?; + + // Restart our volume with the restarted read-only downstairs. + let mut volume = Volume::new(BLOCK_SIZE as u64, csl()); + volume + .add_subvolume_create_guest( + test_downstairs_set.opts(), + volume::RegionExtentInfo { + block_size: BLOCK_SIZE as u64, + blocks_per_extent: test_downstairs_set.blocks_per_extent(), + extent_count: test_downstairs_set.extent_count(), + }, + 2, + None, + ) + .await?; + + volume.activate().await?; + + // Make the new downstairs, but don't start it + let mut new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, + true, + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let clone_source = + test_downstairs_set.downstairs1.repair_address().await; + + // Clone a read only downstairs into the new downstairs region. + new_ds.reboot_clone(clone_source).await.unwrap(); + // Start the new downstairs read only. + new_ds.reboot_read_only().await.unwrap(); + + // Replace a downstairs in our RO volume with the new downstairs we + // just cloned. + let res = volume + .replace_downstairs( + test_downstairs_set.opts().id, + test_downstairs_set.downstairs1.address().await, + new_ds.address().await, + ) + .await + .unwrap(); + + assert_eq!(res, ReplaceResult::Started); + + // We can use the result from calling replace_downstairs to + // intuit status on progress of the replacement. + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + match volume + .replace_downstairs( + test_downstairs_set.opts().id, + test_downstairs_set.downstairs1.address().await, + new_ds.address().await, + ) + .await + .unwrap() + { + ReplaceResult::StartedAlready => { + println!("Waiting for replacement to finish"); + } + ReplaceResult::CompletedAlready => { + println!("Downstairs replacement completed"); + break; + } + x => { + panic!("Bad result from replace_downstairs: {:?}", x); + } + } + } + + // Read back what we wrote. + let volume_size = volume.total_size().await? as usize; + assert_eq!(volume_size % BLOCK_SIZE, 0); + let mut buffer = Buffer::new(volume_size / BLOCK_SIZE, BLOCK_SIZE); + volume + .read(Block::new(0, BLOCK_SIZE.trailing_zeros()), &mut buffer) + .await?; + + assert_eq!(&buffer[BLOCK_SIZE..], &random_buffer[BLOCK_SIZE..]); + + Ok(()) + } + #[tokio::test] async fn integration_test_volume_replace_bad_downstairs() -> Result<()> { // Attempt to replace a downstairs that we don't have. diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json index da562e3f0..c2d74b888 100644 --- a/openapi/downstairs-repair.json +++ b/openapi/downstairs-repair.json @@ -80,10 +80,78 @@ } } } + }, + "/region-info": { + "get": { + "summary": "Return the RegionDefinition describing our region.", + "operationId": "get_region_info", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RegionDefinition" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/region-mode": { + "get": { + "summary": "Return the region-mode describing our region.", + "operationId": "get_region_mode", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Boolean", + "type": "boolean" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } } }, "components": { "schemas": { + "Block": { + "type": "object", + "properties": { + "shift": { + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "value": { + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "required": [ + "shift", + "value" + ] + }, "Error": { "description": "Error information from a response.", "type": "object", @@ -103,6 +171,61 @@ "request_id" ] }, + "RegionDefinition": { + "type": "object", + "properties": { + "block_size": { + "description": "The size of each block in bytes. Must be a power of 2, minimum 512.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "database_read_version": { + "description": "The database version format for reading an extent database file.", + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "database_write_version": { + "description": "The database version format for writing an extent database file.", + "type": "integer", + "format": "uint", + "minimum": 0 + }, + "encrypted": { + "description": "region data will be encrypted", + "type": "boolean" + }, + "extent_count": { + "description": "How many whole extents comprise this region?", + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "extent_size": { + "description": "How many blocks should appear in each extent?", + "allOf": [ + { + "$ref": "#/components/schemas/Block" + } + ] + }, + "uuid": { + "description": "UUID for this region", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "block_size", + "database_read_version", + "database_write_version", + "encrypted", + "extent_count", + "extent_size", + "uuid" + ] + }, "FileType": { "type": "string", "enum": [ diff --git a/repair-client/Cargo.toml b/repair-client/Cargo.toml index 7d51bee15..5ec7db161 100644 --- a/repair-client/Cargo.toml +++ b/repair-client/Cargo.toml @@ -13,4 +13,6 @@ reqwest.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true +crucible-common.workspace = true crucible-workspace-hack.workspace = true +uuid.workspace = true diff --git a/repair-client/src/lib.rs b/repair-client/src/lib.rs index 9fb3220ff..9faafb817 100644 --- a/repair-client/src/lib.rs +++ b/repair-client/src/lib.rs @@ -5,4 +5,7 @@ use progenitor::generate_api; generate_api!( spec = "../openapi/downstairs-repair.json", derives = [schemars::JsonSchema], + replace = { + RegionDefinition = crucible_common::RegionDefinition, + } ); diff --git a/tools/README.md b/tools/README.md index 6d7feb21b..a1b83dc4e 100644 --- a/tools/README.md +++ b/tools/README.md @@ -35,6 +35,8 @@ downstairs UUID and is intended to provide a sample to build off of. ## test_ds.sh Test import then export for crucible downstairs. +Then, test the clone subcommand and verify that the cloned downstairs +exports the same file as the original downstairs. ## test_nightly.sh This runs a selection of tests from this directory and reports their diff --git a/tools/test_ds.sh b/tools/test_ds.sh index 27fdfa685..d85277cca 100755 --- a/tools/test_ds.sh +++ b/tools/test_ds.sh @@ -31,18 +31,47 @@ mkdir "${testdir}" set -o errexit uuid="12345678-1234-1234-1234-000000000001" -dir="${testdir}/export" +region_dir="${testdir}/region" exp="${testdir}/exported_file" imp="${testdir}/import" +clone_dir="${testdir}/clone" +clone_exp="${testdir}/clone_export_file" echo "Create file for import" dd if=/dev/urandom of="$imp" bs=512 count=300 echo "Import region" -${cds} create -i "$imp" -u $uuid -d "$dir" +${cds} create -i "$imp" -u $uuid -d "$region_dir" echo "Export region" -${cds} export -d "$dir" -e "$exp" --count 300 +${cds} export -d "$region_dir" -e "$exp" --count 300 diff $imp $exp - echo "Import Export test passed" + +# We can make use of the export function to test downstairs clone +echo "Test clone" +echo "Starting downstairs" +${cds} run -d "$region_dir" -p 8810 --mode ro > ${testdir}/ds_out.txt & +ds_pid=$! + +sleep 1 +if ! ps -p $ds_pid; then + echo "Failed to start downstairs" + exit 1 +else + echo "Downstairs running" +fi + +echo "Creating new downstairs" +${cds} create -u $(uuidgen) -d "$clone_dir" --extent-size 100 --extent-count 15 --block-size 512 +echo "Cloning existing downstairs" +${cds} clone -d "$clone_dir" -s 127.0.0.1:12810 + +echo "Verify clone using export" +${cds} export -d "$clone_dir" -e "$clone_exp" --count 300 + +diff $imp $clone_exp + +echo "Stopping downstairs" +kill "$ds_pid" +echo "Clone test passed" rm -rf ${testdir}