土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】长连接与推送架构:WebSocket、SSE 与 MQTT

文章导航

分类入口
architecture
标签入口
#WebSocket#SSE#MQTT#push-architecture#long-connection#epoll

目录

一、实时推送:现代应用的基础需求

在线聊天消息要在 200ms 内送达对方;股票行情需要每秒推送数十次报价更新;物联网平台要同时维持数百万设备的心跳连接;协同文档需要将每一次光标移动实时同步给所有参与者。

这些场景有一个共同特征:服务端需要主动向客户端推送数据,而不是等客户端来轮询。

HTTP 协议的请求-响应(Request-Response)模型天然不支持服务端主动推送。客户端发一个请求,服务端回一个响应,连接就结束了。要实现”服务端主动推”,工程上有三条主流路径:

  1. WebSocket:基于 TCP 的全双工通信协议,客户端和服务端都可以随时发送数据。
  2. SSE(Server-Sent Events):基于 HTTP 的单向推送协议,服务端可以持续向客户端发送事件流。
  3. MQTT(Message Queuing Telemetry Transport):面向物联网的轻量级发布/订阅协议,支持三级服务质量保证。

还有一条退化路径:长轮询(Long Polling),它不是真正的推送协议,但在 WebSocket 不可用的环境下仍然有价值。

这篇文章不做协议科普——RFC 文档已经够详细了。我们要回答的是工程问题:推送系统的连接管理、心跳检测、断线重连,到底有多复杂?百万长连接的单机架构怎么设计?消息丢了怎么办?集群扩展时连接状态怎么迁移?


二、WebSocket 协议深度解析

2.1 升级握手

WebSocket 连接始于一个 HTTP 升级请求。客户端发送一个带有 Upgrade: websocket 头的 HTTP GET 请求,服务端返回 101 状态码表示协议切换成功。

GET /ws/chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat

服务端响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

Sec-WebSocket-Accept 的值是将客户端发送的 Sec-WebSocket-Key 拼接上固定 GUID 258EAFA5-E914-47DA-95CA-5AB5DC11CE56,做一次 SHA-1 哈希后 Base64 编码得到的。这个机制不是为了安全——它只是为了防止非 WebSocket 客户端误发升级请求。

握手完成后,TCP 连接不再关闭,双方通过帧(Frame)格式收发数据。

2.2 帧格式

WebSocket 的数据帧格式紧凑,最小开销仅 2 字节:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data (continued)                  |
+---------------------------------------------------------------+

关键字段说明:

2.3 Ping/Pong 心跳

WebSocket 协议内置了 Ping/Pong 机制。任何一方都可以发送 Ping 帧(opcode 0x9),对方必须尽快回复一个 Pong 帧(opcode 0xA),且 Pong 帧的 payload 必须与 Ping 帧完全一致。

// Go 语言 WebSocket 心跳示例(使用 gorilla/websocket)
func keepAlive(conn *websocket.Conn, timeout time.Duration) {
    ticker := time.NewTicker(timeout / 2)
    defer ticker.Stop()

    conn.SetPongHandler(func(appData string) error {
        // 收到 Pong,重置读超时
        conn.SetReadDeadline(time.Now().Add(timeout))
        return nil
    })

    for range ticker.C {
        if err := conn.WriteControl(
            websocket.PingMessage,
            []byte("ping"),
            time.Now().Add(5*time.Second),
        ); err != nil {
            return // 连接已断开
        }
    }
}

2.4 关闭握手

WebSocket 的关闭是一个四步握手过程:

  1. 发起方发送 Close 帧(opcode 0x8),携带关闭码和原因。
  2. 接收方回复 Close 帧。
  3. 接收方关闭 TCP 连接。
  4. 发起方确认 TCP 连接关闭。

常见关闭码包括:1000 正常关闭,1001 终端离开(如浏览器标签页关闭),1006 异常关闭(未发送 Close 帧),1011 服务端内部错误。

// 优雅关闭 WebSocket 连接
func gracefulClose(conn *websocket.Conn) error {
    msg := websocket.FormatCloseMessage(
        websocket.CloseNormalClosure,
        "server shutting down",
    )
    err := conn.WriteControl(
        websocket.CloseMessage,
        msg,
        time.Now().Add(5*time.Second),
    )
    if err != nil {
        return conn.Close()
    }
    // 等待对方回复 Close 帧,设置超时
    conn.SetReadDeadline(time.Now().Add(5 * time.Second))
    for {
        _, _, err := conn.ReadMessage()
        if err != nil {
            break
        }
    }
    return conn.Close()
}

下面这张图展示了 WebSocket 从握手到关闭的完整生命周期:

sequenceDiagram
    participant C as 客户端
    participant S as 服务端

    C->>S: HTTP GET /ws (Upgrade: websocket)
    S-->>C: 101 Switching Protocols

    Note over C,S: WebSocket 连接建立

    C->>S: 文本帧 (opcode=0x1)
    S->>C: 文本帧 (opcode=0x1)

    S->>C: Ping 帧 (opcode=0x9)
    C-->>S: Pong 帧 (opcode=0xA)

    C->>S: Close 帧 (opcode=0x8, code=1000)
    S-->>C: Close 帧 (opcode=0x8, code=1000)

    Note over C,S: TCP 连接关闭

三、SSE(Server-Sent Events)详解

3.1 协议机制

SSE 比 WebSocket 简单得多——它就是一个普通的 HTTP 响应,Content-Type 设为 text/event-stream,服务端持续往响应体里写数据,不关闭连接。

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: {"price": 150.25, "symbol": "AAPL"}

event: trade
data: {"price": 150.30, "symbol": "AAPL", "volume": 100}
id: 1001

event: trade
data: {"price": 150.28, "symbol": "AAPL", "volume": 250}
id: 1002

每条消息由若干字段组成:

消息之间用空行分隔。

3.2 EventSource API

浏览器端使用 EventSource API 连接 SSE 端点:

const source = new EventSource('/api/stock/stream');

// 监听默认 message 事件
source.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log(`${data.symbol}: ${data.price}`);
};

// 监听自定义 trade 事件
source.addEventListener('trade', (event) => {
    const trade = JSON.parse(event.data);
    updateTradeList(trade);
});

// 错误处理
source.onerror = (event) => {
    if (source.readyState === EventSource.CLOSED) {
        console.log('连接已关闭');
    } else {
        console.log('连接中断,浏览器将自动重连');
    }
};

3.3 自动重连与事件 ID

SSE 最有价值的特性之一是内置的断线重连机制。当连接中断时,浏览器会自动尝试重连,并在请求头中携带 Last-Event-ID,告诉服务端上次收到的最后一个事件 ID。

GET /api/stock/stream HTTP/1.1
Host: example.com
Last-Event-ID: 1002

服务端可以根据这个 ID 从断点处继续推送,避免消息丢失。

服务端 Go 实现示例:

