Skip to content

Commit afcce41

Browse files
committedApr 18, 2024
Use to-infinity backpressure with per-client backpressure as well
1 parent 1e2b998 commit afcce41

File tree

5 files changed

+127
-147
lines changed

5 files changed

+127
-147
lines changed
 

‎upstairs/src/backpressure.rs

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2024 Oxide Computer Company
2+
3+
use tokio::time::Duration;
4+
5+
/// Configuration for host-side backpressure
6+
///
7+
/// Backpressure adds an artificial delay to host write messages (which are
8+
/// otherwise acked immediately, before actually being complete). The delay is
9+
/// varied based on two metrics:
10+
///
11+
/// - number of write bytes outstanding
12+
/// - number of jobs in the queue
13+
///
14+
/// The backpressure delay is the max of byte-based delay and queue-based delay.
15+
#[derive(Copy, Clone, Debug)]
16+
pub(crate) struct BackpressureConfig {
17+
/// Configuration for byte-based backpressure
18+
pub bytes: BackpressureScale,
19+
/// Configuration for job-based backpressure
20+
pub jobs: BackpressureScale,
21+
}
22+
23+
/// Single backpressure curve configuration
24+
#[derive(Copy, Clone, Debug)]
25+
pub(crate) struct BackpressureScale {
26+
pub start: u64,
27+
pub max: u64,
28+
pub scale: Duration,
29+
}
30+
31+
impl BackpressureScale {
32+
// Our chosen backpressure curve is quadratic for 1/2 of its range, then
33+
// goes to infinity in the second half. This gives C0 + C1 continuity.
34+
pub fn curve(&self, n: u64) -> Duration {
35+
// Calculate a value from 0-2
36+
let frac = n.saturating_sub(self.start) as f64
37+
/ (self.max - self.start) as f64
38+
* 2.0;
39+
40+
let v = if frac < 1.0 {
41+
frac
42+
} else {
43+
1.0 / (1.0 - (frac - 1.0))
44+
};
45+
self.scale.mul_f64(v.powi(2))
46+
}
47+
}
48+
49+
impl BackpressureConfig {
50+
#[cfg(test)]
51+
pub fn disable(&mut self) {
52+
self.bytes.scale = Duration::ZERO;
53+
self.jobs.scale = Duration::ZERO;
54+
}
55+
56+
pub fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 {
57+
// Special case if backpressure is disabled
58+
if self.bytes.scale == Duration::ZERO
59+
&& self.jobs.scale == Duration::ZERO
60+
{
61+
return 0;
62+
}
63+
64+
// Saturate at 1 hour per job, which is basically infinite
65+
if bytes >= self.bytes.max || jobs >= self.jobs.max {
66+
return Duration::from_secs(60 * 60).as_micros() as u64;
67+
}
68+
69+
let delay_bytes = self.bytes.curve(bytes).as_micros() as u64;
70+
let delay_jobs = self.jobs.curve(jobs).as_micros() as u64;
71+
72+
delay_bytes.max(delay_jobs)
73+
}
74+
}

‎upstairs/src/downstairs.rs

+33-64
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ use crate::{
1313
live_repair::ExtentInfo,
1414
stats::UpStatOuter,
1515
upstairs::{UpstairsConfig, UpstairsState},
16-
AckStatus, ActiveJobs, AllocRingBuffer, ClientData, ClientIOStateCount,
17-
ClientId, ClientMap, CrucibleError, DownstairsIO, DownstairsMend, DsState,
18-
ExtentFix, ExtentRepairIDs, GuestWorkId, IOState, IOStateCount, IOop,
19-
ImpactedBlocks, JobId, Message, ReadRequest, ReadResponse, ReconcileIO,
20-
ReconciliationId, RegionDefinition, ReplaceResult, SnapshotDetails,
21-
WorkSummary,
16+
AckStatus, ActiveJobs, AllocRingBuffer, BackpressureConfig,
17+
BackpressureScale, ClientData, ClientIOStateCount, ClientId, ClientMap,
18+
CrucibleError, DownstairsIO, DownstairsMend, DsState, ExtentFix,
19+
ExtentRepairIDs, GuestWorkId, IOState, IOStateCount, IOop, ImpactedBlocks,
20+
JobId, Message, ReadRequest, ReadResponse, ReconcileIO, ReconciliationId,
21+
RegionDefinition, ReplaceResult, SnapshotDetails, WorkSummary,
2222
};
2323
use crucible_protocol::{RawWrite, WriteHeader};
2424

