Skip to content

Commit 51347bd

Browse files
authored
Increment write backpressure before deferred encryption (#1444)
(Staged on top of #1443 ) This PR tightens the feedback loop for backpressure, by incrementing our counters _before_ encryption is complete. It uses the new RAII `BackpressureGuard` type, so we don't need to worry about weird accounting failures. Before: ``` ┌────────────┐ incr ┌──┤backpressure│◄───┐ │ └────────────┘ │ ┌─────┐ ┌───▼──┐ ┌──────────┐ │ ┌───────┐ │Guest├─►│submit├───►│encryption├─┼───►│clients│ └───▲─┘ └──────┘ └──────────┘ │ └───────┘ │ │ └─────────────────────────────┘ ack ``` After: ``` ┌────────────┐ ┌──┤backpressure│ │ └─▲──────────┘ │ │incr ┌─────┐ ┌───▼──┐ │ ┌──────────┐ ┌───────┐ │Guest├─►│submit├─┴─►│encryption├─┬───►│clients│ └───▲─┘ └──────┘ └──────────┘ │ └───────┘ │ │ └─────────────────────────────┘ ack ```
1 parent 54f2e0c commit 51347bd

File tree

6 files changed

+81
-5
lines changed

6 files changed

+81
-5
lines changed

upstairs/src/backpressure.rs

+30
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,22 @@ impl Drop for BackpressureGuard {
5959
}
6060
}
6161

62+
impl BackpressureGuard {
63+
#[cfg(test)]
64+
pub fn dummy() -> Self {
65+
let counter = Arc::new(BackpressureCountersInner {
66+
write_bytes: 0.into(),
67+
io_bytes: 0.into(),
68+
jobs: 1.into(),
69+
});
70+
Self {
71+
counter,
72+
write_bytes: 0,
73+
io_bytes: 0,
74+
}
75+
}
76+
}
77+
6278
impl BackpressureCounters {
6379
pub fn new() -> Self {
6480
Self(Arc::new(BackpressureCountersInner {
@@ -80,6 +96,20 @@ impl BackpressureCounters {
8096
self.0.jobs.load(Ordering::Relaxed)
8197
}
8298

99+
/// Stores write / IO bytes (and 1 job) for a pending write
100+
#[must_use]
101+
pub fn early_write_increment(&mut self, bytes: u64) -> BackpressureGuard {
102+
self.0.write_bytes.fetch_add(bytes, Ordering::Relaxed);
103+
self.0.io_bytes.fetch_add(bytes, Ordering::Relaxed);
104+
self.0.jobs.fetch_add(1, Ordering::Relaxed);
105+
BackpressureGuard {
106+
counter: self.0.clone(),
107+
write_bytes: bytes,
108+
io_bytes: bytes,
109+
// implicit 1 job
110+
}
111+
}
112+
83113
/// Stores write / IO bytes (and 1 job) in the backpressure counters
84114
#[must_use]
85115
pub fn increment(&mut self, io: &IOop) -> BackpressureGuard {

upstairs/src/client.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,10 @@ impl DownstairsClient {
379379
// backpressure. Remove the backpressure guard for this client,
380380
// which decrements backpressure counters on drop.
381381
job.backpressure_guard.take(&self.client_id);
382-
} else if is_running && !was_running {
382+
} else if is_running
383+
&& !was_running
384+
&& !job.backpressure_guard.contains(&self.client_id)
385+
{
383386
// This should only happen if a job is replayed, but that still
384387
// counts!
385388
job.backpressure_guard.insert(
@@ -935,7 +938,8 @@ impl DownstairsClient {
935938
IOState::New
936939
}
937940
};
938-
if r == IOState::New {
941+
if r == IOState::New && !io.backpressure_guard.contains(&self.client_id)
942+
{
939943
io.backpressure_guard.insert(
940944
self.client_id,
941945
self.backpressure_counters.increment(&io.work),

upstairs/src/deferred.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
use std::sync::Arc;
44

55
use crate::{
6-
client::ConnectionId, upstairs::UpstairsConfig, BlockContext, BlockOp,
7-
BlockRes, ClientId, ImpactedBlocks, Message, RawWrite, Validation,
6+
backpressure::BackpressureGuard, client::ConnectionId,
7+
upstairs::UpstairsConfig, BlockContext, BlockOp, BlockRes, ClientData,
8+
ClientId, ImpactedBlocks, Message, RawWrite, Validation,
89
};
910
use bytes::BytesMut;
1011
use crucible_common::{integrity_hash, CrucibleError, RegionDefinition};
@@ -114,6 +115,7 @@ pub(crate) struct DeferredWrite {
114115
pub res: BlockRes,
115116
pub is_write_unwritten: bool,
116117
pub cfg: Arc<UpstairsConfig>,
118+
pub guard: ClientData<BackpressureGuard>,
117119
}
118120

119121
/// Result of a deferred `BlockOp`
@@ -135,6 +137,7 @@ pub(crate) struct EncryptedWrite {
135137
pub impacted_blocks: ImpactedBlocks,
136138
pub res: BlockRes,
137139
pub is_write_unwritten: bool,
140+
pub guard: ClientData<BackpressureGuard>,
138141
}
139142

140143
impl DeferredWrite {
@@ -183,6 +186,7 @@ impl DeferredWrite {
183186
impacted_blocks: self.impacted_blocks,
184187
res: self.res,
185188
is_write_unwritten: self.is_write_unwritten,
189+
guard: self.guard,
186190
}
187191
}
188192
}

upstairs/src/downstairs.rs

+20-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use crate::{
10+
backpressure::BackpressureGuard,
1011
cdt,
1112
client::{ClientAction, ClientStopReason, DownstairsClient},
1213
guest::GuestWork,
@@ -1822,6 +1823,7 @@ impl Downstairs {
18221823
GuestWorkId(10),
18231824
request,
18241825
is_write_unwritten,
1826+
ClientData::from_fn(|_| BackpressureGuard::dummy()),
18251827
)
18261828
}
18271829

@@ -1831,6 +1833,7 @@ impl Downstairs {
18311833
gw_id: GuestWorkId,
18321834
write: RawWrite,
18331835
is_write_unwritten: bool,
1836+
bp_guard: ClientData<BackpressureGuard>,
18341837
) -> JobId {
18351838
let ds_id = self.next_id();
18361839
let dependencies = self.ds_active.deps_for_write(ds_id, blocks);
@@ -1872,7 +1875,7 @@ impl Downstairs {
18721875
replay: false,
18731876
data: None,
18741877
read_validations: Vec::new(),
1875-
backpressure_guard: ClientMap::new(),
1878+
backpressure_guard: bp_guard.into(),
18761879
};
18771880
self.enqueue(io);
18781881
ds_id
@@ -2453,6 +2456,7 @@ impl Downstairs {
24532456
blocks: ImpactedBlocks,
24542457
write: RawWrite,
24552458
is_write_unwritten: bool,
2459+
backpressure_guard: ClientData<BackpressureGuard>,
24562460
) -> JobId {
24572461
// If there is a live-repair in progress that intersects with this read,
24582462
// then reserve job IDs for those jobs.
@@ -2463,6 +2467,7 @@ impl Downstairs {
24632467
guest_id,
24642468
write,
24652469
is_write_unwritten,
2470+
backpressure_guard,
24662471
)
24672472
}
24682473

@@ -3674,6 +3679,7 @@ impl Downstairs {
36743679
data,
36753680
},
36763681
is_write_unwritten,
3682+
ClientData::from_fn(|_| BackpressureGuard::dummy()),
36773683
)
36783684
}
36793685

@@ -4422,6 +4428,19 @@ impl Downstairs {
44224428
}
44234429
});
44244430
}
4431+
4432+
/// Assign the given number of write bytes to the backpressure counters
4433+
#[must_use]
4434+
pub(crate) fn early_write_backpressure(
4435+
&mut self,
4436+
bytes: u64,
4437+
) -> ClientData<BackpressureGuard> {
4438+
ClientData::from_fn(|i| {
4439+
self.clients[i]
4440+
.backpressure_counters
4441+
.early_write_increment(bytes)
4442+
})
4443+
}
44254444
}
44264445

44274446
/// Configuration for per-client backpressure

upstairs/src/lib.rs

+15
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,15 @@ impl<T> ClientData<T> {
427427
std::mem::swap(&mut self[c], &mut v);
428428
v
429429
}
430+
431+
/// Builds a `ClientData` from a builder function
432+
pub fn from_fn<F: FnMut(ClientId) -> T>(mut f: F) -> Self {
433+
Self([
434+
f(ClientId::new(0)),
435+
f(ClientId::new(1)),
436+
f(ClientId::new(2)),
437+
])
438+
}
430439
}
431440

432441
/// Map of data associated with clients, keyed by `ClientId`
@@ -466,6 +475,12 @@ impl<T> std::ops::Index<ClientId> for ClientMap<T> {
466475
}
467476
}
468477

478+
impl<T> From<ClientData<T>> for ClientMap<T> {
479+
fn from(c: ClientData<T>) -> Self {
480+
Self(ClientData(c.0.map(Option::Some)))
481+
}
482+
}
483+
469484
/*
470485
* These counts describe the various states that a Downstairs IO can
471486
* be in.

upstairs/src/upstairs.rs

+4
Original file line numberDiff line numberDiff line change
@@ -1439,13 +1439,16 @@ impl Upstairs {
14391439
let impacted_blocks =
14401440
extent_from_offset(&ddef, offset, ddef.bytes_to_blocks(data.len()));
14411441

1442+
let guard = self.downstairs.early_write_backpressure(data.len() as u64);
1443+
14421444
Some(DeferredWrite {
14431445
ddef,
14441446
impacted_blocks,
14451447
data,
14461448
res,
14471449
is_write_unwritten,
14481450
cfg: self.cfg.clone(),
1451+
guard,
14491452
})
14501453
}
14511454

@@ -1473,6 +1476,7 @@ impl Upstairs {
14731476
write.impacted_blocks,
14741477
write.data,
14751478
write.is_write_unwritten,
1479+
write.guard,
14761480
)
14771481
},
14781482
Some(GuestBlockRes::Other(write.res)),

0 commit comments

Comments
 (0)