从协议原理到实际应用,深度理解 MQTT 如何在物联网、移动推送、实时消息等场景下实现高效的长连接通信。
MQTT(Message Queuing Telemetry Transport) 是一种基于 TCP/IP 的轻量级发布/订阅消息传输协议,由 IBM 于 1999 年设计,最初用于石油管道监控。它专为带宽有限、网络不稳定、计算资源受限的场景设计。
MQTT 的设计目标是:用最少的网络带宽和设备资源,可靠地传递消息。协议本身不关心消息内容,客户端与 Broker 建立一次 TCP 连接后长期保持,通过心跳包保活,极大减少了重复握手开销。
理解长连接的价值,需要先理解短连接的代价。
长连接的关键优势在于:客户端与服务端建立一次 TCP 连接后持续保持,无需每次通信都重新进行 TCP 三次握手(及 TLS 握手)。服务端可以随时主动推送消息给客户端,实现真正的双向实时通信。
任何能运行 MQTT 库的设备或应用。可以扮演 Publisher(发布者)、Subscriber(订阅者)或两者兼具。
核心中间件,接收所有消息、管理订阅关系、按 Topic 路由分发。常见实现:EMQX、Mosquitto、HiveMQ。
消息路由的地址,用 / 分层,如 home/room1/temp。支持 +(单层通配符)和 #(多层通配符)。
| 订阅 Topic | 匹配示例 | 说明 |
|---|---|---|
sensor/+/temp | sensor/room1/temp、sensor/lab/temp | + 匹配一个层级 |
sensor/# | sensor/room1/temp、sensor/room1/hum/detail | # 匹配零个或多个层级,只能在末尾 |
home/+/+/status | home/floor1/room2/status | 多个 + 组合 |
$SYS/# | Broker 系统内置 Topic | $SYS 开头为系统保留 |
MQTT 在 TCP 层之上增加了自己的应用层握手,点击下方步骤查看每一阶段详情:
MQTT 基于 TCP,首先完成标准 TCP 三次握手建立可靠传输信道:
SYN → Client 发起连接请求(序列号 x)
← SYN-ACK Server 确认并同步(序列号 y,ACK=x+1)
ACK → Client 确认建立(ACK=y+1)
如果是 MQTTS,此后还需 TLS 握手(2 次额外 RTT)。
TCP 连接建立后,Client 立即发送 MQTT CONNECT 报文,包含:
Clean Session = true:断线后清除所有会话状态(订阅、未投递消息)。
Clean Session = false:断线重连后恢复会话,离线消息补发。
Broker 验证认证信息后回复 CONNACK 报文:
| Return Code | 含义 |
|---|---|
0x00 | 连接成功 |
0x01 | 协议版本不支持 |
0x02 | Client ID 非法 |
0x04 | 用户名或密码错误 |
0x05 | 未授权 |
连接建立后,Client 可以:
• 发送 SUBSCRIBE 报文订阅感兴趣的 Topic,Broker 回复 SUBACK
• 发送 PUBLISH 报文向某个 Topic 发布消息
• Broker 将消息转发给所有订阅该 Topic 的 Client
整个过程无需重新握手,连接全程保持。
正常断开:Client 主动发送 DISCONNECT 报文,Broker 清理会话(若 Clean Session=true)。
异常断开:网络中断,Broker 在 Keepalive 超时后检测到,触发遗嘱消息(LWT)发送。
重连:Client 重新发送 CONNECT,若 Clean Session=false,Broker 恢复之前的会话状态。
Pub/Sub 模式的精髓是时间解耦、空间解耦、同步解耦:发布者和订阅者互不知晓对方存在,Broker 负责一切路由。
发布时设置 retain=true,Broker 会保存该 Topic 的最新消息。新订阅者订阅该 Topic 时,立即收到最近一条保留消息,无需等待下一次发布。常用于设备状态上报。
MQTT 提供三种服务质量级别,在可靠性与性能之间权衡:
最多投递一次
At most once
发送即忘,不保证到达。适合高频传感器数据(丢一条无妨)。
至少投递一次
At least once
保证至少到达一次,可能重复。接收方需做幂等处理。
恰好投递一次
Exactly once
四次握手保证恰好一次。用于金融交易、控制指令等不可重复场景。
每个 MQTT 报文由三部分组成:固定报头(2~5字节)+ 可变报头 + 有效载荷(Payload)。
| 报文类型 | 值 | 方向 | 用途 |
|---|---|---|---|
| CONNECT | 1 | Client → Broker | 请求建立连接 |
| CONNACK | 2 | Broker → Client | 连接确认 |
| PUBLISH | 3 | 双向 | 发布消息 |
| PUBACK | 4 | 双向 | QoS 1 确认 |
| PUBREC/REL/COMP | 5/6/7 | 双向 | QoS 2 四次握手 |
| SUBSCRIBE | 8 | Client → Broker | 订阅 Topic |
| SUBACK | 9 | Broker → Client | 订阅确认 |
| UNSUBSCRIBE | 10 | Client → Broker | 取消订阅 |
| PINGREQ/PINGRESP | 12/13 | 双向 | 心跳保活 |
| DISCONNECT | 14 | Client → Broker | 断开连接 |
长连接最大的挑战是检测死连接。防火墙、NAT 设备会在一段时间无数据流量后悄悄关闭 TCP 连接,而两端感知不到。MQTT 通过 Keepalive 心跳机制解决这个问题。
Client 在 CONNECT 报文中携带 keepalive 值(单位:秒)。约定:若 Client 在 keepalive 时间内没有发送任何报文,必须发送 PINGREQ。Broker 收到后回复 PINGRESP。
Broker 等待 1.5 × keepalive 时间内未收到任何报文,即判定连接断开,触发遗嘱消息。
LWT 是 MQTT 中一个优雅的设计:Client 在 CONNECT 时预先设定一条"遗嘱",若连接异常断开(非主动 DISCONNECT),Broker 会自动将遗嘱消息发布出去,通知其他订阅者该设备已下线。
• 设备在线状态监控:所有设备上线时发布 online,遗嘱设为 offline,监控系统实时感知设备掉线。
• 微服务健康检查:服务实例连接 MQTT Broker,下线时自动通知负载均衡器。
• 用户在线状态:IM 应用中,用户异常断线时自动更新好友在线状态。
| 特性 | MQTT | HTTP/1.1 | WebSocket | HTTP/2 SSE |
|---|---|---|---|---|
| 连接模型 | 长连接 | 短连接(可Keep-Alive) | 长连接 | 长连接(单向) |
| 双向通信 | ✓ 全双工 | ✗ 请求-响应 | ✓ 全双工 | → 服务端单向推 |
| 报头开销 | 2字节起 | 数百字节 | 2~10字节(帧) | HTTP头+数据行 |
| 消息路由 | Topic 发布订阅 | 点对点 URL | 点对点 | 点对点 |
| QoS 保障 | 0 / 1 / 2 三级 | 无(TCP层) | 无应用层QoS | 无 |
| 离线消息 | ✓ 持久会话 | ✗ | ✗ | ✗ |
| 设备断线感知 | ✓ LWT 遗嘱 | ✗ 需轮询 | 关闭事件 | 连接断开事件 |
| 适用场景 | IoT / 移动推送 / 实时监控 | Web API / 文件传输 | 在线游戏 / 协同编辑 | 股价推送 / 日志流 |
通过下方模拟器体验 MQTT 的发布订阅流程:
import paho.mqtt.client as mqtt
import json, time
# ---- 回调函数 ----
def on_connect(client, userdata, flags, rc):
print(f"Connected: rc={rc}")
# 连接成功后订阅
client.subscribe("sensor/#", qos=1)
def on_message(client, userdata, msg):
payload = json.loads(msg.payload.decode())
print(f"[{msg.topic}] {payload}")
def on_disconnect(client, userdata, rc):
print(f"Disconnected: rc={rc}")
if rc != 0:
client.reconnect() # 自动重连
# ---- 客户端配置 ----
client = mqtt.Client(client_id="device_001", clean_session=False)
client.username_pw_set("user", "pass")
# 遗嘱消息
client.will_set(
topic="devices/device_001/status",
payload=json.dumps({"status": "offline"}),
qos=1, retain=True
)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# 建立长连接(keepalive=60s)
client.connect("broker.emqx.io", port=1883, keepalive=60)
client.loop_start() # 后台线程维持连接
# 发布消息
while True:
msg = {"temp": 25.6, "ts": int(time.time())}
client.publish("sensor/temp/room1", json.dumps(msg), qos=1)
time.sleep(5)