Skip to content

Commit 0d60de8

Browse files
authored
Combine enqueue and enqueue_repair (#1466)
The only difference between `enqueue` and `enqueue_repair` is that the latter _always_ sends jobs, instead of checking against live-repair state. We can instead move that logic into `send_io_live_repair`, always sending live-repair IO jobs (based on their `IOop` variants)
1 parent 5cfb23c commit 0d60de8

File tree

2 files changed

+15
-72
lines changed

2 files changed

+15
-72
lines changed

upstairs/src/downstairs.rs

+5-71
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,7 @@ impl Downstairs {
670670
return;
671671
}
672672

673+
debug!(self.log, "REPLAYING {ds_id} on {client_id}");
673674
self.clients[client_id].replay_job(job);
674675
to_send.push((*ds_id, job.work.clone()));
675676
});
@@ -1212,7 +1213,7 @@ impl Downstairs {
12121213

12131214
cdt::gw__noop__start!(|| (gw_noop_id.0));
12141215
gw.insert(gw_noop_id, noop_id);
1215-
self.enqueue_repair(noop_id, gw_noop_id, nio);
1216+
self.enqueue(noop_id, gw_noop_id, nio, ClientMap::new());
12161217
}
12171218

12181219
#[allow(clippy::too_many_arguments)]
@@ -1231,7 +1232,7 @@ impl Downstairs {
12311232
cdt::gw__repair__start!(|| (gw_repair_id.0, eid.0));
12321233

12331234
gw.insert(gw_repair_id, repair_id);
1234-
self.enqueue_repair(repair_id, gw_repair_id, repair_io);
1235+
self.enqueue(repair_id, gw_repair_id, repair_io, ClientMap::new());
12351236
}
12361237

12371238
fn repair_or_noop(
@@ -1450,7 +1451,7 @@ impl Downstairs {
14501451
cdt::gw__reopen__start!(|| (gw_reopen_id.0, eid.0));
14511452

14521453
gw.insert(gw_reopen_id, reopen_id);
1453-
self.enqueue_repair(reopen_id, gw_reopen_id, reopen_io);
1454+
self.enqueue(reopen_id, gw_reopen_id, reopen_io, ClientMap::new());
14541455
}
14551456

14561457
#[cfg(test)]
@@ -1586,7 +1587,7 @@ impl Downstairs {
15861587

15871588
cdt::gw__close__start!(|| (gw_close_id.0, eid.0));
15881589
gw.insert(gw_close_id, close_id);
1589-
self.enqueue_repair(close_id, gw_close_id, close_io);
1590+
self.enqueue(close_id, gw_close_id, close_io, ClientMap::new());
15901591
}
15911592

15921593
/// Get the repair IDs and dependencies for this extent.
@@ -2386,73 +2387,6 @@ impl Downstairs {
23862387
self.clients[client_id].send(message)
23872388
}
23882389

2389-
/// Enqueue a new downstairs live repair request. This enqueue variant will
2390-
/// sneakily bypass [DownstairsClient::enqueue] and insert the job's JobId
2391-
/// directly into the clients' [DownstairsClient::new_jobs] queues. This is
2392-
/// necessary because, under [DsState::LiveRepair], the normal
2393-
/// [DownstairsClient::enqueue] function will skip jobs for any extent which
2394-
/// hasn't been repaired yet. We need a way to queue up the repair work
2395-
/// without it being skipped!
2396-
///
2397-
/// That's what this function provides. To ensure this function is only
2398-
/// used for that purpose, it will panic if [io] is not a repair-related
2399-
/// operation
2400-
fn enqueue_repair(
2401-
&mut self,
2402-
ds_id: JobId,
2403-
guest_id: GuestWorkId,
2404-
io: IOop,
2405-
) {
2406-
let state = ClientData::from_fn(|cid| {
2407-
let current = self.clients[cid].state();
2408-
// If a downstairs is faulted, we can move that job directly
2409-
// to IOState::Skipped.
2410-
let s = match current {
2411-
DsState::Faulted
2412-
| DsState::Replaced
2413-
| DsState::Replacing
2414-
| DsState::LiveRepairReady => {
2415-
// TODO can we even get here?
2416-
self.clients[cid].skipped_jobs.insert(ds_id);
2417-
IOState::Skipped
2418-
}
2419-
_ => {
2420-
self.send(ds_id, io.clone(), cid);
2421-
IOState::InProgress
2422-
}
2423-
};
2424-
self.clients[cid].io_state_count.incr(&s);
2425-
s
2426-
});
2427-
assert!(
2428-
matches!(
2429-
io,
2430-
IOop::ExtentLiveReopen { .. }
2431-
| IOop::ExtentFlushClose { .. }
2432-
| IOop::ExtentLiveRepair { .. }
2433-
| IOop::ExtentLiveNoOp { .. }
2434-
),
2435-
"bad IOop: {:?}",
2436-
io,
2437-
);
2438-
2439-
debug!(self.log, "Enqueue repair job {}", ds_id);
2440-
self.ds_active.insert(
2441-
ds_id,
2442-
DownstairsIO {
2443-
ds_id,
2444-
guest_id,
2445-
work: io,
2446-
state,
2447-
acked: false,
2448-
replay: false,
2449-
data: None,
2450-
read_validations: vec![],
2451-
backpressure_guard: ClientMap::new(),
2452-
},
2453-
);
2454-
}
2455-
24562390
pub(crate) fn replace(
24572391
&mut self,
24582392
id: Uuid,

upstairs/src/lib.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,16 @@ impl IOop {
13061306
// (skipped).
13071307
// Return true if we should send it.
13081308
fn send_io_live_repair(&self, extent_limit: Option<ExtentId>) -> bool {
1309-
if let Some(extent_limit) = extent_limit {
1309+
// Always send live-repair IOs
1310+
if matches!(
1311+
self,
1312+
IOop::ExtentLiveReopen { .. }
1313+
| IOop::ExtentFlushClose { .. }
1314+
| IOop::ExtentLiveRepair { .. }
1315+
| IOop::ExtentLiveNoOp { .. }
1316+
) {
1317+
true
1318+
} else if let Some(extent_limit) = extent_limit {
13101319
// The extent_limit has been set, so we have repair work in
13111320
// progress. If our IO touches an extent less than or equal
13121321
// to the extent_limit, then we go ahead and send it.

0 commit comments

Comments
 (0)