MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,专为受限设备和不稳定网络设计。它在物联网(IoT)、车联网、工业控制、移动推送等场景中被广泛使用。
MQTT 的设计哲学是”简单可靠”——协议报文紧凑(最小 2 字节),支持三种服务质量(QoS)级别,内置会话恢复和遗嘱消息。但”简单”不意味着”容易用好”——QoS 级别选择、会话管理、Broker 集群架构——每一个工程决策都影响系统的可靠性和性能。
一、MQTT 协议基础
1.1 发布/订阅模型
MQTT 通信模型:
发布者 A ──→┐ ┌──→ 订阅者 X
发布者 B ──→├──→ Broker ──→──→──→├──→ 订阅者 Y
发布者 C ──→┘ (中间人) └──→ 订阅者 Z
特点:
1. 发布者和订阅者完全解耦(不需要知道对方的存在)
2. Broker 负责消息路由(根据 Topic 匹配)
3. 支持一对多广播(一个 Topic 多个订阅者)
4. 支持多对一聚合(多个发布者同一个 Topic)
与请求/响应模型对比:
HTTP: 客户端 → 服务端 → 响应
MQTT: 发布者 → Broker → 订阅者(异步、解耦)
1.2 Topic 设计
Topic 是 MQTT 消息路由的核心。它是一个 UTF-8 字符串,
用 / 分隔层级。
Topic 示例:
home/living-room/temperature ← 家庭温度传感器
factory/line-3/machine-7/status ← 工厂设备状态
vehicle/VIN001/gps/location ← 车辆 GPS 位置
通配符(仅订阅时使用):
+ 单层通配: home/+/temperature
匹配: home/living-room/temperature
匹配: home/bedroom/temperature
不匹配: home/floor1/room2/temperature
# 多层通配: home/#
匹配: home/living-room/temperature
匹配: home/bedroom/humidity
匹配: home/garage/door/status
Topic 设计最佳实践:
✅ 使用层级结构: region/building/floor/room/sensor
✅ 用小写字母和连字符: device-status (不是 DeviceStatus)
✅ 不以 / 开头(会产生空的第一层级)
✅ 具体到可以过滤: sensor/temp-01/value(不是 sensor/data)
❌ 避免空层级: a//b
❌ 避免过深的层级(>7 层影响性能)
1.3 Topic 设计反模式
反模式 1: 把 Payload 信息放进 Topic
❌ sensor/temp-01/value/23.5
✅ sensor/temp-01/value → Payload: 23.5
原因: Topic 是路由路径,不是数据载体。
动态 Topic 会导致通配符订阅匹配不到。
反模式 2: 每个消息用唯一 Topic
❌ events/{uuid} → 每条消息一个 Topic
✅ events/order → Payload 里带消息 ID
原因: 路由表膨胀,Retained Message 无意义,
订阅者无法用通配符高效订阅。
反模式 3: 扁平 Topic 命名
❌ factory_line3_machine7_temperature
✅ factory/line-3/machine-7/temperature
原因: 无法利用通配符做层级过滤。
反模式 4: 以 $ 开头(保留前缀)
❌ $myapp/events
✅ myapp/events
原因: $SYS/ 是 Broker 系统 Topic 前缀,
$ 开头的 Topic 不会被 # 通配符匹配。
1.3 MQTT 报文格式
MQTT 控制报文结构:
┌─────────────────────┐
│ Fixed Header (2+ B) │ ← 所有报文都有
├─────────────────────┤
│ Variable Header │ ← 部分报文有
├─────────────────────┤
│ Payload │ ← 部分报文有
└─────────────────────┘
Fixed Header:
Bit 7-4: 报文类型(4 bits)
Bit 3-0: 标志位(4 bits,含 QoS、Retain 等)
Remaining Length: 1-4 字节可变长度编码
报文类型:
1 CONNECT ← 客户端连接请求
2 CONNACK ← 连接确认
3 PUBLISH ← 发布消息
4 PUBACK ← QoS 1 发布确认
5 PUBREC ← QoS 2 发布接收
6 PUBREL ← QoS 2 发布释放
7 PUBCOMP ← QoS 2 发布完成
8 SUBSCRIBE ← 订阅请求
9 SUBACK ← 订阅确认
10 UNSUBSCRIBE← 取消订阅
11 UNSUBACK ← 取消订阅确认
12 PINGREQ ← 心跳请求
13 PINGRESP ← 心跳响应
14 DISCONNECT ← 断开连接
最小报文大小: PINGREQ = 2 字节
二、连接管理
2.1 CONNECT 报文
CONNECT 报文的关键字段:
Protocol Name: MQTT
Protocol Level: 4 (MQTT 3.1.1) 或 5 (MQTT 5.0)
Client ID: 客户端唯一标识
Clean Session: 0 或 1(是否建立持久会话)
Keep Alive: 心跳间隔(秒)
Username/Password: 可选的认证信息
Will Flag: 是否设置遗嘱消息
Will Topic: 遗嘱消息的 Topic
Will Message: 遗嘱消息的内容
Will QoS: 遗嘱消息的 QoS
Will Retain: 遗嘱消息是否 Retain
连接流程:
客户端 → CONNECT → Broker
Broker → CONNACK → 客户端
CONNACK 包含: 返回码(0=成功)、Session Present 标志
2.2 Clean Session vs Persistent Session
Clean Session = 1 (非持久会话):
- 连接断开后,Broker 丢弃所有会话状态
- 包括: 订阅关系、未确认的消息、排队的消息
- 每次连接都是"全新开始"
- 适用于: 临时客户端、无状态传感器
Clean Session = 0 (持久会话):
- 连接断开后,Broker 保留会话状态
- 保留: 订阅关系、QoS 1/2 的待确认消息、离线消息队列
- 重连后: Broker 发送 Session Present=1,并补发离线消息
- 适用于: 需要消息不丢失的设备
CONNACK Session Present 标志:
Session Present=0: Broker 没有该 Client ID 的已有会话
Session Present=1: Broker 恢复了之前的会话(有离线消息待发送)
客户端应根据 Session Present 决定是否重新订阅:
if Session Present == 0:
重新订阅所有 Topic
else:
订阅关系已恢复,不需要重新订阅
2.3 Keep Alive 与心跳
Keep Alive 机制:
客户端在 CONNECT 中指定 Keep Alive 间隔(秒)
客户端必须在 1.5 × Keep Alive 内发送至少一个报文
如果没有业务数据,发送 PINGREQ
Broker 收到 PINGREQ 后回复 PINGRESP
如果 Broker 在 1.5 × Keep Alive 内未收到任何报文,断开连接
Keep Alive = 0:
禁用心跳,Broker 不会因为超时断开连接
不推荐: 无法检测死连接
Keep Alive 选择:
IoT 设备(移动网络): 30-60 秒
局域网设备: 60-120 秒
低功耗设备: 300-600 秒(省电优先)
与 WebSocket 心跳的区别:
MQTT: 协议层面的 PINGREQ/PINGRESP
WebSocket: 帧层面的 Ping/Pong
如果 MQTT 运行在 WebSocket 上,两者都存在
三、QoS:服务质量
QoS 是 MQTT 最核心的工程概念。它定义了消息传递的可靠性保证。
3.1 QoS 0:最多一次
QoS 0 消息流(At Most Once):
发布者 Broker 订阅者
│ │ │
│ PUBLISH │ │
│───────────────→│ │
│ │ PUBLISH │
│ │─────────────→│
│ │ │
特点:
- "发后即忘",不保证送达
- 没有确认机制
- 消息可能丢失(网络中断时)
- 不消耗存储(Broker 不持久化)
- 最低延迟、最高吞吐
适用场景:
- 高频传感器数据(温度每秒上报)
- 丢失几条数据可以接受
- 带宽和电量极度受限的设备
3.2 QoS 1:至少一次
QoS 1 消息流(At Least Once):
发布者 Broker 订阅者
│ │ │
│ PUBLISH │ │
│ (PacketID=1) │ │
│───────────────→│ │
│ │ PUBLISH │
│ │ (PacketID=7)│
│ │─────────────→│
│ │ │
│ PUBACK │ PUBACK │
│ (PacketID=1) │ (PacketID=7)│
│←───────────────│←─────────────│
│ │ │
特点:
- 保证至少送达一次
- 可能重复(网络波动时 PUBACK 丢失,触发重发)
- 需要 Packet ID 用于确认
- Broker 在收到 PUBACK 前会持久化消息
重复消息处理:
- 接收方必须能处理重复消息(幂等性)
- 或使用消息 ID 去重
适用场景:
- 告警通知(宁可重复,不能遗漏)
- 命令下发(用幂等操作处理重复)
3.3 QoS 2:恰好一次
QoS 2 消息流(Exactly Once)— 四步握手:
发布者 Broker 订阅者
│ │ │
│ PUBLISH │ │
│ (PacketID=1) │ │
│───────────────→│ │
│ │ │
│ PUBREC │ │
│ (PacketID=1) │ │
│←───────────────│ │
│ │ │
│ PUBREL │ │
│ (PacketID=1) │ │
│───────────────→│ │
│ │ PUBLISH │
│ │ (PacketID=7)│
│ │─────────────→│
│ │ │
│ PUBCOMP │ (同样 4 步) │
│ (PacketID=1) │ │
│←───────────────│ │
│ │ │
四步握手:
1. PUBLISH → 发送消息
2. PUBREC ← 确认收到(不转发)
3. PUBREL → 确认可以释放
4. PUBCOMP ← 确认完成
特点:
- 保证消息恰好送达一次,不重复
- 最高开销(4 次握手)
- 最高延迟
- Broker 需要存储中间状态
适用场景:
- 计费消息
- 金融交易指令
- 关键控制命令(不能重复执行)
3.4 QoS 对比
指标 │ QoS 0 │ QoS 1 │ QoS 2
─────────────┼────────────┼────────────┼────────────
保证 │ 最多一次 │ 至少一次 │ 恰好一次
可能丢失 │ ✅ │ ❌ │ ❌
可能重复 │ ❌ │ ✅ │ ❌
握手次数 │ 1 │ 2 │ 4
带宽开销 │ 最低 │ 中 │ 最高
延迟 │ 最低 │ 中 │ 最高
Broker 存储 │ 不需要 │ 需要 │ 需要
适用场景 │ 遥测数据 │ 告警/命令 │ 计费/交易
注意: 发布和订阅的 QoS 独立设置
发布者 QoS 2 + 订阅者 QoS 1 = 消息按 QoS 1 送达(取最小值)
这叫做 "QoS 降级"(QoS Downgrade)
四、Retained Message 与 Last Will
4.1 Retained Message(保留消息)
保留消息:
发布时设置 Retain=1
Broker 为每个 Topic 保存最后一条 Retain 消息
新订阅者连接后立即收到该 Topic 的最后一条 Retain 消息
用途:
设备上报当前状态(温度、在线/离线)
新订阅者不需要等待下一次上报就能获取最新状态
示例:
设备发布: PUBLISH topic=sensor/temp retain=1 payload="23.5°C"
→ Broker 保存 sensor/temp 的最新值为 "23.5°C"
→ 新订阅者订阅 sensor/temp → 立即收到 "23.5°C"
清除 Retain 消息:
发布一条 payload 为空的 Retain 消息
PUBLISH topic=sensor/temp retain=1 payload=""
4.2 Last Will and Testament(遗嘱消息)
遗嘱消息:
客户端在 CONNECT 时设置遗嘱(Will)
如果客户端异常断开(未发送 DISCONNECT),
Broker 自动发布遗嘱消息
典型用途 — 设备在线/离线状态:
连接时:
CONNECT
Will Topic: device/sensor-01/status
Will Message: {"status":"offline","time":"..."}
Will QoS: 1
Will Retain: 1
连接成功后,设备发布:
PUBLISH topic=device/sensor-01/status retain=1
payload={"status":"online","time":"..."}
效果:
设备在线: topic 保留值为 {"status":"online",...}
设备异常断开: Broker 自动发布 {"status":"offline",...}
其他客户端订阅 device/sensor-01/status 可以实时感知设备状态
注意:
正常断开(发送 DISCONNECT)不会触发遗嘱消息
只有异常断开(网络中断、Keep Alive 超时)才触发
五、MQTT 5.0 新特性
MQTT 5.0(2019 年发布)在 3.1.1 基础上增加了大量面向工程的新特性。
5.1 核心新特性
1. 用户属性(User Properties)
所有报文都可以携带键值对元数据
用途: 追踪 ID、消息来源、自定义路由信息
PUBLISH topic=events/order
User Property: trace-id=abc-123
User Property: source=payment-service
2. 原因码(Reason Code)
所有确认报文都包含原因码
替代 3.1.1 中简单的成功/失败
0x00 成功
0x04 断开连接(遗嘱消息)
0x10 没有匹配的订阅者
0x80 未指定错误
0x83 实现特有错误
0x87 未授权
0x97 配额超出
3. 共享订阅(Shared Subscriptions)
多个客户端共享一个订阅,消息负载均衡
Topic: $share/group-name/sensor/data
效果: 消息只发送给组内的一个客户端
用途: 消费者水平扩展,类似 Kafka Consumer Group
4. 消息过期(Message Expiry Interval)
为消息设置 TTL(秒)
超过 TTL 的消息被 Broker 丢弃
用途: 告警消息 10 分钟后不再有意义
5. 主题别名(Topic Alias)
用短整数代替长 Topic 字符串
减少重复发送长 Topic 的带宽开销
Topic Alias: 7 → 替代 "factory/line-3/machine-7/temperature"
6. 流控(Receive Maximum)
限制对端的未确认消息数
防止快发布者压垮慢消费者
7. Clean Start + Session Expiry Interval
替代 3.1.1 的 Clean Session
Clean Start=1: 新建会话
Clean Start=0: 尝试恢复会话
Session Expiry Interval: 会话过期时间(秒)
0: 断开即删除(等同于 Clean Session=1)
0xFFFFFFFF: 永不过期
3600: 断开后保留 1 小时
5.2 MQTT 3.1.1 vs 5.0
特性 │ MQTT 3.1.1 │ MQTT 5.0
────────────────────────┼──────────────┼──────────────
原因码 │ ❌ 简单返回码 │ ✅ 详细原因码
用户属性 │ ❌ │ ✅
共享订阅 │ ❌(非标准扩展)│ ✅ 标准支持
消息过期 │ ❌ │ ✅
Topic 别名 │ ❌ │ ✅
流控 │ ❌ │ ✅ Receive Maximum
会话过期 │ 二选一 │ 精确控制秒数
请求/响应 │ ❌(需自己实现)│ ✅ Response Topic
服务端断开原因 │ ❌ 直接断开 │ ✅ DISCONNECT + 原因
认证增强 │ 用户名/密码 │ ✅ AUTH 报文
迁移建议:
新项目直接使用 MQTT 5.0
旧项目评估 Broker 和客户端库的 5.0 支持后迁移
EMQX 5.x、Mosquitto 2.x、HiveMQ 4.x 均支持 5.0
六、MQTT Broker 架构
6.1 主流 Broker 对比
Broker │ 语言 │ 集群支持 │ 连接上限 │ 适用场景
──────────────┼──────────┼─────────┼──────────────┼───────────────
EMQX │ Erlang │ ✅ 原生 │ 1 亿+/集群 │ 大规模 IoT 平台
Mosquitto │ C │ ❌ 桥接 │ ~10 万/单节点 │ 边缘/小规模
HiveMQ │ Java │ ✅ │ 千万级 │ 企业级
VerneMQ │ Erlang │ ✅ │ 百万级 │ 开源替代
NanoMQ │ C │ ❌ │ ~10 万 │ 边缘轻量级
选型建议:
个人/小项目: Mosquitto(简单、资源占用小)
生产环境: EMQX(功能全、社区活跃、中文文档好)
企业合规: HiveMQ(商业支持、合规认证)
边缘计算: NanoMQ(轻量、低延迟)
6.2 EMQX 集群架构
EMQX 集群:
客户端 ──→ LB ──→ EMQX Node 1 ──┐
客户端 ──→ LB ──→ EMQX Node 2 ──├── 集群内消息路由
客户端 ──→ LB ──→ EMQX Node 3 ──┘
核心机制:
1. 路由表: 全集群共享订阅关系
Topic sensor/temp → [Node1-ClientA, Node3-ClientB]
2. 消息转发: 发布到 Node 2 的消息自动路由到 Node 1 和 Node 3
3. 会话迁移: 持久会话可以在节点间迁移
配置示例(emqx.conf):
node.name = emqx@10.0.1.10
cluster.discovery_strategy = static
cluster.static.seeds = [emqx@10.0.1.10, emqx@10.0.1.11, emqx@10.0.1.12]
# 连接限制
listeners.tcp.default.max_connections = 1024000
listeners.tcp.default.max_conn_rate = 1000
# 消息限制
mqtt.max_packet_size = 1MB
mqtt.max_topic_levels = 10
mqtt.max_qos_allowed = 2
6.3 安全配置
MQTT 安全层级:
1. 传输层安全(TLS/SSL)
listener.ssl.external.keyfile = /etc/emqx/certs/server.key
listener.ssl.external.certfile = /etc/emqx/certs/server.crt
listener.ssl.external.cacertfile = /etc/emqx/certs/ca.crt
listener.ssl.external.verify = verify_peer # mTLS
2. 认证(Authentication)
- 用户名/密码(内置、MySQL、PostgreSQL、Redis)
- JWT Token
- X.509 客户端证书
- HTTP 回调(外部认证服务)
3. 授权(Authorization)
- ACL 规则: 控制客户端对 Topic 的发布/订阅权限
- 示例:
{allow, {user, "sensor-01"}, publish, ["device/sensor-01/#"]}.
{allow, {user, "dashboard"}, subscribe, ["device/+/status"]}.
{deny, all}.
4. 速率限制
- 连接速率: 每秒最大新连接数
- 消息速率: 每客户端每秒最大消息数
- 带宽限制: 每客户端最大数据速率
6.4 大规模连接的工程挑战
百万连接级别的工程要点:
1. 操作系统调优
# 文件描述符限制
ulimit -n 2000000
# 或在 /etc/security/limits.conf 中设置:
emqx soft nofile 2000000
emqx hard nofile 2000000
# 内核参数
sysctl -w net.core.somaxconn=32768
sysctl -w net.ipv4.tcp_max_syn_backlog=16384
sysctl -w net.core.netdev_max_backlog=16384
sysctl -w fs.file-max=2000000
sysctl -w net.ipv4.ip_local_port_range="1024 65535"
2. Broker 内存估算
每个连接的内存消耗(EMQX 估算):
┌──────────────────────┬─────────────┐
│ 组件 │ 内存 │
├──────────────────────┼─────────────┤
│ TCP 连接 │ ~2 KB │
│ 会话状态 │ ~1 KB │
│ 订阅关系(每个) │ ~0.3 KB │
│ 消息队列(Clean=0) │ 可变 │
└──────────────────────┴─────────────┘
100 万连接 × 3 KB ≈ 3 GB 基础内存
加上消息缓冲、路由表 → 实际需要 8-16 GB
3. 集群扩展策略
单节点: Mosquitto ~10 万连接
EMQX 单节点: ~100 万连接(调优后)
EMQX 集群: 线性扩展(3 节点 → ~300 万连接)
超大规模: 按地域分集群 + 跨集群桥接
4. 连接风暴防护
大量设备同时重连(如断网恢复)会产生连接风暴
防护措施:
- 客户端随机延迟重连(jitter)
- Broker 设置连接速率限制(max_conn_rate)
- LB 层面限流保护
6.5 Go 语言 MQTT 客户端实践
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
opts := mqtt.NewClientOptions().
AddBroker("tcp://broker.example.com:1883").
SetClientID("go-sensor-01").
SetUsername("sensor").
SetPassword("secret").
SetKeepAlive(60 * time.Second).
SetCleanSession(false). // 持久会话
SetAutoReconnect(true). // 自动重连
SetMaxReconnectInterval(30 * time.Second).
SetConnectionLostHandler(func(c mqtt.Client, err error) {
log.Printf("连接丢失: %v", err)
}).
SetOnConnectHandler(func(c mqtt.Client) {
log.Println("已连接,订阅 Topic...")
// 连接成功后订阅(处理重连场景)
if token := c.Subscribe("command/#", 1, handleCommand); token.Wait() && token.Error() != nil {
log.Printf("订阅失败: %v", token.Error())
}
}).
SetWill("device/go-sensor-01/status",
`{"status":"offline"}`, 1, true) // 遗嘱消息
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("连接失败: %v", token.Error())
}
// 上报在线状态(Retained)
client.Publish("device/go-sensor-01/status",
1, true, `{"status":"online"}`)
// 定时上报传感器数据
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-ticker.C:
payload := fmt.Sprintf(`{"temp":%.1f,"ts":%d}`,
22.5+float64(time.Now().Unix()%10)/10,
time.Now().Unix())
token := client.Publish("sensor/go-sensor-01/temp",
0, false, payload)
token.Wait()
case <-sigCh:
log.Println("正常断开...")
client.Publish("device/go-sensor-01/status",
1, true, `{"status":"offline"}`)
client.Disconnect(1000) // 等 1 秒发完
return
}
}
}
func handleCommand(c mqtt.Client, msg mqtt.Message) {
log.Printf("收到命令 [%s]: %s", msg.Topic(), msg.Payload())
}七、MQTT over WebSocket
浏览器无法直接使用 TCP,因此 MQTT 支持通过 WebSocket 传输:
架构:
浏览器 ──WebSocket──→ Broker (ws://broker:8083/mqtt)
IoT 设备 ──TCP──→ Broker (tcp://broker:1883)
两种传输的客户端可以互相通信
Topic 和消息路由对传输层透明
// 浏览器端 MQTT 客户端(使用 mqtt.js)
import mqtt from 'mqtt';
const client = mqtt.connect('wss://broker.example.com:8084/mqtt', {
clientId: 'web-dashboard-' + Math.random().toString(16).substr(2, 8),
username: 'dashboard',
password: 'secret',
clean: true,
keepalive: 30,
reconnectPeriod: 3000, // 自动重连间隔
});
client.on('connect', () => {
console.log('Connected to MQTT broker');
client.subscribe('device/+/status', { qos: 1 });
client.subscribe('alerts/#', { qos: 1 });
});
client.on('message', (topic, message) => {
const data = JSON.parse(message.toString());
console.log(`[${topic}]`, data);
if (topic.startsWith('alerts/')) {
showAlert(data);
}
});
client.on('error', (error) => {
console.error('MQTT error:', error);
});
// 发布消息
client.publish('dashboard/command', JSON.stringify({
action: 'restart',
target: 'sensor-01',
}), { qos: 1 });八、MQTT vs 其他消息协议
协议 │ 传输层 │ 消息模型 │ QoS │ 适用场景
──────────┼─────────┼──────────────┼─────────┼──────────────────
MQTT │ TCP/WS │ Pub/Sub │ 0/1/2 │ IoT, 推送
AMQP │ TCP │ Queue + Topic│ 事务 │ 企业集成
Kafka │ TCP │ Log + Group │ 确认 │ 大数据流
NATS │ TCP │ Pub/Sub │ 0/1 │ 微服务通信
CoAP │ UDP │ REQ/RES │ CON/NON │ 极低功耗 IoT
HTTP SSE │ HTTP │ 单向推送 │ 无 │ Web 实时
MQTT vs AMQP:
MQTT: 更轻量、更适合受限设备、协议更简单
AMQP: 更丰富的路由(Exchange/Queue/Binding)、更适合企业集成
MQTT vs Kafka:
MQTT: 面向设备通信、低延迟、轻量客户端
Kafka: 面向数据管道、高吞吐、消息持久化
MQTT vs CoAP:
MQTT: TCP(可靠)、长连接、支持推送
CoAP: UDP(轻量)、请求/响应、更适合极低功耗场景
九、MQTT 监控与故障排查
9.1 关键监控指标
Broker 级指标:
- 连接数(当前/最大/历史峰值)
- 消息吞吐(收/发/丢弃,按 QoS 分)
- 订阅数(总数/每 Topic 数)
- 内存使用(会话/消息队列/保留消息)
- 集群状态(节点健康/路由表同步延迟)
客户端级指标:
- 连接时长
- 消息频率(发布/接收)
- 队列堆积(持久会话的离线消息数)
- 重连次数
告警规则:
- 连接数超过阈值的 80%
- 消息丢弃率 > 0.1%
- 持久会话的离线消息队列 > 10000 条
- 客户端重连频率 > 每分钟 5 次
9.2 常见故障
故障 1: 消息丢失
可能原因:
- QoS 0 + 网络不稳定 → 使用 QoS 1
- Clean Session=1 + 频繁断线 → 改为 Clean Session=0
- Broker 内存不足导致消息被丢弃 → 扩容
故障 2: 消息重复
可能原因:
- QoS 1 + 网络波动导致 PUBACK 延迟 → 应用层去重
- 客户端频繁重连 → 检查 Clean Session 设置
故障 3: 连接频繁断开
可能原因:
- Keep Alive 太短 → 增大 Keep Alive(60s+)
- Client ID 冲突(两个设备用同一个 ID)→ 确保唯一
- Broker 资源不足 → 检查内存/CPU/连接数
故障 4: 消息延迟高
可能原因:
- QoS 2 的四步握手 → 评估是否可以降到 QoS 1
- 订阅者处理太慢导致消息堆积 → 优化消费速度
- 集群间消息路由延迟 → 检查集群网络
排查工具:
mosquitto_sub -h broker -t '#' -v # 订阅所有消息
mosquitto_pub -h broker -t test -m "hello" # 发布测试
emqx_ctl clients list # EMQX 查看客户端列表
emqx_ctl topics list # EMQX 查看 Topic 列表
十、总结
MQTT 的工程核心是三个决策:QoS 级别选择、会话管理策略、Broker 架构设计。
QoS 选择要匹配场景。遥测数据用 QoS 0,告警命令用 QoS 1,计费交易用 QoS 2。不要无脑用 QoS 2——它的四步握手开销是 QoS 0 的 4 倍。
会话管理决定消息可靠性。Clean Session=0 配合 QoS 1/2 才能实现离线消息不丢失。但持久会话会消耗 Broker 内存,需要设置消息队列上限。
Topic 设计影响整个系统。层级化、可过滤、不过深——好的 Topic 设计让消息路由高效,坏的 Topic 设计让系统混乱。
MQTT 5.0 值得迁移。共享订阅、消息过期、用户属性、详细原因码——这些特性解决了 3.1.1 的大量工程痛点。
Retained Message + Last Will 实现设备状态感知。这是 MQTT 区别于其他消息协议的独特能力——新订阅者立即获取最新状态,设备异常断开自动通知。
Broker 选型看规模。个人项目用 Mosquitto,生产环境用 EMQX,企业合规用 HiveMQ。
参考文献
- MQTT v5.0 Specification (docs.oasis-open.org/mqtt/mqtt/v5.0)
- MQTT v3.1.1 Specification (docs.oasis-open.org/mqtt/mqtt/v3.1.1)
- EMQX Documentation (docs.emqx.com)
- Eclipse Mosquitto (mosquitto.org)
- HiveMQ MQTT Essentials (hivemq.com/mqtt-essentials)
下一篇:协议选型决策树:REST vs gRPC vs GraphQL vs WebSocket
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【网络工程】带宽管理与流量整形:tc、QoS 与拥塞管理
带宽管理是网络工程的核心能力。本文从 Linux tc 的 qdisc/class/filter 三层模型出发,系统讲解 HTB 分层令牌桶、TBF 令牌桶过滤器、FQ/fq_codel 公平队列、tc-bpf 可编程流量控制、netem 网络模拟,以及 QoS 策略设计和容量规划的工程实践。
【网络工程】DNS 协议解剖:查询格式、记录类型与响应码
DNS 是互联网最基础的目录服务,也是最脆弱的单点之一。本文从 wire format 出发逐字段解析 DNS 报文结构,详解 A/AAAA/CNAME/MX/SRV/TXT/NS/SOA 等记录类型的工程用途,分析 EDNS0 扩展与 DNS over TCP 的触发条件,结合 dig +trace 完整实操展示 DNS 解析的真实链路。
【网络工程】WebSocket 工程:握手、帧格式与大规模运维
系统讲解 WebSocket 协议的工程实践:握手升级流程、二进制帧格式、Ping/Pong 心跳机制、断线重连策略、负载均衡器适配、单机百万连接的内核调优与架构设计。
【网络工程】协议选型决策树:REST vs gRPC vs GraphQL vs WebSocket
从延迟、吞吐、开发效率、生态成熟度四个维度对比 REST、gRPC、GraphQL、WebSocket,给出微服务内部与面向客户端的选型决策树,讨论混合架构模式与迁移路径。