前一篇分享了 ROS 的基礎通訊機制——釋出訂閱機制。本打算接下來分享 {\boldsymbol{\color{blue}{Service}}} 請求響應機制。但感覺在實際專案中, {\boldsymbol{\color{red}{多執行緒}}} 的問題經常困擾 ROS 開發人員。如果沒有完全弄明白 ROS 為訂閱回呼、請求回呼、行動回呼和定時器回呼提供的排程機制,可能埋雷。到了開發中後期就容易踩坑。上篇鏈結:
回呼( Callback )
ROS 中主要有四種類別的回呼,分別繫結在不同的 ROS 物件上:
Callback 的特點概括為: \color{red}{使用者實作,ROS排程} 。那麽,ROS是如何進行排程的呢?
ROS回呼機制
ROS預設有維護一個全域回呼佇列(名為: Global Callback Queue ),將已可用的回呼插入Callback佇列中。再透過Spinner執行緒獲取並執行當前可用的回呼。為了說明ROS回呼機制,我引入兩個ROS節點:一個使用定時器釋出多個topic訊息;另一個訂閱這些topic訊息。
釋出節點
一個使用ROS定時器超時回呼釋出多個topic訊息。
ros::Timer
來看一個ros::Timer的回呼佇列,程式碼如下:
//這段程式碼主要是實作定時向Topic釋出訊息
#include
"ros/ros.h"
#include
<boost/thread.hpp>
#include
"my_msg/weather_pub.h"
#include
"my_msg/air_quality_pub.h"
#include
<sstream>
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_publisher"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要釋出一個名為「Weather」的話題(topic),
訊息類別為my_msg::weather_pub,發送佇列長度為48*/
ros
::
Publisher
pub_weather
=
n
.
advertise
<
my_msg
::
weather_pub
>
(
"Weather"
,
48
,
true
);
/*通知ROS master,本node要釋出一個名為「WeatherA」的話題(topic),
訊息類別為my_msg::weather_pub,發送佇列長度為48*/
ros
::
Publisher
pub_weather_a
=
n
.
advertise
<
my_msg
::
weather_pub
>
(
"WeatherA"
,
48
,
true
);
/*通知ROS master,本node要釋出一個名為「AirQuality」的話題(topic),
訊息類別為my_msg::air_quality_pub,發送佇列長度為48*/
ros
::
Publisher
pub_air_quality
=
n
.
advertise
<
my_msg
::
air_quality_pub
>
(
"AirQuality"
,
48
,
true
);
int
count
=
0
;
//建立一個ros::Timer每0.2秒進行釋出,回呼函式采用lamda4方法的格式
ros
::
Timer
timer
=
n
.
createTimer
(
ros
::
Duration
(
0.2
),
[
&
](
const
ros
::
TimerEvent
&
)
{
my_msg
::
weather_pub
msg
;
std
::
stringstream
ss
;
ss
<<
"Sunny "
<<
count
;
msg
.
weather
=
ss
.
str
();
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],weather:"
<<
msg
.
weather
.
c_str
());
pub_weather
.
publish
(
msg
);
std
::
stringstream
ssa
;
ssa
<<
"Sunny "
<<
20
+
count
;
msg
.
weather
=
ssa
.
str
();
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],weather:"
<<
msg
.
weather
.
c_str
());
pub_weather_a
.
publish
(
msg
);
my_msg
::
air_quality_pub
msg_air
;
msg_air
.
air_quality_index
=
128
+
count
;
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],air quality:"
<<
msg_air
.
air_quality_index
);
pub_air_quality
.
publish
(
msg_air
);
++
count
;
});
//確保定時器回呼被呼叫
ros
::
spin
();
return
0
;
}
定時器啟動後會生成一個Timer執行緒,根據定時器的參數,當定時器超時後將定時器的回呼函式加入Callback佇列中。然後再由使用者呼叫的Spinner執行緒(ros::spin)從Callback佇列中依次取出當前已可用的回呼並執行。
ros::Publisher
上面例子中,在定時器回呼函式中向topic進行釋出,ros::Publisher將要釋出的訊息加入到Publisher佇列中,再由專門的Publisher執行緒釋出出去。註意這其中並不涉及Callback佇列,這也解釋了上篇中提到的: 如果一個ROS節點僅進行topic釋出是不需要呼叫spinner的 。
訂閱節點
訂閱上面釋出的topic訊息。根據不同情況,進行程式碼修改。
ros::Subscriber
先讓ROS節點只訂閱一個topic來說明訂閱回呼的過程。下面是程式碼:
//訂閱一個topic的程式碼
#include
"ros/ros.h"
#include
"my_msg/weather_pub.h"
//回呼函式,註意參數是const類別的boost::shared_ptr指標
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO
(
"The 24 hours Weather: [%s]"
,
msg
->
weather
.
c_str
());
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要訂閱名為「Weather」的話題(topic),
並指定回呼函式weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ros
::
spin
();
}
訂閱建立後,涉及到兩個執行緒和兩個佇列:
在實際專案中,如果訂閱回呼中有耗時操作,那麽可以使用者可以啟用多個Spinner執行緒並行從Callback佇列中取出已可用的回呼並執行。這樣可以加快Callback佇列被執行的速度。
ROS Spinner
ROS提供兩種單執行緒回呼的spin方法和兩種多執行緒回呼的Spin類,分別是:
單執行緒回呼spin方法:
多執行緒回呼spin類:
\color{red}{簡單總結,如果程式簡單用ros::spin()就夠了;如果程式復雜推薦使用ros::AsyncSpinner類。} 他們的詳細用法和區別在ROS官方教程中已經寫得比較清楚,可參考:
下面這段程式碼展示如何使用ros::AsyncSpinner啟用多個Spinner執行緒。
//一個topic多個執行緒來執行的程式碼
#include
"ros/ros.h"
#include
"boost/thread.hpp"
#include
"my_msg/weather_pub.h"
//回呼函式,註意參數是const類別的boost::shared_ptr指標
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],after looping 24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要訂閱名為「Weather」的話題(topic),
並指定回呼函式weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"]This is main thread."
);
//聲明spinner物件,參數2表示並行執行緒數,預設處理全域Callback佇列
ros
::
AsyncSpinner
spinner
(
2
);
//啟動兩個spinner執行緒並行執行可用回呼
spinner
.
start
();
ros
::
waitForShutdown
();
}
從執行結果中可以看到,行程中包括三個執行緒:主執行緒、Spinner執行緒1、Spinner執行緒2。
//這是執行結果,可以看到主執行緒
[
INFO
]
[
1637131602.089381910
]
:
Thread
[
7f
9
a1ad24780
]
This
is
main
thread
.
[
INFO
]
[
1637131602.375058712
]
:
Thread
[
7f
9
a11bb6700
],
after
looping
24
hours
weather
:
Sunny
679
[
INFO
]
[
1637131602.488504089
]
:
Thread
[
7f
9
a11bb6700
],
after
looping
24
hours
weather
:
Sunny
680
[
INFO
]
[
1637131602.688845441
]
:
Thread
[
7f
9
a123b7700
],
after
looping
24
hours
weather
:
Sunny
681
[
INFO
]
[
1637131602.888828136
]
:
Thread
[
7f
9
a123b7700
],
after
looping
24
hours
weather
:
Sunny
682
下圖展示了相關的執行緒和佇列處理過程 :
實際專案中一個節點往往要訂閱多個topic,在使用預設全域Callback佇列時,如果某些topic釋出頻率高回呼處理又耗時的話,容易影響其他topic訊息的處理。下圖中TopicB的訊息居多可能影響TopicA的處理。
這種情況下,ROS提供了機制,可以為每個ros::Subscriber指定Callback佇列,再分別指定Spinner執行緒僅處理指定Callback佇列的回呼。這樣確保每個訂閱回呼相互獨立不影響。下面的程式碼展示如何進行上述操作:
//為每個subscriber指定佇列
#include
"ros/ros.h"
#include
"boost/thread.hpp"
#include
"my_msg/weather_pub.h"
#include
"my_msg/air_quality_pub.h"
#include
<ros/callback_queue.h>
//回呼函式,註意參數是const類別的boost::shared_ptr指標
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],before loop 24 hours weather:"
<<
msg
->
weather
.
c_str
());
//死迴圈
while
(
true
){}
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
void
weatherCallback_A
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],A 24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
//回呼函式,註意參數是const類別的boost::shared_ptr指標
void
airQualityCallback
(
const
my_msg
::
air_quality_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],24 hours air quality:"
<<
msg
->
air_quality_index
);
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要訂閱名為「Weather」的話題(topic),
並指定回呼函式weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ros
::
Subscriber
sub_a
=
n
.
subscribe
(
"WeatherA"
,
48
,
weatherCallback_A
);
//需要單獨聲明一個ros::NodeHandle
ros
::
NodeHandle
n_1
;
//為這個ros::Nodehandle指定單獨的Callback佇列
ros
::
CallbackQueue
my_queue
;
n_1
.
setCallbackQueue
(
&
my_queue
);
/*通知ROS master,本node要訂閱名為「AirQuality」的話題(topic),
並指定回呼函式airQualityCallback*/
ros
::
Subscriber
air_sub
=
n_1
.
subscribe
(
"AirQuality"
,
48
,
airQualityCallback
);
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"]This is main thread."
);
//啟動兩個執行緒處理全域Callback佇列
ros
::
AsyncSpinner
spinner
(
2
);
spinner
.
start
();
//啟動一個執行緒處理AirQuality單獨的佇列
ros
::
AsyncSpinner
spinner_1
(
1
,
&
my_queue
);
spinner_1
.
start
();
ros
::
waitForShutdown
();
}
從執行結果中可以看到,行程中包括四個執行緒:主執行緒、全域佇列Spinner執行緒1、全域佇列Spinner執行緒2,以及本地佇列Spinner執行緒3。盡管Spinner執行緒1被回呼函式中的死迴圈卡住,但並不影響其他topic的回呼處理。
[
INFO
]
[
1637132247.535142399
]
:
Thread
[
7f73e4384780
]
This
is
main
thread
.
[
INFO
]
[
1637132247.743935399
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3926
[
INFO
]
[
1637132247.744032493
]
:
Thread
[
7f
73
d6ffd700
],
before
loop
24
hours
weather
:
Sunny
3906
[
INFO
]
[
1637132247.744203496
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4034
[
INFO
]
[
1637132247.888403207
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3927
[
INFO
]
[
1637132247.888433359
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4035
[
INFO
]
[
1637132248.088418911
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4036
[
INFO
]
[
1637132248.088461907
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3928
[
INFO
]
[
1637132248.288417795
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4037
[
INFO
]
[
1637132248.288448289
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3929
下圖展示了相關的執行緒和佇列處理過程 :
小結
在理解ROS的回呼機制後,使用多個Callback佇列和多個Spinner執行緒可以滿足實際專案開發的需要。提醒大家在使用多執行緒時,記得對臨界區域適當加鎖,防止引入多執行緒問題。好了,ROS回呼的多執行緒就分享到這種,如果您喜歡請 \color{red}{點贊,關註,分享} 。您的鼓勵是我創作的最大動力,更多機械臂學習的分享,請關註本專欄 。
本文中所有程式碼可在gitee工程上查詢到:
如果還想要與作者交流可以評論留言或付費咨詢: