@@ -151,7 +151,7 @@ impl<'a, M> CanSchedule<&'a SharedStateRef<M>> {
151
151
}
152
152
153
153
/// Indicated that the protocol needs more messages from other parties
154
- pub fn protocol_needs_more_msgs ( self ) -> Poll < crate :: Incoming < M > > {
154
+ pub fn protocol_needs_one_more_msg ( self ) -> Poll < crate :: Incoming < M > > {
155
155
let mut s = self . borrow_mut ( ) ;
156
156
match s. incoming_msg . take ( ) {
157
157
Some ( msg) => Poll :: Ready ( msg) ,
@@ -173,7 +173,7 @@ impl<'a, M> CanSchedule<&'a SharedStateRef<M>> {
173
173
mod test {
174
174
use core:: task:: Poll ;
175
175
176
- use crate :: { MessageDestination , Outgoing } ;
176
+ use crate :: { Incoming , MessageDestination , Outgoing } ;
177
177
178
178
use super :: SharedStateRef ;
179
179
@@ -211,6 +211,75 @@ mod test {
211
211
} ;
212
212
}
213
213
214
+ #[ test]
215
+ fn recv_msg ( ) {
216
+ let shared_state = SharedStateRef :: < & ' static str > :: new ( ) ;
217
+
218
+ let incomings_state = shared_state. clone ( ) ;
219
+ let executor_state = shared_state;
220
+
221
+ // Request incoming msg
222
+ {
223
+ let Poll :: Ready ( scheduler) = incomings_state. can_schedule ( ) else {
224
+ panic ! ( "can't schedule" ) ;
225
+ } ;
226
+ let Poll :: Pending = scheduler. protocol_needs_one_more_msg ( ) else {
227
+ panic ! ( "unexpected incoming msg" ) ;
228
+ } ;
229
+ }
230
+
231
+ // Scheduling one more task is not possible until incoming msg is received
232
+ assert ! ( matches!( incomings_state. can_schedule( ) , Poll :: Pending ) ) ;
233
+
234
+ // Executor receives an incoming msg
235
+ let incoming_msg = Incoming {
236
+ id : 0 ,
237
+ sender : 1 ,
238
+ msg_type : crate :: MessageType :: Broadcast ,
239
+ msg : "hello" ,
240
+ } ;
241
+ executor_state. executor_received_msg ( incoming_msg) . unwrap ( ) ;
242
+
243
+ // Incoming msg becomes available to the protocol
244
+ {
245
+ let Poll :: Ready ( scheduler) = incomings_state. can_schedule ( ) else {
246
+ panic ! ( "can't schedule" ) ;
247
+ } ;
248
+ let Poll :: Ready ( msg) = scheduler. protocol_needs_one_more_msg ( ) else {
249
+ panic ! ( "no incoming msg" ) ;
250
+ } ;
251
+ assert_eq ! ( msg, incoming_msg)
252
+ }
253
+ }
254
+
255
+ #[ test]
256
+ fn yielding ( ) {
257
+ let shared_state = SharedStateRef :: < ( ) > :: new ( ) ;
258
+
259
+ let runtime_state = shared_state. clone ( ) ;
260
+ let executor_state = shared_state;
261
+
262
+ // Request yielding
263
+ {
264
+ let Poll :: Ready ( scheduler) = runtime_state. can_schedule ( ) else {
265
+ panic ! ( "can't schedule" ) ;
266
+ } ;
267
+ scheduler. protocol_yields ( ) ;
268
+ }
269
+
270
+ // Scheduling one more task is not possible until yield flag is reset
271
+ assert ! ( matches!( runtime_state. can_schedule( ) , Poll :: Pending ) ) ;
272
+
273
+ // Executor reads and resets the yielded flag
274
+ {
275
+ let yielded = executor_state. executor_reads_and_resets_yielded_flag ( ) ;
276
+ assert ! ( yielded) ;
277
+ }
278
+
279
+ // Now, work can be scheduled again...
280
+ assert ! ( matches!( executor_state. can_schedule( ) , Poll :: Ready ( _) ) ) ;
281
+ }
282
+
214
283
#[ test]
215
284
fn task_cannot_be_scheduled_when_another_task_is_scheduled ( ) {
216
285
let try_obtain_lock_and_fail = |shared_state : & SharedStateRef < u32 > | {
@@ -245,7 +314,7 @@ mod test {
245
314
let Poll :: Ready ( scheduler) = shared_state. can_schedule ( ) else {
246
315
panic ! ( "can't schedule" ) ;
247
316
} ;
248
- let Poll :: Pending = scheduler. protocol_needs_more_msgs ( ) else {
317
+ let Poll :: Pending = scheduler. protocol_needs_one_more_msg ( ) else {
249
318
panic ! ( "receiving resolved too early" )
250
319
} ;
251
320
0 commit comments