第3章 话题通信实现

码枢沄社 · 嵌入式体系化教程平台
入门 👤 刚接触ROS2节点通信的新手 🔧 构建传感器数据分发或控制指令广播系统
本章你将学到
  • 话题通信的完整机制:发布者→话题→订阅者
  • 用C++和Python编写发布者与订阅者节点
  • 话题匹配的条件:名称与消息类型必须一致
核心概念话题(Topic)发布者(Publisher)订阅者(Subscriber)

发布者与订阅者的角色分工

角色类比关键动作注意事项
发布者快递发件人创建话题对象后调用 publish()必须在调用 publish() 之前确保话题已创建
订阅者快递收件人预先注册回调函数处理接收到的消息回调函数执行期间会阻塞其他事件处理
话题快递站点流转单持有一个消息类型和名称名称区分大小写且不支持中文

话题通信类似于快递系统:发布者是发件方,将包裹(消息)交给站点;站点(话题)根据标签(话题名)派发给对应的收件方(订阅者)。收件方不需要知道发件方是谁,发件方也不需要知道有多少收件方。

发布者节点
话题 /sensor/imu
订阅者节点A
发布数据
消息队列
回调执行

话题通信是异步的——发布者只管发,不需要等待订阅者处理完毕。订阅者收到消息后,会按照注册的回调函数顺序执行。实际开发中,回调函数的执行时间必须严格控制,我曾遇到过一个订阅者在回调里用同步方式输出日志到数据库,结果数据库延迟,直接卡住了整个节点的所有订阅回调。

创建发布者的代码骨架

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

class Talker : public rclcpp::Node {
public:
  Talker() : Node("talker_node") {
    publisher_ = create_publisher<std_msgs::msg::String>("chatter", 10);
    timer_ = create_wall_timer(std::chrono::seconds(1), [this]() {
      auto msg = std_msgs::msg::String();
      msg.data = "Hello";
      publisher_->publish(msg);
    });
  }
private:
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  rclcpp::TimerBase::SharedPtr timer_;
};

这个发布者节点每秒钟向 chatter 话题发送一条字符串消息。注意队列深度参数 10 决定了消息缓冲区的大小——如果订阅者处理太慢,新消息会覆盖旧消息,而不是无限制堆积。

订阅者节点的核心逻辑

class Listener : public rclcpp::Node {
public:
  Listener() : Node("listener_node") {
    subscription_ = create_subscription<std_msgs::msg::String>(
      "chatter", 10, [this](const std_msgs::msg::String::SharedPtr msg) {
        RCLCPP_INFO(get_logger(), "收到: %s", msg->data.c_str());
      });
  }
private:
  rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};

订阅者通过回调函数处理每条消息。注意回调函数中不能做耗时操作——如果有需要,把数据放入队列,另外开一个线程或者定时器去处理。我在项目中踩过这个坑:回调里直接做特征计算,导致后续消息丢失,实际表现为订阅者经常漏掉关键数据。

节点启动
创建发布者/订阅者
调用 spin()
等待事件(mac)
执行回调或发送

节点创建了发布者和订阅者之后,必须调用 rclcpp::spin() 才能让事件循环持续运转。spin() 就像快递站点的分拣传送带——只有它运行着,包裹才能被送达订阅者。

话题匹配的三个必要条件

  • 话题名完全一致:包括大小写和路径分隔符(如 /sensor/imu 与 /sensor/imu 匹配,但与 /sensor/IMU 不匹配)
  • 消息类型必须相同:std_msgs/msg/String 类型的发布者只能被相同类型的订阅者接收
  • DDS域ID必须一致:默认域ID为0,跨域无法通信(涉及DDS层,初学者通常无需修改)

话题匹配失败时,最常见的情况是订阅者收不到任何消息。排查思路:用 ros2 topic list 查看话题是否存在,再用 ros2 topic info 确认消息类型是否一致。如果你看到的报错是 no matching subscriptionno matching publisher,大概率是名称或类型不匹配的问题。

