前一篇分享了 ROS 的基础通信机制——发布订阅机制。本打算接下来分享 {\boldsymbol{\color{blue}{Service}}} 请求响应机制。但感觉在实际项目中, {\boldsymbol{\color{red}{多线程}}} 的问题经常困扰 ROS 开发人员。如果没有完全弄明白 ROS 为订阅回调、请求回调、行动回调和定时器回调提供的调度机制,可能埋雷。到了开发中后期就容易踩坑。上篇链结:
回调( Callback )
ROS 中主要有四种类型的回调,分别绑定在不同的 ROS 对象上:
Callback 的特点概括为: \color{red}{用户实现,ROS调度} 。那么,ROS是如何进行调度的呢?
ROS回调机制
ROS默认有维护一个全局回调队列(名为: Global Callback Queue ),将已可用的回调插入Callback队列中。再通过Spinner线程获取并执行当前可用的回调。为了说明ROS回调机制,我引入两个ROS节点:一个使用定时器发布多个topic消息;另一个订阅这些topic消息。
发布节点
一个使用ROS定时器超时回调发布多个topic消息。
ros::Timer
来看一个ros::Timer的回调队列,代码如下:
//这段代码主要是实现定时向Topic发布消息
#include
"ros/ros.h"
#include
<boost/thread.hpp>
#include
"my_msg/weather_pub.h"
#include
"my_msg/air_quality_pub.h"
#include
<sstream>
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_publisher"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要发布一个名为「Weather」的话题(topic),
消息类型为my_msg::weather_pub,发送队列长度为48*/
ros
::
Publisher
pub_weather
=
n
.
advertise
<
my_msg
::
weather_pub
>
(
"Weather"
,
48
,
true
);
/*通知ROS master,本node要发布一个名为「WeatherA」的话题(topic),
消息类型为my_msg::weather_pub,发送队列长度为48*/
ros
::
Publisher
pub_weather_a
=
n
.
advertise
<
my_msg
::
weather_pub
>
(
"WeatherA"
,
48
,
true
);
/*通知ROS master,本node要发布一个名为「AirQuality」的话题(topic),
消息类型为my_msg::air_quality_pub,发送队列长度为48*/
ros
::
Publisher
pub_air_quality
=
n
.
advertise
<
my_msg
::
air_quality_pub
>
(
"AirQuality"
,
48
,
true
);
int
count
=
0
;
//创建一个ros::Timer每0.2秒进行发布,回调函数采用lamda4方法的格式
ros
::
Timer
timer
=
n
.
createTimer
(
ros
::
Duration
(
0.2
),
[
&
](
const
ros
::
TimerEvent
&
)
{
my_msg
::
weather_pub
msg
;
std
::
stringstream
ss
;
ss
<<
"Sunny "
<<
count
;
msg
.
weather
=
ss
.
str
();
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],weather:"
<<
msg
.
weather
.
c_str
());
pub_weather
.
publish
(
msg
);
std
::
stringstream
ssa
;
ssa
<<
"Sunny "
<<
20
+
count
;
msg
.
weather
=
ssa
.
str
();
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],weather:"
<<
msg
.
weather
.
c_str
());
pub_weather_a
.
publish
(
msg
);
my_msg
::
air_quality_pub
msg_air
;
msg_air
.
air_quality_index
=
128
+
count
;
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],air quality:"
<<
msg_air
.
air_quality_index
);
pub_air_quality
.
publish
(
msg_air
);
++
count
;
});
//确保定时器回调被调用
ros
::
spin
();
return
0
;
}
定时器启动后会生成一个Timer线程,根据定时器的参数,当定时器超时后将定时器的回调函数加入Callback队列中。然后再由用户调用的Spinner线程(ros::spin)从Callback队列中依次取出当前已可用的回调并执行。
ros::Publisher
上面例子中,在定时器回调函数中向topic进行发布,ros::Publisher将要发布的消息加入到Publisher队列中,再由专门的Publisher线程发布出去。注意这其中并不涉及Callback队列,这也解释了上篇中提到的: 如果一个ROS节点仅进行topic发布是不需要调用spinner的 。
订阅节点
订阅上面发布的topic消息。根据不同情况,进行代码修改。
ros::Subscriber
先让ROS节点只订阅一个topic来说明订阅回调的过程。下面是代码:
//订阅一个topic的代码
#include
"ros/ros.h"
#include
"my_msg/weather_pub.h"
//回调函数,注意参数是const类型的boost::shared_ptr指针
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO
(
"The 24 hours Weather: [%s]"
,
msg
->
weather
.
c_str
());
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要订阅名为「Weather」的话题(topic),
并指定回调函数weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ros
::
spin
();
}
订阅创建后,涉及到两个线程和两个队列:
在实际项目中,如果订阅回调中有耗时操作,那么可以用户可以启用多个Spinner线程并发从Callback队列中取出已可用的回调并执行。这样可以加快Callback队列被执行的速度。
ROS Spinner
ROS提供两种单线程回调的spin方法和两种多线程回调的Spin类,分别是:
单线程回调spin方法:
多线程回调spin类:
\color{red}{简单总结,如果程序简单用ros::spin()就够了;如果程序复杂推荐使用ros::AsyncSpinner类。} 他们的详细用法和区别在ROS官方教程中已经写得比较清楚,可参考:
下面这段代码展示如何使用ros::AsyncSpinner启用多个Spinner线程。
//一个topic多个线程来执行的代码
#include
"ros/ros.h"
#include
"boost/thread.hpp"
#include
"my_msg/weather_pub.h"
//回调函数,注意参数是const类型的boost::shared_ptr指针
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],after looping 24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要订阅名为「Weather」的话题(topic),
并指定回调函数weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"]This is main thread."
);
//声明spinner对象,参数2表示并发线程数,默认处理全局Callback队列
ros
::
AsyncSpinner
spinner
(
2
);
//启动两个spinner线程并发执行可用回调
spinner
.
start
();
ros
::
waitForShutdown
();
}
从执行结果中可以看到,进程中包括三个线程:主线程、Spinner线程1、Spinner线程2。
//这是执行结果,可以看到主线程
[
INFO
]
[
1637131602.089381910
]
:
Thread
[
7f
9
a1ad24780
]
This
is
main
thread
.
[
INFO
]
[
1637131602.375058712
]
:
Thread
[
7f
9
a11bb6700
],
after
looping
24
hours
weather
:
Sunny
679
[
INFO
]
[
1637131602.488504089
]
:
Thread
[
7f
9
a11bb6700
],
after
looping
24
hours
weather
:
Sunny
680
[
INFO
]
[
1637131602.688845441
]
:
Thread
[
7f
9
a123b7700
],
after
looping
24
hours
weather
:
Sunny
681
[
INFO
]
[
1637131602.888828136
]
:
Thread
[
7f
9
a123b7700
],
after
looping
24
hours
weather
:
Sunny
682
下图展示了相关的线程和队列处理过程 :
实际项目中一个节点往往要订阅多个topic,在使用默认全局Callback队列时,如果某些topic发布频率高回调处理又耗时的话,容易影响其他topic消息的处理。下图中TopicB的消息居多可能影响TopicA的处理。
这种情况下,ROS提供了机制,可以为每个ros::Subscriber指定Callback队列,再分别指定Spinner线程仅处理指定Callback队列的回调。这样确保每个订阅回调相互独立不影响。下面的代码展示如何进行上述操作:
//为每个subscriber指定队列
#include
"ros/ros.h"
#include
"boost/thread.hpp"
#include
"my_msg/weather_pub.h"
#include
"my_msg/air_quality_pub.h"
#include
<ros/callback_queue.h>
//回调函数,注意参数是const类型的boost::shared_ptr指针
void
weatherCallback
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],before loop 24 hours weather:"
<<
msg
->
weather
.
c_str
());
//死循环
while
(
true
){}
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
void
weatherCallback_A
(
const
my_msg
::
weather_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],A 24 hours weather:"
<<
msg
->
weather
.
c_str
());
}
//回调函数,注意参数是const类型的boost::shared_ptr指针
void
airQualityCallback
(
const
my_msg
::
air_quality_pubConstPtr
&
msg
)
{
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"],24 hours air quality:"
<<
msg
->
air_quality_index
);
}
int
main
(
int
argc
,
char
**
argv
){
ros
::
init
(
argc
,
argv
,
"multi_subscriber"
);
ros
::
NodeHandle
n
;
/*通知ROS master,本node要订阅名为「Weather」的话题(topic),
并指定回调函数weatherCallback*/
ros
::
Subscriber
sub
=
n
.
subscribe
(
"Weather"
,
48
,
weatherCallback
);
ros
::
Subscriber
sub_a
=
n
.
subscribe
(
"WeatherA"
,
48
,
weatherCallback_A
);
//需要单独声明一个ros::NodeHandle
ros
::
NodeHandle
n_1
;
//为这个ros::Nodehandle指定单独的Callback队列
ros
::
CallbackQueue
my_queue
;
n_1
.
setCallbackQueue
(
&
my_queue
);
/*通知ROS master,本node要订阅名为「AirQuality」的话题(topic),
并指定回调函数airQualityCallback*/
ros
::
Subscriber
air_sub
=
n_1
.
subscribe
(
"AirQuality"
,
48
,
airQualityCallback
);
ROS_INFO_STREAM
(
"Thread["
<<
boost
::
this_thread
::
get_id
()
<<
"]This is main thread."
);
//启动两个线程处理全局Callback队列
ros
::
AsyncSpinner
spinner
(
2
);
spinner
.
start
();
//启动一个线程处理AirQuality单独的队列
ros
::
AsyncSpinner
spinner_1
(
1
,
&
my_queue
);
spinner_1
.
start
();
ros
::
waitForShutdown
();
}
从执行结果中可以看到,进程中包括四个线程:主线程、全局队列Spinner线程1、全局队列Spinner线程2,以及本地队列Spinner线程3。尽管Spinner线程1被回调函数中的死循环卡住,但并不影响其他topic的回调处理。
[
INFO
]
[
1637132247.535142399
]
:
Thread
[
7f73e4384780
]
This
is
main
thread
.
[
INFO
]
[
1637132247.743935399
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3926
[
INFO
]
[
1637132247.744032493
]
:
Thread
[
7f
73
d6ffd700
],
before
loop
24
hours
weather
:
Sunny
3906
[
INFO
]
[
1637132247.744203496
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4034
[
INFO
]
[
1637132247.888403207
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3927
[
INFO
]
[
1637132247.888433359
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4035
[
INFO
]
[
1637132248.088418911
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4036
[
INFO
]
[
1637132248.088461907
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3928
[
INFO
]
[
1637132248.288417795
]
:
Thread
[
7f
73
d67fc700
],
24
hours
air
quality
:
4037
[
INFO
]
[
1637132248.288448289
]
:
Thread
[
7f
73
d77fe700
],
A
24
hours
weather
:
Sunny
3929
下图展示了相关的线程和队列处理过程 :
小结
在理解ROS的回调机制后,使用多个Callback队列和多个Spinner线程可以满足实际项目开发的需要。提醒大家在使用多线程时,记得对临界区域适当加锁,防止引入多线程问题。好了,ROS回调的多线程就分享到这种,如果您喜欢请 \color{red}{点赞,关注,分享} 。您的鼓励是我创作的最大动力,更多机械臂学习的分享,请关注本专栏 。
本文中所有代码可在gitee工程上查询到:
如果还想要与作者交流可以评论留言或付费咨询: