ROS2(四)-MultiThreadedExecutor多线程调用callback funcs

4180阅读 0评论2020-11-18 iibull
分类:其他平台

引用
https://blog.csdn.net/liuerin/article/details/108749596

如何多线程调用callback funcs


本想使用多线程executor执行node,没想到简单的创建Subscription之后callback funcs竟然是顺序执行的,所以仔细看了一下,原来node中还有callback_group这个东西。结论如下:


1.callback_group

为所有的callback分了组别

类型有

2.node默认的组别

Node::create_subscription(
  const std::string & topic_name,  //topic名称
  const rclcpp::QoS & qos,
  CallbackT && callback,
  const SubscriptionOptionsWithAllocator & options,  //选项
  typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
    typename rclcpp::subscription_traits::has_message_type::type, AllocatorT>::SharedPtr
  msg_mem_strat)
							
this->create_subscription("int1",qos,
			std::bind(&PathSearcherNode::int1Sub, this,
	        std::placeholders::_1),
			rclcpp::SubscriptionOptions());
							

这里的SubscriptionOptions()可以设置callback_group组别;如果不设置,默认为node初始化时的callback_group

3.使用MultiThreadedExecutor,想要多线程调用callback

3.1 可设置不同的MutuallyExclusive callback_group

#include  
#include 
using namespace std::chrono_literals;
std::string getTime(){
    time_t tick = (time_t)(rclcpp::Clock().now().seconds());
    struct tm tm;
    char s[100];
    tm = *localtime(&tick);
    strftime(s, sizeof(s), "%Y-%m-%d %H:%M:%S", &tm);
    return std::string(s);
}
std::string string_thread_id()
{
  auto hashed = std::hash()(std::this_thread::get_id());
  return std::to_string(hashed);
}
class MyNode: public rclcpp::Node{
public:
    MyNode(const rclcpp::NodeOptions & options);
    ~MyNode(){}
    void int1Sub(std_msgs::msg::Int32::SharedPtr msg);
    void int2Sub(std_msgs::msg::Int32::SharedPtr msg);

private:
    rclcpp::Subscription::SharedPtr int_sub1_;
    rclcpp::Subscription::SharedPtr int_sub2_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub1_;
    rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_sub2_;
}
MyNode::MyNode(const rclcpp::NodeOptions & options):
    rclcpp::Node("my_node",options){
//两个组别,设置组别类型为MutuallyExclusive
    callback_group_sub1_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);
    callback_group_sub2_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::MutuallyExclusive);

    auto sub1_opt = rclcpp::SubscriptionOptions();
    sub1_opt.callback_group = callback_group_sub1_;
    auto sub2_opt = rclcpp::SubscriptionOptions();
    sub2_opt.callback_group = callback_group_sub2_;
    
    int_sub1_ = nh_->create_subscription("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub1_opt);
    int_sub2_ = nh_->create_subscription("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub2_opt);
}
void MyNode::int1Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int1 sub time:%s\n",getTime().c_str());
    printf("thread1 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int1 sub time:%s\n",getTime().c_str());
}
void MyNode::int2Sub(std_msgs::msg::Int32::SharedPtr msg){
    printf("int2 sub time:%s\n",getTime().c_str());
    printf("thread2 id:%s\n",string_thread_id().c_str());
    rclcpp::sleep_for(10s);
    printf("int2 sub time:%s\n",getTime().c_str());
}
int main(int argc, char **argv) {
    rclcpp::init(argc, argv);
    rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::ExecutorArgs(),5,true);

    auto node = std::make_shared(rclcpp::NodeOptions());
    executor.add_node(node);
    printf("threads   %d\n",executor.get_number_of_threads());
    executor.spin();
    rclcpp::shutdown();
    return EXIT_SUCCESS;
}
										

这时,如果同时收到了/int1和/int2 的消息,两个callback都能执行,但如果接连收到了两个/int1,只能调1个int1Sub;

3.2 设置同一个Reentrant callback_group

//同一个组别,类型设置为Reentrant
callback_group_sub_ = this->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);
auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_sub_;
    
    
int_sub1_ = nh_->create_subscription("int1",10,
        std::bind(&MyNode::int1Sub,this,std::placeholders::_1),
        sub_opt);
int_sub2_ = nh_->create_subscription("int2",10,
        std::bind(&MyNode::int2Sub,this,std::placeholders::_1),
        sub_opt);
												

这时,如果同时收到多个/int1和/int2,开启的线程数会受电脑内核和初始化MultiThreadedExecutor时设置的线程数决

上一篇:ROS 的 cmd_vel 多路复用器 twist_mux
下一篇:C++ 使用 类名重命名 解决 共用代码绑定问题.