Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify extent under repair is valid after copying files #1159

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 64 additions & 9 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ mod stats;
mod extent_inner_raw;
mod extent_inner_sqlite;

use extent::ExtentState;
use region::Region;
use repair::RepairReadyRequest;

pub use admin::run_dropshot;
pub use dump::dump_region;
Expand Down Expand Up @@ -1595,6 +1597,40 @@ where
.await
}

// Repair info task
// listen for the repair API to request if a specific extent is closed
// or if the entire region is read only.
async fn repair_info(
ads: &Arc<Mutex<Downstairs>>,
mut repair_request_rx: mpsc::Receiver<RepairReadyRequest>,
) -> Result<()> {
info!(ads.lock().await.log, "Started repair info task");
while let Some(repair_request) = repair_request_rx.recv().await {
let ds = ads.lock().await;
let state = if ds.read_only {
true
} else {
matches!(ds.region.extents[repair_request.eid], ExtentState::Closed)
};
drop(ds);

if let Err(e) = repair_request.tx.send(state) {
info!(
ads.lock().await.log,
"Error {e} sending repair info for extent {}",
repair_request.eid,
);
} else {
info!(
ads.lock().await.log,
"Repair info sent {state} for extent {}", repair_request.eid,
);
}
}
warn!(ads.lock().await.log, "Repair info task ends");
Ok(())
}

async fn reply_task<WT>(
mut resp_channel_rx: mpsc::Receiver<Message>,
mut fw: FramedWrite<WT, CrucibleEncoder>,
Expand Down Expand Up @@ -3255,23 +3291,42 @@ pub async fn start_downstairs(
let dss = d.clone();
let repair_log = d.lock().await.log.new(o!("task" => "repair".to_string()));

let repair_listener =
match repair::repair_main(&dss, repair_address, &repair_log).await {
Err(e) => {
// TODO tear down other things if repair server can't be
// started?
bail!("got {:?} from repair main", e);
}
let (repair_request_tx, repair_request_rx) = mpsc::channel(10);
let repair_listener = match repair::repair_main(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of spawning a separate task, could we handle this new functionality in repair::repair_main? We're already passing in an Arc<Mutex<Downstairs>> (as the &dss argument), so I'm not sure what the extra task buys us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do pass in the downstairs mutex, but we don't keep it. We only use it to pull out a few fields.
I was actually moddeling this after what you did in upstairs/src/control.rs for the single task
refactoring (#1058).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's 6 of one, half-dozen of the other.

Having any task holding on to an Arc<Mutex<Downstairs>> is something that will have to change if we switch to a single-task implementation (with a dedicated task owning a Downstairs). It doesn't much matter whether the Arc<Mutex<Downstairs>> is held by repair_main or the new repair_info.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My hope here was to make an eventual transition to a similar one-task implementation easier, and make the
downstairs similar to the upstairs in that regard. Is this at least a step in that direction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, there were only two categories of tasks using the Arc<Mutex<Downstairs>>:

  • The main listener task
  • Any tasks spawned by it via proc_stream, i.e. pf_task and dw_task

After this PR, the important change is that we'll have a third category of task using the Arc<Mutex<Downstairs>>.

This will either be repair_info (in the current implementation) or repair_main (in my suggestion). Which one isn't important, the important part (architecturally) is that it's a third thing sharing that mutex; and that's what we'll have to deal with in a future single-task refactoring.

From that perspective, either implementation is making a single-task refactoring harder.

That being said, it's not going to make such a refactoring significantly harder; we can just make requests to the Downstairs task through a queue, instead of calling into its APIs directly.

Since both implementations are architecturally equivalent (w.r.t. a single-task refactoring), I'd vote for the one that doesn't proliferate tasks further (i.e. adding this API to repair_main). Does that make sense? I don't feel too strongly about it, so I'd defer to your judgement if you still think a separate task is the Right Choice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it all in repair_main(), no more repair task.

&dss,
repair_address,
&repair_log,
repair_request_tx,
)
.await
{
Err(e) => {
// TODO tear down other things if repair server can't be
// started?
bail!("got {:?} from repair main", e);
}

Ok(socket_addr) => socket_addr,
};
Ok(socket_addr) => socket_addr,
};

{
let mut ds = d.lock().await;
ds.repair_address = Some(repair_listener);
}
info!(log, "Using repair address: {:?}", repair_listener);

let rd = d.clone();
let _repair_info_handle = tokio::spawn(async move {
if let Err(e) = repair_info(&rd, repair_request_rx).await {
error!(
rd.lock().await.log,
"repair info exits with error: {:?}", e
);
} else {
info!(rd.lock().await.log, "repair info connection all done");
}
});

// Optionally require SSL connections
let ssl_acceptor = if let Some(cert_pem_path) = cert_pem {
let key_pem_path = key_pem.unwrap();
Expand Down
27 changes: 26 additions & 1 deletion downstairs/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ impl Region {
*
* Let us assume we are repairing extent 012
* 1. Make new 012.copy dir (extent name plus: .copy)
* 2. Get all extent files from source side, put in 012.copy directory
* 2. Get all extent files from source side, put in 012.copy directory.
* Verify after the copy completes that the remote side still has
* the extent closed (or the region is read only).
* 3. fsync files we just downloaded
* 4. Rename 012.copy dir to 012.replace dir
* 5. fsync extent directory ( 00/000/ where the extent files live)
Expand Down Expand Up @@ -670,6 +672,29 @@ impl Region {
count += 1;
}

// We have copied over the extent. Verify that the remote side
// still believes that it is valid for repair so we know we have
// received a valid copy.
info!(self.log, "Verify extent {eid} still ready for copy");
let rc = match repair_server.extent_repair_ready(eid as u32).await {
Ok(rc) => rc.into_inner(),
Err(e) => {
crucible_bail!(
RepairRequestError,
"Failed to verify extent is still valid for repair {:?}",
e,
);
}
};

if !rc {
warn!(self.log, "The repair of {eid} is being aborted.");
crucible_bail!(
RepairRequestError,
"Extent {eid} on remote repair server is in incorrect state",
);
}

// After we have all files: move the repair dir.
info!(
self.log,
Expand Down
27 changes: 27 additions & 0 deletions downstairs/src/repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct FileServerContext {
region_dir: PathBuf,
read_only: bool,
region_definition: RegionDefinition,
repair_request_tx: mpsc::Sender<RepairReadyRequest>,
}

pub fn write_openapi<W: Write>(f: &mut W) -> Result<()> {
Expand All @@ -39,6 +40,7 @@ fn build_api() -> ApiDescription<Arc<FileServerContext>> {
api.register(get_files_for_extent).unwrap();
api.register(get_region_info).unwrap();
api.register(get_region_mode).unwrap();
api.register(extent_repair_ready).unwrap();

api
}
Expand All @@ -48,6 +50,7 @@ pub async fn repair_main(
ds: &Mutex<Downstairs>,
addr: SocketAddr,
log: &Logger,
repair_request_tx: mpsc::Sender<RepairReadyRequest>,
) -> Result<SocketAddr, String> {
/*
* We must specify a configuration with a bind address.
Expand Down Expand Up @@ -78,6 +81,7 @@ pub async fn repair_main(
region_dir,
read_only,
region_definition,
repair_request_tx,
};

/*
Expand Down Expand Up @@ -186,6 +190,29 @@ async fn get_a_file(path: PathBuf) -> Result<Response<Body>, HttpError> {
}
}

pub struct RepairReadyRequest {
pub tx: oneshot::Sender<bool>,
pub eid: usize,
}

/// Return true if the provided extent is closed or the region is read only
#[endpoint {
method = GET,
path = "/extent/{eid}/repair-ready",
}]
async fn extent_repair_ready(
rqctx: RequestContext<Arc<FileServerContext>>,
path: Path<Eid>,
) -> Result<HttpResponseOk<bool>, HttpError> {
let eid: usize = path.into_inner().eid as usize;

let (tx, rx) = oneshot::channel();
let rrr = RepairReadyRequest { tx, eid };
rqctx.context().repair_request_tx.send(rrr).await.unwrap();
let res = rx.await.unwrap();
Ok(HttpResponseOk(res))
}

/**
* Get the list of files related to an extent.
*
Expand Down
1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ futures.workspace = true
hex.workspace = true
httptest.workspace = true
rand.workspace = true
repair-client.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
69 changes: 69 additions & 0 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod test {
use futures::lock::Mutex;
use httptest::{matchers::*, responders::*, Expectation, Server};
use rand::Rng;
use repair_client::Client;
use sha2::Digest;
use slog::{info, o, warn, Drain, Logger};
use tempfile::*;
Expand Down Expand Up @@ -2818,6 +2819,74 @@ mod test {
Ok(())
}

#[tokio::test]
async fn integration_test_repair_ready_not_closed() -> Result<()> {
// Create a new downstairs.
// Verify repair ready returns false when an extent is open

// Create a downstairs
let new_ds = TestDownstairs::new(
"127.0.0.1".parse().unwrap(),
true, // encrypted
false, // read only
5,
2,
false,
Backend::RawFile,
)
.await
.unwrap();

let repair_addr = new_ds.repair_address().await;

let url = format!("http://{:?}", repair_addr);
let repair_server = Client::new(&url);

// Extent are open, so the repair ready request should return false.
let rc = repair_server.extent_repair_ready(0).await;
assert!(!rc.unwrap().into_inner());

let rc = repair_server.extent_repair_ready(1).await;
assert!(!rc.unwrap().into_inner());

Ok(())
}

#[tokio::test]
async fn integration_test_repair_ready_read_only() -> Result<()> {
// Create a new downstairs.
// Verify repair ready returns true for read only downstairs,
// even when extents are open.

// Create a downstairs
let new_ds = TestDownstairs::new(
"127.0.0.1".parse().unwrap(),
true, // encrypted
true, // read only
5,
2,
false,
Backend::RawFile,
)
.await
.unwrap();

let repair_addr = new_ds.repair_address().await;

let url = format!("http://{:?}", repair_addr);
let repair_server = Client::new(&url);

// Extent are not open, but the region is read only, so the requests
// should all return true.
let rc = repair_server.extent_repair_ready(0).await;
assert!(rc.unwrap().into_inner());

let rc = repair_server.extent_repair_ready(1).await;
assert!(rc.unwrap().into_inner());

Ok(())
}

#[tokio::test]
async fn integration_test_clone_diff_ec() -> Result<()> {
// Test downstairs region clone.
Expand Down
37 changes: 37 additions & 0 deletions openapi/downstairs-repair.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,43 @@
}
}
},
"/extent/{eid}/repair-ready": {
"get": {
"summary": "Return true if the provided extent is closed or the region is read only",
"operationId": "extent_repair_ready",
"parameters": [
{
"in": "path",
"name": "eid",
"required": true,
"schema": {
"type": "integer",
"format": "uint32",
"minimum": 0
}
}
],
"responses": {
"200": {
"description": "successful operation",
"content": {
"application/json": {
"schema": {
"title": "Boolean",
"type": "boolean"
}
}
}
},
"4XX": {
"$ref": "#/components/responses/Error"
},
"5XX": {
"$ref": "#/components/responses/Error"
}
}
}
},
"/newextent/{eid}/{file_type}": {
"get": {
"operationId": "get_extent_file",
Expand Down
4 changes: 2 additions & 2 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,8 +1247,8 @@ impl DownstairsClient {
if old_state != IOState::InProgress {
// This job is in an unexpected state.
panic!(
"[{}] Job {} completed while not InProgress: {:?}",
self.client_id, ds_id, old_state
"[{}] Job {} completed while not InProgress: {:?} {:?}",
self.client_id, ds_id, old_state, job
);
}

Expand Down
Loading