1
- use futures:: future:: Fuse ;
2
- use futures:: { pin_mut, select, FutureExt } ;
3
1
use rand:: Rng ;
4
- use std:: time:: { self , Duration , Instant } ;
2
+ use std:: time:: { self , Duration } ;
5
3
6
4
use crate :: get_worker_id;
7
5
use crate :: goose:: { GooseTaskFunction , GooseTaskSet , GooseUser , GooseUserCommand } ;
@@ -10,7 +8,7 @@ use crate::metrics::{GooseMetric, GooseTaskMetric};
10
8
11
9
pub ( crate ) async fn user_main (
12
10
thread_number : usize ,
13
- mut thread_task_set : GooseTaskSet ,
11
+ thread_task_set : GooseTaskSet ,
14
12
mut thread_user : GooseUser ,
15
13
thread_receiver : flume:: Receiver < GooseUserCommand > ,
16
14
worker : bool ,
@@ -52,64 +50,73 @@ pub(crate) async fn user_main(
52
50
53
51
// If normal tasks are defined, loop launching tasks until parent tells us to stop.
54
52
if !thread_task_set. weighted_tasks . is_empty ( ) {
55
- let mut task_iter = thread_task_set. weighted_tasks . iter ( ) . cycle ( ) ;
56
- let next_task_delay = Fuse :: terminated ( ) ;
57
- pin_mut ! ( next_task_delay) ;
58
-
59
- let task_wait = match thread_task_set. task_wait . take ( ) {
60
- Some ( ( min, max) ) if min == max => min,
61
- Some ( ( min, max) ) => Duration :: from_millis (
62
- rand:: thread_rng ( ) . gen_range ( min. as_millis ( ) ..max. as_millis ( ) ) as u64 ,
63
- ) ,
64
- None => Duration :: from_millis ( 0 ) ,
65
- } ;
66
-
67
- next_task_delay. set ( tokio:: time:: sleep ( Duration :: from_secs ( 0 ) ) . fuse ( ) ) ;
68
- loop {
69
- select ! {
70
- _ = next_task_delay => {
71
- let ( thread_task_index, thread_task_name) = task_iter. next( ) . unwrap( ) ;
72
- if * thread_task_index == 0 {
73
- // Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
74
- // Mitigation is enabled.
75
- thread_user. update_request_cadence( thread_number) . await ;
76
- }
53
+ // When there is a delay between tasks, wake every second to check for messages.
54
+ let one_second = Duration :: from_secs ( 1 ) ;
55
+
56
+ ' launch_tasks: loop {
57
+ // Tracks the time it takes to loop through all GooseTasks when Coordinated Omission
58
+ // Mitigation is enabled.
59
+ thread_user. update_request_cadence ( thread_number) . await ;
60
+
61
+ for ( thread_task_index, thread_task_name) in & thread_task_set. weighted_tasks {
62
+ // Determine which task we're going to run next.
63
+ let function = & thread_task_set. tasks [ * thread_task_index] . function ;
64
+ debug ! (
65
+ "launching on_start {} task from {}" ,
66
+ thread_task_name, thread_task_set. name
67
+ ) ;
68
+ // Invoke the task function.
69
+ let _todo = invoke_task_function (
70
+ function,
71
+ & mut thread_user,
72
+ * thread_task_index,
73
+ thread_task_name,
74
+ )
75
+ . await ;
76
+
77
+ if received_exit ( & thread_receiver) {
78
+ break ' launch_tasks;
79
+ }
77
80
78
- // Get a reference to the task function we're going to invoke next.
79
- let function = & thread_task_set. tasks[ * thread_task_index] . function;
80
- debug!(
81
- "launching on_start {} task from {}" ,
82
- thread_task_name, thread_task_set. name
83
- ) ;
84
-
85
- let now = Instant :: now( ) ;
86
- // Invoke the task function.
87
- let _ = invoke_task_function(
88
- function,
89
- & mut thread_user,
90
- * thread_task_index,
91
- thread_task_name,
92
- )
93
- . await ;
94
-
95
- let elapsed = now. elapsed( ) ;
96
-
97
- if elapsed < task_wait {
98
- next_task_delay. set( tokio:: time:: sleep( task_wait - elapsed) . fuse( ) ) ;
99
- } else {
100
- next_task_delay. set( tokio:: time:: sleep( Duration :: from_millis( 0 ) ) . fuse( ) ) ;
101
- }
102
- } ,
103
- message = thread_receiver. recv_async( ) . fuse( ) => {
104
- match message {
105
- // Time to exit, break out of launch_tasks loop.
106
- Err ( _) | Ok ( GooseUserCommand :: Exit ) => {
107
- break ;
108
- }
109
- Ok ( command) => {
110
- debug!( "ignoring unexpected GooseUserCommand: {:?}" , command) ;
81
+ // If the task_wait is defined, wait for a random time between tasks.
82
+ if let Some ( ( min, max) ) = thread_task_set. task_wait {
83
+ let wait_time = rand:: thread_rng ( ) . gen_range ( min..max) . as_millis ( ) ;
84
+ // Counter to track how long we've slept, waking regularly to check for messages.
85
+ let mut slept: u128 = 0 ;
86
+ // Wake every second to check if the parent thread has told us to exit.
87
+ let mut in_sleep_loop = true ;
88
+ // Track the time slept for Coordinated Omission Mitigation.
89
+ let sleep_timer = time:: Instant :: now ( ) ;
90
+
91
+ while in_sleep_loop {
92
+ if received_exit ( & thread_receiver) {
93
+ break ' launch_tasks;
111
94
}
95
+
96
+ let sleep_duration = if wait_time - slept >= 1000 {
97
+ slept += 1000 ;
98
+ if slept >= wait_time {
99
+ // Break out of sleep loop after next sleep.
100
+ in_sleep_loop = false ;
101
+ }
102
+ one_second
103
+ } else {
104
+ slept += wait_time;
105
+ // Break out of sleep loop after next sleep.
106
+ in_sleep_loop = false ;
107
+ Duration :: from_millis ( ( wait_time - slept) as u64 )
108
+ } ;
109
+
110
+ debug ! (
111
+ "user {} from {} sleeping {:?} ..." ,
112
+ thread_number, thread_task_set. name, sleep_duration
113
+ ) ;
114
+
115
+ tokio:: time:: sleep ( sleep_duration) . await ;
112
116
}
117
+ // Track how much time the GooseUser sleeps during this loop through all GooseTasks,
118
+ // used by Coordinated Omission Mitigation.
119
+ thread_user. slept += ( time:: Instant :: now ( ) - sleep_timer) . as_millis ( ) as u64 ;
113
120
}
114
121
}
115
122
}
@@ -152,6 +159,25 @@ pub(crate) async fn user_main(
152
159
}
153
160
}
154
161
162
+ // Determine if the parent has sent a GooseUserCommand::Exit message.
163
+ fn received_exit ( thread_receiver : & flume:: Receiver < GooseUserCommand > ) -> bool {
164
+ let mut message = thread_receiver. try_recv ( ) ;
165
+ while message. is_ok ( ) {
166
+ match message. unwrap ( ) {
167
+ // GooseUserCommand::Exit received.
168
+ GooseUserCommand :: Exit => {
169
+ return true ;
170
+ }
171
+ command => {
172
+ debug ! ( "ignoring unexpected GooseUserCommand: {:?}" , command) ;
173
+ }
174
+ }
175
+ message = thread_receiver. try_recv ( ) ;
176
+ }
177
+ // GooseUserCommand::Exit not received.
178
+ false
179
+ }
180
+
155
181
// Invoke the task function, collecting task metrics.
156
182
async fn invoke_task_function (
157
183
function : & GooseTaskFunction ,
0 commit comments