当前位置: 华文问答 > 科学

机械臂学习(15)

2021-11-17科学

前一篇分享了 ROS 的基础通信机制——发布订阅机制。本打算接下来分享 {\boldsymbol{\color{blue}{Service}}} 请求响应机制。但感觉在实际项目中, {\boldsymbol{\color{red}{多线程}}} 的问题经常困扰 ROS 开发人员。如果没有完全弄明白 ROS 为订阅回调、请求回调、行动回调和定时器回调提供的调度机制,可能埋雷。到了开发中后期就容易踩坑。上篇链结:

回调( Callback )

ROS 中主要有四种类型的回调,分别绑定在不同的 ROS 对象上:

  • {\boldsymbol{\color{black}{订阅回调}}} —— ros::Subscriber
  • {\boldsymbol{\color{black}{请求回调}}} —— ros::ServiceServer
  • {\boldsymbol{\color{black}{行动回调}}} —— actionlib::SimpleActionServer
  • {\boldsymbol{\color{black}{定时器回调}}} —— ros::Timer
  • Callback 的特点概括为: \color{red}{用户实现,ROS调度} 。那么,ROS是如何进行调度的呢?

    ROS回调机制

    ROS默认有维护一个全局回调队列(名为: Global Callback Queue ),将已可用的回调插入Callback队列中。再通过Spinner线程获取并执行当前可用的回调。为了说明ROS回调机制,我引入两个ROS节点:一个使用定时器发布多个topic消息;另一个订阅这些topic消息。

    发布节点

    一个使用ROS定时器超时回调发布多个topic消息。

    ros::Timer

    来看一个ros::Timer的回调队列,代码如下:

    //这段代码主要是实现定时向Topic发布消息 #include "ros/ros.h" #include <boost/thread.hpp> #include "my_msg/weather_pub.h" #include "my_msg/air_quality_pub.h" #include <sstream> int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_publisher" ); ros :: NodeHandle n ; /*通知ROS master,本node要发布一个名为「Weather」的话题(topic), 消息类型为my_msg::weather_pub,发送队列长度为48*/ ros :: Publisher pub_weather = n . advertise < my_msg :: weather_pub > ( "Weather" , 48 , true ); /*通知ROS master,本node要发布一个名为「WeatherA」的话题(topic), 消息类型为my_msg::weather_pub,发送队列长度为48*/ ros :: Publisher pub_weather_a = n . advertise < my_msg :: weather_pub > ( "WeatherA" , 48 , true ); /*通知ROS master,本node要发布一个名为「AirQuality」的话题(topic), 消息类型为my_msg::air_quality_pub,发送队列长度为48*/ ros :: Publisher pub_air_quality = n . advertise < my_msg :: air_quality_pub > ( "AirQuality" , 48 , true ); int count = 0 ; //创建一个ros::Timer每0.2秒进行发布,回调函数采用lamda4方法的格式 ros :: Timer timer = n . createTimer ( ros :: Duration ( 0.2 ), [ & ]( const ros :: TimerEvent & ) { my_msg :: weather_pub msg ; std :: stringstream ss ; ss << "Sunny " << count ; msg . weather = ss . str (); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],weather:" << msg . weather . c_str ()); pub_weather . publish ( msg ); std :: stringstream ssa ; ssa << "Sunny " << 20 + count ; msg . weather = ssa . str (); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],weather:" << msg . weather . c_str ()); pub_weather_a . publish ( msg ); my_msg :: air_quality_pub msg_air ; msg_air . air_quality_index = 128 + count ; ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],air quality:" << msg_air . air_quality_index ); pub_air_quality . publish ( msg_air ); ++ count ; }); //确保定时器回调被调用 ros :: spin (); return 0 ; }

    定时器启动后会生成一个Timer线程,根据定时器的参数,当定时器超时后将定时器的回调函数加入Callback队列中。然后再由用户调用的Spinner线程(ros::spin)从Callback队列中依次取出当前已可用的回调并执行。

    ros::Timer的队列和回调过程

    ros::Publisher

    上面例子中,在定时器回调函数中向topic进行发布,ros::Publisher将要发布的消息加入到Publisher队列中,再由专门的Publisher线程发布出去。注意这其中并不涉及Callback队列,这也解释了上篇中提到的: 如果一个ROS节点仅进行topic发布是不需要调用spinner的

    ros::Pubblisher的队列和发送过程

    订阅节点

    订阅上面发布的topic消息。根据不同情况,进行代码修改。

    ros::Subscriber

    先让ROS节点只订阅一个topic来说明订阅回调的过程。下面是代码:

    //订阅一个topic的代码 #include "ros/ros.h" #include "my_msg/weather_pub.h" //回调函数,注意参数是const类型的boost::shared_ptr指针 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO ( "The 24 hours Weather: [%s]" , msg -> weather . c_str ()); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要订阅名为「Weather」的话题(topic), 并指定回调函数weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ros :: spin (); }

    订阅创建后,涉及到两个线程和两个队列:

  • ROS提供的Receiver线程将收到的消息加入到Subscriber队列,并触发使订阅回调函数加入Callback队列。 注意:每个ros::Subscriber有自己的Subscriber队列,而Callback队列默认是全局的。
  • 用户调用的Spinner线程(也就是ros::spin)从Callback队列中取出已可用的回调并执行。
  • ros::Subscriber的队列和回调过程

    在实际项目中,如果订阅回调中有耗时操作,那么可以用户可以启用多个Spinner线程并发从Callback队列中取出已可用的回调并执行。这样可以加快Callback队列被执行的速度。

    ROS Spinner

    ROS提供两种单线程回调的spin方法和两种多线程回调的Spin类,分别是:

    单线程回调spin方法:

  • ros::spin()—— 相当于while(true)的大循环,不断遍历执行Callback队列中的可用回调
  • ros::spinOne()—— 相当于马上执行一次Callback队列中的可用回调
  • 多线程回调spin类:

  • ros::MultiThreadedSpinner—— 相当于while(true)大循环,启动指定数量的Spinner线程并发执行Callback队列中的可用回调。可指定Callback队列。
  • ros::AsyncSpinner—— 异步启停,开启指定数量的Spinner线程并发执行Callback队列中的可用回调。可指定Callback队列。
  • \color{red}{简单总结,如果程序简单用ros::spin()就够了;如果程序复杂推荐使用ros::AsyncSpinner类。} 他们的详细用法和区别在ROS官方教程中已经写得比较清楚,可参考:

    下面这段代码展示如何使用ros::AsyncSpinner启用多个Spinner线程。

    //一个topic多个线程来执行的代码 #include "ros/ros.h" #include "boost/thread.hpp" #include "my_msg/weather_pub.h" //回调函数,注意参数是const类型的boost::shared_ptr指针 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],after looping 24 hours weather:" << msg -> weather . c_str ()); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要订阅名为「Weather」的话题(topic), 并指定回调函数weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "]This is main thread." ); //声明spinner对象,参数2表示并发线程数,默认处理全局Callback队列 ros :: AsyncSpinner spinner ( 2 ); //启动两个spinner线程并发执行可用回调 spinner . start (); ros :: waitForShutdown (); }

    从执行结果中可以看到,进程中包括三个线程:主线程、Spinner线程1、Spinner线程2。

    //这是执行结果,可以看到主线程 [ INFO ] [ 1637131602.089381910 ] : Thread [ 7f 9 a1ad24780 ] This is main thread . [ INFO ] [ 1637131602.375058712 ] : Thread [ 7f 9 a11bb6700 ], after looping 24 hours weather : Sunny 679 [ INFO ] [ 1637131602.488504089 ] : Thread [ 7f 9 a11bb6700 ], after looping 24 hours weather : Sunny 680 [ INFO ] [ 1637131602.688845441 ] : Thread [ 7f 9 a123b7700 ], after looping 24 hours weather : Sunny 681 [ INFO ] [ 1637131602.888828136 ] : Thread [ 7f 9 a123b7700 ], after looping 24 hours weather : Sunny 682

    下图展示了相关的线程和队列处理过程 :

    使用多线程Spinner同时并发处理回调过程

    实际项目中一个节点往往要订阅多个topic,在使用默认全局Callback队列时,如果某些topic发布频率高回调处理又耗时的话,容易影响其他topic消息的处理。下图中TopicB的消息居多可能影响TopicA的处理。

    多个ros::Subscriber在全局回调队列中排队的过程

    这种情况下,ROS提供了机制,可以为每个ros::Subscriber指定Callback队列,再分别指定Spinner线程仅处理指定Callback队列的回调。这样确保每个订阅回调相互独立不影响。下面的代码展示如何进行上述操作:

    //为每个subscriber指定队列 #include "ros/ros.h" #include "boost/thread.hpp" #include "my_msg/weather_pub.h" #include "my_msg/air_quality_pub.h" #include <ros/callback_queue.h> //回调函数,注意参数是const类型的boost::shared_ptr指针 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],before loop 24 hours weather:" << msg -> weather . c_str ()); //死循环 while ( true ){} ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],24 hours weather:" << msg -> weather . c_str ()); } void weatherCallback_A ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],A 24 hours weather:" << msg -> weather . c_str ()); } //回调函数,注意参数是const类型的boost::shared_ptr指针 void airQualityCallback ( const my_msg :: air_quality_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],24 hours air quality:" << msg -> air_quality_index ); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要订阅名为「Weather」的话题(topic), 并指定回调函数weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ros :: Subscriber sub_a = n . subscribe ( "WeatherA" , 48 , weatherCallback_A ); //需要单独声明一个ros::NodeHandle ros :: NodeHandle n_1 ; //为这个ros::Nodehandle指定单独的Callback队列 ros :: CallbackQueue my_queue ; n_1 . setCallbackQueue ( & my_queue ); /*通知ROS master,本node要订阅名为「AirQuality」的话题(topic), 并指定回调函数airQualityCallback*/ ros :: Subscriber air_sub = n_1 . subscribe ( "AirQuality" , 48 , airQualityCallback ); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "]This is main thread." ); //启动两个线程处理全局Callback队列 ros :: AsyncSpinner spinner ( 2 ); spinner . start (); //启动一个线程处理AirQuality单独的队列 ros :: AsyncSpinner spinner_1 ( 1 , & my_queue ); spinner_1 . start (); ros :: waitForShutdown (); }

    从执行结果中可以看到,进程中包括四个线程:主线程、全局队列Spinner线程1、全局队列Spinner线程2,以及本地队列Spinner线程3。尽管Spinner线程1被回调函数中的死循环卡住,但并不影响其他topic的回调处理。

    [ INFO ] [ 1637132247.535142399 ] : Thread [ 7f73e4384780 ] This is main thread . [ INFO ] [ 1637132247.743935399 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3926 [ INFO ] [ 1637132247.744032493 ] : Thread [ 7f 73 d6ffd700 ], before loop 24 hours weather : Sunny 3906 [ INFO ] [ 1637132247.744203496 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4034 [ INFO ] [ 1637132247.888403207 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3927 [ INFO ] [ 1637132247.888433359 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4035 [ INFO ] [ 1637132248.088418911 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4036 [ INFO ] [ 1637132248.088461907 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3928 [ INFO ] [ 1637132248.288417795 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4037 [ INFO ] [ 1637132248.288448289 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3929

    下图展示了相关的线程和队列处理过程 :

    多个ros::Subscriber各自独立使用回调队列的过程

    小结

    在理解ROS的回调机制后,使用多个Callback队列和多个Spinner线程可以满足实际项目开发的需要。提醒大家在使用多线程时,记得对临界区域适当加锁,防止引入多线程问题。好了,ROS回调的多线程就分享到这种,如果您喜欢请 \color{red}{点赞,关注,分享} 。您的鼓励是我创作的最大动力,更多机械臂学习的分享,请关注本专栏 。

    本文中所有代码可在gitee工程上查询到:

    如果还想要与作者交流可以评论留言或付费咨询: