Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor live-repair start and send #1673

Merged
merged 2 commits into from
Mar 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 65 additions & 72 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,14 +1061,7 @@ impl Downstairs {
}

// Submit the initial repair jobs, which kicks everything off
self.begin_repair_for(
ExtentId(0),
Some(self.ddef.unwrap().extent_count()),
false,
&repair_downstairs,
source_downstairs,
up_state,
);
self.start_live_repair(&repair_downstairs, source_downstairs, up_state);

info!(
self.log,
Expand Down Expand Up @@ -1248,31 +1241,9 @@ impl Downstairs {
flush_job: PendingJob::new(flush_id),
}
} else {
// Keep going!
repair.active_extent = next_extent;

let repair_downstairs = repair.repair_downstairs.clone();
let active_extent = repair.active_extent;
let aborting = repair.aborting_repair;
let source_downstairs = repair.source_downstairs;

let repair_id = repair.id;
let extent_count = repair.extent_count;

self.notify_live_repair_progress(
repair_id,
active_extent,
extent_count,
);

self.begin_repair_for(
active_extent,
None,
aborting,
&repair_downstairs,
source_downstairs,
up_state,
);
self.notify_live_repair_progress(repair_id, next_extent);
self.send_next_live_repair(up_state);
};
}
LiveRepairState::FinalFlush { .. } => {
Expand Down Expand Up @@ -1386,29 +1357,69 @@ impl Downstairs {
}
}

/// Begins live-repair for the given extent
///
/// Claims initial IDs and submits initial jobs. If `extent_count` is set,
/// then we also set `self.repair` here; otherwise, we update the current
/// state (`self.repair.as_mut().unwrap().state`).
/// Begins live-repair
///
/// If `aborting` is true, then all of the submitted jobs are no-ops.
/// Claims initial IDs and submits initial jobs, initializing `self.repair`
///
/// # Panics
/// If the upstairs is not in `UpstairsState::Active`, or we _are not_
/// aborting the repair but either (1) the source downstairs is not
/// `DsState::Active`, or (2) the repair downstairs are not all
/// `DsState::LiveRepair`.
#[allow(clippy::too_many_arguments)]
fn begin_repair_for(
/// If the upstairs is not in `UpstairsState::Active`, or either (1) the
/// source downstairs is not `DsState::Active`, or (2) the repair downstairs
/// are not all `DsState::LiveRepair`.
fn start_live_repair(
&mut self,
extent: ExtentId,
extent_count: Option<u32>,
aborting: bool,
repair_downstairs: &[ClientId],
source_downstairs: ClientId,
up_state: &UpstairsState,
) {
let extent_count = self.ddef.unwrap().extent_count();
// Load a partially valid repair state, with a dummy `state` (to be
// updated in `send_live_repair_jobs`).
self.repair = Some(LiveRepairData {
id: Uuid::new_v4(),
extent_count,
repair_downstairs: repair_downstairs.to_vec(),
source_downstairs,
aborting_repair: false,
active_extent: ExtentId(0),
min_id: self.peek_next_id(), // upcoming close_id
repair_job_ids: BTreeMap::new(),
state: LiveRepairState::dummy(), // fixed when sending jobs
});
self.send_live_repair_jobs(up_state);
}

/// Increments `self.repair.active_extent` and sends new jobs
///
/// # Panics
/// If any state is invalid, or `self.repair` is `None`
fn send_next_live_repair(&mut self, up_state: &UpstairsState) {
// This invalidates repair state, but we're about to update it
let repair = self.repair.as_mut().unwrap();
repair.active_extent += 1;
self.send_live_repair_jobs(up_state);
}

/// Begins live-repair for the next extent, based on `self.repair`
///
/// Claims initial IDs and submits live-repair jobs, updating
/// `self.repair.state` with the new state. If `self.repair.aborting` is
/// true, then all of the submitted jobs are no-ops.
///
/// # Panics
/// - If `self.repair` is `None`
/// - If the upstairs is not in `UpstairsState::Active`, or we _are not_
/// aborting the repair but either (1) the source downstairs is not
/// `DsState::Active`, or (2) the repair downstairs are not all
/// `DsState::LiveRepair`.
fn send_live_repair_jobs(&mut self, up_state: &UpstairsState) {
// Keep going!
let repair = self.repair.as_mut().unwrap();

let repair_downstairs = repair.repair_downstairs.clone();
let extent = repair.active_extent;
let aborting = repair.aborting_repair;
let source_downstairs = repair.source_downstairs;

// Invariant checking to begin
assert!(
matches!(up_state, UpstairsState::Active),
Expand All @@ -1422,7 +1433,7 @@ impl Downstairs {
self.clients[source_downstairs].state(),
DsState::Active
);
for &c in repair_downstairs {
for &c in &repair_downstairs {
assert_eq!(self.clients[c].state(), DsState::LiveRepair);
}
}
Expand Down Expand Up @@ -1452,42 +1463,24 @@ impl Downstairs {
close_deps,
);

let state = LiveRepairState::Closing {
let repair = self.repair.as_mut().unwrap(); // reborrow
repair.state = LiveRepairState::Closing {
close_job: PendingJob::new(close_id),
repair_job: PendingJob::new(repair_id),
reopen_job: PendingJob::new(reopen_id),
noop_job: PendingJob::new(noop_id),
};
if let Some(extent_count) = extent_count {
self.repair = Some(LiveRepairData {
id: Uuid::new_v4(),
extent_count,
repair_downstairs: repair_downstairs.to_vec(),
source_downstairs,
aborting_repair: false,
active_extent: ExtentId(0),
min_id: close_id,
repair_job_ids: BTreeMap::new(),
state,
});
} else {
self.repair.as_mut().unwrap().state = state;
}

if aborting {
self.create_and_enqueue_noop_io(vec![noop_id], reopen_id)
self.create_and_enqueue_noop_io(vec![noop_id], reopen_id);
self.create_and_enqueue_noop_io(close_deps, close_id);
} else {
self.create_and_enqueue_reopen_io(extent, vec![noop_id], reopen_id);
};

if aborting {
self.create_and_enqueue_noop_io(close_deps, close_id)
} else {
self.create_and_enqueue_close_io(
extent,
close_deps,
close_id,
repair_downstairs,
&repair_downstairs,
)
};
}
Expand Down Expand Up @@ -3836,9 +3829,9 @@ impl Downstairs {
&self,
repair_id: Uuid,
current_extent: ExtentId,
extent_count: u32,
) {
if let Some(notify) = &self.notify {
let extent_count = self.ddef.unwrap().extent_count();
notify.send(NotifyRequest::LiveRepairProgress {
upstairs_id: self.cfg.upstairs_id,
repair_id,
Expand Down