IoT 长连接 发布订阅 QoS Broker

MQTT 长连接通信

从协议原理到实际应用,深度理解 MQTT 如何在物联网、移动推送、实时消息等场景下实现高效的长连接通信。

目录

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport) 是一种基于 TCP/IP 的轻量级发布/订阅消息传输协议,由 IBM 于 1999 年设计,最初用于石油管道监控。它专为带宽有限、网络不稳定、计算资源受限的场景设计。

协议开销
2 字节
最小固定报头,极低开销
标准端口
1883
TLS 加密版本使用 8883
连接模型
长连接
TCP 持久保持,心跳维活
消息模式
Pub/Sub
发布订阅,解耦生产消费

核心设计哲学

MQTT 的设计目标是:用最少的网络带宽和设备资源,可靠地传递消息。协议本身不关心消息内容,客户端与 Broker 建立一次 TCP 连接后长期保持,通过心跳包保活,极大减少了重复握手开销。

为什么需要长连接?

理解长连接的价值,需要先理解短连接的代价。

HTTP 短连接(每次请求) Client Server SYN SYN-ACK ACK GET /data HTTP/1.1 200 OK + Body FIN/FIN-ACK (四次挥手) 每次请求:7+ 次 RTT 握手 下次请求重复全部流程 ↑ MQTT 长连接(一次握手多次复用) Client Broker TCP + CONNECT CONNACK ✓ 持久连接保持 PUBLISH msg1 PUBLISH msg2 PINGREQ → PINGRESP PUBLISH msg3 握手 1 次,消息发送无限次 心跳维持连接 → 随时收发 ↑

长连接的关键优势在于:客户端与服务端建立一次 TCP 连接后持续保持,无需每次通信都重新进行 TCP 三次握手(及 TLS 握手)。服务端可以随时主动推送消息给客户端,实现真正的双向实时通信

三大角色:Client、Broker、Topic

Broker 消息路由中心 订阅管理 · 会话存储 · QoS 温度传感器 Publisher 摄像头设备 Publisher 手机 App Publisher + Subscriber 数据中心 Subscriber 告警系统 Subscriber 手机 App Publisher + Subscriber sensor/temp/01 camera/event app/control sensor/# camera/+ app/notify PUBLISH 发布者 Topic 路由匹配 SUBSCRIBE 订阅者

Client(客户端)

任何能运行 MQTT 库的设备或应用。可以扮演 Publisher(发布者)、Subscriber(订阅者)或两者兼具。

Broker(代理/服务端)

核心中间件,接收所有消息、管理订阅关系、按 Topic 路由分发。常见实现:EMQX、Mosquitto、HiveMQ

Topic(主题)

消息路由的地址,用 / 分层,如 home/room1/temp。支持 +(单层通配符)和 #(多层通配符)。

Topic 通配符示例

订阅 Topic匹配示例说明
sensor/+/tempsensor/room1/tempsensor/lab/temp+ 匹配一个层级
sensor/#sensor/room1/tempsensor/room1/hum/detail# 匹配零个或多个层级,只能在末尾
home/+/+/statushome/floor1/room2/status多个 + 组合
$SYS/#Broker 系统内置 Topic$SYS 开头为系统保留

完整连接握手流程

MQTT 在 TCP 层之上增加了自己的应用层握手,点击下方步骤查看每一阶段详情:

1
TCP 三次握手

MQTT 基于 TCP,首先完成标准 TCP 三次握手建立可靠传输信道:

SYN → Client 发起连接请求(序列号 x)

← SYN-ACK Server 确认并同步(序列号 y,ACK=x+1)

ACK → Client 确认建立(ACK=y+1)

如果是 MQTTS,此后还需 TLS 握手(2 次额外 RTT)。

2
CONNECT 报文

TCP 连接建立后,Client 立即发送 MQTT CONNECT 报文,包含:

CONNECT Packet
Packet Type
0x10 (CONNECT)
Protocol Name
MQTT
Protocol Level
4 (v3.1.1) / 5
Client ID
device_001
Keepalive
60s
Clean Session
true/false
Username
(可选)
Password
(可选)
Will Topic
遗嘱主题(可选)

Clean Session = true:断线后清除所有会话状态(订阅、未投递消息)。
Clean Session = false:断线重连后恢复会话,离线消息补发。

3
CONNACK 响应

Broker 验证认证信息后回复 CONNACK 报文:

CONNACK Packet
Packet Type
0x20 (CONNACK)
Session Present
0/1
Return Code
0x00 = 连接成功
Return Code含义
0x00连接成功
0x01协议版本不支持
0x02Client ID 非法
0x04用户名或密码错误
0x05未授权
4
订阅 / 发布

连接建立后,Client 可以:

• 发送 SUBSCRIBE 报文订阅感兴趣的 Topic,Broker 回复 SUBACK
• 发送 PUBLISH 报文向某个 Topic 发布消息
• Broker 将消息转发给所有订阅该 Topic 的 Client

整个过程无需重新握手,连接全程保持。

5
DISCONNECT / 断线

正常断开:Client 主动发送 DISCONNECT 报文,Broker 清理会话(若 Clean Session=true)。

异常断开:网络中断,Broker 在 Keepalive 超时后检测到,触发遗嘱消息(LWT)发送。

重连:Client 重新发送 CONNECT,若 Clean Session=false,Broker 恢复之前的会话状态。

发布订阅模型(Pub/Sub)

Pub/Sub 模式的精髓是时间解耦、空间解耦、同步解耦:发布者和订阅者互不知晓对方存在,Broker 负责一切路由。

Publisher A 发布 home/temp Broker Topic: home/temp 订阅者:[B, C] Retain: 最新消息保留 Subscriber B 订阅 home/# Subscriber C 订阅 home/temp PUBLISH 转发给订阅者

Retained Message(保留消息)

发布时设置 retain=true,Broker 会保存该 Topic 的最新消息。新订阅者订阅该 Topic 时,立即收到最近一条保留消息,无需等待下一次发布。常用于设备状态上报。

QoS 消息质量等级

MQTT 提供三种服务质量级别,在可靠性与性能之间权衡:

0

QoS 0

最多投递一次
At most once

PUBLISH

发送即忘,不保证到达。适合高频传感器数据(丢一条无妨)。

1

QoS 1

至少投递一次
At least once

PUBLISH PUBACK

保证至少到达一次,可能重复。接收方需做幂等处理。

2

QoS 2

恰好投递一次
Exactly once

PUBLISH PUBREC PUBREL PUBCOMP

四次握手保证恰好一次。用于金融交易、控制指令等不可重复场景。

QoS 2 四次握手详解 Sender Receiver ① PUBLISH (QoS=2, DUP=0, PacketID=100) ② PUBREC (PacketID=100) — 消息已收到,暂不投递 ③ PUBREL (PacketID=100) — 确认释放,可以投递 ④ PUBCOMP (PacketID=100) — 完成,删除消息记录 Receiver 在收到 PUBREL 之后才真正将消息投递给上层应用,确保 exactly-once

MQTT 数据包(Packet)格式

每个 MQTT 报文由三部分组成:固定报头(2~5字节)+ 可变报头 + 有效载荷(Payload)

固定报头 Byte 1: 报文类型(4bit) + 标志位(4bit) Byte 2+: 剩余长度 (变长编码,1-4字节) 最小 2 字节 可变报头 内容因报文类型而异 PUBLISH: Topic Name + Packet ID (QoS>0) CONNECT: Protocol Info 可选,部分报文没有 有效载荷(Payload) 应用层实际数据,格式由应用定义 PUBLISH: 消息内容(JSON/Binary/Text) CONNECT: ClientID / Username / Password / Will SUBSCRIBE: Topic 列表 + QoS 最大 256MB(实际建议 < 256KB)

报文类型速查

报文类型方向用途
CONNECT1Client → Broker请求建立连接
CONNACK2Broker → Client连接确认
PUBLISH3双向发布消息
PUBACK4双向QoS 1 确认
PUBREC/REL/COMP5/6/7双向QoS 2 四次握手
SUBSCRIBE8Client → Broker订阅 Topic
SUBACK9Broker → Client订阅确认
UNSUBSCRIBE10Client → Broker取消订阅
PINGREQ/PINGRESP12/13双向心跳保活
DISCONNECT14Client → Broker断开连接

Keepalive 心跳保活

长连接最大的挑战是检测死连接。防火墙、NAT 设备会在一段时间无数据流量后悄悄关闭 TCP 连接,而两端感知不到。MQTT 通过 Keepalive 心跳机制解决这个问题。

