@@ -30,24 +30,36 @@ class InterProcessPollingSubscriber
30
30
std::optional<T> data_;
31
31
32
32
public:
33
- explicit InterProcessPollingSubscriber (rclcpp::Node * node, const std::string & topic_name)
33
+ explicit InterProcessPollingSubscriber (
34
+ rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos = rclcpp::QoS{1 })
34
35
{
35
36
auto noexec_callback_group =
36
37
node->create_callback_group (rclcpp::CallbackGroupType::MutuallyExclusive, false );
37
38
auto noexec_subscription_options = rclcpp::SubscriptionOptions ();
38
39
noexec_subscription_options.callback_group = noexec_callback_group;
39
40
40
41
subscriber_ = node->create_subscription <T>(
41
- topic_name, rclcpp::QoS{ 1 } ,
42
+ topic_name, qos ,
42
43
[node]([[maybe_unused]] const typename T::ConstSharedPtr msg) { assert (false ); },
43
44
noexec_subscription_options);
45
+ if (qos.get_rmw_qos_profile ().depth > 1 ) {
46
+ RCLCPP_WARN (
47
+ node->get_logger (),
48
+ " InterProcessPollingSubscriber will be used with depth > 1, which may cause inefficient "
49
+ " serialization while updateLatestData()" );
50
+ }
44
51
};
45
52
bool updateLatestData ()
46
53
{
47
54
rclcpp::MessageInfo message_info;
48
55
T tmp;
49
56
// The queue size (QoS) must be 1 to get the last message data.
50
- if (subscriber_->take (tmp, message_info)) {
57
+ bool is_latest_message_consumed = false ;
58
+ // pop the queue until latest data
59
+ while (subscriber_->take (tmp, message_info)) {
60
+ is_latest_message_consumed = true ;
61
+ }
62
+ if (is_latest_message_consumed) {
51
63
data_ = tmp;
52
64
}
53
65
return data_.has_value ();
0 commit comments