Skip to content

Commit 8f21faf

Browse files
committed
Trying to simplify enqueue
1 parent 381047a commit 8f21faf

File tree

2 files changed

+53
-59
lines changed

2 files changed

+53
-59
lines changed

upstairs/src/client.rs

+36-57
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ use crate::{
66
DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse,
77
ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata,
88
};
9-
use crucible_common::{
10-
x509::TLSContext, ExtentId, NegotiationError, VerboseTimeout,
11-
};
9+
use crucible_common::{x509::TLSContext, NegotiationError, VerboseTimeout};
1210
use crucible_protocol::{
1311
MessageWriter, ReconciliationId, CRUCIBLE_MESSAGE_VERSION,
1412
};
@@ -809,29 +807,16 @@ impl DownstairsClient {
809807
self.state = DsState::Active;
810808
}
811809

812-
/// Checks whether the given job should be sent
813-
///
814-
/// Returns an [`EnqueueResult`] indicating how the caller should handle the
815-
/// packet.
810+
/// Applies an [`EnqueueResult`] for the given job
816811
///
817812
/// If the job should be skipped, then it is added to `self.skipped_jobs`.
818813
/// `self.io_state_job_count` is updated with the incoming job state.
819-
#[must_use]
820-
pub(crate) fn enqueue(
814+
pub(crate) fn apply_enqueue_result(
821815
&mut self,
822816
ds_id: JobId,
823817
io: &IOop,
824-
last_repair_extent: Option<ExtentId>,
825-
) -> EnqueueResult {
826-
// If a downstairs is faulted or ready for repair, we can move
827-
// that job directly to IOState::Skipped
828-
// If a downstairs is in repair, then we need to see if this
829-
// IO is on a repaired extent or not. If an IO spans extents
830-
// where some are repaired and some are not, then this IO had
831-
// better have the dependencies already set to reflect the
832-
// requirement that a repair IO will need to finish first.
833-
let should_send = self.should_send(io, last_repair_extent);
834-
818+
should_send: EnqueueResult,
819+
) {
835820
// Update our set of skipped jobs if we're not sending this one
836821
if matches!(should_send, EnqueueResult::Skip) {
837822
self.skipped_jobs.insert(ds_id);
@@ -841,20 +826,11 @@ impl DownstairsClient {
841826
let state = should_send.state();
842827
self.io_state_job_count[&state] += 1;
843828
self.io_state_byte_count[&state] += io.job_bytes();
844-
845-
should_send
846829
}
847830

848-
/// Checks whether the given job should be sent or skipped
849-
///
850-
/// Returns an [`EnqueueResult`] indicating how the caller should handle the
851-
/// packet.
831+
/// Checks whether the client is accepting IO
852832
#[must_use]
853-
fn should_send(
854-
&self,
855-
io: &IOop,
856-
last_repair_extent: Option<ExtentId>,
857-
) -> EnqueueResult {
833+
pub fn should_send(&self) -> ShouldSendResult {
858834
match self.state {
859835
// We never send jobs if we're in certain inactive states
860836
DsState::Connecting {
@@ -863,7 +839,7 @@ impl DownstairsClient {
863839
} if self.cfg.read_only => {
864840
// Read only upstairs can connect with just a single downstairs
865841
// ready, we skip jobs on the other downstairs till they connect.
866-
EnqueueResult::Skip
842+
ShouldSendResult::Skip
867843
}
868844
DsState::Connecting {
869845
mode: ConnectionMode::Faulted | ConnectionMode::Replaced,
@@ -874,34 +850,22 @@ impl DownstairsClient {
874850
| ClientStopReason::Disabled
875851
| ClientStopReason::Replacing
876852
| ClientStopReason::NegotiationFailed(..),
877-
) => EnqueueResult::Skip,
878-
879-
// We conditionally send jobs if we're in live-repair, depending on
880-
// the current extent.
881-
DsState::LiveRepair => {
882-
// Pick the latest repair limit that's relevant for this
883-
// downstairs. This is either the extent under repair (if
884-
// there are no reserved repair jobs), or the last extent
885-
// for which we have reserved a repair job ID; either way, the
886-
// caller has provided it to us.
887-
if io.send_io_live_repair(last_repair_extent) {
888-
EnqueueResult::Send
889-
} else {
890-
EnqueueResult::Skip
891-
}
892-
}
893-
894-
// Send jobs if the client is active or offline
895-
//
896-
// Sending jobs to an offline client seems counter-intuitive, but it
897-
// means that those jobs are marked as InProgress, so they aren't
898-
// cleared out by a subsequent flush (so we'll be able to bring that
899-
// client back into compliance by replaying jobs).
900-
DsState::Active => EnqueueResult::Send,
853+
) => ShouldSendResult::Skip,
854+
855+
// Send jobs if the client is active or in live-repair. The caller
856+
// is responsible for checking whether live-repair jobs should be
857+
// skipped, and this happens outside of this function
858+
DsState::Active => ShouldSendResult::Send,
859+
DsState::LiveRepair => ShouldSendResult::CheckLiveRepair,
860+
861+
// Holding jobs for an offline client means that those jobs are
862+
// marked as InProgress, so they aren't cleared out by a subsequent
863+
// flush (so we'll be able to bring that client back into compliance
864+
// by replaying jobs).
901865
DsState::Connecting {
902866
mode: ConnectionMode::Offline,
903867
..
904-
} => EnqueueResult::Hold,
868+
} => ShouldSendResult::Hold,
905869

906870
DsState::Stopping(ClientStopReason::Deactivated)
907871
| DsState::Connecting {
@@ -2046,6 +2010,7 @@ pub(crate) enum NegotiationResult {
20462010
}
20472011

20482012
/// Result value from [`DownstairsClient::enqueue`]
2013+
#[derive(Copy, Clone)]
20492014
pub(crate) enum EnqueueResult {
20502015
/// The given job should be marked as in progress and sent
20512016
Send,
@@ -2061,6 +2026,20 @@ pub(crate) enum EnqueueResult {
20612026
Skip,
20622027
}
20632028

2029+
/// Result value from [`DownstairsClient::should_send`]
2030+
///
2031+
/// This is a superset of [`EnqueueResult`], which includes a value indicating
2032+
/// that the client is in live-repair and the caller should figure it out based
2033+
/// on the last active live-repair extent.
2034+
pub(crate) enum ShouldSendResult {
2035+
Send,
2036+
Hold,
2037+
Skip,
2038+
2039+
/// The caller should check against our active live-repair extent
2040+
CheckLiveRepair,
2041+
}
2042+
20642043
impl EnqueueResult {
20652044
pub(crate) fn state(&self) -> IOState {
20662045
match self {

upstairs/src/downstairs.rs

+17-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
client::{
1212
ClientAction, ClientFaultReason, ClientNegotiationFailed,
1313
ClientRunResult, ClientStopReason, DownstairsClient, EnqueueResult,
14-
NegotiationState,
14+
NegotiationState, ShouldSendResult,
1515
},
1616
guest::GuestBlockRes,
1717
io_limits::{IOLimitGuard, IOLimits},
@@ -2288,7 +2288,22 @@ impl Downstairs {
22882288
// Send the job to each client!
22892289
let state = ClientData::from_fn(|cid| {
22902290
let client = &mut self.clients[cid];
2291-
let r = client.enqueue(ds_id, &io, last_repair_extent);
2291+
// should_send -> ShouldSendResult
2292+
// (Send, Hold, Skip, CheckLiveRepair)
2293+
// then convert from that into EnqueueResult and apply it
2294+
let r = match client.should_send() {
2295+
ShouldSendResult::Send => EnqueueResult::Send,
2296+
ShouldSendResult::Hold => EnqueueResult::Hold,
2297+
ShouldSendResult::Skip => EnqueueResult::Skip,
2298+
ShouldSendResult::CheckLiveRepair => {
2299+
if io.send_io_live_repair(last_repair_extent) {
2300+
EnqueueResult::Send
2301+
} else {
2302+
EnqueueResult::Skip
2303+
}
2304+
}
2305+
};
2306+
client.apply_enqueue_result(ds_id, &io, r);
22922307
match r {
22932308
EnqueueResult::Send => self.send(ds_id, io.clone(), cid),
22942309
EnqueueResult::Hold => (),

0 commit comments

Comments
 (0)