diff --git a/Cargo.lock b/Cargo.lock index 6997faf2b..dfdf7b556 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -988,6 +988,7 @@ dependencies = [ "hex", "httptest", "rand 0.8.5", + "repair-client", "reqwest", "serde", "serde_json", diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 492272396..72205726c 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -39,6 +39,7 @@ mod stats; mod extent_inner_raw; mod extent_inner_sqlite; +use extent::ExtentState; use region::Region; pub use admin::run_dropshot; @@ -3256,7 +3257,7 @@ pub async fn start_downstairs( let repair_log = d.lock().await.log.new(o!("task" => "repair".to_string())); let repair_listener = - match repair::repair_main(&dss, repair_address, &repair_log).await { + match repair::repair_main(dss, repair_address, &repair_log).await { Err(e) => { // TODO tear down other things if repair server can't be // started? diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 29db6d79f..6178ae4ff 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -507,7 +507,9 @@ impl Region { * * Let us assume we are repairing extent 012 * 1. Make new 012.copy dir (extent name plus: .copy) - * 2. Get all extent files from source side, put in 012.copy directory + * 2. Get all extent files from source side, put in 012.copy directory. + * Verify after the copy completes that the remote side still has + * the extent closed (or the region is read only). * 3. fsync files we just downloaded * 4. Rename 012.copy dir to 012.replace dir * 5. fsync extent directory ( 00/000/ where the extent files live) @@ -670,6 +672,29 @@ impl Region { count += 1; } + // We have copied over the extent. Verify that the remote side + // still believes that it is valid for repair so we know we have + // received a valid copy. + info!(self.log, "Verify extent {eid} still ready for copy"); + let rc = match repair_server.extent_repair_ready(eid as u32).await { + Ok(rc) => rc.into_inner(), + Err(e) => { + crucible_bail!( + RepairRequestError, + "Failed to verify extent is still valid for repair {:?}", + e, + ); + } + }; + + if !rc { + warn!(self.log, "The repair of {eid} is being aborted."); + crucible_bail!( + RepairRequestError, + "Extent {eid} on remote repair server is in incorrect state", + ); + } + // After we have all files: move the repair dir. info!( self.log, diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index 458cc9fa7..18f61e8a6 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -25,6 +25,7 @@ pub struct FileServerContext { region_dir: PathBuf, read_only: bool, region_definition: RegionDefinition, + downstairs: Arc>, } pub fn write_openapi(f: &mut W) -> Result<()> { @@ -39,13 +40,14 @@ fn build_api() -> ApiDescription> { api.register(get_files_for_extent).unwrap(); api.register(get_region_info).unwrap(); api.register(get_region_mode).unwrap(); + api.register(extent_repair_ready).unwrap(); api } /// Returns Ok(listen address) if everything launched ok, Err otherwise pub async fn repair_main( - ds: &Mutex, + downstairs: Arc>, addr: SocketAddr, log: &Logger, ) -> Result { @@ -67,7 +69,7 @@ pub async fn repair_main( * Record the region directory where all the extents and metadata * files live. */ - let ds = ds.lock().await; + let ds = downstairs.lock().await; let region_dir = ds.region.dir.clone(); let read_only = ds.read_only; let region_definition = ds.region.def(); @@ -78,6 +80,7 @@ pub async fn repair_main( region_dir, read_only, region_definition, + downstairs, }; /* @@ -186,6 +189,31 @@ async fn get_a_file(path: PathBuf) -> Result, HttpError> { } } +/// Return true if the provided extent is closed or the region is read only +#[endpoint { + method = GET, + path = "/extent/{eid}/repair-ready", +}] +async fn extent_repair_ready( + rqctx: RequestContext>, + path: Path, +) -> Result, HttpError> { + let eid: usize = path.into_inner().eid as usize; + let downstairs = &rqctx.context().downstairs; + + // If the region is read only, the extent is always ready. + if rqctx.context().read_only { + return Ok(HttpResponseOk(true)); + } + + let res = { + let ds = downstairs.lock().await; + matches!(ds.region.extents[eid], ExtentState::Closed) + }; + + Ok(HttpResponseOk(res)) +} + /** * Get the list of files related to an extent. * diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 113173f12..8ba51dad0 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -26,6 +26,7 @@ futures.workspace = true hex.workspace = true httptest.workspace = true rand.workspace = true +repair-client.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index bf818a1f5..67fc59b8b 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -16,6 +16,7 @@ mod test { use futures::lock::Mutex; use httptest::{matchers::*, responders::*, Expectation, Server}; use rand::Rng; + use repair_client::Client; use sha2::Digest; use slog::{info, o, warn, Drain, Logger}; use tempfile::*; @@ -2818,6 +2819,74 @@ mod test { Ok(()) } + #[tokio::test] + async fn integration_test_repair_ready_not_closed() -> Result<()> { + // Create a new downstairs. + // Verify repair ready returns false when an extent is open + + // Create a downstairs + let new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, // encrypted + false, // read only + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let repair_addr = new_ds.repair_address().await; + + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + // Extent are open, so the repair ready request should return false. + let rc = repair_server.extent_repair_ready(0).await; + assert!(!rc.unwrap().into_inner()); + + let rc = repair_server.extent_repair_ready(1).await; + assert!(!rc.unwrap().into_inner()); + + Ok(()) + } + + #[tokio::test] + async fn integration_test_repair_ready_read_only() -> Result<()> { + // Create a new downstairs. + // Verify repair ready returns true for read only downstairs, + // even when extents are open. + + // Create a downstairs + let new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, // encrypted + true, // read only + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let repair_addr = new_ds.repair_address().await; + + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + // Extent are not open, but the region is read only, so the requests + // should all return true. + let rc = repair_server.extent_repair_ready(0).await; + assert!(rc.unwrap().into_inner()); + + let rc = repair_server.extent_repair_ready(1).await; + assert!(rc.unwrap().into_inner()); + + Ok(()) + } + #[tokio::test] async fn integration_test_clone_diff_ec() -> Result<()> { // Test downstairs region clone. diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json index c2d74b888..c2aeaa05d 100644 --- a/openapi/downstairs-repair.json +++ b/openapi/downstairs-repair.json @@ -46,6 +46,43 @@ } } }, + "/extent/{eid}/repair-ready": { + "get": { + "summary": "Return true if the provided extent is closed or the region is read only", + "operationId": "extent_repair_ready", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Boolean", + "type": "boolean" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/newextent/{eid}/{file_type}": { "get": { "operationId": "get_extent_file", diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index d09896e60..cd0c02934 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1247,8 +1247,8 @@ impl DownstairsClient { if old_state != IOState::InProgress { // This job is in an unexpected state. panic!( - "[{}] Job {} completed while not InProgress: {:?}", - self.client_id, ds_id, old_state + "[{}] Job {} completed while not InProgress: {:?} {:?}", + self.client_id, ds_id, old_state, job ); }