| 角色 | 类比 | 关键动作 | 注意事项 |
|---|---|---|---|
| 发布者 | 快递发件人 | 创建话题对象后调用 publish() | 必须在调用 publish() 之前确保话题已创建 |
| 订阅者 | 快递收件人 | 预先注册回调函数处理接收到的消息 | 回调函数执行期间会阻塞其他事件处理 |
| 话题 | 快递站点流转单 | 持有一个消息类型和名称 | 名称区分大小写且不支持中文 |
话题通信类似于快递系统:发布者是发件方,将包裹(消息)交给站点;站点(话题)根据标签(话题名)派发给对应的收件方(订阅者)。收件方不需要知道发件方是谁,发件方也不需要知道有多少收件方。
话题通信是异步的——发布者只管发,不需要等待订阅者处理完毕。订阅者收到消息后,会按照注册的回调函数顺序执行。实际开发中,回调函数的执行时间必须严格控制,我曾遇到过一个订阅者在回调里用同步方式输出日志到数据库,结果数据库延迟,直接卡住了整个节点的所有订阅回调。
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
class Talker : public rclcpp::Node {
public:
Talker() : Node("talker_node") {
publisher_ = create_publisher<std_msgs::msg::String>("chatter", 10);
timer_ = create_wall_timer(std::chrono::seconds(1), [this]() {
auto msg = std_msgs::msg::String();
msg.data = "Hello";
publisher_->publish(msg);
});
}
private:
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::TimerBase::SharedPtr timer_;
};
这个发布者节点每秒钟向 chatter 话题发送一条字符串消息。注意队列深度参数 10 决定了消息缓冲区的大小——如果订阅者处理太慢,新消息会覆盖旧消息,而不是无限制堆积。
class Listener : public rclcpp::Node {
public:
Listener() : Node("listener_node") {
subscription_ = create_subscription<std_msgs::msg::String>(
"chatter", 10, [this](const std_msgs::msg::String::SharedPtr msg) {
RCLCPP_INFO(get_logger(), "收到: %s", msg->data.c_str());
});
}
private:
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};
订阅者通过回调函数处理每条消息。注意回调函数中不能做耗时操作——如果有需要,把数据放入队列,另外开一个线程或者定时器去处理。我在项目中踩过这个坑:回调里直接做特征计算,导致后续消息丢失,实际表现为订阅者经常漏掉关键数据。
节点创建了发布者和订阅者之后,必须调用 rclcpp::spin() 才能让事件循环持续运转。spin() 就像快递站点的分拣传送带——只有它运行着,包裹才能被送达订阅者。
话题匹配失败时,最常见的情况是订阅者收不到任何消息。排查思路:用 ros2 topic list 查看话题是否存在,再用 ros2 topic info 确认消息类型是否一致。如果你看到的报错是 no matching subscription 或 no matching publisher,大概率是名称或类型不匹配的问题。
编者提示: 话题通信的底层依赖DDS中间件,而DDS在发现对端节点时存在一个短暂的发现周期(通常在1-3秒内)。所以节点启动后,订阅者不会瞬间收到消息,需要等待建立连接。在实际调试中,如果需要确认连接是否成功,可以运行 ros2 topic echo /chatter 实时观察消息是否流通。另外,如果发布者和订阅者运行在不同的网络段,还需要配置DDS发现协议的通信端口(这个在嵌入式多机部署时特别重要,不过本章只讲同机通信)。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
self.timer = self.create_timer(0.5, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = 'Hello ROS2'
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = MinimalPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Python版本的代码更加简洁直观,适合快速验证话题通信是否正常。实际开发中,大部分团队对于计算密集型的节点用C++实现,而行为逻辑简单的节点用Python。两种语言在话题层面的API结构一一对应,互换使用完全透明。
每个话题的背后都有一个消息队列,队列深度由创建发布者/订阅者时的QoS参数控制。队列深度决定了当订阅者处理速度跟不上时,系统可以缓冲多少条消息。深度为10表示最多缓存10条未处理的消息,超过时旧消息会被丢弃。如果订阅者处理非常慢,可以适当增大队列深度,但同时会消耗更多内存。
这里有个隐藏的坑:提高队列深度会增加内存开销,在嵌入式设备上可能引发OOM。我在一个ARM架构的板子上部署时,默认队列深度100导致内存不足,系统直接OOM Kill了节点进程。解决办法是把深度降到20,同时对重要数据做数据库落盘。
任务:创建一个发布者节点和一个订阅者节点,发布者每0.5秒发布一条带有计数的String消息,订阅者在回调里打印收到的字符串。
/demo/count,队列深度设为5。/demo/count 话题,打印每条消息。/demo/count2),观察订阅者是否还能收到消息并理解原因。rclcpp::spin() 使事件循环工作,否则所有回调都不会触发。