土法炼钢兴趣小组的算法知识备份

【网络工程】MQTT 工程:IoT 协议的 QoS 与 MQTT 5.0

文章导航

分类入口
network
标签入口
#mqtt#iot#qos#messaging#protocol

目录

MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,专为受限设备和不稳定网络设计。它在物联网(IoT)、车联网、工业控制、移动推送等场景中被广泛使用。

MQTT 的设计哲学是”简单可靠”——协议报文紧凑(最小 2 字节),支持三种服务质量(QoS)级别,内置会话恢复和遗嘱消息。但”简单”不意味着”容易用好”——QoS 级别选择、会话管理、Broker 集群架构——每一个工程决策都影响系统的可靠性和性能。

一、MQTT 协议基础

1.1 发布/订阅模型

MQTT 通信模型:

  发布者 A ──→┐                    ┌──→ 订阅者 X
  发布者 B ──→├──→ Broker ──→──→──→├──→ 订阅者 Y
  发布者 C ──→┘    (中间人)         └──→ 订阅者 Z

  特点:
  1. 发布者和订阅者完全解耦(不需要知道对方的存在)
  2. Broker 负责消息路由(根据 Topic 匹配)
  3. 支持一对多广播(一个 Topic 多个订阅者)
  4. 支持多对一聚合(多个发布者同一个 Topic)

与请求/响应模型对比:
  HTTP:  客户端 → 服务端 → 响应
  MQTT:  发布者 → Broker → 订阅者(异步、解耦)

1.2 Topic 设计

Topic 是 MQTT 消息路由的核心。它是一个 UTF-8 字符串,
用 / 分隔层级。

Topic 示例:
  home/living-room/temperature     ← 家庭温度传感器
  factory/line-3/machine-7/status  ← 工厂设备状态
  vehicle/VIN001/gps/location      ← 车辆 GPS 位置

通配符(仅订阅时使用):
  + 单层通配: home/+/temperature
    匹配: home/living-room/temperature
    匹配: home/bedroom/temperature
    不匹配: home/floor1/room2/temperature

  # 多层通配: home/#
    匹配: home/living-room/temperature
    匹配: home/bedroom/humidity
    匹配: home/garage/door/status

Topic 设计最佳实践:
  ✅ 使用层级结构: region/building/floor/room/sensor
  ✅ 用小写字母和连字符: device-status (不是 DeviceStatus)
  ✅ 不以 / 开头(会产生空的第一层级)
  ✅ 具体到可以过滤: sensor/temp-01/value(不是 sensor/data)
  ❌ 避免空层级: a//b
  ❌ 避免过深的层级(>7 层影响性能)

1.3 Topic 设计反模式

反模式 1: 把 Payload 信息放进 Topic
  ❌ sensor/temp-01/value/23.5
  ✅ sensor/temp-01/value → Payload: 23.5
  原因: Topic 是路由路径,不是数据载体。
        动态 Topic 会导致通配符订阅匹配不到。

反模式 2: 每个消息用唯一 Topic
  ❌ events/{uuid}  → 每条消息一个 Topic
  ✅ events/order   → Payload 里带消息 ID
  原因: 路由表膨胀,Retained Message 无意义,
        订阅者无法用通配符高效订阅。

反模式 3: 扁平 Topic 命名
  ❌ factory_line3_machine7_temperature
  ✅ factory/line-3/machine-7/temperature
  原因: 无法利用通配符做层级过滤。

反模式 4: 以 $ 开头(保留前缀)
  ❌ $myapp/events
  ✅ myapp/events
  原因: $SYS/ 是 Broker 系统 Topic 前缀,
        $ 开头的 Topic 不会被 # 通配符匹配。

1.3 MQTT 报文格式

MQTT 控制报文结构:

┌─────────────────────┐
│ Fixed Header (2+ B) │  ← 所有报文都有
├─────────────────────┤
│ Variable Header     │  ← 部分报文有
├─────────────────────┤
│ Payload             │  ← 部分报文有
└─────────────────────┘

Fixed Header:
  Bit 7-4: 报文类型(4 bits)
  Bit 3-0: 标志位(4 bits,含 QoS、Retain 等)
  Remaining Length: 1-4 字节可变长度编码

报文类型:
  1  CONNECT    ← 客户端连接请求
  2  CONNACK    ← 连接确认
  3  PUBLISH    ← 发布消息
  4  PUBACK     ← QoS 1 发布确认
  5  PUBREC     ← QoS 2 发布接收
  6  PUBREL     ← QoS 2 发布释放
  7  PUBCOMP    ← QoS 2 发布完成
  8  SUBSCRIBE  ← 订阅请求
  9  SUBACK     ← 订阅确认
  10 UNSUBSCRIBE← 取消订阅
  11 UNSUBACK   ← 取消订阅确认
  12 PINGREQ    ← 心跳请求
  13 PINGRESP   ← 心跳响应
  14 DISCONNECT ← 断开连接

最小报文大小: PINGREQ = 2 字节

二、连接管理

2.1 CONNECT 报文

CONNECT 报文的关键字段:

  Protocol Name:     MQTT
  Protocol Level:    4 (MQTT 3.1.1) 或 5 (MQTT 5.0)
  Client ID:         客户端唯一标识
  Clean Session:     0 或 1(是否建立持久会话)
  Keep Alive:        心跳间隔(秒)
  Username/Password: 可选的认证信息
  Will Flag:         是否设置遗嘱消息
  Will Topic:        遗嘱消息的 Topic
  Will Message:      遗嘱消息的内容
  Will QoS:          遗嘱消息的 QoS
  Will Retain:       遗嘱消息是否 Retain

连接流程:
  客户端 → CONNECT → Broker
  Broker → CONNACK → 客户端
  CONNACK 包含: 返回码(0=成功)、Session Present 标志

2.2 Clean Session vs Persistent Session

Clean Session = 1 (非持久会话):
  - 连接断开后,Broker 丢弃所有会话状态
  - 包括: 订阅关系、未确认的消息、排队的消息
  - 每次连接都是"全新开始"
  - 适用于: 临时客户端、无状态传感器

Clean Session = 0 (持久会话):
  - 连接断开后,Broker 保留会话状态
  - 保留: 订阅关系、QoS 1/2 的待确认消息、离线消息队列
  - 重连后: Broker 发送 Session Present=1,并补发离线消息
  - 适用于: 需要消息不丢失的设备

CONNACK Session Present 标志:
  Session Present=0: Broker 没有该 Client ID 的已有会话
  Session Present=1: Broker 恢复了之前的会话(有离线消息待发送)

  客户端应根据 Session Present 决定是否重新订阅:
    if Session Present == 0:
        重新订阅所有 Topic
    else:
        订阅关系已恢复,不需要重新订阅

2.3 Keep Alive 与心跳

Keep Alive 机制:
  客户端在 CONNECT 中指定 Keep Alive 间隔(秒)
  客户端必须在 1.5 × Keep Alive 内发送至少一个报文
  如果没有业务数据,发送 PINGREQ
  Broker 收到 PINGREQ 后回复 PINGRESP
  如果 Broker 在 1.5 × Keep Alive 内未收到任何报文,断开连接

Keep Alive = 0:
  禁用心跳,Broker 不会因为超时断开连接
  不推荐: 无法检测死连接

Keep Alive 选择:
  IoT 设备(移动网络):  30-60 秒
  局域网设备:            60-120 秒
  低功耗设备:            300-600 秒(省电优先)

与 WebSocket 心跳的区别:
  MQTT: 协议层面的 PINGREQ/PINGRESP
  WebSocket: 帧层面的 Ping/Pong
  如果 MQTT 运行在 WebSocket 上,两者都存在

三、QoS:服务质量

QoS 是 MQTT 最核心的工程概念。它定义了消息传递的可靠性保证。

3.1 QoS 0:最多一次

QoS 0 消息流(At Most Once):

  发布者         Broker         订阅者
    │                │              │
    │  PUBLISH       │              │
    │───────────────→│              │
    │                │  PUBLISH     │
    │                │─────────────→│
    │                │              │

  特点:
  - "发后即忘",不保证送达
  - 没有确认机制
  - 消息可能丢失(网络中断时)
  - 不消耗存储(Broker 不持久化)
  - 最低延迟、最高吞吐

  适用场景:
  - 高频传感器数据(温度每秒上报)
  - 丢失几条数据可以接受
  - 带宽和电量极度受限的设备

3.2 QoS 1:至少一次

QoS 1 消息流(At Least Once):

  发布者         Broker         订阅者
    │                │              │
    │  PUBLISH       │              │
    │  (PacketID=1)  │              │
    │───────────────→│              │
    │                │  PUBLISH     │
    │                │  (PacketID=7)│
    │                │─────────────→│
    │                │              │
    │  PUBACK        │  PUBACK      │
    │  (PacketID=1)  │  (PacketID=7)│
    │←───────────────│←─────────────│
    │                │              │

  特点:
  - 保证至少送达一次
  - 可能重复(网络波动时 PUBACK 丢失,触发重发)
  - 需要 Packet ID 用于确认
  - Broker 在收到 PUBACK 前会持久化消息

  重复消息处理:
  - 接收方必须能处理重复消息(幂等性)
  - 或使用消息 ID 去重

  适用场景:
  - 告警通知(宁可重复,不能遗漏)
  - 命令下发(用幂等操作处理重复)

3.3 QoS 2:恰好一次

QoS 2 消息流(Exactly Once)— 四步握手:

  发布者         Broker         订阅者
    │                │              │
    │  PUBLISH       │              │
    │  (PacketID=1)  │              │
    │───────────────→│              │
    │                │              │
    │  PUBREC        │              │
    │  (PacketID=1)  │              │
    │←───────────────│              │
    │                │              │
    │  PUBREL        │              │
    │  (PacketID=1)  │              │
    │───────────────→│              │
    │                │  PUBLISH     │
    │                │  (PacketID=7)│
    │                │─────────────→│
    │                │              │
    │  PUBCOMP       │  (同样 4 步)  │
    │  (PacketID=1)  │              │
    │←───────────────│              │
    │                │              │

  四步握手:
  1. PUBLISH  → 发送消息
  2. PUBREC   ← 确认收到(不转发)
  3. PUBREL   → 确认可以释放
  4. PUBCOMP  ← 确认完成

  特点:
  - 保证消息恰好送达一次,不重复
  - 最高开销(4 次握手)
  - 最高延迟
  - Broker 需要存储中间状态

  适用场景:
  - 计费消息
  - 金融交易指令
  - 关键控制命令(不能重复执行)

3.4 QoS 对比

指标         │ QoS 0       │ QoS 1       │ QoS 2
─────────────┼────────────┼────────────┼────────────
保证         │ 最多一次    │ 至少一次    │ 恰好一次
可能丢失     │ ✅          │ ❌          │ ❌
可能重复     │ ❌          │ ✅          │ ❌
握手次数     │ 1           │ 2           │ 4
带宽开销     │ 最低        │ 中          │ 最高
延迟         │ 最低        │ 中          │ 最高
Broker 存储  │ 不需要      │ 需要        │ 需要
适用场景     │ 遥测数据    │ 告警/命令    │ 计费/交易

注意: 发布和订阅的 QoS 独立设置
  发布者 QoS 2 + 订阅者 QoS 1 = 消息按 QoS 1 送达(取最小值)
  这叫做 "QoS 降级"(QoS Downgrade)

四、Retained Message 与 Last Will

4.1 Retained Message(保留消息)

保留消息:
  发布时设置 Retain=1
  Broker 为每个 Topic 保存最后一条 Retain 消息
  新订阅者连接后立即收到该 Topic 的最后一条 Retain 消息

用途:
  设备上报当前状态(温度、在线/离线)
  新订阅者不需要等待下一次上报就能获取最新状态

示例:
  设备发布: PUBLISH topic=sensor/temp retain=1 payload="23.5°C"
  → Broker 保存 sensor/temp 的最新值为 "23.5°C"
  → 新订阅者订阅 sensor/temp → 立即收到 "23.5°C"

清除 Retain 消息:
  发布一条 payload 为空的 Retain 消息
  PUBLISH topic=sensor/temp retain=1 payload=""

4.2 Last Will and Testament(遗嘱消息)

遗嘱消息:
  客户端在 CONNECT 时设置遗嘱(Will)
  如果客户端异常断开(未发送 DISCONNECT),
  Broker 自动发布遗嘱消息

典型用途 — 设备在线/离线状态:

  连接时:
    CONNECT
      Will Topic: device/sensor-01/status
      Will Message: {"status":"offline","time":"..."}
      Will QoS: 1
      Will Retain: 1

  连接成功后,设备发布:
    PUBLISH topic=device/sensor-01/status retain=1
      payload={"status":"online","time":"..."}

  效果:
    设备在线: topic 保留值为 {"status":"online",...}
    设备异常断开: Broker 自动发布 {"status":"offline",...}
    其他客户端订阅 device/sensor-01/status 可以实时感知设备状态

  注意:
    正常断开(发送 DISCONNECT)不会触发遗嘱消息
    只有异常断开(网络中断、Keep Alive 超时)才触发

五、MQTT 5.0 新特性

MQTT 5.0(2019 年发布)在 3.1.1 基础上增加了大量面向工程的新特性。

5.1 核心新特性

1. 用户属性(User Properties)
   所有报文都可以携带键值对元数据
   用途: 追踪 ID、消息来源、自定义路由信息
   PUBLISH topic=events/order
     User Property: trace-id=abc-123
     User Property: source=payment-service

2. 原因码(Reason Code)
   所有确认报文都包含原因码
   替代 3.1.1 中简单的成功/失败
   0x00 成功
   0x04 断开连接(遗嘱消息)
   0x10 没有匹配的订阅者
   0x80 未指定错误
   0x83 实现特有错误
   0x87 未授权
   0x97 配额超出

3. 共享订阅(Shared Subscriptions)
   多个客户端共享一个订阅,消息负载均衡
   Topic: $share/group-name/sensor/data
   效果: 消息只发送给组内的一个客户端
   用途: 消费者水平扩展,类似 Kafka Consumer Group

4. 消息过期(Message Expiry Interval)
   为消息设置 TTL(秒)
   超过 TTL 的消息被 Broker 丢弃
   用途: 告警消息 10 分钟后不再有意义

5. 主题别名(Topic Alias)
   用短整数代替长 Topic 字符串
   减少重复发送长 Topic 的带宽开销
   Topic Alias: 7 → 替代 "factory/line-3/machine-7/temperature"

6. 流控(Receive Maximum)
   限制对端的未确认消息数
   防止快发布者压垮慢消费者

7. Clean Start + Session Expiry Interval
   替代 3.1.1 的 Clean Session
   Clean Start=1: 新建会话
   Clean Start=0: 尝试恢复会话
   Session Expiry Interval: 会话过期时间(秒)
     0: 断开即删除(等同于 Clean Session=1)
     0xFFFFFFFF: 永不过期
     3600: 断开后保留 1 小时

5.2 MQTT 3.1.1 vs 5.0

特性                    │ MQTT 3.1.1    │ MQTT 5.0
────────────────────────┼──────────────┼──────────────
原因码                  │ ❌ 简单返回码  │ ✅ 详细原因码
用户属性                │ ❌             │ ✅
共享订阅                │ ❌(非标准扩展)│ ✅ 标准支持
消息过期                │ ❌             │ ✅
Topic 别名              │ ❌             │ ✅
流控                    │ ❌             │ ✅ Receive Maximum
会话过期                │ 二选一         │ 精确控制秒数
请求/响应               │ ❌(需自己实现)│ ✅ Response Topic
服务端断开原因          │ ❌ 直接断开    │ ✅ DISCONNECT + 原因
认证增强                │ 用户名/密码   │ ✅ AUTH 报文

迁移建议:
  新项目直接使用 MQTT 5.0
  旧项目评估 Broker 和客户端库的 5.0 支持后迁移
  EMQX 5.x、Mosquitto 2.x、HiveMQ 4.x 均支持 5.0

六、MQTT Broker 架构

6.1 主流 Broker 对比

Broker        │ 语言      │ 集群支持 │ 连接上限      │ 适用场景
──────────────┼──────────┼─────────┼──────────────┼───────────────
EMQX          │ Erlang   │ ✅ 原生  │ 1 亿+/集群    │ 大规模 IoT 平台
Mosquitto     │ C        │ ❌ 桥接  │ ~10 万/单节点 │ 边缘/小规模
HiveMQ        │ Java     │ ✅       │ 千万级        │ 企业级
VerneMQ       │ Erlang   │ ✅       │ 百万级        │ 开源替代
NanoMQ        │ C        │ ❌       │ ~10 万        │ 边缘轻量级

选型建议:
  个人/小项目: Mosquitto(简单、资源占用小)
  生产环境: EMQX(功能全、社区活跃、中文文档好)
  企业合规: HiveMQ(商业支持、合规认证)
  边缘计算: NanoMQ(轻量、低延迟)

6.2 EMQX 集群架构

EMQX 集群:

  客户端 ──→ LB ──→ EMQX Node 1 ──┐
  客户端 ──→ LB ──→ EMQX Node 2 ──├── 集群内消息路由
  客户端 ──→ LB ──→ EMQX Node 3 ──┘

  核心机制:
  1. 路由表: 全集群共享订阅关系
     Topic sensor/temp → [Node1-ClientA, Node3-ClientB]
  2. 消息转发: 发布到 Node 2 的消息自动路由到 Node 1 和 Node 3
  3. 会话迁移: 持久会话可以在节点间迁移

配置示例(emqx.conf):
  node.name = emqx@10.0.1.10
  cluster.discovery_strategy = static
  cluster.static.seeds = [emqx@10.0.1.10, emqx@10.0.1.11, emqx@10.0.1.12]

  # 连接限制
  listeners.tcp.default.max_connections = 1024000
  listeners.tcp.default.max_conn_rate = 1000

  # 消息限制
  mqtt.max_packet_size = 1MB
  mqtt.max_topic_levels = 10
  mqtt.max_qos_allowed = 2

6.3 安全配置

MQTT 安全层级:

1. 传输层安全(TLS/SSL)
   listener.ssl.external.keyfile = /etc/emqx/certs/server.key
   listener.ssl.external.certfile = /etc/emqx/certs/server.crt
   listener.ssl.external.cacertfile = /etc/emqx/certs/ca.crt
   listener.ssl.external.verify = verify_peer  # mTLS

2. 认证(Authentication)
   - 用户名/密码(内置、MySQL、PostgreSQL、Redis)
   - JWT Token
   - X.509 客户端证书
   - HTTP 回调(外部认证服务)

3. 授权(Authorization)
   - ACL 规则: 控制客户端对 Topic 的发布/订阅权限
   - 示例:
     {allow, {user, "sensor-01"}, publish, ["device/sensor-01/#"]}.
     {allow, {user, "dashboard"}, subscribe, ["device/+/status"]}.
     {deny, all}.

4. 速率限制
   - 连接速率: 每秒最大新连接数
   - 消息速率: 每客户端每秒最大消息数
   - 带宽限制: 每客户端最大数据速率

6.4 大规模连接的工程挑战

百万连接级别的工程要点:

1. 操作系统调优
   # 文件描述符限制
   ulimit -n 2000000
   # 或在 /etc/security/limits.conf 中设置:
   emqx  soft  nofile  2000000
   emqx  hard  nofile  2000000

   # 内核参数
   sysctl -w net.core.somaxconn=32768
   sysctl -w net.ipv4.tcp_max_syn_backlog=16384
   sysctl -w net.core.netdev_max_backlog=16384
   sysctl -w fs.file-max=2000000
   sysctl -w net.ipv4.ip_local_port_range="1024 65535"

2. Broker 内存估算
   每个连接的内存消耗(EMQX 估算):
   ┌──────────────────────┬─────────────┐
   │ 组件                 │ 内存         │
   ├──────────────────────┼─────────────┤
   │ TCP 连接             │ ~2 KB       │
   │ 会话状态             │ ~1 KB       │
   │ 订阅关系(每个)     │ ~0.3 KB     │
   │ 消息队列(Clean=0)  │ 可变        │
   └──────────────────────┴─────────────┘
   100 万连接 × 3 KB ≈ 3 GB 基础内存
   加上消息缓冲、路由表 → 实际需要 8-16 GB

3. 集群扩展策略
   单节点: Mosquitto ~10 万连接
   EMQX 单节点: ~100 万连接(调优后)
   EMQX 集群: 线性扩展(3 节点 → ~300 万连接)
   超大规模: 按地域分集群 + 跨集群桥接

4. 连接风暴防护
   大量设备同时重连(如断网恢复)会产生连接风暴
   防护措施:
   - 客户端随机延迟重连(jitter)
   - Broker 设置连接速率限制(max_conn_rate)
   - LB 层面限流保护

6.5 Go 语言 MQTT 客户端实践

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    opts := mqtt.NewClientOptions().
        AddBroker("tcp://broker.example.com:1883").
        SetClientID("go-sensor-01").
        SetUsername("sensor").
        SetPassword("secret").
        SetKeepAlive(60 * time.Second).
        SetCleanSession(false).          // 持久会话
        SetAutoReconnect(true).          // 自动重连
        SetMaxReconnectInterval(30 * time.Second).
        SetConnectionLostHandler(func(c mqtt.Client, err error) {
            log.Printf("连接丢失: %v", err)
        }).
        SetOnConnectHandler(func(c mqtt.Client) {
            log.Println("已连接,订阅 Topic...")
            // 连接成功后订阅(处理重连场景)
            if token := c.Subscribe("command/#", 1, handleCommand); token.Wait() && token.Error() != nil {
                log.Printf("订阅失败: %v", token.Error())
            }
        }).
        SetWill("device/go-sensor-01/status",
            `{"status":"offline"}`, 1, true) // 遗嘱消息

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf("连接失败: %v", token.Error())
    }

    // 上报在线状态(Retained)
    client.Publish("device/go-sensor-01/status",
        1, true, `{"status":"online"}`)

    // 定时上报传感器数据
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    for {
        select {
        case <-ticker.C:
            payload := fmt.Sprintf(`{"temp":%.1f,"ts":%d}`,
                22.5+float64(time.Now().Unix()%10)/10,
                time.Now().Unix())
            token := client.Publish("sensor/go-sensor-01/temp",
                0, false, payload)
            token.Wait()
        case <-sigCh:
            log.Println("正常断开...")
            client.Publish("device/go-sensor-01/status",
                1, true, `{"status":"offline"}`)
            client.Disconnect(1000) // 等 1 秒发完
            return
        }
    }
}

func handleCommand(c mqtt.Client, msg mqtt.Message) {
    log.Printf("收到命令 [%s]: %s", msg.Topic(), msg.Payload())
}

七、MQTT over WebSocket

浏览器无法直接使用 TCP,因此 MQTT 支持通过 WebSocket 传输:

架构:

  浏览器 ──WebSocket──→ Broker (ws://broker:8083/mqtt)
  IoT 设备 ──TCP──→ Broker (tcp://broker:1883)

  两种传输的客户端可以互相通信
  Topic 和消息路由对传输层透明
// 浏览器端 MQTT 客户端(使用 mqtt.js)
import mqtt from 'mqtt';

const client = mqtt.connect('wss://broker.example.com:8084/mqtt', {
    clientId: 'web-dashboard-' + Math.random().toString(16).substr(2, 8),
    username: 'dashboard',
    password: 'secret',
    clean: true,
    keepalive: 30,
    reconnectPeriod: 3000,  // 自动重连间隔
});

client.on('connect', () => {
    console.log('Connected to MQTT broker');
    client.subscribe('device/+/status', { qos: 1 });
    client.subscribe('alerts/#', { qos: 1 });
});

client.on('message', (topic, message) => {
    const data = JSON.parse(message.toString());
    console.log(`[${topic}]`, data);

    if (topic.startsWith('alerts/')) {
        showAlert(data);
    }
});

client.on('error', (error) => {
    console.error('MQTT error:', error);
});

// 发布消息
client.publish('dashboard/command', JSON.stringify({
    action: 'restart',
    target: 'sensor-01',
}), { qos: 1 });

八、MQTT vs 其他消息协议

协议      │ 传输层   │ 消息模型      │ QoS     │ 适用场景
──────────┼─────────┼──────────────┼─────────┼──────────────────
MQTT      │ TCP/WS  │ Pub/Sub      │ 0/1/2   │ IoT, 推送
AMQP      │ TCP     │ Queue + Topic│ 事务    │ 企业集成
Kafka     │ TCP     │ Log + Group  │ 确认    │ 大数据流
NATS      │ TCP     │ Pub/Sub      │ 0/1     │ 微服务通信
CoAP      │ UDP     │ REQ/RES      │ CON/NON │ 极低功耗 IoT
HTTP SSE  │ HTTP    │ 单向推送     │ 无      │ Web 实时

MQTT vs AMQP:
  MQTT: 更轻量、更适合受限设备、协议更简单
  AMQP: 更丰富的路由(Exchange/Queue/Binding)、更适合企业集成

MQTT vs Kafka:
  MQTT: 面向设备通信、低延迟、轻量客户端
  Kafka: 面向数据管道、高吞吐、消息持久化

MQTT vs CoAP:
  MQTT: TCP(可靠)、长连接、支持推送
  CoAP: UDP(轻量)、请求/响应、更适合极低功耗场景

九、MQTT 监控与故障排查

9.1 关键监控指标

Broker 级指标:
  - 连接数(当前/最大/历史峰值)
  - 消息吞吐(收/发/丢弃,按 QoS 分)
  - 订阅数(总数/每 Topic 数)
  - 内存使用(会话/消息队列/保留消息)
  - 集群状态(节点健康/路由表同步延迟)

客户端级指标:
  - 连接时长
  - 消息频率(发布/接收)
  - 队列堆积(持久会话的离线消息数)
  - 重连次数

告警规则:
  - 连接数超过阈值的 80%
  - 消息丢弃率 > 0.1%
  - 持久会话的离线消息队列 > 10000 条
  - 客户端重连频率 > 每分钟 5 次

9.2 常见故障

故障 1: 消息丢失
  可能原因:
  - QoS 0 + 网络不稳定 → 使用 QoS 1
  - Clean Session=1 + 频繁断线 → 改为 Clean Session=0
  - Broker 内存不足导致消息被丢弃 → 扩容

故障 2: 消息重复
  可能原因:
  - QoS 1 + 网络波动导致 PUBACK 延迟 → 应用层去重
  - 客户端频繁重连 → 检查 Clean Session 设置

故障 3: 连接频繁断开
  可能原因:
  - Keep Alive 太短 → 增大 Keep Alive(60s+)
  - Client ID 冲突(两个设备用同一个 ID)→ 确保唯一
  - Broker 资源不足 → 检查内存/CPU/连接数

故障 4: 消息延迟高
  可能原因:
  - QoS 2 的四步握手 → 评估是否可以降到 QoS 1
  - 订阅者处理太慢导致消息堆积 → 优化消费速度
  - 集群间消息路由延迟 → 检查集群网络

排查工具:
  mosquitto_sub -h broker -t '#' -v  # 订阅所有消息
  mosquitto_pub -h broker -t test -m "hello"  # 发布测试
  emqx_ctl clients list  # EMQX 查看客户端列表
  emqx_ctl topics list   # EMQX 查看 Topic 列表

十、总结

MQTT 的工程核心是三个决策:QoS 级别选择、会话管理策略、Broker 架构设计。

  1. QoS 选择要匹配场景。遥测数据用 QoS 0,告警命令用 QoS 1,计费交易用 QoS 2。不要无脑用 QoS 2——它的四步握手开销是 QoS 0 的 4 倍。

  2. 会话管理决定消息可靠性。Clean Session=0 配合 QoS 1/2 才能实现离线消息不丢失。但持久会话会消耗 Broker 内存,需要设置消息队列上限。

  3. Topic 设计影响整个系统。层级化、可过滤、不过深——好的 Topic 设计让消息路由高效,坏的 Topic 设计让系统混乱。

  4. MQTT 5.0 值得迁移。共享订阅、消息过期、用户属性、详细原因码——这些特性解决了 3.1.1 的大量工程痛点。

  5. Retained Message + Last Will 实现设备状态感知。这是 MQTT 区别于其他消息协议的独特能力——新订阅者立即获取最新状态,设备异常断开自动通知。

  6. Broker 选型看规模。个人项目用 Mosquitto,生产环境用 EMQX,企业合规用 HiveMQ。


参考文献


上一篇:SSE 与长轮询:服务端推送的轻量解法

下一篇:协议选型决策树:REST vs gRPC vs GraphQL vs WebSocket

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2025-08-02 · network

【网络工程】带宽管理与流量整形:tc、QoS 与拥塞管理

带宽管理是网络工程的核心能力。本文从 Linux tc 的 qdisc/class/filter 三层模型出发,系统讲解 HTB 分层令牌桶、TBF 令牌桶过滤器、FQ/fq_codel 公平队列、tc-bpf 可编程流量控制、netem 网络模拟,以及 QoS 策略设计和容量规划的工程实践。

2025-07-28 · network

【网络工程】DNS 协议解剖:查询格式、记录类型与响应码

DNS 是互联网最基础的目录服务,也是最脆弱的单点之一。本文从 wire format 出发逐字段解析 DNS 报文结构,详解 A/AAAA/CNAME/MX/SRV/TXT/NS/SOA 等记录类型的工程用途,分析 EDNS0 扩展与 DNS over TCP 的触发条件,结合 dig +trace 完整实操展示 DNS 解析的真实链路。


By .