Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor live-repair start and send #1673

Merged
merged 2 commits into from
Mar 18, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 71 additions & 72 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,14 +1061,7 @@ impl Downstairs {
}

// Submit the initial repair jobs, which kicks everything off
self.begin_repair_for(
ExtentId(0),
Some(self.ddef.unwrap().extent_count()),
false,
&repair_downstairs,
source_downstairs,
up_state,
);
self.start_live_repair(&repair_downstairs, source_downstairs, up_state);

info!(
self.log,
Expand Down Expand Up @@ -1248,31 +1241,9 @@ impl Downstairs {
flush_job: PendingJob::new(flush_id),
}
} else {
// Keep going!
repair.active_extent = next_extent;

let repair_downstairs = repair.repair_downstairs.clone();
let active_extent = repair.active_extent;
let aborting = repair.aborting_repair;
let source_downstairs = repair.source_downstairs;

let repair_id = repair.id;
let extent_count = repair.extent_count;

self.notify_live_repair_progress(
repair_id,
active_extent,
extent_count,
);

self.begin_repair_for(
active_extent,
None,
aborting,
&repair_downstairs,
source_downstairs,
up_state,
);
self.notify_live_repair_progress(repair_id, next_extent);
self.send_next_live_repair(up_state);
};
}
LiveRepairState::FinalFlush { .. } => {
Expand Down Expand Up @@ -1386,29 +1357,74 @@ impl Downstairs {
}
}

/// Begins live-repair for the given extent
/// Begins live-repair
///
/// Claims initial IDs and submits initial jobs. If `extent_count` is set,
/// then we also set `self.repair` here; otherwise, we update the current
/// state (`self.repair.as_mut().unwrap().state`).
///
/// If `aborting` is true, then all of the submitted jobs are no-ops.
/// Claims initial IDs and submits initial jobs, initializing `self.repair`
///
/// # Panics
/// If the upstairs is not in `UpstairsState::Active`, or we _are not_
/// aborting the repair but either (1) the source downstairs is not
/// `DsState::Active`, or (2) the repair downstairs are not all
/// `DsState::LiveRepair`.
#[allow(clippy::too_many_arguments)]
fn begin_repair_for(
/// If the upstairs is not in `UpstairsState::Active`, or either (1) the
/// source downstairs is not `DsState::Active`, or (2) the repair downstairs
/// are not all `DsState::LiveRepair`.
fn start_live_repair(
&mut self,
extent: ExtentId,
extent_count: Option<u32>,
aborting: bool,
repair_downstairs: &[ClientId],
source_downstairs: ClientId,
up_state: &UpstairsState,
) {
let extent_count = self.ddef.unwrap().extent_count();
// Load a partially valid repair state (`min_id` and `state` are bogus);
// `state` will be updated in `send_live_repair_jobs`, and `min_jobs`
// will be set once we know the close job ID.
self.repair = Some(LiveRepairData {
id: Uuid::new_v4(),
extent_count,
repair_downstairs: repair_downstairs.to_vec(),
source_downstairs,
aborting_repair: false,
active_extent: ExtentId(0),
min_id: JobId(0), // fixed below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we send this first repair with min_id not set, is it possible this could be
a problem with dependency removal?

The path for the first repair goes like this:
send_live_repair_jobs()
-> create_and_enqueue_..()
-> self.enqueue()
-> self.send()
In send, we prune_deps(), where we check min_id.

I think it's going to be okay, but .. I want to be sure (as this was a problem with the initial LR and min_id was added to prevent a hang).

  • A non LiveRepair downstairs should not prune deps, as it must honor them.
  • A downstairs under Repair, this is the first actual repair it is getting, so any previous IO should have been skipped already, and should not be on the dependency list?

At least, thats what I think will happen. Does that make sense? Even if true, I still feel like we are relying on some behavior that may not be obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it should work as written, because there should be no IO going to a downstairs that's LiveRepairReady. However, I realized it's an easy fix to initialize min_id to self.peek_next_id(), so let's just do that instead to remove all doubt!

Fixed in 50e16fd

repair_job_ids: BTreeMap::new(),
state: LiveRepairState::dummy(), // fixed when sending jobs
});
let extent_repair_ids = self.send_live_repair_jobs(up_state);
self.repair.as_mut().unwrap().min_id = extent_repair_ids.close_id;
}

/// Increments `self.repair.active_extent` and sends new jobs
///
/// # Panics
/// If any state is invalid, or `self.repair` is `None`
fn send_next_live_repair(&mut self, up_state: &UpstairsState) {
// This invalidates repair state, but we're about to update it
let repair = self.repair.as_mut().unwrap();
repair.active_extent += 1;
self.send_live_repair_jobs(up_state);
}

/// Begins live-repair for the next extent, based on `self.repair`
///
/// Claims initial IDs and submits live-repair jobs, updating
/// `self.repair.state` with the new state. If `self.repair.aborting` is
/// true, then all of the submitted jobs are no-ops.
///
/// # Panics
/// - If `self.repair` is `None`
/// - If the upstairs is not in `UpstairsState::Active`, or we _are not_
/// aborting the repair but either (1) the source downstairs is not
/// `DsState::Active`, or (2) the repair downstairs are not all
/// `DsState::LiveRepair`.
fn send_live_repair_jobs(
&mut self,
up_state: &UpstairsState,
) -> ExtentRepairIDs {
// Keep going!
let repair = self.repair.as_mut().unwrap();

let repair_downstairs = repair.repair_downstairs.clone();
let extent = repair.active_extent;
let aborting = repair.aborting_repair;
let source_downstairs = repair.source_downstairs;

// Invariant checking to begin
assert!(
matches!(up_state, UpstairsState::Active),
Expand All @@ -1422,7 +1438,7 @@ impl Downstairs {
self.clients[source_downstairs].state(),
DsState::Active
);
for &c in repair_downstairs {
for &c in &repair_downstairs {
assert_eq!(self.clients[c].state(), DsState::LiveRepair);
}
}
Expand Down Expand Up @@ -1452,44 +1468,27 @@ impl Downstairs {
close_deps,
);

let state = LiveRepairState::Closing {
let repair = self.repair.as_mut().unwrap(); // reborrow
repair.state = LiveRepairState::Closing {
close_job: PendingJob::new(close_id),
repair_job: PendingJob::new(repair_id),
reopen_job: PendingJob::new(reopen_id),
noop_job: PendingJob::new(noop_id),
};
if let Some(extent_count) = extent_count {
self.repair = Some(LiveRepairData {
id: Uuid::new_v4(),
extent_count,
repair_downstairs: repair_downstairs.to_vec(),
source_downstairs,
aborting_repair: false,
active_extent: ExtentId(0),
min_id: close_id,
repair_job_ids: BTreeMap::new(),
state,
});
} else {
self.repair.as_mut().unwrap().state = state;
}

if aborting {
self.create_and_enqueue_noop_io(vec![noop_id], reopen_id)
self.create_and_enqueue_noop_io(vec![noop_id], reopen_id);
self.create_and_enqueue_noop_io(close_deps, close_id);
} else {
self.create_and_enqueue_reopen_io(extent, vec![noop_id], reopen_id);
};

if aborting {
self.create_and_enqueue_noop_io(close_deps, close_id)
} else {
self.create_and_enqueue_close_io(
extent,
close_deps,
close_id,
repair_downstairs,
&repair_downstairs,
)
};
extent_repair_ids
}

/// Creates a [DownstairsIO] job for an [IOop::ExtentLiveReopen], and
Expand Down Expand Up @@ -3836,9 +3835,9 @@ impl Downstairs {
&self,
repair_id: Uuid,
current_extent: ExtentId,
extent_count: u32,
) {
if let Some(notify) = &self.notify {
let extent_count = self.ddef.unwrap().extent_count();
notify.send(NotifyRequest::LiveRepairProgress {
upstairs_id: self.cfg.upstairs_id,
repair_id,
Expand Down