@@ -475,44 +475,38 @@ pub struct TestHarness {
475
475
476
476
impl TestHarness {
477
477
pub async fn new ( ) -> TestHarness {
478
- Self :: new_ ( false , true ) . await
478
+ Self :: new_ ( false ) . await
479
479
}
480
480
481
481
pub async fn new_ro ( ) -> TestHarness {
482
- Self :: new_ ( true , true ) . await
483
- }
484
-
485
- /// Build a new TestHarness where DS1 doesn't reply to pings
486
- pub async fn new_no_ping ( ) -> TestHarness {
487
- Self :: new_ ( false , false ) . await
482
+ Self :: new_ ( true ) . await
488
483
}
489
484
490
485
pub fn ds1 ( & mut self ) -> & mut DownstairsHandle {
491
486
self . ds1 . as_mut ( ) . unwrap ( )
492
487
}
493
488
494
- fn default_config (
495
- read_only : bool ,
496
- reply_to_ping : bool ,
497
- ) -> DownstairsConfig {
489
+ fn default_config ( read_only : bool ) -> DownstairsConfig {
498
490
DownstairsConfig {
499
491
read_only,
500
- reply_to_ping,
492
+ reply_to_ping : true ,
501
493
502
- // Slightly over 1 MiB, so we can do max-size writes
494
+ // Extent count is picked so that we can hit
495
+ // IO_OUTSTANDING_MAX_BYTES in less than IO_OUTSTANDING_MAX_JOBS,
496
+ // i.e. letting us test both byte and job fault conditions.
503
497
extent_count : 25 ,
504
- extent_size : Block :: new_512 ( 100 ) ,
498
+ extent_size : Block :: new_512 ( 10 ) ,
505
499
506
- gen_numbers : vec ! [ 0u64 ; 10 ] ,
507
- flush_numbers : vec ! [ 0u64 ; 10 ] ,
508
- dirty_bits : vec ! [ false ; 10 ] ,
500
+ gen_numbers : vec ! [ 0u64 ; 25 ] ,
501
+ flush_numbers : vec ! [ 0u64 ; 25 ] ,
502
+ dirty_bits : vec ! [ false ; 25 ] ,
509
503
}
510
504
}
511
505
512
- async fn new_ ( read_only : bool , reply_to_ping : bool ) -> TestHarness {
506
+ async fn new_ ( read_only : bool ) -> TestHarness {
513
507
let log = csl ( ) ;
514
508
515
- let cfg = Self :: default_config ( read_only, reply_to_ping ) ;
509
+ let cfg = Self :: default_config ( read_only) ;
516
510
517
511
let ds1 = cfg. clone ( ) . start ( log. new ( o ! ( "downstairs" => 1 ) ) ) . await ;
518
512
let ds2 = cfg. clone ( ) . start ( log. new ( o ! ( "downstairs" => 2 ) ) ) . await ;
@@ -813,7 +807,7 @@ async fn run_live_repair(mut harness: TestHarness) {
813
807
let mut ds2_buffered_messages = vec ! [ ] ;
814
808
let mut ds3_buffered_messages = vec ! [ ] ;
815
809
816
- for eid in 0 ..10 {
810
+ for eid in 0 ..25 {
817
811
// The Upstairs first sends the close and reopen jobs
818
812
for _ in 0 ..2 {
819
813
ds1_buffered_messages. push ( harness. ds1 ( ) . recv ( ) . await . unwrap ( ) ) ;
@@ -871,7 +865,7 @@ async fn run_live_repair(mut harness: TestHarness) {
871
865
872
866
let mut responses = vec ! [ Vec :: new( ) ; 3 ] ;
873
867
874
- for io_eid in 0usize ..10 {
868
+ for io_eid in 0usize ..25 {
875
869
let mut dep_job_id = [ reopen_job_id; 3 ] ;
876
870
// read
877
871
harness. spawn ( move |guest| async move {
@@ -1425,11 +1419,11 @@ async fn run_live_repair(mut harness: TestHarness) {
1425
1419
// Expect the live repair to send a final flush
1426
1420
{
1427
1421
let flush_number = harness. ds1 ( ) . ack_flush ( ) . await ;
1428
- assert_eq ! ( flush_number, 12 ) ;
1422
+ assert_eq ! ( flush_number, 27 ) ;
1429
1423
let flush_number = harness. ds2 . ack_flush ( ) . await ;
1430
- assert_eq ! ( flush_number, 12 ) ;
1424
+ assert_eq ! ( flush_number, 27 ) ;
1431
1425
let flush_number = harness. ds3 . ack_flush ( ) . await ;
1432
- assert_eq ! ( flush_number, 12 ) ;
1426
+ assert_eq ! ( flush_number, 27 ) ;
1433
1427
}
1434
1428
1435
1429
// Try another read
@@ -1447,34 +1441,24 @@ async fn run_live_repair(mut harness: TestHarness) {
1447
1441
/// Test that we will mark a Downstairs as failed if we hit the byte limit
1448
1442
#[ tokio:: test]
1449
1443
async fn test_byte_fault_condition ( ) {
1450
- let mut harness = TestHarness :: new_no_ping ( ) . await ;
1451
-
1452
1444
// Send enough bytes such that when we hit the client timeout, we are above
1453
1445
// our bytes-in-flight limit (so the downstairs gets marked as faulted
1454
1446
// instead of offline).
1455
- const MARGIN_SECS : f32 = 4.0 ;
1456
- const SEND_JOBS_TIME : f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS ;
1457
- let start_time = tokio:: time:: Instant :: now ( ) ;
1447
+ //
1448
+ // Notice that we keep DS1 replying to pings through this process, so it
1449
+ // doesn't get set to offline early.
1450
+ let mut harness = TestHarness :: new ( ) . await ;
1458
1451
1459
1452
// `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
1460
1453
// condition on downstairs 1, which should mark it as faulted and kick it
1461
1454
// out.
1462
- const WRITE_SIZE : usize = 1024usize . pow ( 2 ) ; // 1 MiB
1455
+ const WRITE_SIZE : usize = 105 * 1024 ; // 105 KiB
1463
1456
let write_buf = BytesMut :: from ( vec ! [ 1 ; WRITE_SIZE ] . as_slice ( ) ) ; // 50 KiB
1464
1457
let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf. len ( ) + 10 ;
1465
1458
assert ! ( num_jobs < IO_OUTSTANDING_MAX_JOBS ) ;
1466
1459
1467
1460
// First, we'll send jobs until the timeout
1468
- for i in 0 ..num_jobs {
1469
- // Delay so that we hit SEND_JOBS_TIME at the end of this loop
1470
- tokio:: time:: sleep_until (
1471
- start_time
1472
- + Duration :: from_secs_f32 (
1473
- SEND_JOBS_TIME * i as f32 / ( num_jobs / 2 ) as f32 ,
1474
- ) ,
1475
- )
1476
- . await ;
1477
-
1461
+ for _ in 0 ..num_jobs {
1478
1462
// We must `spawn` here because `write` will wait for the response
1479
1463
// to come back before returning
1480
1464
let write_buf = write_buf. clone ( ) ;
@@ -1490,7 +1474,7 @@ async fn test_byte_fault_condition() {
1490
1474
harness. ds2 . ack_write ( ) . await ;
1491
1475
harness. ds3 . ack_write ( ) . await ;
1492
1476
1493
- // With 2x responses, we can now await the read job (which ensures that
1477
+ // With 2x responses, we can now await the write job (which ensures that
1494
1478
// the Upstairs has finished updating its state).
1495
1479
h. await . unwrap ( ) ;
1496
1480
@@ -1501,8 +1485,23 @@ async fn test_byte_fault_condition() {
1501
1485
}
1502
1486
1503
1487
// Sleep until we're confident that the Downstairs is kicked out
1504
- info ! ( harness. log, "waiting for Upstairs to kick out DS1" ) ;
1505
- tokio:: time:: sleep ( Duration :: from_secs_f32 ( 2.0 * MARGIN_SECS ) ) . await ;
1488
+ let sleep_time = CLIENT_TIMEOUT_SECS + 5.0 ;
1489
+ info ! (
1490
+ harness. log,
1491
+ "waiting {sleep_time} secs for Upstairs to kick out DS1"
1492
+ ) ;
1493
+ tokio:: select! {
1494
+ _ = tokio:: time:: sleep( Duration :: from_secs_f32( sleep_time) ) => {
1495
+ // we're done!
1496
+ }
1497
+ // we don't listen to ds1 here, so we won't acknowledge any pings!
1498
+ v = harness. ds2. recv( ) => {
1499
+ panic!( "received unexpected message on ds2: {v:?}" )
1500
+ }
1501
+ v = harness. ds3. recv( ) => {
1502
+ panic!( "received unexpected message on ds3: {v:?}" )
1503
+ }
1504
+ }
1506
1505
1507
1506
// Check to make sure that happened
1508
1507
let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
@@ -1517,8 +1516,9 @@ async fn test_byte_fault_condition() {
1517
1516
/// Test that we will transition a downstairs from offline -> faulted if we hit
1518
1517
/// the byte limit after it's already offline
1519
1518
#[ tokio:: test]
1520
- async fn test_bbyte_fault_condition_offline ( ) {
1521
- let mut harness = TestHarness :: new_no_ping ( ) . await ;
1519
+ async fn test_byte_fault_condition_offline ( ) {
1520
+ let mut harness = TestHarness :: new ( ) . await ;
1521
+ harness. ds1 ( ) . cfg . reply_to_ping = false ;
1522
1522
1523
1523
// Two different transitions occur during this test:
1524
1524
// - We're not replying to pings, so DS1 will eventually transition from
@@ -1532,7 +1532,7 @@ async fn test_bbyte_fault_condition_offline() {
1532
1532
// `num_jobs` sends enough bytes to hit the IO_OUTSTANDING_MAX_BYTES
1533
1533
// condition on downstairs 1, which should mark it as faulted and kick it
1534
1534
// out.
1535
- const WRITE_SIZE : usize = 1024usize . pow ( 2 ) ; // 1 MiB
1535
+ const WRITE_SIZE : usize = 105 * 1024 ; // 105 KiB
1536
1536
let write_buf = BytesMut :: from ( vec ! [ 1 ; WRITE_SIZE ] . as_slice ( ) ) ; // 50 KiB
1537
1537
let num_jobs = IO_OUTSTANDING_MAX_BYTES as usize / write_buf. len ( ) + 10 ;
1538
1538
assert ! ( num_jobs < IO_OUTSTANDING_MAX_JOBS ) ;
@@ -1625,26 +1625,13 @@ async fn test_bbyte_fault_condition_offline() {
1625
1625
/// Test that we will mark a Downstairs as failed if we hit the job limit
1626
1626
#[ tokio:: test]
1627
1627
async fn test_job_fault_condition ( ) {
1628
- let mut harness = TestHarness :: new_no_ping ( ) . await ;
1628
+ let mut harness = TestHarness :: new ( ) . await ;
1629
1629
1630
- // We're going to queue up > IO_OUTSTANDING_MAX_JOBS in less than our
1631
- // timeout time, so that when timeout hits, the downstairs will become
1632
- // Faulted instead of Offline.
1633
- const MARGIN_SECS : f32 = 4.0 ;
1634
- const SEND_JOBS_TIME : f32 = CLIENT_TIMEOUT_SECS - MARGIN_SECS ;
1630
+ // We're going to queue up > IO_OUTSTANDING_MAX_JOBS, then wait for a
1631
+ // timeout, so that when timeout hits, the downstairs will become Faulted
1632
+ // instead of Offline.
1635
1633
let num_jobs = IO_OUTSTANDING_MAX_JOBS + 200 ;
1636
- let start_time = tokio:: time:: Instant :: now ( ) ;
1637
-
1638
- for i in 0 ..num_jobs {
1639
- // Delay so that we hit SEND_JOBS_TIME at the end of this loop
1640
- tokio:: time:: sleep_until (
1641
- start_time
1642
- + Duration :: from_secs_f32 (
1643
- SEND_JOBS_TIME * i as f32 / num_jobs as f32 ,
1644
- ) ,
1645
- )
1646
- . await ;
1647
-
1634
+ for _ in 0 ..num_jobs {
1648
1635
// We must `spawn` here because `write` will wait for the response to
1649
1636
// come back before returning
1650
1637
let h = harness. spawn ( |guest| async move {
@@ -1677,8 +1664,24 @@ async fn test_job_fault_condition() {
1677
1664
// Because it has so many pending jobs, it will become Faulted instead of
1678
1665
// Offline (or rather, will transition Active -> Offline -> Faulted
1679
1666
// immediately).
1667
+ let sleep_time = CLIENT_TIMEOUT_SECS + 5.0 ;
1680
1668
info ! ( harness. log, "waiting for Upstairs to kick out DS1" ) ;
1681
- tokio:: time:: sleep ( Duration :: from_secs_f32 ( 2.0 * MARGIN_SECS ) ) . await ;
1669
+ info ! (
1670
+ harness. log,
1671
+ "waiting {sleep_time} secs for Upstairs to kick out DS1"
1672
+ ) ;
1673
+ tokio:: select! {
1674
+ _ = tokio:: time:: sleep( Duration :: from_secs_f32( sleep_time) ) => {
1675
+ // we're done!
1676
+ }
1677
+ // we don't listen to ds1 here, so we won't acknowledge any pings!
1678
+ v = harness. ds2. recv( ) => {
1679
+ panic!( "received unexpected message on ds2: {v:?}" )
1680
+ }
1681
+ v = harness. ds3. recv( ) => {
1682
+ panic!( "received unexpected message on ds3: {v:?}" )
1683
+ }
1684
+ }
1682
1685
1683
1686
// Check to make sure that happened
1684
1687
let ds = harness. guest . downstairs_state ( ) . await . unwrap ( ) ;
@@ -1693,8 +1696,9 @@ async fn test_job_fault_condition() {
1693
1696
/// Test that we will transition a downstairs from offline -> faulted if we hit
1694
1697
/// the job limit after it's already offline
1695
1698
#[ tokio:: test]
1696
- async fn test_jjob_fault_condition_offline ( ) {
1697
- let mut harness = TestHarness :: new_no_ping ( ) . await ;
1699
+ async fn test_job_fault_condition_offline ( ) {
1700
+ let mut harness = TestHarness :: new ( ) . await ;
1701
+ harness. ds1 ( ) . cfg . reply_to_ping = false ;
1698
1702
1699
1703
// Two different transitions occur during this test:
1700
1704
// - We're not replying to pings, so DS1 will eventually transition from
0 commit comments