@@ -65,7 +65,17 @@ pub(crate) struct Downstairs {
6565
pub(crate) clients: ClientData<DownstairsClient>,
6666

6767
/// Per-client backpressure configuration
68-
backpressure_config: DownstairsBackpressureConfig,
68+
///
69+
/// This backpressure adds an artificial delay to the client queues, to keep
70+
/// the three clients relatively in sync. The delay is varied based on two
71+
/// metrics:
72+
///
73+
/// - number of write bytes outstanding
74+
/// - queue length
75+
///
76+
/// These metrics are _relative_ to the slowest downstairs; the goal is to
77+
/// slow down the faster Downstairs to keep the gap bounded.
78+
backpressure_config: BackpressureConfig,
6979

7080
/// The active list of IO for the downstairs.
7181
pub(crate) ds_active: ActiveJobs,
@@ -304,22 +314,20 @@ impl Downstairs {
304314
let clients = clients.map(Option::unwrap);
305315
Self {
306316
clients: ClientData(clients),
307-
backpressure_config: DownstairsBackpressureConfig {
317+
backpressure_config: BackpressureConfig {
308318
// start byte-based backpressure at 25 MiB of difference
309-
bytes_start: 25 * 1024 * 1024,
310-
// bytes_scale is chosen to have 1.5 ms of delay at 100 MiB of
311-
// discrepancy
312-
bytes_scale: 5e-7,
319+
bytes: BackpressureScale {
320+
start: 25 * 1024 * 1024,
321+
max: 1024 * 1024 * 200, // max difference is 200 MiB
322+
scale: Duration::from_micros(1500), // 1.5 ms at 100 MiB
323+
},
313324

314325
// start job-based backpressure at 100 jobs of discrepancy
315-
jobs_start: 100,
316-
// job_scale is chosen to have 1 ms of of per-job delay at 900
317-
// jobs of discrepancy
318-
jobs_scale: 0.04,
319-
320-
// max delay is 100 ms, chosen experimentally to keep downstairs
321-
// in sync even in heavily loaded systems
322-
max_delay: Duration::from_millis(100),
326+
jobs: BackpressureScale {
327+
start: 100,
328+
max: 2000,
329+
scale: Duration::from_millis(1), // 1 ms at 1000 jobs
330+
},
323331
},
324332
cfg,
325333
next_flush: 0,
@@ -3801,7 +3809,7 @@ impl Downstairs {
38013809

38023810
#[cfg(test)]
38033811
pub(crate) fn disable_client_backpressure(&mut self) {
3804-
self.backpressure_config.max_delay = Duration::ZERO;
3812+
self.backpressure_config.disable();
38053813
}
38063814

38073815
pub(crate) fn set_client_backpressure(&self) {
@@ -3839,22 +3847,10 @@ impl Downstairs {
38393847
let bytes_gap =
38403848
(max_bytes - c.total_bytes_outstanding()) as u64;
38413849

3842-
let job_delay_us = (job_gap
3843-
.saturating_sub(self.backpressure_config.jobs_start)
3844-
as f64
3845-
* self.backpressure_config.jobs_scale)
3846-
.powf(2.0);
3847-
3848-
let bytes_delay_us = (bytes_gap
3849-
.saturating_sub(self.backpressure_config.bytes_start)
3850-
as f64
3851-
* self.backpressure_config.bytes_scale)
3852-
.powf(2.0);
3853-
3854-
let delay_us = (job_delay_us.max(bytes_delay_us) as u64)
3855-
.min(self.backpressure_config.max_delay.as_micros() as u64);
3856-
3857-
delays[i] = Some(delay_us);
3850+
delays[i] = Some(
3851+
self.backpressure_config
3852+
.get_backpressure_us(bytes_gap, job_gap),
3853+
);
38583854
}
38593855
}
38603856
// Cancel out any common delay, because it wouldn't be useful.
@@ -4427,33 +4423,6 @@ impl Downstairs {
44274423
}
44284424
}
44294425

4430-
/// Configuration for per-client backpressure
4431-
///
4432-
/// Per-client backpressure adds an artificial delay to the client queues, to
4433-
/// keep the three clients relatively in sync. The delay is varied based on two
4434-
/// metrics:
4435-
///
4436-
/// - number of write bytes outstanding
4437-
/// - queue length
4438-
///
4439-
/// These metrics are _relative_ to the slowest downstairs; the goal is to slow
4440-
/// down the faster Downstairs to keep the gap bounded.
4441-
#[derive(Copy, Clone, Debug)]
4442-
struct DownstairsBackpressureConfig {
4443-
/// When should backpressure start (in bytes)?
4444-
bytes_start: u64,
4445-
/// Scale for byte-based quadratic backpressure
4446-
bytes_scale: f64,
4447-
4448-
/// When should job-count-based backpressure start?
4449-
jobs_start: u64,
4450-
/// Scale for job-count-based quadratic backpressure
4451-
jobs_scale: f64,
4452-
4453-
/// Maximum delay
4454-
max_delay: Duration,
4455-
}
4456-
44574426
#[cfg(test)]
44584427
pub(crate) mod test {
44594428
use super::Downstairs;

‎upstairs/src/dummy_downstairs_tests.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,7 @@ impl TestHarness {
518518
// Configure our guest without backpressure, to speed up tests which
519519
// require triggering a timeout
520520
let (g, mut io) = Guest::new(Some(log.clone()));
521-
io.disable_queue_backpressure();
522-
io.disable_byte_backpressure();
521+
io.disable_backpressure();
523522
let guest = Arc::new(g);
524523

525524
let crucible_opts = CrucibleOpts {

‎upstairs/src/guest.rs

+15-80
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::{
1010
};
1111

1212
use crate::{
13+
backpressure::{BackpressureConfig, BackpressureScale},
1314
BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, JobId, ReplaceResult,
1415
UpstairsAction, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS,
1516
};
@@ -304,71 +305,6 @@ pub struct Guest {
304305
log: Logger,
305306
}
306307

307-
/// Configuration for host-side backpressure
308-
///
309-
/// Backpressure adds an artificial delay to host write messages (which are
310-
/// otherwise acked immediately, before actually being complete). The delay is
311-
/// varied based on two metrics:
312-
///
313-
/// - number of write bytes outstanding (as a fraction of max)
314-
/// - queue length as a fraction (where 1.0 is full)
315-
///
316-
/// These two metrics are used for quadratic backpressure, picking the larger of
317-
/// the two delays.
318-
#[derive(Copy, Clone, Debug)]
319-
struct BackpressureConfig {
320-
/// When should backpressure start, in units of bytes
321-
bytes_start: u64,
322-
/// Maximum number of bytes (i.e. backpressure goes to infinity)
323-
bytes_max: u64,
324-
/// Scale of bytes-based backpressure
325-
bytes_scale: Duration,
326-
327-
/// When should backpressure start, in units of jobs
328-
queue_start: u64,
329-
/// Maximum number of jobs (i.e. backpressure goes to infinity)
330-
queue_max: u64,
331-
/// Scale of queue-based delay
332-
queue_scale: Duration,
333-
}
334-
335-
impl BackpressureConfig {
336-
// Our chosen backpressure curve is quadratic for 1/2 of its range, then
337-
// goes to infinity in the second half. This gives C0 + C1 continuity.
338-
fn curve(frac: f64, scale: Duration) -> Duration {
339-
// Remap from 0-1 to 0-1.5 for ease of calculation
340-
let frac = frac * 2.0;
341-
let v = if frac < 1.0 {
342-
frac
343-
} else {
344-
1.0 / (1.0 - (frac - 1.0))
345-
};
346-
scale.mul_f64(v.powi(2))
347-
}
348-
349-
fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 {
350-
// Saturate at 1 hour per job, which is basically infinite
351-
if bytes >= self.bytes_max || jobs >= self.queue_max {
352-
return Duration::from_secs(60 * 60).as_micros() as u64;
353-
}
354-
355-
// These ratios start at 0 (at *_start) and hit 1 when backpressure
356-
// should be infinite.
357-
let jobs_frac = jobs.saturating_sub(self.queue_start) as f64
358-
/ (self.queue_max - self.queue_start) as f64;
359-
let bytes_frac = bytes.saturating_sub(self.bytes_start) as f64
360-
/ (self.bytes_max - self.bytes_start) as f64;
361-
362-
// Delay should be 0 at frac = 0, and infinite at frac = 1
363-
let delay_bytes =
364-
Self::curve(bytes_frac, self.bytes_scale).as_micros() as u64;
365-
let delay_jobs =
366-
Self::curve(jobs_frac, self.queue_scale).as_micros() as u64;
367-
368-
delay_bytes.max(delay_jobs)
369-
}
370-
}
371-
372308
/*
373309
* These methods are how to add or checking for new work on the Guest struct
374310
*/
@@ -429,14 +365,18 @@ impl Guest {
429365
fn default_backpressure_config() -> BackpressureConfig {
430366
BackpressureConfig {
431367
// Byte-based backpressure
432-
bytes_start: 50 * 1024u64.pow(2), // 50 MiB
433-
bytes_max: IO_OUTSTANDING_MAX_BYTES * 2,
434-
bytes_scale: Duration::from_millis(100),
368+
bytes: BackpressureScale {
369+
start: 50 * 1024u64.pow(2), // 50 MiB
370+
max: IO_OUTSTANDING_MAX_BYTES * 2,
371+
scale: Duration::from_millis(100),
372+
},
435373

436374
// Queue-based backpressure
437-
queue_start: 500,
438-
queue_max: IO_OUTSTANDING_MAX_JOBS as u64 * 2,
439-
queue_scale: Duration::from_millis(5),
375+
jobs: BackpressureScale {
376+
start: 500,
377+
max: IO_OUTSTANDING_MAX_JOBS as u64 * 2,
378+
scale: Duration::from_millis(5),
379+
},
440380
}
441381
}
442382

@@ -959,18 +899,13 @@ impl GuestIoHandle {
959899
}
960900

961901
#[cfg(test)]
962-
pub fn disable_queue_backpressure(&mut self) {
963-
self.backpressure_config.queue_scale = Duration::ZERO;
964-
}
965-
966-
#[cfg(test)]
967-
pub fn disable_byte_backpressure(&mut self) {
968-
self.backpressure_config.bytes_scale = Duration::ZERO;
902+
pub fn disable_backpressure(&mut self) {
903+
self.backpressure_config.disable();
969904
}
970905

971906
#[cfg(test)]
972-
pub fn is_queue_backpressure_disabled(&self) -> bool {
973-
self.backpressure_config.queue_scale == Duration::ZERO
907+
pub fn is_backpressure_disabled(&self) -> bool {
908+
self.backpressure_config.jobs.scale == Duration::ZERO
974909
}
975910

976911
/// Set `self.backpressure_us` based on outstanding IO ratio

‎upstairs/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ mod buffer;
5555
pub use buffer::Buffer; // used in BlockIO::Read, so it must be public
5656
pub(crate) use buffer::UninitializedBuffer; // only used in pub(crate) functions
5757

58+
mod backpressure;
59+
pub(crate) use backpressure::{BackpressureConfig, BackpressureScale};
60+
5861
mod mend;
5962
pub use mend::{DownstairsMend, ExtentFix, RegionMetadata};
6063
pub use pseudo_file::CruciblePseudoFile;
@@ -1700,7 +1703,7 @@ pub fn up_main(
17001703
};
17011704

17021705
#[cfg(test)]
1703-
let disable_backpressure = guest.is_queue_backpressure_disabled();
1706+
let disable_backpressure = guest.is_backpressure_disabled();
17041707

17051708
/*
17061709
* Build the Upstairs struct that we use to share data between

0 commit comments

Comments
 (0)