Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto disconnect for the compatible subscriber #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions include/cuda_blackboard/cuda_blackboard_subscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ template <typename T>
class CudaBlackboardSubscriber
{
public:
CudaBlackboardSubscriber(
[[deprecated]] CudaBlackboardSubscriber(
rclcpp::Node & node, const std::string & topic_name, bool add_compatible_sub,
std::function<void(std::shared_ptr<const T>)> callback);

CudaBlackboardSubscriber(
rclcpp::Node & node, const std::string & topic_name,
std::function<void(std::shared_ptr<const T>)> callback);

private:
void instanceIdCallback(const std_msgs::msg::UInt64 & instance_id_msg);

void compatibleCallback(const std::shared_ptr<const typename T::ros_type> & ros_msg_ptr);

std::function<void(std::shared_ptr<const T> cuda_msg)> callback_{};

rclcpp::Node & node_;
std::shared_ptr<negotiated::NegotiatedSubscription> negotiated_sub_;
typename rclcpp::Subscription<T>::SharedPtr compatible_sub_;
typename rclcpp::Subscription<typename T::ros_type>::SharedPtr compatible_sub_;
};

} // namespace cuda_blackboard
82 changes: 77 additions & 5 deletions src/cuda_blackboard_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace cuda_blackboard

template <typename T>
CudaBlackboardSubscriber<T>::CudaBlackboardSubscriber(
rclcpp::Node & node, const std::string & topic_name, bool add_compatible_sub,
rclcpp::Node & node, const std::string & topic_name, [[maybe_unused]] bool add_compatible_sub,
std::function<void(std::shared_ptr<const T>)> callback)
: node_(node)
{
Expand All @@ -31,17 +31,64 @@ CudaBlackboardSubscriber<T>::CudaBlackboardSubscriber(
1.0, rclcpp::QoS(1), std::bind(&CudaBlackboardSubscriber<T>::instanceIdCallback, this, _1),
sub_options);

if (add_compatible_sub) {
compatible_sub_ =
node.create_subscription<T>(topic_name, rclcpp::SensorDataQoS(), callback_, sub_options);
}
std::string ros_type_name = NegotiationStruct<typename T::ros_type>::supported_type_name;

compatible_sub_ = node.create_subscription<typename T::ros_type>(
topic_name, rclcpp::SensorDataQoS(),
std::bind(&CudaBlackboardSubscriber<T>::compatibleCallback, this, _1), sub_options);

negotiated_sub_->add_compatible_subscription(compatible_sub_, ros_type_name, 0.1);

negotiated_sub_->start();
}

template <typename T>
CudaBlackboardSubscriber<T>::CudaBlackboardSubscriber(
rclcpp::Node & node, const std::string & topic_name,
std::function<void(std::shared_ptr<const T>)> callback)
: node_(node)
{
using std::placeholders::_1;

negotiated::NegotiatedSubscriptionOptions negotiation_options;
negotiation_options.disconnect_on_negotiation_failure = false;

callback_ = callback;
negotiated_sub_ = std::make_shared<negotiated::NegotiatedSubscription>(
node, topic_name + "/cuda", negotiation_options);

rclcpp::SubscriptionOptions sub_options;
sub_options.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;

negotiated_sub_->add_supported_callback<NegotiationStruct<T>>(
1.0, rclcpp::QoS(1), std::bind(&CudaBlackboardSubscriber<T>::instanceIdCallback, this, _1),
sub_options);

std::string ros_type_name = NegotiationStruct<typename T::ros_type>::supported_type_name;

compatible_sub_ = node.create_subscription<typename T::ros_type>(
topic_name, rclcpp::SensorDataQoS(),
std::bind(&CudaBlackboardSubscriber<T>::compatibleCallback, this, _1), sub_options);

negotiated_sub_->add_compatible_subscription(compatible_sub_, ros_type_name, 0.1);

negotiated_sub_->start();
}

template <typename T>
void CudaBlackboardSubscriber<T>::instanceIdCallback(const std_msgs::msg::UInt64 & instance_id_msg)
{
if (compatible_sub_ && negotiated_sub_->get_negotiated_topic_publisher_count() > 0) {
const std::string ros_type_name = NegotiationStruct<typename T::ros_type>::supported_type_name;
negotiated_sub_->remove_compatible_subscription<typename T::ros_type>(
compatible_sub_, ros_type_name);
compatible_sub_ = nullptr;

RCLCPP_INFO(
node_.get_logger(),
"A negotiated message has been received, so the compatible callback will be disabled");
}

auto & blackboard = CudaBlackboard<T>::getInstance();
auto data = blackboard.queryData(instance_id_msg.data);
if (data) {
Expand All @@ -53,6 +100,31 @@ void CudaBlackboardSubscriber<T>::instanceIdCallback(const std_msgs::msg::UInt64
}
}

template <typename T>
void CudaBlackboardSubscriber<T>::compatibleCallback(
const std::shared_ptr<const typename T::ros_type> & ros_msg_ptr)
{
const std::string ros_type_name = NegotiationStruct<typename T::ros_type>::supported_type_name;

if (compatible_sub_ && negotiated_sub_->get_negotiated_topic_publisher_count() > 0) {
negotiated_sub_->remove_compatible_subscription<typename T::ros_type>(
compatible_sub_, ros_type_name);
compatible_sub_ = nullptr;

RCLCPP_INFO(
node_.get_logger(),
"A negotiation type succeeded, so the compatible callback will be disabled");

return;
}

RCLCPP_WARN_ONCE(
node_.get_logger(),
"The compatible callback was called. This results in a performance loss. This behavior is "
"probably not intended or a temporal measure");
callback_(std::make_shared<T>(*ros_msg_ptr));
}

} // namespace cuda_blackboard

template class cuda_blackboard::CudaBlackboardSubscriber<cuda_blackboard::CudaPointCloud2>;
Expand Down