想象你让机器人去抓取一个杯子。话题只能持续发布位置数据,服务只能发一次指令然后等回复——但抓取这个动作需要几秒钟:手臂要先移动到杯子附近,然后调整夹爪,最后夹紧收回。在这个过程中,你希望知道“手臂走到哪了”,而且如果位置偏了,你还想中途取消重新来一次。这就是动作(Action)通信诞生的场景。
| 组成部分 | 类比 | 技术含义 | 通信方式 |
|---|---|---|---|
| 目标(Goal) | 外卖订单内容 | 告诉服务器要执行什么任务 | 服务(一次性) |
| 反馈(Feedback) | 骑手实时位置 | 服务器在过程中持续推送进度 | 话题(持续) |
| 结果(Result) | 外卖送达或取消 | 任务完成后的最终输出 | 服务(一次、最终) |
一个动作定义对应一个.action文件,包含三个部分的分隔声明:#goal、#result、#feedback。我用一个实际抓取的动作定义来演示其结构。
# File: Grasp.action
# 目标(Goal)
string object_name # 要抓取的目标物体名称
---
# 结果(Result)
bool success # 是否成功
float32 grasp_force # 抓取力度
---
# 反馈(Feedback)
float32 approach_progress # 接近进度 0.0~1.0
float32 gripper_position # 当前夹爪开度
rclcpp和rclpy库把这两者封装成了统一的Action接口,你只需要关心业务逻辑。
实际项目中,动作服务器通常会创建一个后台线程来执行耗时操作,主线程继续监听新的目标请求。我在调试一个机械臂动作服务器时踩过一个坑:如果在反馈回调中执行了阻塞IO,整个动作通信会被卡住,因为反馈实际上是在DDS的回调线程中发送的。
下面是一个动作客户端的代码,它向动作服务器发送一个抓取目标,并实时打印反馈进度。
<span class="com">import rclpy
from rclpy.action import ActionClient
from custom_interfaces.action import Grasp
class GraspClient:
def __init__(self):
super().__init__('grasp_client')
self._client = ActionClient(self, Grasp, 'grasp')
def send_goal(self, object_name):
goal_msg = Grasp.Goal()
goal_msg.object_name = object_name
# 等待服务器就绪
self._client.wait_for_server()
# 发送目标,绑定反馈回调
send_goal_future = self._client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
print(f"接近进度: {feedback.approach_progress:.2f}")
send_goal_async()会收到异常,因为动作客户端内部的服务代理还没连上服务器。/grasp/_action/feedback,直接订阅虽然可以收到数据,但无法获得完整的生命周期管理(如目标ID匹配、超时取消等)。动作通信最典型的用法是“可抢占目标队列”。服务器端可以同时接收多个目标,但同一时间只执行一个。如果收到新目标,默认行为是抢占当前目标:服务器会取消当前任务,转而执行新目标。实际开发中,大部分团队会在动作服务器中实现一个目标队列,让非紧急任务排队等待,避免频繁抢占导致机械臂来回抖动。
uint8百分比(0~100)代替float32,可以显著减少DDS序列化的开销。我曾在一个Cortex-M4平台上把动作的目标结构体从4个字段精简到2个,反馈频率从5Hz提升到了20Hz。
| 特性 | 话题(Topic) | 服务(Service) | 动作(Action) |
|---|---|---|---|
| 通信模式 | 发布/订阅 | 请求/响应 | 目标+反馈+结果 |
| 是否持续 | 持续流 | 一次性 | 长期运行 |
| 是否有反馈 | 无 | 无 | 有 |
| 是否可取消 | 否 | 否 | 是 |
| 适用场景 | 传感器数据 | 瞬时查询 | 复杂任务 |
刚入门时容易困惑的一个问题是:既然动作底层用了话题和服务,为什么还要单独学一套API?原因在于动作封装了**生命周期管理**。当你取消一个动作时,服务器会自动清理正在执行的线程、释放资源,并通知客户端。这些逻辑如果用原生话题+服务来实现,需要自己写大量状态机代码。
# 动作客户端取消示例
async def cancel_goal(self):
cancel_future = await self._client.cancel_goal_async(self._goal_handle)
if cancel_future.returncode == CancelResponse.ACCEPT:
print("取消请求已接受")
从上图可以看到,一个动作通信背后涉及到4个通信端点:Goal服务、Cancel服务、Result服务、Feedback话题。ROS2的ActionClient和ActionServer把这4个端点封装成了一个简洁的接口。当你在客户端调用send_goal_async()时,底层自动完成了Goal服务的调用、反馈话题的订阅、Result服务的轮询以及Cancel服务的注册。
如果你看到类似Unable to find action server的报错,大概率是命名空间不匹配。动作服务器默认在/action_name命名空间下创建4个端点,客户端必须使用完全相同的名称。一个排查技巧是运行ros2 action list -t查看当前所有可用的动作。
MoveTo.action,包含目标坐标(x, y),反馈为剩余距离,结果为到达时间。/move_to发送目标,每秒打印一次剩余距离。.action文件声明,编译后自动生成目标、结果、反馈三个消息类型。