func sseHandler(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // 获取断线重连的起始 ID
    lastID := r.Header.Get("Last-Event-ID")
    startSeq := parseSequence(lastID)

    eventCh := subscribe(r.Context(), startSeq)

    for {
        select {
        case event, ok := <-eventCh:
            if !ok {
                return
            }
            fmt.Fprintf(w, "id: %d\n", event.Seq)
            fmt.Fprintf(w, "event: %s\n", event.Type)
            fmt.Fprintf(w, "data: %s\n\n", event.Data)
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

3.4 SSE 的局限

SSE 是单向的——只有服务端可以向客户端发送数据。客户端要发送数据,需要另开一个 HTTP 请求。这在大多数推送场景下不是问题(股票行情、通知推送、日志流),但在需要双向通信的场景下(聊天、游戏)就显得力不从心。

另一个限制是浏览器对同一域名的 SSE 连接数有上限。HTTP/1.1 下,大多数浏览器限制 6 个连接;HTTP/2 下这个限制大幅放宽到 100 个。


四、MQTT 协议与 QoS 级别

4.1 协议概述

MQTT 是 IBM 在 1999 年为卫星链路设计的轻量级协议。它的设计目标是在低带宽、高延迟、不稳定的网络上实现可靠的消息传递。经过二十多年的演进,MQTT 已经成为物联网(IoT)领域的事实标准。

MQTT 采用发布/订阅(Publish/Subscribe)模型,所有消息通过一个中心化的代理(Broker)转发。客户端可以向主题(Topic)发布消息,也可以订阅主题接收消息。

                  +---------+
 传感器 A ------->|         |-------> 监控面板
                  |  MQTT   |
 传感器 B ------->|  Broker |-------> 数据存储
                  |         |
 传感器 C ------->|         |-------> 告警服务
                  +---------+

4.2 主题层级

MQTT 的主题是用斜杠分隔的层级结构,类似文件路径:

factory/floor1/machine42/temperature
factory/floor1/machine42/vibration
factory/floor2/+/temperature        # + 匹配单层
factory/#                           # # 匹配多层

通配符规则:

4.3 三级 QoS

MQTT 的核心特性是三级服务质量(Quality of Service)保证:

QoS 0:至多一次(At Most Once)

发布者发送消息后不等待确认,Broker 也不做存储。消息可能丢失,但开销最小。适用于高频传感器数据——丢一两个点不影响整体趋势。

Publisher ---PUBLISH---> Broker ---PUBLISH---> Subscriber

QoS 1:至少一次(At Least Once)

发布者发送消息后等待 Broker 的 PUBACK 确认。如果超时未收到 PUBACK,发布者会重发消息(带 DUP 标志)。消息不会丢,但可能重复。

Publisher ---PUBLISH---> Broker
Publisher <---PUBACK---- Broker

QoS 2:恰好一次(Exactly Once)

通过四步握手保证消息不丢不重。这是开销最大的级别,适用于计费、指令下发等不能容忍重复的场景。

sequenceDiagram
    participant P as 发布者
    participant B as Broker

    P->>B: PUBLISH (QoS 2, PacketID=42)
    B-->>P: PUBREC (PacketID=42)
    P->>B: PUBREL (PacketID=42)
    B-->>P: PUBCOMP (PacketID=42)

    Note over P,B: 四步握手完成,消息恰好投递一次

4.4 保留消息与遗嘱

保留消息(Retained Message):发布者可以将消息标记为”保留”。Broker 会为每个主题保存最后一条保留消息。当新的订阅者订阅该主题时,立即收到这条保留消息,不用等到下次发布。这对设备状态查询特别有用——新上线的监控面板可以立刻拿到所有设备的最新状态。

遗嘱消息(Last Will and Testament,LWT):客户端连接时可以预设一条遗嘱消息。如果客户端异常断开(未发送 DISCONNECT),Broker 会代替客户端发布这条遗嘱消息。其他订阅者收到遗嘱后就知道该设备离线了。

// Go 语言 MQTT 客户端配置遗嘱消息
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://broker.example.com:1883")
opts.SetClientID("sensor-42")
opts.SetWill(
    "factory/floor1/machine42/status",  // 遗嘱主题
    "offline",                           // 遗嘱内容
    1,                                   // QoS 1
    true,                                // 保留
)
opts.SetKeepAlive(30 * time.Second)

client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()

五、三种推送技术对比

在选择推送技术之前,需要从多个维度对比三种协议的特性和适用场景:

维度 WebSocket SSE MQTT
通信方向 全双工 服务端到客户端单向 全双工(发布/订阅)
底层协议 TCP(HTTP 升级) HTTP TCP
最小帧开销 2 字节 无二进制帧,纯文本 2 字节
浏览器支持 所有现代浏览器 除 IE 外所有现代浏览器 需要第三方库(MQTT.js)
自动重连 无,需自行实现 内置,浏览器自动处理 客户端库通常内置
消息可靠性 无内置保证 通过事件 ID 支持断点续传 QoS 0/1/2 三级保证
多路复用 HTTP/2 下不支持 HTTP/2 天然支持 单连接多主题
代理穿透 部分企业代理会阻断 HTTP 兼容,代理友好 需要专用端口或 WS 桥接
二进制数据 原生支持 需 Base64 编码 原生支持
适用场景 聊天、游戏、协同编辑 通知推送、实时仪表盘 物联网、传感器数据采集
连接管理复杂度 中(Broker 托管)
典型延迟 亚毫秒级(局域网) 毫秒级 毫秒到秒级(取决于 QoS)

选型建议:


六、百万长连接的单机架构

6.1 问题规模

一台 Linux 服务器(32 核、128 GB 内存)能撑多少 WebSocket 长连接?理论上限取决于三个资源:

一个经验数字:在消息频率不高(每秒每连接 1 条消息以下)的场景下,一台配置合理的服务器可以维持 100 万到 200 万个空闲长连接。

6.2 epoll 事件驱动模型

百万连接的基础是 Linux 的 epoll 机制。与传统的 select/poll 不同,epoll 不需要遍历所有连接来查找有事件的连接——它通过内核回调机制,只返回有事件的文件描述符。

#include <sys/epoll.h>

#define MAX_EVENTS 10000

int main() {
    int epfd = epoll_create1(0);

    // 将监听 socket 加入 epoll
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;  // 边缘触发
    ev.data.fd = listen_fd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    struct epoll_event events[MAX_EVENTS];

    for (;;) {
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == listen_fd) {
                // 接受新连接
                int conn_fd = accept(listen_fd, NULL, NULL);
                set_nonblocking(conn_fd);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = conn_fd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev);
            } else {
                // 处理已有连接的数据
                handle_data(events[i].data.fd);
            }
        }
    }
}

关键配置参数:

6.3 Go 语言的 goroutine 模型

Go 语言的网络库在底层使用 epoll(Linux)/ kqueue(macOS),但暴露给开发者的是同步阻塞的编程模型。每个连接一个 goroutine,goroutine 在等待 I/O 时会被 Go 运行时挂起,不占用操作系统线程。

func main() {
    listener, _ := net.Listen("tcp", ":8080")
    upgrader := websocket.Upgrader{}

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        conn, _ := upgrader.Upgrade(w, r, nil)
        go readLoop(conn)   // 每个连接一个读 goroutine
        go writeLoop(conn)  // 每个连接一个写 goroutine
    })

    http.Serve(listener, nil)
}

func readLoop(conn *websocket.Conn) {
    defer conn.Close()
    for {
        _, msg, err := conn.ReadMessage()
        if err != nil {
            return
        }
        processMessage(conn, msg)
    }
}

问题在于:100 万连接意味着 200 万个 goroutine(读 + 写各一个),每个 goroutine 默认栈大小 8 KB,光栈内存就消耗 16 GB。加上 WebSocket 读写缓冲区,总内存可能超过 40 GB。

6.4 优化:reactor 模式

对于真正的百万连接场景,更高效的做法是放弃”每连接一个 goroutine”的模型,回到 reactor 模式:少量 goroutine 通过 epoll 管理大量连接,只在有数据到达时才分配 goroutine 处理。

// 伪代码:基于 epoll 的 Go reactor
type Reactor struct {
    epfd    int
    conns   map[int]*Connection
    workers chan func()
}

func (r *Reactor) Run() {
    events := make([]syscall.EpollEvent, 1024)
    for {
        n, _ := syscall.EpollWait(r.epfd, events, -1)
        for i := 0; i < n; i++ {
            fd := int(events[i].Fd)
            conn := r.conns[fd]
            r.workers <- func() {
                conn.handleEvent()
            }
        }
    }
}

Go 生态中 gnetnbio 等库实现了这种模式,能在单机上以较低的内存开销维持数百万连接。

6.5 系统调优清单

百万连接需要调整的内核参数:

# 文件描述符限制
ulimit -n 2000000
echo "fs.file-max = 2000000" >> /etc/sysctl.conf

# TCP 连接回收
echo "net.ipv4.tcp_tw_reuse = 1" >> /etc/sysctl.conf
echo "net.ipv4.tcp_fin_timeout = 15" >> /etc/sysctl.conf

# TCP 缓冲区(减小默认值以节省内存)
echo "net.ipv4.tcp_rmem = 4096 4096 16384" >> /etc/sysctl.conf
echo "net.ipv4.tcp_wmem = 4096 4096 16384" >> /etc/sysctl.conf

# 本地端口范围(用于出站连接)
echo "net.ipv4.ip_local_port_range = 1024 65535" >> /etc/sysctl.conf

# somaxconn(监听队列长度)
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf

sysctl -p

七、心跳检测与断线重连

7.1 为什么需要应用层心跳

TCP 协议本身有 keepalive 机制,但它的默认参数不适合推送场景:

也就是说,一个已经断开的连接,TCP 层最长需要 2 小时 + 675 秒才能发现。这对推送系统是不可接受的。

更严重的是,很多中间设备(NAT 网关、防火墙、负载均衡器)会静默丢弃长时间空闲的连接。移动网络环境下,NAT 超时通常在 5-10 分钟。如果不发心跳,连接可能在客户端和服务端都没感知的情况下被中间设备切断。

7.2 心跳策略设计

type HeartbeatConfig struct {
    Interval    time.Duration // 心跳发送间隔
    Timeout     time.Duration // 等待回复的超时时间
    MaxMissed   int           // 最大允许连续丢失次数
}

// 推荐配置
var defaultConfig = HeartbeatConfig{
    Interval:  30 * time.Second,
    Timeout:   10 * time.Second,
    MaxMissed: 3,
}

type Connection struct {
    conn       net.Conn
    lastPong   time.Time
    missCount  int
    mu         sync.Mutex
}

func (c *Connection) heartbeatLoop(cfg HeartbeatConfig) {
    ticker := time.NewTicker(cfg.Interval)
    defer ticker.Stop()

    for range ticker.C {
        c.mu.Lock()
        since := time.Since(c.lastPong)
        c.mu.Unlock()

        if since > cfg.Interval*time.Duration(cfg.MaxMissed) {
            log.Printf("连接超时,最后一次 Pong 在 %v 前", since)
            c.conn.Close()
            return
        }

        // 发送 Ping
        c.conn.SetWriteDeadline(time.Now().Add(cfg.Timeout))
        if err := c.sendPing(); err != nil {
            log.Printf("发送 Ping 失败: %v", err)
            c.conn.Close()
            return
        }
    }
}

func (c *Connection) onPong() {
    c.mu.Lock()
    c.lastPong = time.Now()
    c.missCount = 0
    c.mu.Unlock()
}

7.3 智能心跳间隔

不同网络环境下 NAT 超时不同。移动网络通常 5-10 分钟,Wi-Fi 环境可能更长。一种优化策略是动态探测最优心跳间隔:

  1. 初始心跳间隔设为较大值(如 5 分钟)。
  2. 如果连接断开后重连成功,缩短心跳间隔。
  3. 如果连接长时间稳定,逐步延长心跳间隔。
  4. 记录每个网络环境下的最优间隔,下次连接时直接使用。

微信客户端在移动网络下的心跳间隔大约是 4.5 分钟,这是经过大规模测试得到的经验值——短到足以在大多数 NAT 超时之前保活,长到不会过度消耗电量和流量。

7.4 断线重连与指数退避

当连接断开后,客户端需要尝试重连。直接重连可能导致”惊群效应”(Thundering Herd)——如果服务端重启,上万个客户端同时重连,瞬间冲垮服务端。

指数退避(Exponential Backoff)加随机抖动(Jitter)是标准解决方案:

type ReconnectPolicy struct {
    BaseDelay  time.Duration
    MaxDelay   time.Duration
    Multiplier float64
    MaxRetries int
}

func (p *ReconnectPolicy) nextDelay(attempt int) time.Duration {
    if attempt >= p.MaxRetries {
        return p.MaxDelay
    }
    delay := float64(p.BaseDelay) * math.Pow(p.Multiplier, float64(attempt))
    if delay > float64(p.MaxDelay) {
        delay = float64(p.MaxDelay)
    }
    // 加入 [0, delay) 范围的随机抖动
    jitter := rand.Float64() * delay
    return time.Duration(jitter)
}

func reconnectLoop(addr string, policy ReconnectPolicy) net.Conn {
    for attempt := 0; ; attempt++ {
        conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
        if err == nil {
            log.Println("重连成功")
            return conn
        }

        delay := policy.nextDelay(attempt)
        log.Printf("重连失败(第 %d 次),等待 %v 后重试: %v",
            attempt+1, delay, err)
        time.Sleep(delay)
    }
}

推荐参数:基础延迟 1 秒,倍率 2,最大延迟 60 秒,最大重试次数 20。


八、推送消息的可靠投递

8.1 消息丢失的三个环节

推送链路上有三个地方可能丢消息:

  1. 服务端到连接层:业务服务发送消息到连接网关,网关可能崩溃或过载。
  2. 连接层到客户端:TCP 连接可能已断开但服务端尚未感知。
  3. 客户端处理失败:客户端收到消息但处理过程中崩溃。
graph LR
    A[业务服务] -->|MQ| B[连接网关]
    B -->|WebSocket/TCP| C[客户端]

    style A fill:#f9f,stroke:#333
    style B fill:#bbf,stroke:#333
    style C fill:#bfb,stroke:#333

    D[丢失点1: MQ 到网关] -.-> B
    E[丢失点2: 网关到客户端] -.-> C
    F[丢失点3: 客户端处理失败] -.-> C

8.2 端到端 ACK 机制

要实现可靠投递,必须引入应用层的确认(ACK)机制:

type Message struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`
    Payload   []byte    `json:"payload"`
    Timestamp int64     `json:"ts"`
    NeedACK   bool      `json:"need_ack"`
}

type ACK struct {
    MessageID string `json:"msg_id"`
    ClientID  string `json:"client_id"`
    Timestamp int64  `json:"ts"`
}

// 服务端:发送消息并等待 ACK
type DeliveryManager struct {
    pending   map[string]*PendingMessage
    mu        sync.RWMutex
    retryPool chan *PendingMessage
}

type PendingMessage struct {
    msg       *Message
    conn      *Connection
    attempts  int
    nextRetry time.Time
}

func (dm *DeliveryManager) Send(conn *Connection, msg *Message) {
    if msg.NeedACK {
        pm := &PendingMessage{
            msg:       msg,
            conn:      conn,
            attempts:  0,
            nextRetry: time.Now().Add(3 * time.Second),
        }
        dm.mu.Lock()
        dm.pending[msg.ID] = pm
        dm.mu.Unlock()
    }
    conn.Write(msg)
}

func (dm *DeliveryManager) OnACK(ack *ACK) {
    dm.mu.Lock()
    delete(dm.pending, ack.MessageID)
    dm.mu.Unlock()
}

// 重试协程
func (dm *DeliveryManager) retryLoop() {
    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        now := time.Now()
        dm.mu.Lock()
        for id, pm := range dm.pending {
            if now.After(pm.nextRetry) {
                if pm.attempts >= 3 {
                    // 投递失败,放入离线队列
                    dm.moveToOffline(pm)
                    delete(dm.pending, id)
                } else {
                    pm.attempts++
                    pm.nextRetry = now.Add(
                        time.Duration(pm.attempts*3) * time.Second,
                    )
                    pm.conn.Write(pm.msg)
                }
            }
        }
        dm.mu.Unlock()
    }
}

8.3 离线消息队列

当客户端不在线时,消息需要存入离线队列,等客户端重连后再投递:

type OfflineStore interface {
    // 存入离线消息
    Push(userID string, msg *Message) error
    // 拉取离线消息(按时间顺序)
    Pull(userID string, limit int) ([]*Message, error)
    // 确认已收到,从队列删除
    Ack(userID string, msgIDs []string) error
    // 获取未读消息数量
    Count(userID string) (int, error)
}

存储选型:

8.4 消息去重

由于重试机制的存在,客户端可能收到重复消息。去重有两种策略:

  1. 服务端去重:发送前检查消息 ID 是否已经发送过。适用于 QoS 2 级别的场景。
  2. 客户端去重:客户端维护一个已处理消息 ID 的滑动窗口,收到重复消息直接丢弃。
type Deduplicator struct {
    seen    map[string]struct{}
    order   []string
    maxSize int
    mu      sync.Mutex
}

func (d *Deduplicator) IsDuplicate(msgID string) bool {
    d.mu.Lock()
    defer d.mu.Unlock()

    if _, exists := d.seen[msgID]; exists {
        return true
    }

    d.seen[msgID] = struct{}{}
    d.order = append(d.order, msgID)

    // 窗口满了,淘汰最旧的
    if len(d.order) > d.maxSize {
        oldest := d.order[0]
        d.order = d.order[1:]
        delete(d.seen, oldest)
    }
    return false
}

九、推送系统的水平扩展

9.1 连接层与业务层分离

单机能撑百万连接,但单机终究有上限。水平扩展的第一步是把连接层和业务层分离:

graph TB
    subgraph 连接层
        GW1[网关 1]
        GW2[网关 2]
        GW3[网关 3]
    end

    subgraph 业务层
        BIZ1[聊天服务]
        BIZ2[通知服务]
        BIZ3[行情服务]
    end

    LB[负载均衡] --> GW1
    LB --> GW2
    LB --> GW3

    GW1 <-->|gRPC| BIZ1
    GW2 <-->|gRPC| BIZ2
    GW3 <-->|gRPC| BIZ3

    MQ[消息队列] --> GW1
    MQ --> GW2
    MQ --> GW3

连接网关的职责:

业务层的职责:

9.2 连接状态路由

分离之后有一个核心问题:业务层要给用户 A 推送消息,但用户 A 连接在哪台网关上?

解决方案是维护一个全局的连接路由表:

type ConnRouter interface {
    // 用户上线时注册
    Register(userID string, gatewayID string) error
    // 用户下线时注销
    Unregister(userID string, gatewayID string) error
    // 查询用户连接在哪台网关
    Lookup(userID string) ([]string, error)
    // 一个用户可能有多个设备同时在线
}

存储选型:

9.3 跨网关消息转发

当业务层确定消息的目标用户后,需要将消息投递到对应的网关。两种方案:

方案一:消息队列广播

每台网关订阅一个独立的消息队列主题(如 push.gateway.{id})。业务层根据路由表查到目标网关,将消息投递到对应的队列。

优点:解耦,网关故障时消息不丢(堆积在队列中)。 缺点:增加了一跳延迟,需要维护队列基础设施。

方案二:直接 RPC

业务层通过 gRPC 直接调用目标网关的推送接口。

优点:延迟低。 缺点:需要处理网关故障时的重试和降级。

实际工程中,两种方案通常结合使用:在线消息走 RPC 追求低延迟,离线消息走消息队列保证不丢。

9.4 长轮询作为降级方案

当客户端无法建立 WebSocket 或 SSE 连接时(企业防火墙、老旧浏览器),长轮询(Long Polling)是最后的退路:

func longPollHandler(w http.ResponseWriter, r *http.Request) {
    userID := r.URL.Query().Get("user_id")
    lastSeq := parseSequence(r.URL.Query().Get("last_seq"))
    timeout := 30 * time.Second

    ctx, cancel := context.WithTimeout(r.Context(), timeout)
    defer cancel()

    messages, err := waitForMessages(ctx, userID, lastSeq)
    if err == context.DeadlineExceeded {
        // 超时无消息,返回空响应
        w.WriteHeader(http.StatusNoContent)
        return
    }
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    json.NewEncoder(w).Encode(messages)
}

长轮询的本质是:客户端发起一个 HTTP 请求,服务端挂起这个请求直到有新消息或超时。超时后客户端立刻发起下一个请求。与传统短轮询相比,长轮询减少了大量无效请求;与 WebSocket 相比,它的延迟更高(至少一个 RTT)且服务端需要为每个挂起的请求占用一个连接。


十、工程案例:即时通讯与物联网推送平台

10.1 案例一:千万级 IM 系统

某社交产品的即时通讯(IM)系统需要支持以下指标:

架构设计:

graph TB
    subgraph 客户端
        APP1[移动端]
        APP2[桌面端]
        APP3[Web 端]
    end

    subgraph 接入层
        LB[LVS + Nginx]
        GW1[WebSocket 网关]
        GW2[WebSocket 网关]
        GW3[WebSocket 网关]
    end

    subgraph 逻辑层
        MSG[消息服务]
        GROUP[群聊服务]
        ROUTE[路由服务]
        OFFLINE[离线服务]
    end

    subgraph 存储层
        REDIS[(Redis 集群)]
        KAFKA[(Kafka)]
        MYSQL[(MySQL)]
        MINIO[(MinIO)]
    end

    APP1 --> LB
    APP2 --> LB
    APP3 --> LB
    LB --> GW1
    LB --> GW2
    LB --> GW3

    GW1 <--> ROUTE
    GW2 <--> MSG
    GW3 <--> GROUP

    MSG --> KAFKA
    KAFKA --> GW1
    KAFKA --> GW2
    KAFKA --> GW3

    OFFLINE --> REDIS
    MSG --> MYSQL
    MSG --> MINIO

关键设计决策:

  1. 连接网关无状态化:网关不存储业务状态,连接路由表存在 Redis 中。网关故障时,客户端重连到其他网关,自动恢复。

  2. 消息存储模型:采用”写扩散”(Fan-out on Write)模型。单聊消息写入发送者和接收者的消息收件箱(inbox)。群聊消息写入群的消息时间线,成员读取时拉取。万人群不能做写扩散——写放大 10000 倍会压垮存储层。

  3. 消息有序性:每个会话维护一个单调递增的序列号(sequence)。服务端使用 Redis INCR 生成序列号,保证同一会话内的消息有序。

  4. ACK 与重投:消息下发后启动 3 秒计时器,未收到客户端 ACK 则重投。重投 3 次仍失败则放入离线队列。客户端每次上线后主动拉取离线消息。

10.2 案例二:工业物联网推送平台

某制造企业的物联网(IoT)平台需要接入 50 万台设备:

协议选型:MQTT

选择 MQTT 的理由:

主题设计:

# 数据上报
telemetry/{factory_id}/{floor}/{machine_id}/temperature
telemetry/{factory_id}/{floor}/{machine_id}/vibration

# 设备状态
status/{factory_id}/{floor}/{machine_id}/online
status/{factory_id}/{floor}/{machine_id}/alarm

# 下行指令
command/{factory_id}/{floor}/{machine_id}/config
command/{factory_id}/{floor}/{machine_id}/firmware
command/{factory_id}/{floor}/{machine_id}/emergency_stop

# 遗嘱主题
status/{factory_id}/{floor}/{machine_id}/online -> payload: "offline"

桥接模式(Bridge Pattern):

部分传感器不直接支持 MQTT(例如 Modbus RTU 协议的老设备),通过边缘网关(Edge Gateway)桥接:

// Rust 边缘网关:Modbus -> MQTT 桥接
use tokio_modbus::prelude::*;
use rumqttc::{MqttOptions, AsyncClient, QoS};

async fn bridge_loop(
    modbus_ctx: &mut tcp::Context,
    mqtt_client: &AsyncClient,
    device_id: &str,
) {
    let mut interval = tokio::time::interval(
        tokio::time::Duration::from_secs(5)
    );
    loop {
        interval.tick().await;

        // 从 Modbus 设备读取温度寄存器
        let registers = modbus_ctx
            .read_holding_registers(0x0001, 2)
            .await;

        match registers {
            Ok(data) => {
                let temperature = decode_temperature(&data);
                let topic = format!(
                    "telemetry/factory01/floor1/{}/temperature",
                    device_id
                );
                let payload = serde_json::json!({
                    "value": temperature,
                    "unit": "celsius",
                    "timestamp": chrono::Utc::now().timestamp()
                });
                mqtt_client
                    .publish(
                        topic,
                        QoS::AtLeastOnce,
                        false,
                        payload.to_string(),
                    )
                    .await
                    .ok();
            }
            Err(e) => {
                eprintln!("Modbus 读取失败: {}", e);
            }
        }
    }
}

QoS 级别分配:

消息类型 QoS 理由
温度数据 0 高频上报,丢一个点不影响趋势
振动告警 1 告警不能丢,但重复可以在业务层去重
紧急停机指令 2 不能丢也不能重复执行
固件升级 1 升级流程本身有校验,重复指令不会导致问题
设备状态 1 + 保留 新订阅者需要立即获取设备当前状态

Broker 集群架构:

单台 EMQX Broker 可以承载约 200 万连接。50 万设备只需一个小型集群(3 节点)。但考虑到数据上报频率(传感器 5 秒一次,50 万台 = 每秒 10 万条消息;PLC 100ms 一次,假设 1000 台 = 每秒 1 万条消息),消息吞吐是更大的挑战。

EMQX 的集群方案基于 Erlang/OTP 的分布式能力,节点间通过 Erlang 分布式协议通信,订阅表在集群内全量复制。客户端连接到任意节点,发布的消息会被路由到所有持有匹配订阅的节点。

10.3 工程经验总结

从这两个案例中可以提炼出推送系统设计的几个通用原则:

  1. 协议选型跟着场景走。浏览器端优先 WebSocket/SSE,物联网用 MQTT,不要反过来。
  2. 连接层做薄,业务层做厚。连接网关的职责越少,稳定性越高。
  3. 可靠投递不是免费的。ACK、重试、离线队列每一个都有成本——延迟、存储、复杂度。只在需要的消息上开启。
  4. 心跳间隔是一个权衡。太短浪费带宽和电量,太长导致连接静默断开。没有银弹,需要针对网络环境实测。
  5. 水平扩展的关键是状态外置。连接路由表、离线消息、会话状态全部放到外部存储,网关本身无状态。

参考资料

  1. RFC 6455 - The WebSocket Protocol. IETF, 2011.
  2. Server-Sent Events. W3C Recommendation, 2015.
  3. MQTT Version 5.0. OASIS Standard, 2019.
  4. MQTT Version 3.1.1. OASIS Standard, 2014.
  5. Libenzi, D. “epoll - I/O event notification facility”. Linux Programmer’s Manual.
  6. 陶辉.《深入理解 Nginx:模块开发与架构解析》. 机械工业出版社, 2013.
  7. EMQX Documentation. “Cluster Architecture”. https://docs.emqx.com/en/emqx/latest/deploy/cluster/introduction.html
  8. Gorilla WebSocket. “Chat Example”. https://github.com/gorilla/websocket/tree/main/examples/chat
  9. Eclipse Paho. “MQTT Embedded C/C++ Client Libraries”. https://github.com/eclipse/paho.mqtt.embedded-c
  10. Grigorik, I. “High Performance Browser Networking”. O’Reilly Media, 2013. Chapter 17: WebSocket.
  11. Fielding, R. & Reschke, J. RFC 7230 - HTTP/1.1 Message Syntax and Routing. IETF, 2014.
  12. 许式伟.《Go 语言编程》. 人民邮电出版社, 2012.

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】Slack 架构:实时协作的工程挑战

Slack 每天为超过一千万活跃用户提供实时消息服务,峰值时段同时维持数百万条 WebSocket(全双工通信协议)长连接。一条消息从发送到被同一频道所有成员看到,端到端延迟通常控制在 200 毫秒以内。这套系统并非一蹴而就:它从一个 PHP 单体应用起步,历经数次关键重构,逐步演变为以 Hack、Go、Java 为核…

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .