From fadebd1b4ff050134bb0c20491500a1511e69ece Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Mon, 12 Feb 2024 18:02:47 +0000 Subject: [PATCH 1/2] Verify extent under repair is valid after copying files Added a call to the downstairs repair API that will verify that an extent is closed (or the region is read only), meaning the extent is okay to copy. This prevents a rare but possible condition where an extent is closed when a remote side contacts it for repair, but is re-opened before the extent file copy starts and the resulting copy is invalid. --- Cargo.lock | 1 + downstairs/src/lib.rs | 73 +++++++++++++++++++++++++++++----- downstairs/src/region.rs | 27 ++++++++++++- downstairs/src/repair.rs | 27 +++++++++++++ integration_tests/Cargo.toml | 1 + integration_tests/src/lib.rs | 69 ++++++++++++++++++++++++++++++++ openapi/downstairs-repair.json | 37 +++++++++++++++++ upstairs/src/client.rs | 4 +- 8 files changed, 227 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6997faf2b..dfdf7b556 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -988,6 +988,7 @@ dependencies = [ "hex", "httptest", "rand 0.8.5", + "repair-client", "reqwest", "serde", "serde_json", diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 492272396..8a61fbdc6 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -39,7 +39,9 @@ mod stats; mod extent_inner_raw; mod extent_inner_sqlite; +use extent::ExtentState; use region::Region; +use repair::RepairReadyRequest; pub use admin::run_dropshot; pub use dump::dump_region; @@ -1595,6 +1597,40 @@ where .await } +// Repair info task +// listen for the repair API to request if a specific extent is closed +// or if the entire region is read only. +async fn repair_info( + ads: &Arc>, + mut repair_request_rx: mpsc::Receiver, +) -> Result<()> { + info!(ads.lock().await.log, "Started repair info task"); + while let Some(repair_request) = repair_request_rx.recv().await { + let ds = ads.lock().await; + let state = if ds.read_only { + true + } else { + matches!(ds.region.extents[repair_request.eid], ExtentState::Closed) + }; + drop(ds); + + if let Err(e) = repair_request.tx.send(state) { + info!( + ads.lock().await.log, + "Error {e} sending repair info for extent {}", + repair_request.eid, + ); + } else { + info!( + ads.lock().await.log, + "Repair info sent {state} for extent {}", repair_request.eid, + ); + } + } + warn!(ads.lock().await.log, "Repair info task ends"); + Ok(()) +} + async fn reply_task( mut resp_channel_rx: mpsc::Receiver, mut fw: FramedWrite, @@ -3255,16 +3291,23 @@ pub async fn start_downstairs( let dss = d.clone(); let repair_log = d.lock().await.log.new(o!("task" => "repair".to_string())); - let repair_listener = - match repair::repair_main(&dss, repair_address, &repair_log).await { - Err(e) => { - // TODO tear down other things if repair server can't be - // started? - bail!("got {:?} from repair main", e); - } + let (repair_request_tx, repair_request_rx) = mpsc::channel(10); + let repair_listener = match repair::repair_main( + &dss, + repair_address, + &repair_log, + repair_request_tx, + ) + .await + { + Err(e) => { + // TODO tear down other things if repair server can't be + // started? + bail!("got {:?} from repair main", e); + } - Ok(socket_addr) => socket_addr, - }; + Ok(socket_addr) => socket_addr, + }; { let mut ds = d.lock().await; @@ -3272,6 +3315,18 @@ pub async fn start_downstairs( } info!(log, "Using repair address: {:?}", repair_listener); + let rd = d.clone(); + let _repair_info_handle = tokio::spawn(async move { + if let Err(e) = repair_info(&rd, repair_request_rx).await { + error!( + rd.lock().await.log, + "repair info exits with error: {:?}", e + ); + } else { + info!(rd.lock().await.log, "repair info connection all done"); + } + }); + // Optionally require SSL connections let ssl_acceptor = if let Some(cert_pem_path) = cert_pem { let key_pem_path = key_pem.unwrap(); diff --git a/downstairs/src/region.rs b/downstairs/src/region.rs index 29db6d79f..6178ae4ff 100644 --- a/downstairs/src/region.rs +++ b/downstairs/src/region.rs @@ -507,7 +507,9 @@ impl Region { * * Let us assume we are repairing extent 012 * 1. Make new 012.copy dir (extent name plus: .copy) - * 2. Get all extent files from source side, put in 012.copy directory + * 2. Get all extent files from source side, put in 012.copy directory. + * Verify after the copy completes that the remote side still has + * the extent closed (or the region is read only). * 3. fsync files we just downloaded * 4. Rename 012.copy dir to 012.replace dir * 5. fsync extent directory ( 00/000/ where the extent files live) @@ -670,6 +672,29 @@ impl Region { count += 1; } + // We have copied over the extent. Verify that the remote side + // still believes that it is valid for repair so we know we have + // received a valid copy. + info!(self.log, "Verify extent {eid} still ready for copy"); + let rc = match repair_server.extent_repair_ready(eid as u32).await { + Ok(rc) => rc.into_inner(), + Err(e) => { + crucible_bail!( + RepairRequestError, + "Failed to verify extent is still valid for repair {:?}", + e, + ); + } + }; + + if !rc { + warn!(self.log, "The repair of {eid} is being aborted."); + crucible_bail!( + RepairRequestError, + "Extent {eid} on remote repair server is in incorrect state", + ); + } + // After we have all files: move the repair dir. info!( self.log, diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index 458cc9fa7..d06039f09 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -25,6 +25,7 @@ pub struct FileServerContext { region_dir: PathBuf, read_only: bool, region_definition: RegionDefinition, + repair_request_tx: mpsc::Sender, } pub fn write_openapi(f: &mut W) -> Result<()> { @@ -39,6 +40,7 @@ fn build_api() -> ApiDescription> { api.register(get_files_for_extent).unwrap(); api.register(get_region_info).unwrap(); api.register(get_region_mode).unwrap(); + api.register(extent_repair_ready).unwrap(); api } @@ -48,6 +50,7 @@ pub async fn repair_main( ds: &Mutex, addr: SocketAddr, log: &Logger, + repair_request_tx: mpsc::Sender, ) -> Result { /* * We must specify a configuration with a bind address. @@ -78,6 +81,7 @@ pub async fn repair_main( region_dir, read_only, region_definition, + repair_request_tx, }; /* @@ -186,6 +190,29 @@ async fn get_a_file(path: PathBuf) -> Result, HttpError> { } } +pub struct RepairReadyRequest { + pub tx: oneshot::Sender, + pub eid: usize, +} + +/// Return true if the provided extent is closed or the region is read only +#[endpoint { + method = GET, + path = "/extent/{eid}/repair-ready", +}] +async fn extent_repair_ready( + rqctx: RequestContext>, + path: Path, +) -> Result, HttpError> { + let eid: usize = path.into_inner().eid as usize; + + let (tx, rx) = oneshot::channel(); + let rrr = RepairReadyRequest { tx, eid }; + rqctx.context().repair_request_tx.send(rrr).await.unwrap(); + let res = rx.await.unwrap(); + Ok(HttpResponseOk(res)) +} + /** * Get the list of files related to an extent. * diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 113173f12..8ba51dad0 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -26,6 +26,7 @@ futures.workspace = true hex.workspace = true httptest.workspace = true rand.workspace = true +repair-client.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index bf818a1f5..67fc59b8b 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -16,6 +16,7 @@ mod test { use futures::lock::Mutex; use httptest::{matchers::*, responders::*, Expectation, Server}; use rand::Rng; + use repair_client::Client; use sha2::Digest; use slog::{info, o, warn, Drain, Logger}; use tempfile::*; @@ -2818,6 +2819,74 @@ mod test { Ok(()) } + #[tokio::test] + async fn integration_test_repair_ready_not_closed() -> Result<()> { + // Create a new downstairs. + // Verify repair ready returns false when an extent is open + + // Create a downstairs + let new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, // encrypted + false, // read only + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let repair_addr = new_ds.repair_address().await; + + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + // Extent are open, so the repair ready request should return false. + let rc = repair_server.extent_repair_ready(0).await; + assert!(!rc.unwrap().into_inner()); + + let rc = repair_server.extent_repair_ready(1).await; + assert!(!rc.unwrap().into_inner()); + + Ok(()) + } + + #[tokio::test] + async fn integration_test_repair_ready_read_only() -> Result<()> { + // Create a new downstairs. + // Verify repair ready returns true for read only downstairs, + // even when extents are open. + + // Create a downstairs + let new_ds = TestDownstairs::new( + "127.0.0.1".parse().unwrap(), + true, // encrypted + true, // read only + 5, + 2, + false, + Backend::RawFile, + ) + .await + .unwrap(); + + let repair_addr = new_ds.repair_address().await; + + let url = format!("http://{:?}", repair_addr); + let repair_server = Client::new(&url); + + // Extent are not open, but the region is read only, so the requests + // should all return true. + let rc = repair_server.extent_repair_ready(0).await; + assert!(rc.unwrap().into_inner()); + + let rc = repair_server.extent_repair_ready(1).await; + assert!(rc.unwrap().into_inner()); + + Ok(()) + } + #[tokio::test] async fn integration_test_clone_diff_ec() -> Result<()> { // Test downstairs region clone. diff --git a/openapi/downstairs-repair.json b/openapi/downstairs-repair.json index c2d74b888..c2aeaa05d 100644 --- a/openapi/downstairs-repair.json +++ b/openapi/downstairs-repair.json @@ -46,6 +46,43 @@ } } }, + "/extent/{eid}/repair-ready": { + "get": { + "summary": "Return true if the provided extent is closed or the region is read only", + "operationId": "extent_repair_ready", + "parameters": [ + { + "in": "path", + "name": "eid", + "required": true, + "schema": { + "type": "integer", + "format": "uint32", + "minimum": 0 + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Boolean", + "type": "boolean" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/newextent/{eid}/{file_type}": { "get": { "operationId": "get_extent_file", diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index d09896e60..cd0c02934 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -1247,8 +1247,8 @@ impl DownstairsClient { if old_state != IOState::InProgress { // This job is in an unexpected state. panic!( - "[{}] Job {} completed while not InProgress: {:?}", - self.client_id, ds_id, old_state + "[{}] Job {} completed while not InProgress: {:?} {:?}", + self.client_id, ds_id, old_state, job ); } From 2d5d05060994af90718a3e3e5d43d63ecf3e3f2f Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Tue, 13 Feb 2024 20:49:42 +0000 Subject: [PATCH 2/2] Keep a mutex for the repair task --- downstairs/src/lib.rs | 72 +++++----------------------------------- downstairs/src/repair.rs | 29 ++++++++-------- 2 files changed, 24 insertions(+), 77 deletions(-) diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 8a61fbdc6..72205726c 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -41,7 +41,6 @@ mod extent_inner_sqlite; use extent::ExtentState; use region::Region; -use repair::RepairReadyRequest; pub use admin::run_dropshot; pub use dump::dump_region; @@ -1597,40 +1596,6 @@ where .await } -// Repair info task -// listen for the repair API to request if a specific extent is closed -// or if the entire region is read only. -async fn repair_info( - ads: &Arc>, - mut repair_request_rx: mpsc::Receiver, -) -> Result<()> { - info!(ads.lock().await.log, "Started repair info task"); - while let Some(repair_request) = repair_request_rx.recv().await { - let ds = ads.lock().await; - let state = if ds.read_only { - true - } else { - matches!(ds.region.extents[repair_request.eid], ExtentState::Closed) - }; - drop(ds); - - if let Err(e) = repair_request.tx.send(state) { - info!( - ads.lock().await.log, - "Error {e} sending repair info for extent {}", - repair_request.eid, - ); - } else { - info!( - ads.lock().await.log, - "Repair info sent {state} for extent {}", repair_request.eid, - ); - } - } - warn!(ads.lock().await.log, "Repair info task ends"); - Ok(()) -} - async fn reply_task( mut resp_channel_rx: mpsc::Receiver, mut fw: FramedWrite, @@ -3291,23 +3256,16 @@ pub async fn start_downstairs( let dss = d.clone(); let repair_log = d.lock().await.log.new(o!("task" => "repair".to_string())); - let (repair_request_tx, repair_request_rx) = mpsc::channel(10); - let repair_listener = match repair::repair_main( - &dss, - repair_address, - &repair_log, - repair_request_tx, - ) - .await - { - Err(e) => { - // TODO tear down other things if repair server can't be - // started? - bail!("got {:?} from repair main", e); - } + let repair_listener = + match repair::repair_main(dss, repair_address, &repair_log).await { + Err(e) => { + // TODO tear down other things if repair server can't be + // started? + bail!("got {:?} from repair main", e); + } - Ok(socket_addr) => socket_addr, - }; + Ok(socket_addr) => socket_addr, + }; { let mut ds = d.lock().await; @@ -3315,18 +3273,6 @@ pub async fn start_downstairs( } info!(log, "Using repair address: {:?}", repair_listener); - let rd = d.clone(); - let _repair_info_handle = tokio::spawn(async move { - if let Err(e) = repair_info(&rd, repair_request_rx).await { - error!( - rd.lock().await.log, - "repair info exits with error: {:?}", e - ); - } else { - info!(rd.lock().await.log, "repair info connection all done"); - } - }); - // Optionally require SSL connections let ssl_acceptor = if let Some(cert_pem_path) = cert_pem { let key_pem_path = key_pem.unwrap(); diff --git a/downstairs/src/repair.rs b/downstairs/src/repair.rs index d06039f09..18f61e8a6 100644 --- a/downstairs/src/repair.rs +++ b/downstairs/src/repair.rs @@ -25,7 +25,7 @@ pub struct FileServerContext { region_dir: PathBuf, read_only: bool, region_definition: RegionDefinition, - repair_request_tx: mpsc::Sender, + downstairs: Arc>, } pub fn write_openapi(f: &mut W) -> Result<()> { @@ -47,10 +47,9 @@ fn build_api() -> ApiDescription> { /// Returns Ok(listen address) if everything launched ok, Err otherwise pub async fn repair_main( - ds: &Mutex, + downstairs: Arc>, addr: SocketAddr, log: &Logger, - repair_request_tx: mpsc::Sender, ) -> Result { /* * We must specify a configuration with a bind address. @@ -70,7 +69,7 @@ pub async fn repair_main( * Record the region directory where all the extents and metadata * files live. */ - let ds = ds.lock().await; + let ds = downstairs.lock().await; let region_dir = ds.region.dir.clone(); let read_only = ds.read_only; let region_definition = ds.region.def(); @@ -81,7 +80,7 @@ pub async fn repair_main( region_dir, read_only, region_definition, - repair_request_tx, + downstairs, }; /* @@ -190,11 +189,6 @@ async fn get_a_file(path: PathBuf) -> Result, HttpError> { } } -pub struct RepairReadyRequest { - pub tx: oneshot::Sender, - pub eid: usize, -} - /// Return true if the provided extent is closed or the region is read only #[endpoint { method = GET, @@ -205,11 +199,18 @@ async fn extent_repair_ready( path: Path, ) -> Result, HttpError> { let eid: usize = path.into_inner().eid as usize; + let downstairs = &rqctx.context().downstairs; + + // If the region is read only, the extent is always ready. + if rqctx.context().read_only { + return Ok(HttpResponseOk(true)); + } + + let res = { + let ds = downstairs.lock().await; + matches!(ds.region.extents[eid], ExtentState::Closed) + }; - let (tx, rx) = oneshot::channel(); - let rrr = RepairReadyRequest { tx, eid }; - rqctx.context().repair_request_tx.send(rrr).await.unwrap(); - let res = rx.await.unwrap(); Ok(HttpResponseOk(res)) }