From 5c86f7258d952fe240d0064560b2270df04973df Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Mon, 17 Mar 2025 12:05:36 -0400 Subject: [PATCH 1/2] Refactor live-repair start and send --- upstairs/src/downstairs.rs | 143 ++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 72 deletions(-) diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 885a870c7..3e5bc9ae7 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -1061,14 +1061,7 @@ impl Downstairs { } // Submit the initial repair jobs, which kicks everything off - self.begin_repair_for( - ExtentId(0), - Some(self.ddef.unwrap().extent_count()), - false, - &repair_downstairs, - source_downstairs, - up_state, - ); + self.start_live_repair(&repair_downstairs, source_downstairs, up_state); info!( self.log, @@ -1248,31 +1241,9 @@ impl Downstairs { flush_job: PendingJob::new(flush_id), } } else { - // Keep going! - repair.active_extent = next_extent; - - let repair_downstairs = repair.repair_downstairs.clone(); - let active_extent = repair.active_extent; - let aborting = repair.aborting_repair; - let source_downstairs = repair.source_downstairs; - let repair_id = repair.id; - let extent_count = repair.extent_count; - - self.notify_live_repair_progress( - repair_id, - active_extent, - extent_count, - ); - - self.begin_repair_for( - active_extent, - None, - aborting, - &repair_downstairs, - source_downstairs, - up_state, - ); + self.notify_live_repair_progress(repair_id, next_extent); + self.send_next_live_repair(up_state); }; } LiveRepairState::FinalFlush { .. } => { @@ -1386,29 +1357,74 @@ impl Downstairs { } } - /// Begins live-repair for the given extent + /// Begins live-repair /// - /// Claims initial IDs and submits initial jobs. If `extent_count` is set, - /// then we also set `self.repair` here; otherwise, we update the current - /// state (`self.repair.as_mut().unwrap().state`). - /// - /// If `aborting` is true, then all of the submitted jobs are no-ops. + /// Claims initial IDs and submits initial jobs, initializing `self.repair` /// /// # Panics - /// If the upstairs is not in `UpstairsState::Active`, or we _are not_ - /// aborting the repair but either (1) the source downstairs is not - /// `DsState::Active`, or (2) the repair downstairs are not all - /// `DsState::LiveRepair`. - #[allow(clippy::too_many_arguments)] - fn begin_repair_for( + /// If the upstairs is not in `UpstairsState::Active`, or either (1) the + /// source downstairs is not `DsState::Active`, or (2) the repair downstairs + /// are not all `DsState::LiveRepair`. + fn start_live_repair( &mut self, - extent: ExtentId, - extent_count: Option, - aborting: bool, repair_downstairs: &[ClientId], source_downstairs: ClientId, up_state: &UpstairsState, ) { + let extent_count = self.ddef.unwrap().extent_count(); + // Load a partially valid repair state (`min_id` and `state` are bogus); + // `state` will be updated in `send_live_repair_jobs`, and `min_jobs` + // will be set once we know the close job ID. + self.repair = Some(LiveRepairData { + id: Uuid::new_v4(), + extent_count, + repair_downstairs: repair_downstairs.to_vec(), + source_downstairs, + aborting_repair: false, + active_extent: ExtentId(0), + min_id: JobId(0), // fixed below + repair_job_ids: BTreeMap::new(), + state: LiveRepairState::dummy(), // fixed when sending jobs + }); + let extent_repair_ids = self.send_live_repair_jobs(up_state); + self.repair.as_mut().unwrap().min_id = extent_repair_ids.close_id; + } + + /// Increments `self.repair.active_extent` and sends new jobs + /// + /// # Panics + /// If any state is invalid, or `self.repair` is `None` + fn send_next_live_repair(&mut self, up_state: &UpstairsState) { + // This invalidates repair state, but we're about to update it + let repair = self.repair.as_mut().unwrap(); + repair.active_extent += 1; + self.send_live_repair_jobs(up_state); + } + + /// Begins live-repair for the next extent, based on `self.repair` + /// + /// Claims initial IDs and submits live-repair jobs, updating + /// `self.repair.state` with the new state. If `self.repair.aborting` is + /// true, then all of the submitted jobs are no-ops. + /// + /// # Panics + /// - If `self.repair` is `None` + /// - If the upstairs is not in `UpstairsState::Active`, or we _are not_ + /// aborting the repair but either (1) the source downstairs is not + /// `DsState::Active`, or (2) the repair downstairs are not all + /// `DsState::LiveRepair`. + fn send_live_repair_jobs( + &mut self, + up_state: &UpstairsState, + ) -> ExtentRepairIDs { + // Keep going! + let repair = self.repair.as_mut().unwrap(); + + let repair_downstairs = repair.repair_downstairs.clone(); + let extent = repair.active_extent; + let aborting = repair.aborting_repair; + let source_downstairs = repair.source_downstairs; + // Invariant checking to begin assert!( matches!(up_state, UpstairsState::Active), @@ -1422,7 +1438,7 @@ impl Downstairs { self.clients[source_downstairs].state(), DsState::Active ); - for &c in repair_downstairs { + for &c in &repair_downstairs { assert_eq!(self.clients[c].state(), DsState::LiveRepair); } } @@ -1452,44 +1468,27 @@ impl Downstairs { close_deps, ); - let state = LiveRepairState::Closing { + let repair = self.repair.as_mut().unwrap(); // reborrow + repair.state = LiveRepairState::Closing { close_job: PendingJob::new(close_id), repair_job: PendingJob::new(repair_id), reopen_job: PendingJob::new(reopen_id), noop_job: PendingJob::new(noop_id), }; - if let Some(extent_count) = extent_count { - self.repair = Some(LiveRepairData { - id: Uuid::new_v4(), - extent_count, - repair_downstairs: repair_downstairs.to_vec(), - source_downstairs, - aborting_repair: false, - active_extent: ExtentId(0), - min_id: close_id, - repair_job_ids: BTreeMap::new(), - state, - }); - } else { - self.repair.as_mut().unwrap().state = state; - } if aborting { - self.create_and_enqueue_noop_io(vec![noop_id], reopen_id) + self.create_and_enqueue_noop_io(vec![noop_id], reopen_id); + self.create_and_enqueue_noop_io(close_deps, close_id); } else { self.create_and_enqueue_reopen_io(extent, vec![noop_id], reopen_id); - }; - - if aborting { - self.create_and_enqueue_noop_io(close_deps, close_id) - } else { self.create_and_enqueue_close_io( extent, close_deps, close_id, - repair_downstairs, + &repair_downstairs, ) }; + extent_repair_ids } /// Creates a [DownstairsIO] job for an [IOop::ExtentLiveReopen], and @@ -3836,9 +3835,9 @@ impl Downstairs { &self, repair_id: Uuid, current_extent: ExtentId, - extent_count: u32, ) { if let Some(notify) = &self.notify { + let extent_count = self.ddef.unwrap().extent_count(); notify.send(NotifyRequest::LiveRepairProgress { upstairs_id: self.cfg.upstairs_id, repair_id, From 50e16fdefcc2595deeb44f1977988aac06d4b440 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 18 Mar 2025 10:01:42 -0400 Subject: [PATCH 2/2] Initialize min_id to peek_next_id --- upstairs/src/downstairs.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 3e5bc9ae7..78430f721 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -1372,9 +1372,8 @@ impl Downstairs { up_state: &UpstairsState, ) { let extent_count = self.ddef.unwrap().extent_count(); - // Load a partially valid repair state (`min_id` and `state` are bogus); - // `state` will be updated in `send_live_repair_jobs`, and `min_jobs` - // will be set once we know the close job ID. + // Load a partially valid repair state, with a dummy `state` (to be + // updated in `send_live_repair_jobs`). self.repair = Some(LiveRepairData { id: Uuid::new_v4(), extent_count, @@ -1382,12 +1381,11 @@ impl Downstairs { source_downstairs, aborting_repair: false, active_extent: ExtentId(0), - min_id: JobId(0), // fixed below + min_id: self.peek_next_id(), // upcoming close_id repair_job_ids: BTreeMap::new(), state: LiveRepairState::dummy(), // fixed when sending jobs }); - let extent_repair_ids = self.send_live_repair_jobs(up_state); - self.repair.as_mut().unwrap().min_id = extent_repair_ids.close_id; + self.send_live_repair_jobs(up_state); } /// Increments `self.repair.active_extent` and sends new jobs @@ -1413,10 +1411,7 @@ impl Downstairs { /// aborting the repair but either (1) the source downstairs is not /// `DsState::Active`, or (2) the repair downstairs are not all /// `DsState::LiveRepair`. - fn send_live_repair_jobs( - &mut self, - up_state: &UpstairsState, - ) -> ExtentRepairIDs { + fn send_live_repair_jobs(&mut self, up_state: &UpstairsState) { // Keep going! let repair = self.repair.as_mut().unwrap(); @@ -1488,7 +1483,6 @@ impl Downstairs { &repair_downstairs, ) }; - extent_repair_ids } /// Creates a [DownstairsIO] job for an [IOop::ExtentLiveReopen], and