當前位置: 華文問答 > 科學

機械臂學習(15)

2021-11-17科學

前一篇分享了 ROS 的基礎通訊機制——釋出訂閱機制。本打算接下來分享 {\boldsymbol{\color{blue}{Service}}} 請求響應機制。但感覺在實際專案中, {\boldsymbol{\color{red}{多執行緒}}} 的問題經常困擾 ROS 開發人員。如果沒有完全弄明白 ROS 為訂閱回呼、請求回呼、行動回呼和定時器回呼提供的排程機制,可能埋雷。到了開發中後期就容易踩坑。上篇鏈結:

回呼( Callback )

ROS 中主要有四種類別的回呼,分別繫結在不同的 ROS 物件上:

  • {\boldsymbol{\color{black}{訂閱回呼}}} —— ros::Subscriber
  • {\boldsymbol{\color{black}{請求回呼}}} —— ros::ServiceServer
  • {\boldsymbol{\color{black}{行動回呼}}} —— actionlib::SimpleActionServer
  • {\boldsymbol{\color{black}{定時器回呼}}} —— ros::Timer
  • Callback 的特點概括為: \color{red}{使用者實作,ROS排程} 。那麽,ROS是如何進行排程的呢?

    ROS回呼機制

    ROS預設有維護一個全域回呼佇列(名為: Global Callback Queue ),將已可用的回呼插入Callback佇列中。再透過Spinner執行緒獲取並執行當前可用的回呼。為了說明ROS回呼機制,我引入兩個ROS節點:一個使用定時器釋出多個topic訊息;另一個訂閱這些topic訊息。

    釋出節點

    一個使用ROS定時器超時回呼釋出多個topic訊息。

    ros::Timer

    來看一個ros::Timer的回呼佇列,程式碼如下:

    //這段程式碼主要是實作定時向Topic釋出訊息 #include "ros/ros.h" #include <boost/thread.hpp> #include "my_msg/weather_pub.h" #include "my_msg/air_quality_pub.h" #include <sstream> int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_publisher" ); ros :: NodeHandle n ; /*通知ROS master,本node要釋出一個名為「Weather」的話題(topic), 訊息類別為my_msg::weather_pub,發送佇列長度為48*/ ros :: Publisher pub_weather = n . advertise < my_msg :: weather_pub > ( "Weather" , 48 , true ); /*通知ROS master,本node要釋出一個名為「WeatherA」的話題(topic), 訊息類別為my_msg::weather_pub,發送佇列長度為48*/ ros :: Publisher pub_weather_a = n . advertise < my_msg :: weather_pub > ( "WeatherA" , 48 , true ); /*通知ROS master,本node要釋出一個名為「AirQuality」的話題(topic), 訊息類別為my_msg::air_quality_pub,發送佇列長度為48*/ ros :: Publisher pub_air_quality = n . advertise < my_msg :: air_quality_pub > ( "AirQuality" , 48 , true ); int count = 0 ; //建立一個ros::Timer每0.2秒進行釋出,回呼函式采用lamda4方法的格式 ros :: Timer timer = n . createTimer ( ros :: Duration ( 0.2 ), [ & ]( const ros :: TimerEvent & ) { my_msg :: weather_pub msg ; std :: stringstream ss ; ss << "Sunny " << count ; msg . weather = ss . str (); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],weather:" << msg . weather . c_str ()); pub_weather . publish ( msg ); std :: stringstream ssa ; ssa << "Sunny " << 20 + count ; msg . weather = ssa . str (); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],weather:" << msg . weather . c_str ()); pub_weather_a . publish ( msg ); my_msg :: air_quality_pub msg_air ; msg_air . air_quality_index = 128 + count ; ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],air quality:" << msg_air . air_quality_index ); pub_air_quality . publish ( msg_air ); ++ count ; }); //確保定時器回呼被呼叫 ros :: spin (); return 0 ; }

    定時器啟動後會生成一個Timer執行緒,根據定時器的參數,當定時器超時後將定時器的回呼函式加入Callback佇列中。然後再由使用者呼叫的Spinner執行緒(ros::spin)從Callback佇列中依次取出當前已可用的回呼並執行。

    ros::Timer的佇列和回呼過程

    ros::Publisher

    上面例子中,在定時器回呼函式中向topic進行釋出,ros::Publisher將要釋出的訊息加入到Publisher佇列中,再由專門的Publisher執行緒釋出出去。註意這其中並不涉及Callback佇列,這也解釋了上篇中提到的: 如果一個ROS節點僅進行topic釋出是不需要呼叫spinner的

    ros::Pubblisher的佇列和發送過程

    訂閱節點

    訂閱上面釋出的topic訊息。根據不同情況,進行程式碼修改。

    ros::Subscriber

    先讓ROS節點只訂閱一個topic來說明訂閱回呼的過程。下面是程式碼:

    //訂閱一個topic的程式碼 #include "ros/ros.h" #include "my_msg/weather_pub.h" //回呼函式,註意參數是const類別的boost::shared_ptr指標 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO ( "The 24 hours Weather: [%s]" , msg -> weather . c_str ()); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要訂閱名為「Weather」的話題(topic), 並指定回呼函式weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ros :: spin (); }

    訂閱建立後,涉及到兩個執行緒和兩個佇列:

  • ROS提供的Receiver執行緒將收到的訊息加入到Subscriber佇列,並觸發使訂閱回呼函式加入Callback佇列。 註意:每個ros::Subscriber有自己的Subscriber佇列,而Callback佇列預設是全域的。
  • 使用者呼叫的Spinner執行緒(也就是ros::spin)從Callback佇列中取出已可用的回呼並執行。
  • ros::Subscriber的佇列和回呼過程

    在實際專案中,如果訂閱回呼中有耗時操作,那麽可以使用者可以啟用多個Spinner執行緒並行從Callback佇列中取出已可用的回呼並執行。這樣可以加快Callback佇列被執行的速度。

    ROS Spinner

    ROS提供兩種單執行緒回呼的spin方法和兩種多執行緒回呼的Spin類,分別是:

    單執行緒回呼spin方法:

  • ros::spin()—— 相當於while(true)的大迴圈,不斷遍歷執行Callback佇列中的可用回呼
  • ros::spinOne()—— 相當於馬上執行一次Callback佇列中的可用回呼
  • 多執行緒回呼spin類:

  • ros::MultiThreadedSpinner—— 相當於while(true)大迴圈,啟動指定數量的Spinner執行緒並行執行Callback佇列中的可用回呼。可指定Callback佇列。
  • ros::AsyncSpinner—— 異步啟停,開啟指定數量的Spinner執行緒並行執行Callback佇列中的可用回呼。可指定Callback佇列。
  • \color{red}{簡單總結,如果程式簡單用ros::spin()就夠了;如果程式復雜推薦使用ros::AsyncSpinner類。} 他們的詳細用法和區別在ROS官方教程中已經寫得比較清楚,可參考:

    下面這段程式碼展示如何使用ros::AsyncSpinner啟用多個Spinner執行緒。

    //一個topic多個執行緒來執行的程式碼 #include "ros/ros.h" #include "boost/thread.hpp" #include "my_msg/weather_pub.h" //回呼函式,註意參數是const類別的boost::shared_ptr指標 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],after looping 24 hours weather:" << msg -> weather . c_str ()); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要訂閱名為「Weather」的話題(topic), 並指定回呼函式weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "]This is main thread." ); //聲明spinner物件,參數2表示並行執行緒數,預設處理全域Callback佇列 ros :: AsyncSpinner spinner ( 2 ); //啟動兩個spinner執行緒並行執行可用回呼 spinner . start (); ros :: waitForShutdown (); }

    從執行結果中可以看到,行程中包括三個執行緒:主執行緒、Spinner執行緒1、Spinner執行緒2。

    //這是執行結果,可以看到主執行緒 [ INFO ] [ 1637131602.089381910 ] : Thread [ 7f 9 a1ad24780 ] This is main thread . [ INFO ] [ 1637131602.375058712 ] : Thread [ 7f 9 a11bb6700 ], after looping 24 hours weather : Sunny 679 [ INFO ] [ 1637131602.488504089 ] : Thread [ 7f 9 a11bb6700 ], after looping 24 hours weather : Sunny 680 [ INFO ] [ 1637131602.688845441 ] : Thread [ 7f 9 a123b7700 ], after looping 24 hours weather : Sunny 681 [ INFO ] [ 1637131602.888828136 ] : Thread [ 7f 9 a123b7700 ], after looping 24 hours weather : Sunny 682

    下圖展示了相關的執行緒和佇列處理過程 :

    使用多執行緒Spinner同時並行處理回呼過程

    實際專案中一個節點往往要訂閱多個topic,在使用預設全域Callback佇列時,如果某些topic釋出頻率高回呼處理又耗時的話,容易影響其他topic訊息的處理。下圖中TopicB的訊息居多可能影響TopicA的處理。

    多個ros::Subscriber在全域回呼佇列中排隊的過程

    這種情況下,ROS提供了機制,可以為每個ros::Subscriber指定Callback佇列,再分別指定Spinner執行緒僅處理指定Callback佇列的回呼。這樣確保每個訂閱回呼相互獨立不影響。下面的程式碼展示如何進行上述操作:

    //為每個subscriber指定佇列 #include "ros/ros.h" #include "boost/thread.hpp" #include "my_msg/weather_pub.h" #include "my_msg/air_quality_pub.h" #include <ros/callback_queue.h> //回呼函式,註意參數是const類別的boost::shared_ptr指標 void weatherCallback ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],before loop 24 hours weather:" << msg -> weather . c_str ()); //死迴圈 while ( true ){} ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],24 hours weather:" << msg -> weather . c_str ()); } void weatherCallback_A ( const my_msg :: weather_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],A 24 hours weather:" << msg -> weather . c_str ()); } //回呼函式,註意參數是const類別的boost::shared_ptr指標 void airQualityCallback ( const my_msg :: air_quality_pubConstPtr & msg ) { ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "],24 hours air quality:" << msg -> air_quality_index ); } int main ( int argc , char ** argv ){ ros :: init ( argc , argv , "multi_subscriber" ); ros :: NodeHandle n ; /*通知ROS master,本node要訂閱名為「Weather」的話題(topic), 並指定回呼函式weatherCallback*/ ros :: Subscriber sub = n . subscribe ( "Weather" , 48 , weatherCallback ); ros :: Subscriber sub_a = n . subscribe ( "WeatherA" , 48 , weatherCallback_A ); //需要單獨聲明一個ros::NodeHandle ros :: NodeHandle n_1 ; //為這個ros::Nodehandle指定單獨的Callback佇列 ros :: CallbackQueue my_queue ; n_1 . setCallbackQueue ( & my_queue ); /*通知ROS master,本node要訂閱名為「AirQuality」的話題(topic), 並指定回呼函式airQualityCallback*/ ros :: Subscriber air_sub = n_1 . subscribe ( "AirQuality" , 48 , airQualityCallback ); ROS_INFO_STREAM ( "Thread[" << boost :: this_thread :: get_id () << "]This is main thread." ); //啟動兩個執行緒處理全域Callback佇列 ros :: AsyncSpinner spinner ( 2 ); spinner . start (); //啟動一個執行緒處理AirQuality單獨的佇列 ros :: AsyncSpinner spinner_1 ( 1 , & my_queue ); spinner_1 . start (); ros :: waitForShutdown (); }

    從執行結果中可以看到,行程中包括四個執行緒:主執行緒、全域佇列Spinner執行緒1、全域佇列Spinner執行緒2,以及本地佇列Spinner執行緒3。盡管Spinner執行緒1被回呼函式中的死迴圈卡住,但並不影響其他topic的回呼處理。

    [ INFO ] [ 1637132247.535142399 ] : Thread [ 7f73e4384780 ] This is main thread . [ INFO ] [ 1637132247.743935399 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3926 [ INFO ] [ 1637132247.744032493 ] : Thread [ 7f 73 d6ffd700 ], before loop 24 hours weather : Sunny 3906 [ INFO ] [ 1637132247.744203496 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4034 [ INFO ] [ 1637132247.888403207 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3927 [ INFO ] [ 1637132247.888433359 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4035 [ INFO ] [ 1637132248.088418911 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4036 [ INFO ] [ 1637132248.088461907 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3928 [ INFO ] [ 1637132248.288417795 ] : Thread [ 7f 73 d67fc700 ], 24 hours air quality : 4037 [ INFO ] [ 1637132248.288448289 ] : Thread [ 7f 73 d77fe700 ], A 24 hours weather : Sunny 3929

    下圖展示了相關的執行緒和佇列處理過程 :

    多個ros::Subscriber各自獨立使用回呼佇列的過程

    小結

    在理解ROS的回呼機制後,使用多個Callback佇列和多個Spinner執行緒可以滿足實際專案開發的需要。提醒大家在使用多執行緒時,記得對臨界區域適當加鎖,防止引入多執行緒問題。好了,ROS回呼的多執行緒就分享到這種,如果您喜歡請 \color{red}{點贊,關註,分享} 。您的鼓勵是我創作的最大動力,更多機械臂學習的分享,請關註本專欄 。

    本文中所有程式碼可在gitee工程上查詢到:

    如果還想要與作者交流可以評論留言或付費咨詢: