diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 6c2d52293..bd11d4013 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -133,8 +133,9 @@ pub(crate) mod protocol_test { uuid: Uuid::new_v4(), read_only: false, - extent_count: 10, - extent_size: Block::new_512(10), + // 8M region + extent_count: 32, + extent_size: Block::new_512(512), gen_numbers: vec![0u64; 10], flush_numbers: vec![0u64; 10], @@ -632,6 +633,25 @@ pub(crate) mod protocol_test { } } + fn make_4m_blank_read_response() -> Vec<crucible_protocol::ReadResponse> { + (0..(7 * 1024 * 1024 / 512)) + .map(|i| { + let data = vec![0u8; 512]; + let hash = crucible_common::integrity_hash(&[&data]); + + crucible_protocol::ReadResponse { + eid: 0, + offset: Block::new_512(i), + data: BytesMut::from(&data[..]), + block_contexts: vec![BlockContext { + hash, + encryption_context: None, + }], + } + }) + .collect() + } + /// Filter the first element that matches some predicate out of a list pub fn filter_out<T, P>(l: &mut Vec<T>, pred: P) -> Option<T> where @@ -2964,4 +2984,185 @@ pub(crate) mod protocol_test { Ok(()) } + + /// Submit a huge amount of IO all at once to ensure that the Upstairs + /// doesn't stall by allowing the TCP buffers to fill up. + #[tokio::test(flavor = "multi_thread")] + async fn test_upstairs_stall() -> Result<()> { + let harness = Arc::new(TestHarness::new().await?); + + let (_jh1, mut ds1_messages) = + harness.ds1().await.spawn_message_receiver().await; + let (_jh2, mut ds2_messages) = + harness.ds2.spawn_message_receiver().await; + let (_jh3, mut ds3_messages) = + harness.ds3.spawn_message_receiver().await; + + // For each downstairs, spawn a task that will return read responses as + // fast as possible. + + let _ds1_task: tokio::task::JoinHandle<Result<()>> = { + let harness = harness.clone(); + tokio::spawn(async move { + let upstairs_id = harness.guest.get_uuid().await.unwrap(); + let session_id = harness + .ds1() + .await + .upstairs_session_id + .lock() + .await + .unwrap(); + + loop { + match ds1_messages.recv().await { + Some(m) => match m { + Message::ReadRequest { job_id, .. } => { + let ds1 = harness.ds1().await; + let mut fw = ds1.fw.lock().await; + + fw.send(Message::ReadResponse { + upstairs_id, + session_id, + job_id, + responses: Ok( + make_4m_blank_read_response(), + ), + }) + .await + .unwrap(); + } + + _ => { + panic!("unexpected message!"); + } + }, + + None => { + // channel closed, + break; + } + } + } + + Ok(()) + }) + }; + + let _ds2_task: tokio::task::JoinHandle<Result<()>> = { + let harness = harness.clone(); + tokio::spawn(async move { + let upstairs_id = harness.guest.get_uuid().await.unwrap(); + let session_id = harness + .ds1() + .await + .upstairs_session_id + .lock() + .await + .unwrap(); + + loop { + match ds2_messages.recv().await { + Some(m) => match m { + Message::ReadRequest { job_id, .. } => { + let mut fw = harness.ds2.fw.lock().await; + + fw.send(Message::ReadResponse { + upstairs_id, + session_id, + job_id, + responses: Ok( + make_4m_blank_read_response(), + ), + }) + .await + .unwrap(); + } + + _ => { + panic!("unexpected message!"); + } + }, + + None => { + // channel closed, + break; + } + } + } + + Ok(()) + }) + }; + + let _ds3_task: tokio::task::JoinHandle<Result<()>> = { + let harness = harness.clone(); + tokio::spawn(async move { + let upstairs_id = harness.guest.get_uuid().await.unwrap(); + let session_id = harness + .ds1() + .await + .upstairs_session_id + .lock() + .await + .unwrap(); + + loop { + match ds3_messages.recv().await { + Some(m) => match m { + Message::ReadRequest { job_id, .. } => { + let mut fw = harness.ds3.fw.lock().await; + + fw.send(Message::ReadResponse { + upstairs_id, + session_id, + job_id, + responses: Ok( + make_4m_blank_read_response(), + ), + }) + .await + .unwrap(); + } + + _ => { + panic!("unexpected message!"); + } + }, + + None => { + // channel closed, + break; + } + } + } + + Ok(()) + }) + }; + + // Then, submit a huge amount of IO all at the same time (by spawning + // tokio tasks) and await it finishing. + + const IO_DEPTH: usize = 200; + + let futures: Vec<_> = (0..IO_DEPTH) + .map(|_| { + let harness = harness.clone(); + tokio::spawn(async move { + let buffer = Buffer::new(7 * 1024 * 1024); + harness + .guest + .read(Block::new_512(0), buffer) + .await + .unwrap(); + }) + }) + .collect(); + + for future in futures { + future.await.unwrap(); + } + + Ok(()) + } }