心跳机制原理

Client 在 CONNECT 报文中携带 keepalive 值(单位:秒)。约定:若 Client 在 keepalive 时间内没有发送任何报文,必须发送 PINGREQ。Broker 收到后回复 PINGRESP。

Broker 等待 1.5 × keepalive 时间内未收到任何报文,即判定连接断开,触发遗嘱消息。

Client Broker t=0s t=55s t=60s t=90s PUBLISH msg (有业务数据,不需要心跳) PINGREQ (55s 内无业务数据,发送心跳) PINGRESP (Broker 响应,连接保活成功) 若 1.5×60=90s 内无任何包 → Broker 判断超时,触发 LWT,关闭连接
推荐 Keepalive
60s
移动网络推荐 30-60s
Broker 超时判定
1.5×
1.5 倍 Keepalive 后判超时
设为 0
禁用
关闭心跳检测(不推荐)

Last Will & Testament(遗嘱消息)

LWT 是 MQTT 中一个优雅的设计:Client 在 CONNECT 时预先设定一条"遗嘱",若连接异常断开(非主动 DISCONNECT),Broker 会自动将遗嘱消息发布出去,通知其他订阅者该设备已下线。

设备 A 突然断电 Broker 检测到 A 超时 → 触发遗嘱消息 Topic: devices/A/status 监控系统 订阅 devices/# CONNECT (带遗嘱) 断线 / 超时 LWT: {"status":"offline"} 遗嘱消息参数(在 CONNECT 报文中预设) Will Topic: devices/A/status Will Payload: {"status":"offline"} Will QoS: 1 | Will Retain: true

典型应用场景

设备在线状态监控:所有设备上线时发布 online,遗嘱设为 offline,监控系统实时感知设备掉线。

微服务健康检查:服务实例连接 MQTT Broker,下线时自动通知负载均衡器。

用户在线状态:IM 应用中,用户异常断线时自动更新好友在线状态。

MQTT vs HTTP vs WebSocket

特性 MQTT HTTP/1.1 WebSocket HTTP/2 SSE
连接模型 长连接 短连接(可Keep-Alive) 长连接 长连接(单向)
双向通信 ✓ 全双工 ✗ 请求-响应 ✓ 全双工 → 服务端单向推
报头开销 2字节起 数百字节 2~10字节(帧) HTTP头+数据行
消息路由 Topic 发布订阅 点对点 URL 点对点 点对点
QoS 保障 0 / 1 / 2 三级 无(TCP层) 无应用层QoS
离线消息 ✓ 持久会话
设备断线感知 ✓ LWT 遗嘱 ✗ 需轮询 关闭事件 连接断开事件
适用场景 IoT / 移动推送 / 实时监控 Web API / 文件传输 在线游戏 / 协同编辑 股价推送 / 日志流

MQTT 通信模拟器

通过下方模拟器体验 MQTT 的发布订阅流程:

MQTT 消息流模拟

Publisher(发布者)

B
Broker
就绪

Subscriber(订阅者)

当前订阅:无

代码实现示例

import paho.mqtt.client as mqtt
import json, time

# ---- 回调函数 ----
def on_connect(client, userdata, flags, rc):
    print(f"Connected: rc={rc}")
    # 连接成功后订阅
    client.subscribe("sensor/#", qos=1)

def on_message(client, userdata, msg):
    payload = json.loads(msg.payload.decode())
    print(f"[{msg.topic}] {payload}")

def on_disconnect(client, userdata, rc):
    print(f"Disconnected: rc={rc}")
    if rc != 0:
        client.reconnect()  # 自动重连

# ---- 客户端配置 ----
client = mqtt.Client(client_id="device_001", clean_session=False)
client.username_pw_set("user", "pass")

# 遗嘱消息
client.will_set(
    topic="devices/device_001/status",
    payload=json.dumps({"status": "offline"}),
    qos=1, retain=True
)

client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

# 建立长连接(keepalive=60s)
client.connect("broker.emqx.io", port=1883, keepalive=60)
client.loop_start()  # 后台线程维持连接

# 发布消息
while True:
    msg = {"temp": 25.6, "ts": int(time.time())}
    client.publish("sensor/temp/room1", json.dumps(msg), qos=1)
    time.sleep(5)