新手常见误区

  1. 忘记调用 spin():没有 spin(),节点就像快递站点没开分拣传送带,消息积压在缓冲区,订阅者永远无法收到。
  2. 回调函数做耗时操作:回调是串行执行的,一旦阻塞,后面所有回调都排队等待,容易引发消息丢失或延迟飙升。
  3. 话题名写错大小写:ROS2 的话题名对大小写敏感,/cmd_vel 与 /Cmd_vel 是两个截然不同的话题。

编者提示: 话题通信的底层依赖DDS中间件,而DDS在发现对端节点时存在一个短暂的发现周期(通常在1-3秒内)。所以节点启动后,订阅者不会瞬间收到消息,需要等待建立连接。在实际调试中,如果需要确认连接是否成功,可以运行 ros2 topic echo /chatter 实时观察消息是否流通。另外,如果发布者和订阅者运行在不同的网络段,还需要配置DDS发现协议的通信端口(这个在嵌入式多机部署时特别重要,不过本章只讲同机通信)。

使用Python快速验证话题通信

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MinimalPublisher(Node):
    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        self.timer = self.create_timer(0.5, self.timer_callback)

    def timer_callback(self):
        msg = String()
        msg.data = 'Hello ROS2'
        self.publisher_.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = MinimalPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

Python版本的代码更加简洁直观,适合快速验证话题通信是否正常。实际开发中,大部分团队对于计算密集型的节点用C++实现,而行为逻辑简单的节点用Python。两种语言在话题层面的API结构一一对应,互换使用完全透明。

话题通信中的消息队列管理

每个话题的背后都有一个消息队列,队列深度由创建发布者/订阅者时的QoS参数控制。队列深度决定了当订阅者处理速度跟不上时,系统可以缓冲多少条消息。深度为10表示最多缓存10条未处理的消息,超过时旧消息会被丢弃。如果订阅者处理非常慢,可以适当增大队列深度,但同时会消耗更多内存。

实时控制

  • 队列深度建议1~2
  • 保证最新指令被接收
  • 丢旧消息可接受

传感器数据

  • 队列深度10~50
  • 允许短时缓存
  • 避免丢失关键帧

日志消息

  • 队列深度100+
  • 允许较大延迟
  • 尽量不丢数据

这里有个隐藏的坑:提高队列深度会增加内存开销,在嵌入式设备上可能引发OOM。我在一个ARM架构的板子上部署时,默认队列深度100导致内存不足,系统直接OOM Kill了节点进程。解决办法是把深度降到20,同时对重要数据做数据库落盘。

动手试一试

任务:创建一个发布者节点和一个订阅者节点,发布者每0.5秒发布一条带有计数的String消息,订阅者在回调里打印收到的字符串。

  1. 编写C++发布者节点(参考本章Talker类),话题名为 /demo/count,队列深度设为5。
  2. 编写Python订阅者节点,订阅 /demo/count 话题,打印每条消息。
  3. 运行两个节点,观察终端输出是否每0.5秒出现一条递增的消息。
  4. 尝试故意改错话题名称(如将订阅者话题名改为 /demo/count2),观察订阅者是否还能收到消息并理解原因。

检验你的理解

  1. 判断题:两个节点使用不同名称的同一个消息类型,它们可以通过话题通信。 ( )
  2. 判断题:订阅者的回调函数中可以执行任何耗时的计算任务。 ( )
  3. 选择题:如果发布者以10Hz的频率发布消息,而订阅者处理一条消息需要0.3秒,下列哪种设置最合适?
    A. 队列深度1    B. 队列深度10    C. 队列深度100

本章小结

  • 话题通信是ROS2中最核心的异步数据交换方式,由发布者、话题和订阅者三部分组成。
  • 创建节点后必须调用 rclcpp::spin() 使事件循环工作,否则所有回调都不会触发。
  • 订阅者的回调函数应保持轻量,避免阻塞影响其他消息的处理。
  • 话题匹配由名称和消息类型共同决定,大小写敏感,跨域通信受DDS域ID影响。
  • 队列深度影响消息缓存的容量,应根据应用场景(实时控制/传感器/日志)合理设置。
码枢沄社 · 嵌入式体系化教程平台

3000+ 实战教程 · STM32 / Linux / RTOS / 汽车电子 / 物联网…
查看全部课程
← 上一章 返回目录 下一章 →