From 71616e1d35cd4a0fac391644735a5d5e3bc5ddba Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Mon, 1 Apr 2024 17:32:24 -0400
Subject: [PATCH 1/8] Remove IO limits and tweak backpressure

---
 Cargo.lock               |   1 +
 Cargo.toml               |   1 +
 crutest/src/main.rs      |   4 +-
 upstairs/Cargo.toml      |   7 +-
 upstairs/src/client.rs   |   4 +
 upstairs/src/guest.rs    | 198 +++++++++++++++++++++++++++++----------
 upstairs/src/lib.rs      |  14 +--
 upstairs/src/upstairs.rs |  21 ++---
 8 files changed, 173 insertions(+), 77 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0bdf1eb93..99b33b13f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -774,6 +774,7 @@ dependencies = [
  "futures",
  "futures-core",
  "http 0.2.12",
+ "humantime",
  "internal-dns",
  "itertools",
  "libc",
diff --git a/Cargo.toml b/Cargo.toml
index e10f1b0bd..621f0e6c4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,6 +53,7 @@ hex = "0.4"
 http = "0.2.12"
 httptest = "0.15.5"
 human_bytes = "0.4.3"
+humantime = "2.1.0"
 hyper = { version = "0.14", features = [ "full" ] }
 hyper-staticfile = "0.9"
 indicatif = { version = "0.17.8", features = ["rayon"] }
diff --git a/crutest/src/main.rs b/crutest/src/main.rs
index eafa51784..b7e731ce6 100644
--- a/crutest/src/main.rs
+++ b/crutest/src/main.rs
@@ -1836,9 +1836,7 @@ async fn generic_workload(
 }
 
 // Make use of dsc to stop and start a downstairs while sending IO.  This
-// should trigger the replay code path.  The IO sent to the downstairs should
-// be below the threshold of gone_too_long() so we don't end up faulting the
-// downstairs and doing a live repair
+// should trigger the replay code path.
 async fn replay_workload(
     guest: &Arc<Guest>,
     wtq: &mut WhenToQuit,
diff --git a/upstairs/Cargo.toml b/upstairs/Cargo.toml
index 77bc8a06c..0f6d15e6b 100644
--- a/upstairs/Cargo.toml
+++ b/upstairs/Cargo.toml
@@ -62,12 +62,13 @@ http = { workspace = true, optional = true }
 
 [dev-dependencies]
 expectorate.workspace = true
-openapiv3.workspace = true
+humantime.workspace = true
 openapi-lint.workspace = true
-tokio-test.workspace = true
+openapiv3.workspace = true
+proptest.workspace = true
 tempfile.workspace = true
 test-strategy.workspace = true
-proptest.workspace = true
+tokio-test.workspace = true
 
 [build-dependencies]
 version_check = "0.9.4"
diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs
index d785333d1..e1f48fc67 100644
--- a/upstairs/src/client.rs
+++ b/upstairs/src/client.rs
@@ -2506,6 +2506,10 @@ impl ClientIoTask {
         // If we're reconnecting, then add a short delay to avoid constantly
         // spinning (e.g. if something is fundamentally wrong with the
         // Downstairs)
+        //
+        // The upstairs can still stop us here, e.g. if we need to transition
+        // from Offline -> Faulted because we hit a job limit, that bounces the
+        // IO task (whether it *should* is debatable).
         if self.delay {
             tokio::select! {
                 s = &mut self.stop => {
diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs
index fba35b480..22dc1ff29 100644
--- a/upstairs/src/guest.rs
+++ b/upstairs/src/guest.rs
@@ -11,7 +11,7 @@ use std::{
 
 use crate::{
     BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, JobId, ReplaceResult,
-    UpstairsAction,
+    UpstairsAction, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS,
 };
 use crucible_common::{build_logger, crucible_bail, Block, CrucibleError};
 use crucible_protocol::{ReadResponse, SnapshotDetails};
@@ -317,15 +317,49 @@ pub struct Guest {
 /// the two delays.
 #[derive(Copy, Clone, Debug)]
 struct BackpressureConfig {
-    /// When should backpressure start?
-    bytes_start: f64,
-    /// Maximum bytes-based backpressure
-    bytes_max_delay: Duration,
-
-    /// When should queue-based backpressure start?
-    queue_start: f64,
-    /// Maximum queue-based delay
-    queue_max_delay: Duration,
+    /// When should backpressure start, in units of bytes
+    bytes_start: u64,
+    /// Maximum number of bytes (i.e. backpressure goes to infinity)
+    bytes_max: u64,
+    /// Scale of bytes-based backpressure
+    bytes_scale: Duration,
+
+    /// When should backpressure start, in units of jobs
+    queue_start: u64,
+    /// Maximum number of jobs (i.e. backpressure goes to infinity)
+    queue_max: u64,
+    /// Scale of queue-based delay
+    queue_scale: Duration,
+}
+
+impl BackpressureConfig {
+    fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 {
+        // Saturate at 1 hour per job, which is basically infinite
+        if bytes >= self.bytes_max || jobs >= self.queue_max {
+            return Duration::from_secs(60 * 60).as_micros() as u64;
+        }
+
+        // These ratios start at 0 (at *_start) and hit 1 when backpressure
+        // should be infinite.
+        let jobs_frac = jobs.saturating_sub(self.queue_start) as f64
+            / (self.queue_max - self.queue_start) as f64;
+        let bytes_frac = bytes.saturating_sub(self.bytes_start) as f64
+            / (self.bytes_max - self.bytes_start) as f64;
+
+        // Delay should be 0 at frac = 0, and infinite at frac = 1
+        let delay_bytes = self
+            .bytes_scale
+            .mul_f64(1.0 / (1.0 - bytes_frac).powi(2) - 1.0)
+            .as_micros() as u64;
+
+        // Compute an alternate delay based on queue length
+        let delay_jobs = self
+            .queue_scale
+            .mul_f64(1.0 / (1.0 - jobs_frac).powi(2) - 1.0)
+            .as_micros() as u64;
+
+        delay_bytes.max(delay_jobs)
+    }
 }
 
 /*
@@ -370,15 +404,7 @@ impl Guest {
             iop_tokens: 0,
             bw_tokens: 0,
             backpressure_us: backpressure_us.clone(),
-            backpressure_config: BackpressureConfig {
-                // Byte-based backpressure
-                bytes_start: 0.05,
-                bytes_max_delay: Duration::from_millis(100),
-
-                // Queue-based backpressure
-                queue_start: 0.05,
-                queue_max_delay: Duration::from_millis(5),
-            },
+            backpressure_config: Self::default_backpressure_config(),
             log: log.clone(),
         };
         let guest = Guest {
@@ -393,6 +419,20 @@ impl Guest {
         (guest, io)
     }
 
+    fn default_backpressure_config() -> BackpressureConfig {
+        BackpressureConfig {
+            // Byte-based backpressure
+            bytes_start: 50 * 1024u64.pow(2), // 50 MiB
+            bytes_max: IO_OUTSTANDING_MAX_BYTES * 2,
+            bytes_scale: Duration::from_millis(25),
+
+            // Queue-based backpressure
+            queue_start: 500,
+            queue_max: IO_OUTSTANDING_MAX_JOBS as u64 * 2,
+            queue_scale: Duration::from_millis(5),
+        }
+    }
+
     /*
      * This is used to submit a new BlockOp IO request to Crucible.
      *
@@ -913,50 +953,23 @@ impl GuestIoHandle {
 
     #[cfg(test)]
     pub fn disable_queue_backpressure(&mut self) {
-        self.backpressure_config.queue_max_delay = Duration::ZERO;
+        self.backpressure_config.queue_scale = Duration::ZERO;
     }
 
     #[cfg(test)]
     pub fn disable_byte_backpressure(&mut self) {
-        self.backpressure_config.bytes_max_delay = Duration::ZERO;
+        self.backpressure_config.bytes_scale = Duration::ZERO;
     }
 
     #[cfg(test)]
     pub fn is_queue_backpressure_disabled(&self) -> bool {
-        self.backpressure_config.queue_max_delay == Duration::ZERO
+        self.backpressure_config.queue_scale == Duration::ZERO
     }
 
     /// Set `self.backpressure_us` based on outstanding IO ratio
     pub fn set_backpressure(&self, bytes: u64, jobs: u64) {
-        let jobs_frac = jobs as f64 / crate::IO_OUTSTANDING_MAX_JOBS as f64;
-        let bytes_frac = bytes as f64 / crate::IO_OUTSTANDING_MAX_BYTES as f64;
-
-        // Check to see if the number of outstanding write bytes (between
-        // the upstairs and downstairs) is particularly high.  If so,
-        // apply some backpressure by delaying host operations, with a
-        // quadratically-increasing delay.
-        let delay_bytes = self
-            .backpressure_config
-            .bytes_max_delay
-            .mul_f64(
-                ((bytes_frac - self.backpressure_config.bytes_start).max(0.0)
-                    / (1.0 - self.backpressure_config.bytes_start))
-                    .powf(2.0),
-            )
-            .as_micros() as u64;
-
-        // Compute an alternate delay based on queue length
-        let delay_jobs = self
-            .backpressure_config
-            .queue_max_delay
-            .mul_f64(
-                ((jobs_frac - self.backpressure_config.queue_start).max(0.0)
-                    / (1.0 - self.backpressure_config.queue_start))
-                    .powf(2.0),
-            )
-            .as_micros() as u64;
-        self.backpressure_us
-            .store(delay_jobs.max(delay_bytes), Ordering::SeqCst);
+        let bp_usec = self.backpressure_config.get_backpressure_us(bytes, jobs);
+        self.backpressure_us.store(bp_usec, Ordering::SeqCst);
     }
 
     pub fn set_iop_limit(&mut self, bytes_per_iop: usize, limit: usize) {
@@ -1459,4 +1472,87 @@ mod test {
 
         Ok(())
     }
+
+    /// Confirm that the offline timeout is reasonable
+    #[test]
+    fn check_offline_timeout() {
+        for job_size in
+            [512, 4 * 1024, 16 * 1024, 64 * 1024, 256 * 1024, 1024 * 1024]
+        {
+            let mut bytes_in_flight = 0;
+            let mut jobs_in_flight = 0;
+            let mut time_usec: u64 = 0;
+            let cfg = Guest::default_backpressure_config();
+
+            let (t, desc) = loop {
+                let bp_usec =
+                    cfg.get_backpressure_us(bytes_in_flight, jobs_in_flight);
+                time_usec = time_usec.saturating_add(bp_usec);
+
+                if bytes_in_flight >= IO_OUTSTANDING_MAX_BYTES {
+                    break (time_usec, "bytes");
+                }
+
+                if jobs_in_flight >= IO_OUTSTANDING_MAX_JOBS as u64 {
+                    break (time_usec, "jobs");
+                }
+
+                bytes_in_flight += job_size;
+                jobs_in_flight += 1;
+            };
+
+            let timeout = Duration::from_micros(t);
+            assert!(
+                timeout > Duration::from_secs(1),
+                "offline -> faulted transition happens too quickly \
+                 with job size {job_size};  expected > 1 sec, got {}",
+                humantime::format_duration(timeout)
+            );
+            assert!(
+                timeout < Duration::from_secs(120),
+                "offline -> faulted transition happens too slowly \
+                 with job size {job_size};  expected < 2 mins, got {}",
+                humantime::format_duration(timeout)
+            );
+
+            println!(
+                "job size {job_size:>8}:\n    Timeout in {} ({desc})\n",
+                humantime::format_duration(timeout)
+            );
+        }
+    }
+
+    #[test]
+    fn check_max_backpressure() {
+        let cfg = Guest::default_backpressure_config();
+        let t = cfg.get_backpressure_us(
+            IO_OUTSTANDING_MAX_BYTES * 2 - 1024u64.pow(2),
+            0,
+        );
+        let timeout = Duration::from_micros(t);
+        println!(
+            "max byte-based delay: {}",
+            humantime::format_duration(timeout)
+        );
+        assert!(
+            timeout > Duration::from_secs(60 * 60),
+            "max byte-based backpressure delay is too low;
+            expected > 1 hr, got {}",
+            humantime::format_duration(timeout)
+        );
+
+        let t =
+            cfg.get_backpressure_us(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 - 1);
+        let timeout = Duration::from_micros(t);
+        println!(
+            "max job-based delay: {}",
+            humantime::format_duration(timeout)
+        );
+        assert!(
+            timeout > Duration::from_secs(60 * 60),
+            "max job-based backpressure delay is too low;
+            expected > 1 hr, got {}",
+            humantime::format_duration(timeout)
+        );
+    }
 }
diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs
index f27b07d27..ce350a1a3 100644
--- a/upstairs/src/lib.rs
+++ b/upstairs/src/lib.rs
@@ -83,17 +83,17 @@ mod downstairs;
 mod upstairs;
 use upstairs::{UpCounters, UpstairsAction};
 
-/// Max number of write bytes between the upstairs and the downstairs
+/// Max number of write bytes between the upstairs and an offline downstairs
 ///
-/// If we exceed this value, the upstairs will give up and mark that downstairs
-/// as faulted.
+/// If we exceed this value, the upstairs will give
+/// up and mark the offline downstairs as faulted.
 const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB
 
-/// Max number of outstanding IOs between the upstairs and a downstairs
+/// Max number of outstanding IOs between the upstairs and an offline downstairs
 ///
-/// If we exceed this value, the upstairs will give up and mark that downstairs
-/// as faulted.
-pub const IO_OUTSTANDING_MAX_JOBS: usize = 57000;
+/// If we exceed this value, the upstairs will give up and mark that offline
+/// downstairs as faulted.
+pub const IO_OUTSTANDING_MAX_JOBS: usize = 10000;
 
 /// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual
 /// disk): there is no contract about what order operations that are submitted
diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs
index 2307458e8..90caf2374 100644
--- a/upstairs/src/upstairs.rs
+++ b/upstairs/src/upstairs.rs
@@ -595,8 +595,8 @@ impl Upstairs {
             }
         }
 
-        // Check whether we need to mark a Downstairs as faulted because too
-        // many jobs have piled up.
+        // Check whether we need to mark an offline Downstairs as faulted
+        // because too many jobs have piled up.
         self.gone_too_long();
 
         // Check to see whether live-repair can continue
@@ -697,8 +697,10 @@ impl Upstairs {
 
     /// Check outstanding IOops for each downstairs.
     ///
-    /// If the number is too high, then mark that downstairs as failed, scrub
-    /// any outstanding jobs, and restart the client IO task.
+    /// We never kick out a Downstairs that is replying to us, but will
+    /// eventually transition a Downstairs from Offline to Faulted (which then
+    /// leads to scrubbing any outstanding jobs, and restarting the client IO
+    /// task).
     fn gone_too_long(&mut self) {
         // If we are not active, then just exit.
         if !matches!(self.state, UpstairsState::Active) {
@@ -706,15 +708,8 @@ impl Upstairs {
         }
 
         for cid in ClientId::iter() {
-            // Only downstairs in these states are checked.
-            match self.downstairs.clients[cid].state() {
-                DsState::Active
-                | DsState::LiveRepair
-                | DsState::Offline
-                | DsState::Replay => {
-                    self.downstairs.check_gone_too_long(cid, &self.state);
-                }
-                _ => {}
+            if self.downstairs.clients[cid].state() == DsState::Offline {
+                self.downstairs.check_gone_too_long(cid, &self.state);
             }
         }
     }

From ad6bc9c13770b3bbe35040dbdb8ccd05cfd037db Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Fri, 12 Apr 2024 11:24:24 -0400
Subject: [PATCH 2/8] Working on unit tests

---
 upstairs/src/dummy_downstairs_tests.rs | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs
index 1ba37f069..f02318c55 100644
--- a/upstairs/src/dummy_downstairs_tests.rs
+++ b/upstairs/src/dummy_downstairs_tests.rs
@@ -499,8 +499,9 @@ impl TestHarness {
             read_only,
             reply_to_ping,
 
-            extent_count: 10,
-            extent_size: Block::new_512(10),
+            // Slightly over 1 MiB, so we can do max-size writes
+            extent_count: 25,
+            extent_size: Block::new_512(100),
 
             gen_numbers: vec![0u64; 10],
             flush_numbers: vec![0u64; 10],
@@ -1448,7 +1449,7 @@ async fn test_byte_fault_condition() {
 
     // Send enough bytes to hit the IO_OUTSTANDING_MAX_BYTES condition on
     // downstairs 1, which should mark it as faulted and kick it out.
-    const WRITE_SIZE: usize = 50 * 1024; // 50 KiB
+    const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB
     let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB
     let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10;
     assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS);
@@ -1516,7 +1517,7 @@ async fn test_byte_fault_condition_offline() {
     // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
     // condition on downstairs 1, which should mark it as faulted and kick it
     // out.
-    const WRITE_SIZE: usize = 50 * 1024; // 50 KiB
+    const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB
     let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB
     let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10;
     assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS);

From f5de239ec9450e87fb1fe24336ab7d237b4b9ee8 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Fri, 12 Apr 2024 11:39:38 -0400
Subject: [PATCH 3/8] More unit test work

---
 upstairs/src/dummy_downstairs_tests.rs | 168 ++++++++++++++-----------
 1 file changed, 97 insertions(+), 71 deletions(-)

diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs
index f02318c55..d05c52b7d 100644
--- a/upstairs/src/dummy_downstairs_tests.rs
+++ b/upstairs/src/dummy_downstairs_tests.rs
@@ -923,10 +923,11 @@ async fn run_live_repair(mut harness: TestHarness) {
             } else {
                 // All IO above this is skipped for the downstairs under
                 // repair.
-                assert!(matches!(
-                    harness.ds1().try_recv(),
-                    Err(TryRecvError::Empty)
-                ));
+                let r = harness.ds1().try_recv();
+                assert!(
+                    matches!(r, Err(TryRecvError::Empty)),
+                    "unexpected response {r:?}"
+                );
             }
 
             let m2 = harness.ds2.recv().await.unwrap();
@@ -1031,10 +1032,11 @@ async fn run_live_repair(mut harness: TestHarness) {
             } else {
                 // All IO above this is skipped for the downstairs under
                 // repair.
-                assert!(matches!(
-                    harness.ds1().try_recv(),
-                    Err(TryRecvError::Empty)
-                ));
+                let r = harness.ds1().try_recv();
+                assert!(
+                    matches!(r, Err(TryRecvError::Empty)),
+                    "unexpected IO: {r:?}"
+                );
             }
 
             let m2 = harness.ds2.recv().await.unwrap();
@@ -1445,16 +1447,34 @@ async fn run_live_repair(mut harness: TestHarness) {
 /// Test that we will mark a Downstairs as failed if we hit the byte limit
 #[tokio::test]
 async fn test_byte_fault_condition() {
-    let mut harness = TestHarness::new().await;
+    let mut harness = TestHarness::new_no_ping().await;
+
+    // Send enough bytes such that when we hit the client timeout, we are above
+    // our bytes-in-flight limit (so the downstairs gets marked as faulted
+    // instead of offline).
+    const MARGIN_SECS: f32 = 4.0;
+    const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS;
+    let start_time = tokio::time::Instant::now();
 
-    // Send enough bytes to hit the IO_OUTSTANDING_MAX_BYTES condition on
-    // downstairs 1, which should mark it as faulted and kick it out.
+    // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
+    // condition on downstairs 1, which should mark it as faulted and kick it
+    // out.
     const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB
     let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB
     let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10;
     assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS);
 
+    // First, we'll send jobs until the timeout
     for i in 0..num_jobs {
+        // Delay so that we hit SEND_JOBS_TIME at the end of this loop
+        tokio::time::sleep_until(
+            start_time
+                + Duration::from_secs_f32(
+                    SEND_JOBS_TIME * i as f32 / (num_jobs / 2) as f32,
+                ),
+        )
+        .await;
+
         // We must `spawn` here because `write` will wait for the response
         // to come back before returning
         let write_buf = write_buf.clone();
@@ -1462,47 +1482,42 @@ async fn test_byte_fault_condition() {
             guest.write(Block::new_512(0), write_buf).await.unwrap();
         });
 
+        // Before we're kicked out, assert we're seeing the read requests
+        assert!(matches!(
+            harness.ds1().recv().await.unwrap(),
+            Message::Write { .. },
+        ));
         harness.ds2.ack_write().await;
         harness.ds3.ack_write().await;
 
-        // With 2x responses, we can now await the write job (which ensures that
+        // With 2x responses, we can now await the read job (which ensures that
         // the Upstairs has finished updating its state).
         h.await.unwrap();
 
         let ds = harness.guest.downstairs_state().await.unwrap();
-        if (i + 1) * WRITE_SIZE < IO_OUTSTANDING_MAX_BYTES as usize {
-            // Before we're kicked out, assert we're seeing the read
-            // requests
-            assert!(matches!(
-                harness.ds1().recv().await.unwrap(),
-                Message::Write { .. },
-            ));
-            assert_eq!(ds[ClientId::new(0)], DsState::Active);
-            assert_eq!(ds[ClientId::new(1)], DsState::Active);
-            assert_eq!(ds[ClientId::new(2)], DsState::Active);
-        } else {
-            // After ds1 is kicked out, we shouldn't see any more messages
-            match harness.ds1().try_recv() {
-                Err(TryRecvError::Empty) => {}
-                Err(TryRecvError::Disconnected) => {}
-                x => {
-                    panic!("Read {i} should return EMPTY, but we got:{x:?}");
-                }
-            }
-            assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
-            assert_eq!(ds[ClientId::new(1)], DsState::Active);
-            assert_eq!(ds[ClientId::new(2)], DsState::Active);
-        }
+        assert_eq!(ds[ClientId::new(0)], DsState::Active);
+        assert_eq!(ds[ClientId::new(1)], DsState::Active);
+        assert_eq!(ds[ClientId::new(2)], DsState::Active);
     }
 
-    // Confirm that live repair still works
-    run_live_repair(harness).await
+    // Sleep until we're confident that the Downstairs is kicked out
+    info!(harness.log, "waiting for Upstairs to kick out DS1");
+    tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await;
+
+    // Check to make sure that happened
+    let ds = harness.guest.downstairs_state().await.unwrap();
+    assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
+    assert_eq!(ds[ClientId::new(1)], DsState::Active);
+    assert_eq!(ds[ClientId::new(2)], DsState::Active);
+
+    // Confirm that the system comes up after live-repair
+    run_live_repair(harness).await;
 }
 
 /// Test that we will transition a downstairs from offline -> faulted if we hit
 /// the byte limit after it's already offline
 #[tokio::test]
-async fn test_byte_fault_condition_offline() {
+async fn test_bbyte_fault_condition_offline() {
     let mut harness = TestHarness::new_no_ping().await;
 
     // Two different transitions occur during this test:
@@ -1610,15 +1625,26 @@ async fn test_byte_fault_condition_offline() {
 /// Test that we will mark a Downstairs as failed if we hit the job limit
 #[tokio::test]
 async fn test_job_fault_condition() {
-    let mut harness = TestHarness::new().await;
+    let mut harness = TestHarness::new_no_ping().await;
 
-    // Send 200 more than IO_OUTSTANDING_MAX_JOBS jobs, sending read
-    // responses from two of the three downstairs. After we have sent
-    // IO_OUTSTANDING_MAX_JOBS jobs, the Upstairs will set ds1 to faulted,
-    // and send it no more work.
-    const NUM_JOBS: usize = IO_OUTSTANDING_MAX_JOBS + 200;
+    // We're going to queue up > IO_OUTSTANDING_MAX_JOBS in less than our
+    // timeout time, so that when timeout hits, the downstairs will become
+    // Faulted instead of Offline.
+    const MARGIN_SECS: f32 = 4.0;
+    const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS;
+    let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200;
+    let start_time = tokio::time::Instant::now();
+
+    for i in 0..num_jobs {
+        // Delay so that we hit SEND_JOBS_TIME at the end of this loop
+        tokio::time::sleep_until(
+            start_time
+                + Duration::from_secs_f32(
+                    SEND_JOBS_TIME * i as f32 / num_jobs as f32,
+                ),
+        )
+        .await;
 
-    for i in 0..NUM_JOBS {
         // We must `spawn` here because `write` will wait for the response to
         // come back before returning
         let h = harness.spawn(|guest| async move {
@@ -1626,6 +1652,12 @@ async fn test_job_fault_condition() {
             guest.read(Block::new_512(0), &mut buffer).await.unwrap();
         });
 
+        // DS1 should be receiving messages
+        assert!(matches!(
+            harness.ds1().recv().await.unwrap(),
+            Message::ReadRequest { .. },
+        ));
+
         // Respond with read responses for downstairs 2 and 3
         harness.ds2.ack_read().await;
         harness.ds3.ack_read().await;
@@ -1635,39 +1667,33 @@ async fn test_job_fault_condition() {
         h.await.unwrap();
 
         let ds = harness.guest.downstairs_state().await.unwrap();
-        if i < IO_OUTSTANDING_MAX_JOBS {
-            // Before we're kicked out, assert we're seeing the read
-            // requests
-            assert!(matches!(
-                harness.ds1().recv().await.unwrap(),
-                Message::ReadRequest { .. },
-            ));
-            assert_eq!(ds[ClientId::new(0)], DsState::Active);
-            assert_eq!(ds[ClientId::new(1)], DsState::Active);
-            assert_eq!(ds[ClientId::new(2)], DsState::Active);
-        } else {
-            // After ds1 is kicked out, we shouldn't see any more messages
-            match harness.ds1().try_recv() {
-                Err(TryRecvError::Empty) => {}
-                Err(TryRecvError::Disconnected) => {}
-                x => {
-                    panic!("Read {i} should return EMPTY, but we got:{x:?}");
-                }
-            }
-            assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
-            assert_eq!(ds[ClientId::new(1)], DsState::Active);
-            assert_eq!(ds[ClientId::new(2)], DsState::Active);
-        }
+        assert_eq!(ds[ClientId::new(0)], DsState::Active);
+        assert_eq!(ds[ClientId::new(1)], DsState::Active);
+        assert_eq!(ds[ClientId::new(2)], DsState::Active);
     }
 
-    // Confirm that live repair still works
-    run_live_repair(harness).await
+    // Sleep until we're confident that the Downstairs is kicked out
+    //
+    // Because it has so many pending jobs, it will become Faulted instead of
+    // Offline (or rather, will transition Active -> Offline -> Faulted
+    // immediately).
+    info!(harness.log, "waiting for Upstairs to kick out DS1");
+    tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await;
+
+    // Check to make sure that happened
+    let ds = harness.guest.downstairs_state().await.unwrap();
+    assert_eq!(ds[ClientId::new(0)], DsState::Faulted);
+    assert_eq!(ds[ClientId::new(1)], DsState::Active);
+    assert_eq!(ds[ClientId::new(2)], DsState::Active);
+
+    // Confirm that the system comes up after live-repair
+    run_live_repair(harness).await;
 }
 
 /// Test that we will transition a downstairs from offline -> faulted if we hit
 /// the job limit after it's already offline
 #[tokio::test]
-async fn test_job_fault_condition_offline() {
+async fn test_jjob_fault_condition_offline() {
     let mut harness = TestHarness::new_no_ping().await;
 
     // Two different transitions occur during this test:

From 476a8c84acdf0fe409c19f0e206f67e6b84eac95 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Fri, 12 Apr 2024 14:15:08 -0400
Subject: [PATCH 4/8] Unit tests are passing?

---
 upstairs/src/dummy_downstairs_tests.rs | 138 +++++++++++++------------
 1 file changed, 71 insertions(+), 67 deletions(-)

diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs
index d05c52b7d..db65e26b7 100644
--- a/upstairs/src/dummy_downstairs_tests.rs
+++ b/upstairs/src/dummy_downstairs_tests.rs
@@ -475,44 +475,38 @@ pub struct TestHarness {
 
 impl TestHarness {
     pub async fn new() -> TestHarness {
-        Self::new_(false, true).await
+        Self::new_(false).await
     }
 
     pub async fn new_ro() -> TestHarness {
-        Self::new_(true, true).await
-    }
-
-    /// Build a new TestHarness where DS1 doesn't reply to pings
-    pub async fn new_no_ping() -> TestHarness {
-        Self::new_(false, false).await
+        Self::new_(true).await
     }
 
     pub fn ds1(&mut self) -> &mut DownstairsHandle {
         self.ds1.as_mut().unwrap()
     }
 
-    fn default_config(
-        read_only: bool,
-        reply_to_ping: bool,
-    ) -> DownstairsConfig {
+    fn default_config(read_only: bool) -> DownstairsConfig {
         DownstairsConfig {
             read_only,
-            reply_to_ping,
+            reply_to_ping: true,
 
-            // Slightly over 1 MiB, so we can do max-size writes
+            // Extent count is picked so that we can hit
+            // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS,
+            // i.e. letting us test both byte and job fault conditions.
             extent_count: 25,
-            extent_size: Block::new_512(100),
+            extent_size: Block::new_512(10),
 
-            gen_numbers: vec![0u64; 10],
-            flush_numbers: vec![0u64; 10],
-            dirty_bits: vec![false; 10],
+            gen_numbers: vec![0u64; 25],
+            flush_numbers: vec![0u64; 25],
+            dirty_bits: vec![false; 25],
         }
     }
 
-    async fn new_(read_only: bool, reply_to_ping: bool) -> TestHarness {
+    async fn new_(read_only: bool) -> TestHarness {
         let log = csl();
 
-        let cfg = Self::default_config(read_only, reply_to_ping);
+        let cfg = Self::default_config(read_only);
 
         let ds1 = cfg.clone().start(log.new(o!("downstairs" => 1))).await;
         let ds2 = cfg.clone().start(log.new(o!("downstairs" => 2))).await;
@@ -813,7 +807,7 @@ async fn run_live_repair(mut harness: TestHarness) {
     let mut ds2_buffered_messages = vec![];
     let mut ds3_buffered_messages = vec![];
 
-    for eid in 0..10 {
+    for eid in 0..25 {
         // The Upstairs first sends the close and reopen jobs
         for _ in 0..2 {
             ds1_buffered_messages.push(harness.ds1().recv().await.unwrap());
@@ -871,7 +865,7 @@ async fn run_live_repair(mut harness: TestHarness) {
 
         let mut responses = vec![Vec::new(); 3];
 
-        for io_eid in 0usize..10 {
+        for io_eid in 0usize..25 {
             let mut dep_job_id = [reopen_job_id; 3];
             // read
             harness.spawn(move |guest| async move {
@@ -1425,11 +1419,11 @@ async fn run_live_repair(mut harness: TestHarness) {
     // Expect the live repair to send a final flush
     {
         let flush_number = harness.ds1().ack_flush().await;
-        assert_eq!(flush_number, 12);
+        assert_eq!(flush_number, 27);
         let flush_number = harness.ds2.ack_flush().await;
-        assert_eq!(flush_number, 12);
+        assert_eq!(flush_number, 27);
         let flush_number = harness.ds3.ack_flush().await;
-        assert_eq!(flush_number, 12);
+        assert_eq!(flush_number, 27);
     }
 
     // Try another read
@@ -1447,34 +1441,24 @@ async fn run_live_repair(mut harness: TestHarness) {
 /// Test that we will mark a Downstairs as failed if we hit the byte limit
 #[tokio::test]
 async fn test_byte_fault_condition() {
-    let mut harness = TestHarness::new_no_ping().await;
-
     // Send enough bytes such that when we hit the client timeout, we are above
     // our bytes-in-flight limit (so the downstairs gets marked as faulted
     // instead of offline).
-    const MARGIN_SECS: f32 = 4.0;
-    const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS;
-    let start_time = tokio::time::Instant::now();
+    //
+    // Notice that we keep DS1 replying to pings through this process, so it
+    // doesn't get set to offline early.
+    let mut harness = TestHarness::new().await;
 
     // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
     // condition on downstairs 1, which should mark it as faulted and kick it
     // out.
-    const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB
+    const WRITE_SIZE: usize = 105 * 1024; // 105 KiB
     let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB
     let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10;
     assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS);
 
     // First, we'll send jobs until the timeout
-    for i in 0..num_jobs {
-        // Delay so that we hit SEND_JOBS_TIME at the end of this loop
-        tokio::time::sleep_until(
-            start_time
-                + Duration::from_secs_f32(
-                    SEND_JOBS_TIME * i as f32 / (num_jobs / 2) as f32,
-                ),
-        )
-        .await;
-
+    for _ in 0..num_jobs {
         // We must `spawn` here because `write` will wait for the response
         // to come back before returning
         let write_buf = write_buf.clone();
@@ -1490,7 +1474,7 @@ async fn test_byte_fault_condition() {
         harness.ds2.ack_write().await;
         harness.ds3.ack_write().await;
 
-        // With 2x responses, we can now await the read job (which ensures that
+        // With 2x responses, we can now await the write job (which ensures that
         // the Upstairs has finished updating its state).
         h.await.unwrap();
 
@@ -1501,8 +1485,23 @@ async fn test_byte_fault_condition() {
     }
 
     // Sleep until we're confident that the Downstairs is kicked out
-    info!(harness.log, "waiting for Upstairs to kick out DS1");
-    tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await;
+    let sleep_time = CLIENT_TIMEOUT_SECS + 5.0;
+    info!(
+        harness.log,
+        "waiting {sleep_time} secs for Upstairs to kick out DS1"
+    );
+    tokio::select! {
+        _ = tokio::time::sleep(Duration::from_secs_f32(sleep_time)) => {
+            // we're done!
+        }
+        // we don't listen to ds1 here, so we won't acknowledge any pings!
+        v = harness.ds2.recv() => {
+            panic!("received unexpected message on ds2: {v:?}")
+        }
+        v = harness.ds3.recv() => {
+            panic!("received unexpected message on ds3: {v:?}")
+        }
+    }
 
     // Check to make sure that happened
     let ds = harness.guest.downstairs_state().await.unwrap();
@@ -1517,8 +1516,9 @@ async fn test_byte_fault_condition() {
 /// Test that we will transition a downstairs from offline -> faulted if we hit
 /// the byte limit after it's already offline
 #[tokio::test]
-async fn test_bbyte_fault_condition_offline() {
-    let mut harness = TestHarness::new_no_ping().await;
+async fn test_byte_fault_condition_offline() {
+    let mut harness = TestHarness::new().await;
+    harness.ds1().cfg.reply_to_ping = false;
 
     // Two different transitions occur during this test:
     // - We're not replying to pings, so DS1 will eventually transition from
@@ -1532,7 +1532,7 @@ async fn test_bbyte_fault_condition_offline() {
     // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
     // condition on downstairs 1, which should mark it as faulted and kick it
     // out.
-    const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB
+    const WRITE_SIZE: usize = 105 * 1024; // 105 KiB
     let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB
     let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10;
     assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS);
@@ -1625,26 +1625,13 @@ async fn test_bbyte_fault_condition_offline() {
 /// Test that we will mark a Downstairs as failed if we hit the job limit
 #[tokio::test]
 async fn test_job_fault_condition() {
-    let mut harness = TestHarness::new_no_ping().await;
+    let mut harness = TestHarness::new().await;
 
-    // We're going to queue up > IO_OUTSTANDING_MAX_JOBS in less than our
-    // timeout time, so that when timeout hits, the downstairs will become
-    // Faulted instead of Offline.
-    const MARGIN_SECS: f32 = 4.0;
-    const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS;
+    // We're going to queue up > IO_OUTSTANDING_MAX_JOBS, then wait for a
+    // timeout, so that when timeout hits, the downstairs will become Faulted
+    // instead of Offline.
     let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200;
-    let start_time = tokio::time::Instant::now();
-
-    for i in 0..num_jobs {
-        // Delay so that we hit SEND_JOBS_TIME at the end of this loop
-        tokio::time::sleep_until(
-            start_time
-                + Duration::from_secs_f32(
-                    SEND_JOBS_TIME * i as f32 / num_jobs as f32,
-                ),
-        )
-        .await;
-
+    for _ in 0..num_jobs {
         // We must `spawn` here because `write` will wait for the response to
         // come back before returning
         let h = harness.spawn(|guest| async move {
@@ -1677,8 +1664,24 @@ async fn test_job_fault_condition() {
     // Because it has so many pending jobs, it will become Faulted instead of
     // Offline (or rather, will transition Active -> Offline -> Faulted
     // immediately).
+    let sleep_time = CLIENT_TIMEOUT_SECS + 5.0;
     info!(harness.log, "waiting for Upstairs to kick out DS1");
-    tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await;
+    info!(
+        harness.log,
+        "waiting {sleep_time} secs for Upstairs to kick out DS1"
+    );
+    tokio::select! {
+        _ = tokio::time::sleep(Duration::from_secs_f32(sleep_time)) => {
+            // we're done!
+        }
+        // we don't listen to ds1 here, so we won't acknowledge any pings!
+        v = harness.ds2.recv() => {
+            panic!("received unexpected message on ds2: {v:?}")
+        }
+        v = harness.ds3.recv() => {
+            panic!("received unexpected message on ds3: {v:?}")
+        }
+    }
 
     // Check to make sure that happened
     let ds = harness.guest.downstairs_state().await.unwrap();
@@ -1693,8 +1696,9 @@ async fn test_job_fault_condition() {
 /// Test that we will transition a downstairs from offline -> faulted if we hit
 /// the job limit after it's already offline
 #[tokio::test]
-async fn test_jjob_fault_condition_offline() {
-    let mut harness = TestHarness::new_no_ping().await;
+async fn test_job_fault_condition_offline() {
+    let mut harness = TestHarness::new().await;
+    harness.ds1().cfg.reply_to_ping = false;
 
     // Two different transitions occur during this test:
     // - We're not replying to pings, so DS1 will eventually transition from

From dd8722c600537927f23aae3686c4cafc353d9b59 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Mon, 15 Apr 2024 14:38:12 -0400
Subject: [PATCH 5/8] Move Imok reply to main IO loop in Downstairs

---
 downstairs/src/lib.rs | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs
index 296c3740c..fb3c77366 100644
--- a/downstairs/src/lib.rs
+++ b/downstairs/src/lib.rs
@@ -1308,12 +1308,7 @@ where
                         return Ok(());
                     }
                     Some(Ok(msg)) => {
-                        if matches!(msg, Message::Ruok) {
-                            // Respond instantly to pings, don't wait.
-                            if let Err(e) = resp_channel_tx.send(Message::Imok) {
-                                bail!("Failed sending Imok: {}", e);
-                            }
-                        } else if let Err(e) = message_channel_tx.send(msg) {
+                        if let Err(e) = message_channel_tx.send(msg) {
                             bail!("Failed sending message to proc_frame: {}", e);
                         }
                     }
@@ -2660,6 +2655,10 @@ impl Downstairs {
                 resp_tx.send(msg)?;
                 None
             }
+            Message::Ruok => {
+                resp_tx.send(Message::Imok)?;
+                None
+            }
             x => bail!("unexpected frame {:?}", x),
         };
         Ok(r)

From bf72b38fbd7baf41a3f42bde1763015a947afa23 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Wed, 17 Apr 2024 10:30:06 -0400
Subject: [PATCH 6/8] Update diagrams

---
 downstairs/src/lib.rs | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs
index fb3c77366..91f1397e6 100644
--- a/downstairs/src/lib.rs
+++ b/downstairs/src/lib.rs
@@ -1140,13 +1140,13 @@ where
      *        │                          │
      *        │         ┌────────────────┴────────────────────┐
      *        │         │         framed_write_task           │
-     *        │         └─▲─────▲──────────────────▲──────────┘
-     *        │           │     │                  │
-     *        │       ping│     │invalid           │
-     *        │  ┌────────┘     │frame             │responses
-     *        │  │              │errors            │
-     *        │  │              │                  │
-     *   ┌────▼──┴─┐ message   ┌┴──────┐  job     ┌┴────────┐
+     *        │         └───────▲──────────────────▲──────────┘
+     *        │                 │                  │
+     *        │                 │invalid frame     │
+     *        │                 │errors, pings     │responses
+     *        │                 │                  │
+     *        │                 │                  │
+     *   ┌────▼────┐ message   ┌┴──────┐  job     ┌┴────────┐
      *   │resp_loop├──────────►│pf_task├─────────►│ dw_task │
      *   └─────────┘ channel   └──┬────┘ channel  └▲────────┘
      *                            │                │

From 9cf000dcc383287778b3144fc6d0154915c3a5d9 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Wed, 17 Apr 2024 14:23:11 -0400
Subject: [PATCH 7/8] Pull extent_count into a constant

---
 upstairs/src/dummy_downstairs_tests.rs | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs
index db65e26b7..c7a6c5a72 100644
--- a/upstairs/src/dummy_downstairs_tests.rs
+++ b/upstairs/src/dummy_downstairs_tests.rs
@@ -473,6 +473,9 @@ pub struct TestHarness {
     guest: Arc<Guest>,
 }
 
+/// Number of extents in `TestHarness::default_config`
+const DEFAULT_EXTENT_COUNT: usize = 25;
+
 impl TestHarness {
     pub async fn new() -> TestHarness {
         Self::new_(false).await
@@ -494,12 +497,12 @@ impl TestHarness {
             // Extent count is picked so that we can hit
             // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS,
             // i.e. letting us test both byte and job fault conditions.
-            extent_count: 25,
+            extent_count: DEFAULT_EXTENT_COUNT as u32,
             extent_size: Block::new_512(10),
 
-            gen_numbers: vec![0u64; 25],
-            flush_numbers: vec![0u64; 25],
-            dirty_bits: vec![false; 25],
+            gen_numbers: vec![0u64; DEFAULT_EXTENT_COUNT],
+            flush_numbers: vec![0u64; DEFAULT_EXTENT_COUNT],
+            dirty_bits: vec![false; DEFAULT_EXTENT_COUNT],
         }
     }
 
@@ -807,7 +810,7 @@ async fn run_live_repair(mut harness: TestHarness) {
     let mut ds2_buffered_messages = vec![];
     let mut ds3_buffered_messages = vec![];
 
-    for eid in 0..25 {
+    for eid in 0..DEFAULT_EXTENT_COUNT {
         // The Upstairs first sends the close and reopen jobs
         for _ in 0..2 {
             ds1_buffered_messages.push(harness.ds1().recv().await.unwrap());
@@ -865,7 +868,7 @@ async fn run_live_repair(mut harness: TestHarness) {
 
         let mut responses = vec![Vec::new(); 3];
 
-        for io_eid in 0usize..25 {
+        for io_eid in 0usize..DEFAULT_EXTENT_COUNT {
             let mut dep_job_id = [reopen_job_id; 3];
             // read
             harness.spawn(move |guest| async move {

From 1e2b998e3c0774f99e50eef0e64c04b7d3f68fb5 Mon Sep 17 00:00:00 2001
From: Matt Keeter <matt@oxide.computer>
Date: Wed, 17 Apr 2024 14:53:17 -0400
Subject: [PATCH 8/8] Tweak backpressure curve to match `master`

---
 upstairs/src/guest.rs | 33 ++++++++++++++++++++-------------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs
index 22dc1ff29..e7880826c 100644
--- a/upstairs/src/guest.rs
+++ b/upstairs/src/guest.rs
@@ -333,6 +333,19 @@ struct BackpressureConfig {
 }
 
 impl BackpressureConfig {
+    // Our chosen backpressure curve is quadratic for 1/2 of its range, then
+    // goes to infinity in the second half.  This gives C0 + C1 continuity.
+    fn curve(frac: f64, scale: Duration) -> Duration {
+        // Remap from 0-1 to 0-1.5 for ease of calculation
+        let frac = frac * 2.0;
+        let v = if frac < 1.0 {
+            frac
+        } else {
+            1.0 / (1.0 - (frac - 1.0))
+        };
+        scale.mul_f64(v.powi(2))
+    }
+
     fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 {
         // Saturate at 1 hour per job, which is basically infinite
         if bytes >= self.bytes_max || jobs >= self.queue_max {
@@ -347,16 +360,10 @@ impl BackpressureConfig {
             / (self.bytes_max - self.bytes_start) as f64;
 
         // Delay should be 0 at frac = 0, and infinite at frac = 1
-        let delay_bytes = self
-            .bytes_scale
-            .mul_f64(1.0 / (1.0 - bytes_frac).powi(2) - 1.0)
-            .as_micros() as u64;
-
-        // Compute an alternate delay based on queue length
-        let delay_jobs = self
-            .queue_scale
-            .mul_f64(1.0 / (1.0 - jobs_frac).powi(2) - 1.0)
-            .as_micros() as u64;
+        let delay_bytes =
+            Self::curve(bytes_frac, self.bytes_scale).as_micros() as u64;
+        let delay_jobs =
+            Self::curve(jobs_frac, self.queue_scale).as_micros() as u64;
 
         delay_bytes.max(delay_jobs)
     }
@@ -424,7 +431,7 @@ impl Guest {
             // Byte-based backpressure
             bytes_start: 50 * 1024u64.pow(2), // 50 MiB
             bytes_max: IO_OUTSTANDING_MAX_BYTES * 2,
-            bytes_scale: Duration::from_millis(25),
+            bytes_scale: Duration::from_millis(100),
 
             // Queue-based backpressure
             queue_start: 500,
@@ -1509,9 +1516,9 @@ mod test {
                 humantime::format_duration(timeout)
             );
             assert!(
-                timeout < Duration::from_secs(120),
+                timeout < Duration::from_secs(180),
                 "offline -> faulted transition happens too slowly \
-                 with job size {job_size};  expected < 2 mins, got {}",
+                 with job size {job_size};  expected < 3 mins, got {}",
                 humantime::format_duration(timeout)
             );