From 71616e1d35cd4a0fac391644735a5d5e3bc5ddba Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Mon, 1 Apr 2024 17:32:24 -0400 Subject: [PATCH 1/8] Remove IO limits and tweak backpressure --- Cargo.lock | 1 + Cargo.toml | 1 + crutest/src/main.rs | 4 +- upstairs/Cargo.toml | 7 +- upstairs/src/client.rs | 4 + upstairs/src/guest.rs | 198 +++++++++++++++++++++++++++++---------- upstairs/src/lib.rs | 14 +-- upstairs/src/upstairs.rs | 21 ++--- 8 files changed, 173 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0bdf1eb93..99b33b13f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,6 +774,7 @@ dependencies = [ "futures", "futures-core", "http 0.2.12", + "humantime", "internal-dns", "itertools", "libc", diff --git a/Cargo.toml b/Cargo.toml index e10f1b0bd..621f0e6c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ hex = "0.4" http = "0.2.12" httptest = "0.15.5" human_bytes = "0.4.3" +humantime = "2.1.0" hyper = { version = "0.14", features = [ "full" ] } hyper-staticfile = "0.9" indicatif = { version = "0.17.8", features = ["rayon"] } diff --git a/crutest/src/main.rs b/crutest/src/main.rs index eafa51784..b7e731ce6 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -1836,9 +1836,7 @@ async fn generic_workload( } // Make use of dsc to stop and start a downstairs while sending IO. This -// should trigger the replay code path. The IO sent to the downstairs should -// be below the threshold of gone_too_long() so we don't end up faulting the -// downstairs and doing a live repair +// should trigger the replay code path. async fn replay_workload( guest: &Arc<Guest>, wtq: &mut WhenToQuit, diff --git a/upstairs/Cargo.toml b/upstairs/Cargo.toml index 77bc8a06c..0f6d15e6b 100644 --- a/upstairs/Cargo.toml +++ b/upstairs/Cargo.toml @@ -62,12 +62,13 @@ http = { workspace = true, optional = true } [dev-dependencies] expectorate.workspace = true -openapiv3.workspace = true +humantime.workspace = true openapi-lint.workspace = true -tokio-test.workspace = true +openapiv3.workspace = true +proptest.workspace = true tempfile.workspace = true test-strategy.workspace = true -proptest.workspace = true +tokio-test.workspace = true [build-dependencies] version_check = "0.9.4" diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index d785333d1..e1f48fc67 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -2506,6 +2506,10 @@ impl ClientIoTask { // If we're reconnecting, then add a short delay to avoid constantly // spinning (e.g. if something is fundamentally wrong with the // Downstairs) + // + // The upstairs can still stop us here, e.g. if we need to transition + // from Offline -> Faulted because we hit a job limit, that bounces the + // IO task (whether it *should* is debatable). if self.delay { tokio::select! { s = &mut self.stop => { diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index fba35b480..22dc1ff29 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -11,7 +11,7 @@ use std::{ use crate::{ BlockIO, BlockOp, BlockOpWaiter, BlockRes, Buffer, JobId, ReplaceResult, - UpstairsAction, + UpstairsAction, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS, }; use crucible_common::{build_logger, crucible_bail, Block, CrucibleError}; use crucible_protocol::{ReadResponse, SnapshotDetails}; @@ -317,15 +317,49 @@ pub struct Guest { /// the two delays. #[derive(Copy, Clone, Debug)] struct BackpressureConfig { - /// When should backpressure start? - bytes_start: f64, - /// Maximum bytes-based backpressure - bytes_max_delay: Duration, - - /// When should queue-based backpressure start? - queue_start: f64, - /// Maximum queue-based delay - queue_max_delay: Duration, + /// When should backpressure start, in units of bytes + bytes_start: u64, + /// Maximum number of bytes (i.e. backpressure goes to infinity) + bytes_max: u64, + /// Scale of bytes-based backpressure + bytes_scale: Duration, + + /// When should backpressure start, in units of jobs + queue_start: u64, + /// Maximum number of jobs (i.e. backpressure goes to infinity) + queue_max: u64, + /// Scale of queue-based delay + queue_scale: Duration, +} + +impl BackpressureConfig { + fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 { + // Saturate at 1 hour per job, which is basically infinite + if bytes >= self.bytes_max || jobs >= self.queue_max { + return Duration::from_secs(60 * 60).as_micros() as u64; + } + + // These ratios start at 0 (at *_start) and hit 1 when backpressure + // should be infinite. + let jobs_frac = jobs.saturating_sub(self.queue_start) as f64 + / (self.queue_max - self.queue_start) as f64; + let bytes_frac = bytes.saturating_sub(self.bytes_start) as f64 + / (self.bytes_max - self.bytes_start) as f64; + + // Delay should be 0 at frac = 0, and infinite at frac = 1 + let delay_bytes = self + .bytes_scale + .mul_f64(1.0 / (1.0 - bytes_frac).powi(2) - 1.0) + .as_micros() as u64; + + // Compute an alternate delay based on queue length + let delay_jobs = self + .queue_scale + .mul_f64(1.0 / (1.0 - jobs_frac).powi(2) - 1.0) + .as_micros() as u64; + + delay_bytes.max(delay_jobs) + } } /* @@ -370,15 +404,7 @@ impl Guest { iop_tokens: 0, bw_tokens: 0, backpressure_us: backpressure_us.clone(), - backpressure_config: BackpressureConfig { - // Byte-based backpressure - bytes_start: 0.05, - bytes_max_delay: Duration::from_millis(100), - - // Queue-based backpressure - queue_start: 0.05, - queue_max_delay: Duration::from_millis(5), - }, + backpressure_config: Self::default_backpressure_config(), log: log.clone(), }; let guest = Guest { @@ -393,6 +419,20 @@ impl Guest { (guest, io) } + fn default_backpressure_config() -> BackpressureConfig { + BackpressureConfig { + // Byte-based backpressure + bytes_start: 50 * 1024u64.pow(2), // 50 MiB + bytes_max: IO_OUTSTANDING_MAX_BYTES * 2, + bytes_scale: Duration::from_millis(25), + + // Queue-based backpressure + queue_start: 500, + queue_max: IO_OUTSTANDING_MAX_JOBS as u64 * 2, + queue_scale: Duration::from_millis(5), + } + } + /* * This is used to submit a new BlockOp IO request to Crucible. * @@ -913,50 +953,23 @@ impl GuestIoHandle { #[cfg(test)] pub fn disable_queue_backpressure(&mut self) { - self.backpressure_config.queue_max_delay = Duration::ZERO; + self.backpressure_config.queue_scale = Duration::ZERO; } #[cfg(test)] pub fn disable_byte_backpressure(&mut self) { - self.backpressure_config.bytes_max_delay = Duration::ZERO; + self.backpressure_config.bytes_scale = Duration::ZERO; } #[cfg(test)] pub fn is_queue_backpressure_disabled(&self) -> bool { - self.backpressure_config.queue_max_delay == Duration::ZERO + self.backpressure_config.queue_scale == Duration::ZERO } /// Set `self.backpressure_us` based on outstanding IO ratio pub fn set_backpressure(&self, bytes: u64, jobs: u64) { - let jobs_frac = jobs as f64 / crate::IO_OUTSTANDING_MAX_JOBS as f64; - let bytes_frac = bytes as f64 / crate::IO_OUTSTANDING_MAX_BYTES as f64; - - // Check to see if the number of outstanding write bytes (between - // the upstairs and downstairs) is particularly high. If so, - // apply some backpressure by delaying host operations, with a - // quadratically-increasing delay. - let delay_bytes = self - .backpressure_config - .bytes_max_delay - .mul_f64( - ((bytes_frac - self.backpressure_config.bytes_start).max(0.0) - / (1.0 - self.backpressure_config.bytes_start)) - .powf(2.0), - ) - .as_micros() as u64; - - // Compute an alternate delay based on queue length - let delay_jobs = self - .backpressure_config - .queue_max_delay - .mul_f64( - ((jobs_frac - self.backpressure_config.queue_start).max(0.0) - / (1.0 - self.backpressure_config.queue_start)) - .powf(2.0), - ) - .as_micros() as u64; - self.backpressure_us - .store(delay_jobs.max(delay_bytes), Ordering::SeqCst); + let bp_usec = self.backpressure_config.get_backpressure_us(bytes, jobs); + self.backpressure_us.store(bp_usec, Ordering::SeqCst); } pub fn set_iop_limit(&mut self, bytes_per_iop: usize, limit: usize) { @@ -1459,4 +1472,87 @@ mod test { Ok(()) } + + /// Confirm that the offline timeout is reasonable + #[test] + fn check_offline_timeout() { + for job_size in + [512, 4 * 1024, 16 * 1024, 64 * 1024, 256 * 1024, 1024 * 1024] + { + let mut bytes_in_flight = 0; + let mut jobs_in_flight = 0; + let mut time_usec: u64 = 0; + let cfg = Guest::default_backpressure_config(); + + let (t, desc) = loop { + let bp_usec = + cfg.get_backpressure_us(bytes_in_flight, jobs_in_flight); + time_usec = time_usec.saturating_add(bp_usec); + + if bytes_in_flight >= IO_OUTSTANDING_MAX_BYTES { + break (time_usec, "bytes"); + } + + if jobs_in_flight >= IO_OUTSTANDING_MAX_JOBS as u64 { + break (time_usec, "jobs"); + } + + bytes_in_flight += job_size; + jobs_in_flight += 1; + }; + + let timeout = Duration::from_micros(t); + assert!( + timeout > Duration::from_secs(1), + "offline -> faulted transition happens too quickly \ + with job size {job_size}; expected > 1 sec, got {}", + humantime::format_duration(timeout) + ); + assert!( + timeout < Duration::from_secs(120), + "offline -> faulted transition happens too slowly \ + with job size {job_size}; expected < 2 mins, got {}", + humantime::format_duration(timeout) + ); + + println!( + "job size {job_size:>8}:\n Timeout in {} ({desc})\n", + humantime::format_duration(timeout) + ); + } + } + + #[test] + fn check_max_backpressure() { + let cfg = Guest::default_backpressure_config(); + let t = cfg.get_backpressure_us( + IO_OUTSTANDING_MAX_BYTES * 2 - 1024u64.pow(2), + 0, + ); + let timeout = Duration::from_micros(t); + println!( + "max byte-based delay: {}", + humantime::format_duration(timeout) + ); + assert!( + timeout > Duration::from_secs(60 * 60), + "max byte-based backpressure delay is too low; + expected > 1 hr, got {}", + humantime::format_duration(timeout) + ); + + let t = + cfg.get_backpressure_us(0, IO_OUTSTANDING_MAX_JOBS as u64 * 2 - 1); + let timeout = Duration::from_micros(t); + println!( + "max job-based delay: {}", + humantime::format_duration(timeout) + ); + assert!( + timeout > Duration::from_secs(60 * 60), + "max job-based backpressure delay is too low; + expected > 1 hr, got {}", + humantime::format_duration(timeout) + ); + } } diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index f27b07d27..ce350a1a3 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -83,17 +83,17 @@ mod downstairs; mod upstairs; use upstairs::{UpCounters, UpstairsAction}; -/// Max number of write bytes between the upstairs and the downstairs +/// Max number of write bytes between the upstairs and an offline downstairs /// -/// If we exceed this value, the upstairs will give up and mark that downstairs -/// as faulted. +/// If we exceed this value, the upstairs will give +/// up and mark the offline downstairs as faulted. const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB -/// Max number of outstanding IOs between the upstairs and a downstairs +/// Max number of outstanding IOs between the upstairs and an offline downstairs /// -/// If we exceed this value, the upstairs will give up and mark that downstairs -/// as faulted. -pub const IO_OUTSTANDING_MAX_JOBS: usize = 57000; +/// If we exceed this value, the upstairs will give up and mark that offline +/// downstairs as faulted. +pub const IO_OUTSTANDING_MAX_JOBS: usize = 10000; /// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual /// disk): there is no contract about what order operations that are submitted diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 2307458e8..90caf2374 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -595,8 +595,8 @@ impl Upstairs { } } - // Check whether we need to mark a Downstairs as faulted because too - // many jobs have piled up. + // Check whether we need to mark an offline Downstairs as faulted + // because too many jobs have piled up. self.gone_too_long(); // Check to see whether live-repair can continue @@ -697,8 +697,10 @@ impl Upstairs { /// Check outstanding IOops for each downstairs. /// - /// If the number is too high, then mark that downstairs as failed, scrub - /// any outstanding jobs, and restart the client IO task. + /// We never kick out a Downstairs that is replying to us, but will + /// eventually transition a Downstairs from Offline to Faulted (which then + /// leads to scrubbing any outstanding jobs, and restarting the client IO + /// task). fn gone_too_long(&mut self) { // If we are not active, then just exit. if !matches!(self.state, UpstairsState::Active) { @@ -706,15 +708,8 @@ impl Upstairs { } for cid in ClientId::iter() { - // Only downstairs in these states are checked. - match self.downstairs.clients[cid].state() { - DsState::Active - | DsState::LiveRepair - | DsState::Offline - | DsState::Replay => { - self.downstairs.check_gone_too_long(cid, &self.state); - } - _ => {} + if self.downstairs.clients[cid].state() == DsState::Offline { + self.downstairs.check_gone_too_long(cid, &self.state); } } } From ad6bc9c13770b3bbe35040dbdb8ccd05cfd037db Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Fri, 12 Apr 2024 11:24:24 -0400 Subject: [PATCH 2/8] Working on unit tests --- upstairs/src/dummy_downstairs_tests.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 1ba37f069..f02318c55 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -499,8 +499,9 @@ impl TestHarness { read_only, reply_to_ping, - extent_count: 10, - extent_size: Block::new_512(10), + // Slightly over 1 MiB, so we can do max-size writes + extent_count: 25, + extent_size: Block::new_512(100), gen_numbers: vec![0u64; 10], flush_numbers: vec![0u64; 10], @@ -1448,7 +1449,7 @@ async fn test_byte_fault_condition() { // Send enough bytes to hit the IO_OUTSTANDING_MAX_BYTES condition on // downstairs 1, which should mark it as faulted and kick it out. - const WRITE_SIZE: usize = 50 * 1024; // 50 KiB + const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); @@ -1516,7 +1517,7 @@ async fn test_byte_fault_condition_offline() { // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES // condition on downstairs 1, which should mark it as faulted and kick it // out. - const WRITE_SIZE: usize = 50 * 1024; // 50 KiB + const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); From f5de239ec9450e87fb1fe24336ab7d237b4b9ee8 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Fri, 12 Apr 2024 11:39:38 -0400 Subject: [PATCH 3/8] More unit test work --- upstairs/src/dummy_downstairs_tests.rs | 168 ++++++++++++++----------- 1 file changed, 97 insertions(+), 71 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index f02318c55..d05c52b7d 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -923,10 +923,11 @@ async fn run_live_repair(mut harness: TestHarness) { } else { // All IO above this is skipped for the downstairs under // repair. - assert!(matches!( - harness.ds1().try_recv(), - Err(TryRecvError::Empty) - )); + let r = harness.ds1().try_recv(); + assert!( + matches!(r, Err(TryRecvError::Empty)), + "unexpected response {r:?}" + ); } let m2 = harness.ds2.recv().await.unwrap(); @@ -1031,10 +1032,11 @@ async fn run_live_repair(mut harness: TestHarness) { } else { // All IO above this is skipped for the downstairs under // repair. - assert!(matches!( - harness.ds1().try_recv(), - Err(TryRecvError::Empty) - )); + let r = harness.ds1().try_recv(); + assert!( + matches!(r, Err(TryRecvError::Empty)), + "unexpected IO: {r:?}" + ); } let m2 = harness.ds2.recv().await.unwrap(); @@ -1445,16 +1447,34 @@ async fn run_live_repair(mut harness: TestHarness) { /// Test that we will mark a Downstairs as failed if we hit the byte limit #[tokio::test] async fn test_byte_fault_condition() { - let mut harness = TestHarness::new().await; + let mut harness = TestHarness::new_no_ping().await; + + // Send enough bytes such that when we hit the client timeout, we are above + // our bytes-in-flight limit (so the downstairs gets marked as faulted + // instead of offline). + const MARGIN_SECS: f32 = 4.0; + const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS; + let start_time = tokio::time::Instant::now(); - // Send enough bytes to hit the IO_OUTSTANDING_MAX_BYTES condition on - // downstairs 1, which should mark it as faulted and kick it out. + // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES + // condition on downstairs 1, which should mark it as faulted and kick it + // out. const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); + // First, we'll send jobs until the timeout for i in 0..num_jobs { + // Delay so that we hit SEND_JOBS_TIME at the end of this loop + tokio::time::sleep_until( + start_time + + Duration::from_secs_f32( + SEND_JOBS_TIME * i as f32 / (num_jobs / 2) as f32, + ), + ) + .await; + // We must `spawn` here because `write` will wait for the response // to come back before returning let write_buf = write_buf.clone(); @@ -1462,47 +1482,42 @@ async fn test_byte_fault_condition() { guest.write(Block::new_512(0), write_buf).await.unwrap(); }); + // Before we're kicked out, assert we're seeing the read requests + assert!(matches!( + harness.ds1().recv().await.unwrap(), + Message::Write { .. }, + )); harness.ds2.ack_write().await; harness.ds3.ack_write().await; - // With 2x responses, we can now await the write job (which ensures that + // With 2x responses, we can now await the read job (which ensures that // the Upstairs has finished updating its state). h.await.unwrap(); let ds = harness.guest.downstairs_state().await.unwrap(); - if (i + 1) * WRITE_SIZE < IO_OUTSTANDING_MAX_BYTES as usize { - // Before we're kicked out, assert we're seeing the read - // requests - assert!(matches!( - harness.ds1().recv().await.unwrap(), - Message::Write { .. }, - )); - assert_eq!(ds[ClientId::new(0)], DsState::Active); - assert_eq!(ds[ClientId::new(1)], DsState::Active); - assert_eq!(ds[ClientId::new(2)], DsState::Active); - } else { - // After ds1 is kicked out, we shouldn't see any more messages - match harness.ds1().try_recv() { - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => {} - x => { - panic!("Read {i} should return EMPTY, but we got:{x:?}"); - } - } - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); - assert_eq!(ds[ClientId::new(1)], DsState::Active); - assert_eq!(ds[ClientId::new(2)], DsState::Active); - } + assert_eq!(ds[ClientId::new(0)], DsState::Active); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); } - // Confirm that live repair still works - run_live_repair(harness).await + // Sleep until we're confident that the Downstairs is kicked out + info!(harness.log, "waiting for Upstairs to kick out DS1"); + tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await; + + // Check to make sure that happened + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Confirm that the system comes up after live-repair + run_live_repair(harness).await; } /// Test that we will transition a downstairs from offline -> faulted if we hit /// the byte limit after it's already offline #[tokio::test] -async fn test_byte_fault_condition_offline() { +async fn test_bbyte_fault_condition_offline() { let mut harness = TestHarness::new_no_ping().await; // Two different transitions occur during this test: @@ -1610,15 +1625,26 @@ async fn test_byte_fault_condition_offline() { /// Test that we will mark a Downstairs as failed if we hit the job limit #[tokio::test] async fn test_job_fault_condition() { - let mut harness = TestHarness::new().await; + let mut harness = TestHarness::new_no_ping().await; - // Send 200 more than IO_OUTSTANDING_MAX_JOBS jobs, sending read - // responses from two of the three downstairs. After we have sent - // IO_OUTSTANDING_MAX_JOBS jobs, the Upstairs will set ds1 to faulted, - // and send it no more work. - const NUM_JOBS: usize = IO_OUTSTANDING_MAX_JOBS + 200; + // We're going to queue up > IO_OUTSTANDING_MAX_JOBS in less than our + // timeout time, so that when timeout hits, the downstairs will become + // Faulted instead of Offline. + const MARGIN_SECS: f32 = 4.0; + const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS; + let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200; + let start_time = tokio::time::Instant::now(); + + for i in 0..num_jobs { + // Delay so that we hit SEND_JOBS_TIME at the end of this loop + tokio::time::sleep_until( + start_time + + Duration::from_secs_f32( + SEND_JOBS_TIME * i as f32 / num_jobs as f32, + ), + ) + .await; - for i in 0..NUM_JOBS { // We must `spawn` here because `write` will wait for the response to // come back before returning let h = harness.spawn(|guest| async move { @@ -1626,6 +1652,12 @@ async fn test_job_fault_condition() { guest.read(Block::new_512(0), &mut buffer).await.unwrap(); }); + // DS1 should be receiving messages + assert!(matches!( + harness.ds1().recv().await.unwrap(), + Message::ReadRequest { .. }, + )); + // Respond with read responses for downstairs 2 and 3 harness.ds2.ack_read().await; harness.ds3.ack_read().await; @@ -1635,39 +1667,33 @@ async fn test_job_fault_condition() { h.await.unwrap(); let ds = harness.guest.downstairs_state().await.unwrap(); - if i < IO_OUTSTANDING_MAX_JOBS { - // Before we're kicked out, assert we're seeing the read - // requests - assert!(matches!( - harness.ds1().recv().await.unwrap(), - Message::ReadRequest { .. }, - )); - assert_eq!(ds[ClientId::new(0)], DsState::Active); - assert_eq!(ds[ClientId::new(1)], DsState::Active); - assert_eq!(ds[ClientId::new(2)], DsState::Active); - } else { - // After ds1 is kicked out, we shouldn't see any more messages - match harness.ds1().try_recv() { - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => {} - x => { - panic!("Read {i} should return EMPTY, but we got:{x:?}"); - } - } - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); - assert_eq!(ds[ClientId::new(1)], DsState::Active); - assert_eq!(ds[ClientId::new(2)], DsState::Active); - } + assert_eq!(ds[ClientId::new(0)], DsState::Active); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); } - // Confirm that live repair still works - run_live_repair(harness).await + // Sleep until we're confident that the Downstairs is kicked out + // + // Because it has so many pending jobs, it will become Faulted instead of + // Offline (or rather, will transition Active -> Offline -> Faulted + // immediately). + info!(harness.log, "waiting for Upstairs to kick out DS1"); + tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await; + + // Check to make sure that happened + let ds = harness.guest.downstairs_state().await.unwrap(); + assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!(ds[ClientId::new(1)], DsState::Active); + assert_eq!(ds[ClientId::new(2)], DsState::Active); + + // Confirm that the system comes up after live-repair + run_live_repair(harness).await; } /// Test that we will transition a downstairs from offline -> faulted if we hit /// the job limit after it's already offline #[tokio::test] -async fn test_job_fault_condition_offline() { +async fn test_jjob_fault_condition_offline() { let mut harness = TestHarness::new_no_ping().await; // Two different transitions occur during this test: From 476a8c84acdf0fe409c19f0e206f67e6b84eac95 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Fri, 12 Apr 2024 14:15:08 -0400 Subject: [PATCH 4/8] Unit tests are passing? --- upstairs/src/dummy_downstairs_tests.rs | 138 +++++++++++++------------ 1 file changed, 71 insertions(+), 67 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index d05c52b7d..db65e26b7 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -475,44 +475,38 @@ pub struct TestHarness { impl TestHarness { pub async fn new() -> TestHarness { - Self::new_(false, true).await + Self::new_(false).await } pub async fn new_ro() -> TestHarness { - Self::new_(true, true).await - } - - /// Build a new TestHarness where DS1 doesn't reply to pings - pub async fn new_no_ping() -> TestHarness { - Self::new_(false, false).await + Self::new_(true).await } pub fn ds1(&mut self) -> &mut DownstairsHandle { self.ds1.as_mut().unwrap() } - fn default_config( - read_only: bool, - reply_to_ping: bool, - ) -> DownstairsConfig { + fn default_config(read_only: bool) -> DownstairsConfig { DownstairsConfig { read_only, - reply_to_ping, + reply_to_ping: true, - // Slightly over 1 MiB, so we can do max-size writes + // Extent count is picked so that we can hit + // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS, + // i.e. letting us test both byte and job fault conditions. extent_count: 25, - extent_size: Block::new_512(100), + extent_size: Block::new_512(10), - gen_numbers: vec![0u64; 10], - flush_numbers: vec![0u64; 10], - dirty_bits: vec![false; 10], + gen_numbers: vec![0u64; 25], + flush_numbers: vec![0u64; 25], + dirty_bits: vec![false; 25], } } - async fn new_(read_only: bool, reply_to_ping: bool) -> TestHarness { + async fn new_(read_only: bool) -> TestHarness { let log = csl(); - let cfg = Self::default_config(read_only, reply_to_ping); + let cfg = Self::default_config(read_only); let ds1 = cfg.clone().start(log.new(o!("downstairs" => 1))).await; let ds2 = cfg.clone().start(log.new(o!("downstairs" => 2))).await; @@ -813,7 +807,7 @@ async fn run_live_repair(mut harness: TestHarness) { let mut ds2_buffered_messages = vec![]; let mut ds3_buffered_messages = vec![]; - for eid in 0..10 { + for eid in 0..25 { // The Upstairs first sends the close and reopen jobs for _ in 0..2 { ds1_buffered_messages.push(harness.ds1().recv().await.unwrap()); @@ -871,7 +865,7 @@ async fn run_live_repair(mut harness: TestHarness) { let mut responses = vec![Vec::new(); 3]; - for io_eid in 0usize..10 { + for io_eid in 0usize..25 { let mut dep_job_id = [reopen_job_id; 3]; // read harness.spawn(move |guest| async move { @@ -1425,11 +1419,11 @@ async fn run_live_repair(mut harness: TestHarness) { // Expect the live repair to send a final flush { let flush_number = harness.ds1().ack_flush().await; - assert_eq!(flush_number, 12); + assert_eq!(flush_number, 27); let flush_number = harness.ds2.ack_flush().await; - assert_eq!(flush_number, 12); + assert_eq!(flush_number, 27); let flush_number = harness.ds3.ack_flush().await; - assert_eq!(flush_number, 12); + assert_eq!(flush_number, 27); } // Try another read @@ -1447,34 +1441,24 @@ async fn run_live_repair(mut harness: TestHarness) { /// Test that we will mark a Downstairs as failed if we hit the byte limit #[tokio::test] async fn test_byte_fault_condition() { - let mut harness = TestHarness::new_no_ping().await; - // Send enough bytes such that when we hit the client timeout, we are above // our bytes-in-flight limit (so the downstairs gets marked as faulted // instead of offline). - const MARGIN_SECS: f32 = 4.0; - const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS; - let start_time = tokio::time::Instant::now(); + // + // Notice that we keep DS1 replying to pings through this process, so it + // doesn't get set to offline early. + let mut harness = TestHarness::new().await; // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES // condition on downstairs 1, which should mark it as faulted and kick it // out. - const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB + const WRITE_SIZE: usize = 105 * 1024; // 105 KiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); // First, we'll send jobs until the timeout - for i in 0..num_jobs { - // Delay so that we hit SEND_JOBS_TIME at the end of this loop - tokio::time::sleep_until( - start_time - + Duration::from_secs_f32( - SEND_JOBS_TIME * i as f32 / (num_jobs / 2) as f32, - ), - ) - .await; - + for _ in 0..num_jobs { // We must `spawn` here because `write` will wait for the response // to come back before returning let write_buf = write_buf.clone(); @@ -1490,7 +1474,7 @@ async fn test_byte_fault_condition() { harness.ds2.ack_write().await; harness.ds3.ack_write().await; - // With 2x responses, we can now await the read job (which ensures that + // With 2x responses, we can now await the write job (which ensures that // the Upstairs has finished updating its state). h.await.unwrap(); @@ -1501,8 +1485,23 @@ async fn test_byte_fault_condition() { } // Sleep until we're confident that the Downstairs is kicked out - info!(harness.log, "waiting for Upstairs to kick out DS1"); - tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await; + let sleep_time = CLIENT_TIMEOUT_SECS + 5.0; + info!( + harness.log, + "waiting {sleep_time} secs for Upstairs to kick out DS1" + ); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs_f32(sleep_time)) => { + // we're done! + } + // we don't listen to ds1 here, so we won't acknowledge any pings! + v = harness.ds2.recv() => { + panic!("received unexpected message on ds2: {v:?}") + } + v = harness.ds3.recv() => { + panic!("received unexpected message on ds3: {v:?}") + } + } // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); @@ -1517,8 +1516,9 @@ async fn test_byte_fault_condition() { /// Test that we will transition a downstairs from offline -> faulted if we hit /// the byte limit after it's already offline #[tokio::test] -async fn test_bbyte_fault_condition_offline() { - let mut harness = TestHarness::new_no_ping().await; +async fn test_byte_fault_condition_offline() { + let mut harness = TestHarness::new().await; + harness.ds1().cfg.reply_to_ping = false; // Two different transitions occur during this test: // - We're not replying to pings, so DS1 will eventually transition from @@ -1532,7 +1532,7 @@ async fn test_bbyte_fault_condition_offline() { // `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES // condition on downstairs 1, which should mark it as faulted and kick it // out. - const WRITE_SIZE: usize = 1024usize.pow(2); // 1 MiB + const WRITE_SIZE: usize = 105 * 1024; // 105 KiB let write_buf = BytesMut::from(vec![1; WRITE_SIZE].as_slice()); // 50 KiB let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf.len() + 10; assert!(num_jobs < IO_OUTSTANDING_MAX_JOBS); @@ -1625,26 +1625,13 @@ async fn test_bbyte_fault_condition_offline() { /// Test that we will mark a Downstairs as failed if we hit the job limit #[tokio::test] async fn test_job_fault_condition() { - let mut harness = TestHarness::new_no_ping().await; + let mut harness = TestHarness::new().await; - // We're going to queue up > IO_OUTSTANDING_MAX_JOBS in less than our - // timeout time, so that when timeout hits, the downstairs will become - // Faulted instead of Offline. - const MARGIN_SECS: f32 = 4.0; - const SEND_JOBS_TIME: f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS; + // We're going to queue up > IO_OUTSTANDING_MAX_JOBS, then wait for a + // timeout, so that when timeout hits, the downstairs will become Faulted + // instead of Offline. let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200; - let start_time = tokio::time::Instant::now(); - - for i in 0..num_jobs { - // Delay so that we hit SEND_JOBS_TIME at the end of this loop - tokio::time::sleep_until( - start_time - + Duration::from_secs_f32( - SEND_JOBS_TIME * i as f32 / num_jobs as f32, - ), - ) - .await; - + for _ in 0..num_jobs { // We must `spawn` here because `write` will wait for the response to // come back before returning let h = harness.spawn(|guest| async move { @@ -1677,8 +1664,24 @@ async fn test_job_fault_condition() { // Because it has so many pending jobs, it will become Faulted instead of // Offline (or rather, will transition Active -> Offline -> Faulted // immediately). + let sleep_time = CLIENT_TIMEOUT_SECS + 5.0; info!(harness.log, "waiting for Upstairs to kick out DS1"); - tokio::time::sleep(Duration::from_secs_f32(2.0 * MARGIN_SECS)).await; + info!( + harness.log, + "waiting {sleep_time} secs for Upstairs to kick out DS1" + ); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs_f32(sleep_time)) => { + // we're done! + } + // we don't listen to ds1 here, so we won't acknowledge any pings! + v = harness.ds2.recv() => { + panic!("received unexpected message on ds2: {v:?}") + } + v = harness.ds3.recv() => { + panic!("received unexpected message on ds3: {v:?}") + } + } // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); @@ -1693,8 +1696,9 @@ async fn test_job_fault_condition() { /// Test that we will transition a downstairs from offline -> faulted if we hit /// the job limit after it's already offline #[tokio::test] -async fn test_jjob_fault_condition_offline() { - let mut harness = TestHarness::new_no_ping().await; +async fn test_job_fault_condition_offline() { + let mut harness = TestHarness::new().await; + harness.ds1().cfg.reply_to_ping = false; // Two different transitions occur during this test: // - We're not replying to pings, so DS1 will eventually transition from From dd8722c600537927f23aae3686c4cafc353d9b59 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Mon, 15 Apr 2024 14:38:12 -0400 Subject: [PATCH 5/8] Move Imok reply to main IO loop in Downstairs --- downstairs/src/lib.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index 296c3740c..fb3c77366 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -1308,12 +1308,7 @@ where return Ok(()); } Some(Ok(msg)) => { - if matches!(msg, Message::Ruok) { - // Respond instantly to pings, don't wait. - if let Err(e) = resp_channel_tx.send(Message::Imok) { - bail!("Failed sending Imok: {}", e); - } - } else if let Err(e) = message_channel_tx.send(msg) { + if let Err(e) = message_channel_tx.send(msg) { bail!("Failed sending message to proc_frame: {}", e); } } @@ -2660,6 +2655,10 @@ impl Downstairs { resp_tx.send(msg)?; None } + Message::Ruok => { + resp_tx.send(Message::Imok)?; + None + } x => bail!("unexpected frame {:?}", x), }; Ok(r) From bf72b38fbd7baf41a3f42bde1763015a947afa23 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Wed, 17 Apr 2024 10:30:06 -0400 Subject: [PATCH 6/8] Update diagrams --- downstairs/src/lib.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/downstairs/src/lib.rs b/downstairs/src/lib.rs index fb3c77366..91f1397e6 100644 --- a/downstairs/src/lib.rs +++ b/downstairs/src/lib.rs @@ -1140,13 +1140,13 @@ where * │ │ * │ ┌────────────────┴────────────────────┐ * │ │ framed_write_task │ - * │ └─▲─────▲──────────────────▲──────────┘ - * │ │ │ │ - * │ ping│ │invalid │ - * │ ┌────────┘ │frame │responses - * │ │ │errors │ - * │ │ │ │ - * ┌────▼──┴─┐ message ┌┴──────┐ job ┌┴────────┐ + * │ └───────▲──────────────────▲──────────┘ + * │ │ │ + * │ │invalid frame │ + * │ │errors, pings │responses + * │ │ │ + * │ │ │ + * ┌────▼────┐ message ┌┴──────┐ job ┌┴────────┐ * │resp_loop├──────────►│pf_task├─────────►│ dw_task │ * └─────────┘ channel └──┬────┘ channel └▲────────┘ * │ │ From 9cf000dcc383287778b3144fc6d0154915c3a5d9 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Wed, 17 Apr 2024 14:23:11 -0400 Subject: [PATCH 7/8] Pull extent_count into a constant --- upstairs/src/dummy_downstairs_tests.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index db65e26b7..c7a6c5a72 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -473,6 +473,9 @@ pub struct TestHarness { guest: Arc<Guest>, } +/// Number of extents in `TestHarness::default_config` +const DEFAULT_EXTENT_COUNT: usize = 25; + impl TestHarness { pub async fn new() -> TestHarness { Self::new_(false).await @@ -494,12 +497,12 @@ impl TestHarness { // Extent count is picked so that we can hit // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS, // i.e. letting us test both byte and job fault conditions. - extent_count: 25, + extent_count: DEFAULT_EXTENT_COUNT as u32, extent_size: Block::new_512(10), - gen_numbers: vec![0u64; 25], - flush_numbers: vec![0u64; 25], - dirty_bits: vec![false; 25], + gen_numbers: vec![0u64; DEFAULT_EXTENT_COUNT], + flush_numbers: vec![0u64; DEFAULT_EXTENT_COUNT], + dirty_bits: vec![false; DEFAULT_EXTENT_COUNT], } } @@ -807,7 +810,7 @@ async fn run_live_repair(mut harness: TestHarness) { let mut ds2_buffered_messages = vec![]; let mut ds3_buffered_messages = vec![]; - for eid in 0..25 { + for eid in 0..DEFAULT_EXTENT_COUNT { // The Upstairs first sends the close and reopen jobs for _ in 0..2 { ds1_buffered_messages.push(harness.ds1().recv().await.unwrap()); @@ -865,7 +868,7 @@ async fn run_live_repair(mut harness: TestHarness) { let mut responses = vec![Vec::new(); 3]; - for io_eid in 0usize..25 { + for io_eid in 0usize..DEFAULT_EXTENT_COUNT { let mut dep_job_id = [reopen_job_id; 3]; // read harness.spawn(move |guest| async move { From 1e2b998e3c0774f99e50eef0e64c04b7d3f68fb5 Mon Sep 17 00:00:00 2001 From: Matt Keeter <matt@oxide.computer> Date: Wed, 17 Apr 2024 14:53:17 -0400 Subject: [PATCH 8/8] Tweak backpressure curve to match `master` --- upstairs/src/guest.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 22dc1ff29..e7880826c 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -333,6 +333,19 @@ struct BackpressureConfig { } impl BackpressureConfig { + // Our chosen backpressure curve is quadratic for 1/2 of its range, then + // goes to infinity in the second half. This gives C0 + C1 continuity. + fn curve(frac: f64, scale: Duration) -> Duration { + // Remap from 0-1 to 0-1.5 for ease of calculation + let frac = frac * 2.0; + let v = if frac < 1.0 { + frac + } else { + 1.0 / (1.0 - (frac - 1.0)) + }; + scale.mul_f64(v.powi(2)) + } + fn get_backpressure_us(&self, bytes: u64, jobs: u64) -> u64 { // Saturate at 1 hour per job, which is basically infinite if bytes >= self.bytes_max || jobs >= self.queue_max { @@ -347,16 +360,10 @@ impl BackpressureConfig { / (self.bytes_max - self.bytes_start) as f64; // Delay should be 0 at frac = 0, and infinite at frac = 1 - let delay_bytes = self - .bytes_scale - .mul_f64(1.0 / (1.0 - bytes_frac).powi(2) - 1.0) - .as_micros() as u64; - - // Compute an alternate delay based on queue length - let delay_jobs = self - .queue_scale - .mul_f64(1.0 / (1.0 - jobs_frac).powi(2) - 1.0) - .as_micros() as u64; + let delay_bytes = + Self::curve(bytes_frac, self.bytes_scale).as_micros() as u64; + let delay_jobs = + Self::curve(jobs_frac, self.queue_scale).as_micros() as u64; delay_bytes.max(delay_jobs) } @@ -424,7 +431,7 @@ impl Guest { // Byte-based backpressure bytes_start: 50 * 1024u64.pow(2), // 50 MiB bytes_max: IO_OUTSTANDING_MAX_BYTES * 2, - bytes_scale: Duration::from_millis(25), + bytes_scale: Duration::from_millis(100), // Queue-based backpressure queue_start: 500, @@ -1509,9 +1516,9 @@ mod test { humantime::format_duration(timeout) ); assert!( - timeout < Duration::from_secs(120), + timeout < Duration::from_secs(180), "offline -> faulted transition happens too slowly \ - with job size {job_size}; expected < 2 mins, got {}", + with job size {job_size}; expected < 3 mins, got {}", humantime::format_duration(timeout) );