@@ -110,6 +110,7 @@ template <typename M> class MessageQueue {
110
110
bool running;
111
111
bool new_message;
112
112
boost::uuids::random_generator uuid_generator;
113
+ std::recursive_mutex next_message_mutex;
113
114
std::optional<MessageId> next_message_to_send;
114
115
115
116
Everest::SteadyTimer in_flight_timeout_timer;
@@ -368,12 +369,15 @@ template <typename M> class MessageQueue {
368
369
continue ;
369
370
}
370
371
371
- if (next_message_to_send.has_value ()) {
372
- if (next_message_to_send.value () != message->uniqueId ()) {
373
- EVLOG_debug << " Message with id " << message->uniqueId ()
374
- << " held back because message with id " << next_message_to_send.value ()
375
- << " should be sent first" ;
376
- continue ;
372
+ {
373
+ std::lock_guard<std::recursive_mutex> lk (this ->next_message_mutex );
374
+ if (next_message_to_send.has_value ()) {
375
+ if (next_message_to_send.value () != message->uniqueId ()) {
376
+ EVLOG_debug << " Message with id " << message->uniqueId ()
377
+ << " held back because message with id " << next_message_to_send.value ()
378
+ << " should be sent first" ;
379
+ continue ;
380
+ }
377
381
}
378
382
}
379
383
@@ -507,9 +511,12 @@ template <typename M> class MessageQueue {
507
511
}
508
512
509
513
this ->send_callback (call_result);
510
- if (next_message_to_send.has_value ()) {
511
- if (next_message_to_send.value () == call_result.uniqueId ) {
512
- next_message_to_send.reset ();
514
+ {
515
+ std::lock_guard<std::recursive_mutex> lk (this ->next_message_mutex );
516
+ if (next_message_to_send.has_value ()) {
517
+ if (next_message_to_send.value () == call_result.uniqueId ) {
518
+ next_message_to_send.reset ();
519
+ }
513
520
}
514
521
}
515
522
@@ -523,9 +530,12 @@ template <typename M> class MessageQueue {
523
530
}
524
531
525
532
this ->send_callback (call_error);
526
- if (next_message_to_send.has_value ()) {
527
- if (next_message_to_send.value () == call_error.uniqueId ) {
528
- next_message_to_send.reset ();
533
+ {
534
+ std::lock_guard<std::recursive_mutex> lk (this ->next_message_mutex );
535
+ if (next_message_to_send.has_value ()) {
536
+ if (next_message_to_send.value () == call_error.uniqueId ) {
537
+ next_message_to_send.reset ();
538
+ }
529
539
}
530
540
}
531
541
@@ -575,15 +585,21 @@ template <typename M> class MessageQueue {
575
585
enhanced_message.messageType = this ->string_to_messagetype (enhanced_message.message .at (CALL_ACTION));
576
586
enhanced_message.call_message = enhanced_message.message ;
577
587
578
- // save the uid of the message we just received to ensure the next message we send is a response to this
579
- // message
580
- next_message_to_send.emplace (enhanced_message.uniqueId );
588
+ {
589
+ std::lock_guard<std::recursive_mutex> lk (this ->next_message_mutex );
590
+ // save the uid of the message we just received to ensure the next message we send is a response to
591
+ // this message
592
+ next_message_to_send.emplace (enhanced_message.uniqueId );
593
+ }
581
594
}
582
595
583
596
// TODO(kai): what happens if we receive a CallResult or CallError out of order?
584
597
if (enhanced_message.messageTypeId == MessageTypeId::CALLRESULT ||
585
598
enhanced_message.messageTypeId == MessageTypeId::CALLERROR) {
586
- next_message_to_send.reset ();
599
+ {
600
+ std::lock_guard<std::recursive_mutex> lk (this ->next_message_mutex );
601
+ next_message_to_send.reset ();
602
+ }
587
603
// we need to remove Call messages from in_flight if we receive a CallResult OR a CallError
588
604
589
605
// TODO(kai): we need to do some error handling in the CallError case
0 commit comments