2
2
use std:: fs:: File ;
3
3
use std:: io:: Write ;
4
4
use std:: net:: { IpAddr , SocketAddr } ;
5
+ use std:: num:: NonZeroU64 ;
5
6
use std:: path:: { Path , PathBuf } ;
6
7
use std:: sync:: {
7
8
atomic:: { AtomicBool , AtomicUsize , Ordering } ,
@@ -308,8 +309,8 @@ struct RandReadWriteWorkload {
308
309
#[ clap( long, default_value_t = 1.0 ) ]
309
310
sample_time : f64 ,
310
311
/// Number of subsamples per sample
311
- #[ clap( long, default_value_t = 10 ) ]
312
- subsample_count : u64 ,
312
+ #[ clap( long, default_value_t = NonZeroU64 :: new ( 10 ) . unwrap ( ) ) ]
313
+ subsample_count : NonZeroU64 ,
313
314
}
314
315
315
316
/// Mode flags for `rand_write_read_workload`
@@ -334,7 +335,7 @@ struct RandReadWriteConfig {
334
335
/// Rate at which we should print samples
335
336
sample_time_secs : f64 ,
336
337
/// Number of subsamples for each `sample_time_secs`, for standard deviation
337
- subsample_count : u64 ,
338
+ subsample_count : NonZeroU64 ,
338
339
fill : bool ,
339
340
}
340
341
@@ -2493,43 +2494,34 @@ async fn rand_read_write_workload(
2493
2494
workers. push ( handle) ;
2494
2495
}
2495
2496
2496
- // Spawn a task which stops us after the given time
2497
- {
2498
- let stop = stop. clone ( ) ;
2499
- let time_secs = cfg. time_secs ;
2500
- tokio:: task:: spawn ( async move {
2501
- tokio:: time:: sleep ( Duration :: from_secs ( time_secs) ) . await ;
2502
- stop. store ( true , Ordering :: Release ) ;
2503
- } ) ;
2504
- }
2505
-
2506
2497
// Initial sleep
2507
2498
let mut prev = 0 ;
2508
2499
2509
2500
let subsample_delay = Duration :: from_secs_f64 (
2510
- cfg. sample_time_secs / cfg. subsample_count as f64 ,
2501
+ cfg. sample_time_secs / cfg. subsample_count . get ( ) as f64 ,
2511
2502
) ;
2512
2503
let mut samples = vec ! [ ] ;
2513
2504
let mut first = true ;
2514
- let mut next_time = std:: time:: Instant :: now ( ) + subsample_delay;
2515
- while !stop. load ( Ordering :: Relaxed ) {
2505
+ let mut next_time = Instant :: now ( ) + subsample_delay;
2506
+ let stop_time = Instant :: now ( ) + Duration :: from_secs ( cfg. time_secs ) ;
2507
+ ' outer: loop {
2516
2508
// Store speeds in bytes/sec, correcting for our sub-second sample time
2517
2509
let start = samples. len ( ) ;
2518
- for _ in 0 ..cfg. subsample_count {
2519
- tokio:: time:: sleep_until ( next_time. into ( ) ) . await ;
2510
+ for _ in 0 ..cfg. subsample_count . get ( ) {
2511
+ tokio:: time:: sleep_until ( next_time) . await ;
2520
2512
let bytes = byte_count. load ( Ordering :: Acquire ) ;
2521
2513
next_time += subsample_delay;
2522
2514
samples. push ( ( bytes - prev) as f64 / subsample_delay. as_secs_f64 ( ) ) ;
2523
2515
prev = bytes;
2516
+ if Instant :: now ( ) >= stop_time {
2517
+ break ' outer;
2518
+ }
2524
2519
}
2525
2520
// Ignore the first round of samples, to reduce startup effects
2526
2521
if std:: mem:: take ( & mut first) {
2527
2522
samples. clear ( ) ;
2528
2523
continue ;
2529
2524
}
2530
- if stop. load ( Ordering :: Relaxed ) {
2531
- break ;
2532
- }
2533
2525
let slice = & samples[ start..] ;
2534
2526
let mean = slice. iter ( ) . sum :: < f64 > ( ) / slice. len ( ) as f64 ;
2535
2527
let stdev = ( slice. iter ( ) . map ( |& d| ( d - mean) . powi ( 2 ) ) . sum :: < f64 > ( )
0 commit comments