一、实时推送:现代应用的基础需求
在线聊天消息要在 200ms 内送达对方;股票行情需要每秒推送数十次报价更新;物联网平台要同时维持数百万设备的心跳连接;协同文档需要将每一次光标移动实时同步给所有参与者。
这些场景有一个共同特征:服务端需要主动向客户端推送数据,而不是等客户端来轮询。
HTTP 协议的请求-响应(Request-Response)模型天然不支持服务端主动推送。客户端发一个请求,服务端回一个响应,连接就结束了。要实现”服务端主动推”,工程上有三条主流路径:
- WebSocket:基于 TCP 的全双工通信协议,客户端和服务端都可以随时发送数据。
- SSE(Server-Sent Events):基于 HTTP 的单向推送协议,服务端可以持续向客户端发送事件流。
- MQTT(Message Queuing Telemetry Transport):面向物联网的轻量级发布/订阅协议,支持三级服务质量保证。
还有一条退化路径:长轮询(Long Polling),它不是真正的推送协议,但在 WebSocket 不可用的环境下仍然有价值。
这篇文章不做协议科普——RFC 文档已经够详细了。我们要回答的是工程问题:推送系统的连接管理、心跳检测、断线重连,到底有多复杂?百万长连接的单机架构怎么设计?消息丢了怎么办?集群扩展时连接状态怎么迁移?
二、WebSocket 协议深度解析
2.1 升级握手
WebSocket 连接始于一个 HTTP 升级请求。客户端发送一个带有
Upgrade: websocket 头的 HTTP GET
请求,服务端返回 101 状态码表示协议切换成功。
GET /ws/chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Sec-WebSocket-Protocol: chat
服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
Sec-WebSocket-Accept 的值是将客户端发送的
Sec-WebSocket-Key 拼接上固定 GUID
258EAFA5-E914-47DA-95CA-5AB5DC11CE56,做一次
SHA-1 哈希后 Base64
编码得到的。这个机制不是为了安全——它只是为了防止非 WebSocket
客户端误发升级请求。
握手完成后,TCP 连接不再关闭,双方通过帧(Frame)格式收发数据。
2.2 帧格式
WebSocket 的数据帧格式紧凑,最小开销仅 2 字节:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data (continued) |
+---------------------------------------------------------------+
关键字段说明:
- FIN(1 bit):标识是否是消息的最后一帧。大消息可以拆成多帧发送。
- opcode(4 bits):
0x1文本帧,0x2二进制帧,0x8关闭帧,0x9Ping,0xAPong。 - MASK(1 bit):客户端发送的帧必须掩码,服务端发送的帧不掩码。这是为了防止缓存投毒攻击。
- Payload length:7 位表示 0-125 字节,126 表示后续 2 字节是实际长度,127 表示后续 8 字节是实际长度。
2.3 Ping/Pong 心跳
WebSocket 协议内置了 Ping/Pong 机制。任何一方都可以发送
Ping 帧(opcode 0x9),对方必须尽快回复一个
Pong 帧(opcode 0xA),且 Pong 帧的 payload
必须与 Ping 帧完全一致。
// Go 语言 WebSocket 心跳示例(使用 gorilla/websocket)
func keepAlive(conn *websocket.Conn, timeout time.Duration) {
ticker := time.NewTicker(timeout / 2)
defer ticker.Stop()
conn.SetPongHandler(func(appData string) error {
// 收到 Pong,重置读超时
conn.SetReadDeadline(time.Now().Add(timeout))
return nil
})
for range ticker.C {
if err := conn.WriteControl(
websocket.PingMessage,
[]byte("ping"),
time.Now().Add(5*time.Second),
); err != nil {
return // 连接已断开
}
}
}2.4 关闭握手
WebSocket 的关闭是一个四步握手过程:
- 发起方发送 Close 帧(opcode
0x8),携带关闭码和原因。 - 接收方回复 Close 帧。
- 接收方关闭 TCP 连接。
- 发起方确认 TCP 连接关闭。
常见关闭码包括:1000
正常关闭,1001
终端离开(如浏览器标签页关闭),1006
异常关闭(未发送 Close 帧),1011
服务端内部错误。
// 优雅关闭 WebSocket 连接
func gracefulClose(conn *websocket.Conn) error {
msg := websocket.FormatCloseMessage(
websocket.CloseNormalClosure,
"server shutting down",
)
err := conn.WriteControl(
websocket.CloseMessage,
msg,
time.Now().Add(5*time.Second),
)
if err != nil {
return conn.Close()
}
// 等待对方回复 Close 帧,设置超时
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
return conn.Close()
}下面这张图展示了 WebSocket 从握手到关闭的完整生命周期:
sequenceDiagram
participant C as 客户端
participant S as 服务端
C->>S: HTTP GET /ws (Upgrade: websocket)
S-->>C: 101 Switching Protocols
Note over C,S: WebSocket 连接建立
C->>S: 文本帧 (opcode=0x1)
S->>C: 文本帧 (opcode=0x1)
S->>C: Ping 帧 (opcode=0x9)
C-->>S: Pong 帧 (opcode=0xA)
C->>S: Close 帧 (opcode=0x8, code=1000)
S-->>C: Close 帧 (opcode=0x8, code=1000)
Note over C,S: TCP 连接关闭
三、SSE(Server-Sent Events)详解
3.1 协议机制
SSE 比 WebSocket 简单得多——它就是一个普通的 HTTP
响应,Content-Type 设为
text/event-stream,服务端持续往响应体里写数据,不关闭连接。
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"price": 150.25, "symbol": "AAPL"}
event: trade
data: {"price": 150.30, "symbol": "AAPL", "volume": 100}
id: 1001
event: trade
data: {"price": 150.28, "symbol": "AAPL", "volume": 250}
id: 1002
每条消息由若干字段组成:
- data:消息内容,可以多行(每行以
data:开头)。 - event:事件类型,默认是
message。 - id:事件 ID,用于断线重连时恢复位置。
- retry:建议的重连间隔(毫秒)。
消息之间用空行分隔。
3.2 EventSource API
浏览器端使用 EventSource API 连接 SSE
端点:
const source = new EventSource('/api/stock/stream');
// 监听默认 message 事件
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`${data.symbol}: ${data.price}`);
};
// 监听自定义 trade 事件
source.addEventListener('trade', (event) => {
const trade = JSON.parse(event.data);
updateTradeList(trade);
});
// 错误处理
source.onerror = (event) => {
if (source.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
} else {
console.log('连接中断,浏览器将自动重连');
}
};3.3 自动重连与事件 ID
SSE
最有价值的特性之一是内置的断线重连机制。当连接中断时,浏览器会自动尝试重连,并在请求头中携带
Last-Event-ID,告诉服务端上次收到的最后一个事件
ID。
GET /api/stock/stream HTTP/1.1
Host: example.com
Last-Event-ID: 1002
服务端可以根据这个 ID 从断点处继续推送,避免消息丢失。
服务端 Go 实现示例:
func sseHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 获取断线重连的起始 ID
lastID := r.Header.Get("Last-Event-ID")
startSeq := parseSequence(lastID)
eventCh := subscribe(r.Context(), startSeq)
for {
select {
case event, ok := <-eventCh:
if !ok {
return
}
fmt.Fprintf(w, "id: %d\n", event.Seq)
fmt.Fprintf(w, "event: %s\n", event.Type)
fmt.Fprintf(w, "data: %s\n\n", event.Data)
flusher.Flush()
case <-r.Context().Done():
return
}
}
}3.4 SSE 的局限
SSE 是单向的——只有服务端可以向客户端发送数据。客户端要发送数据,需要另开一个 HTTP 请求。这在大多数推送场景下不是问题(股票行情、通知推送、日志流),但在需要双向通信的场景下(聊天、游戏)就显得力不从心。
另一个限制是浏览器对同一域名的 SSE 连接数有上限。HTTP/1.1 下,大多数浏览器限制 6 个连接;HTTP/2 下这个限制大幅放宽到 100 个。
四、MQTT 协议与 QoS 级别
4.1 协议概述
MQTT 是 IBM 在 1999 年为卫星链路设计的轻量级协议。它的设计目标是在低带宽、高延迟、不稳定的网络上实现可靠的消息传递。经过二十多年的演进,MQTT 已经成为物联网(IoT)领域的事实标准。
MQTT 采用发布/订阅(Publish/Subscribe)模型,所有消息通过一个中心化的代理(Broker)转发。客户端可以向主题(Topic)发布消息,也可以订阅主题接收消息。
+---------+
传感器 A ------->| |-------> 监控面板
| MQTT |
传感器 B ------->| Broker |-------> 数据存储
| |
传感器 C ------->| |-------> 告警服务
+---------+
4.2 主题层级
MQTT 的主题是用斜杠分隔的层级结构,类似文件路径:
factory/floor1/machine42/temperature
factory/floor1/machine42/vibration
factory/floor2/+/temperature # + 匹配单层
factory/# # # 匹配多层
通配符规则:
+:匹配单个层级。sensor/+/temperature匹配sensor/room1/temperature和sensor/room2/temperature,但不匹配sensor/room1/sub/temperature。#:匹配零个或多个层级,只能出现在主题末尾。sensor/#匹配sensor、sensor/room1、sensor/room1/temperature。
4.3 三级 QoS
MQTT 的核心特性是三级服务质量(Quality of Service)保证:
QoS 0:至多一次(At Most Once)
发布者发送消息后不等待确认,Broker 也不做存储。消息可能丢失,但开销最小。适用于高频传感器数据——丢一两个点不影响整体趋势。
Publisher ---PUBLISH---> Broker ---PUBLISH---> Subscriber
QoS 1:至少一次(At Least Once)
发布者发送消息后等待 Broker 的 PUBACK 确认。如果超时未收到 PUBACK,发布者会重发消息(带 DUP 标志)。消息不会丢,但可能重复。
Publisher ---PUBLISH---> Broker
Publisher <---PUBACK---- Broker
QoS 2:恰好一次(Exactly Once)
通过四步握手保证消息不丢不重。这是开销最大的级别,适用于计费、指令下发等不能容忍重复的场景。
sequenceDiagram
participant P as 发布者
participant B as Broker
P->>B: PUBLISH (QoS 2, PacketID=42)
B-->>P: PUBREC (PacketID=42)
P->>B: PUBREL (PacketID=42)
B-->>P: PUBCOMP (PacketID=42)
Note over P,B: 四步握手完成,消息恰好投递一次
4.4 保留消息与遗嘱
保留消息(Retained Message):发布者可以将消息标记为”保留”。Broker 会为每个主题保存最后一条保留消息。当新的订阅者订阅该主题时,立即收到这条保留消息,不用等到下次发布。这对设备状态查询特别有用——新上线的监控面板可以立刻拿到所有设备的最新状态。
遗嘱消息(Last Will and Testament,LWT):客户端连接时可以预设一条遗嘱消息。如果客户端异常断开(未发送 DISCONNECT),Broker 会代替客户端发布这条遗嘱消息。其他订阅者收到遗嘱后就知道该设备离线了。
// Go 语言 MQTT 客户端配置遗嘱消息
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://broker.example.com:1883")
opts.SetClientID("sensor-42")
opts.SetWill(
"factory/floor1/machine42/status", // 遗嘱主题
"offline", // 遗嘱内容
1, // QoS 1
true, // 保留
)
opts.SetKeepAlive(30 * time.Second)
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()五、三种推送技术对比
在选择推送技术之前,需要从多个维度对比三种协议的特性和适用场景:
| 维度 | WebSocket | SSE | MQTT |
|---|---|---|---|
| 通信方向 | 全双工 | 服务端到客户端单向 | 全双工(发布/订阅) |
| 底层协议 | TCP(HTTP 升级) | HTTP | TCP |
| 最小帧开销 | 2 字节 | 无二进制帧,纯文本 | 2 字节 |
| 浏览器支持 | 所有现代浏览器 | 除 IE 外所有现代浏览器 | 需要第三方库(MQTT.js) |
| 自动重连 | 无,需自行实现 | 内置,浏览器自动处理 | 客户端库通常内置 |
| 消息可靠性 | 无内置保证 | 通过事件 ID 支持断点续传 | QoS 0/1/2 三级保证 |
| 多路复用 | HTTP/2 下不支持 | HTTP/2 天然支持 | 单连接多主题 |
| 代理穿透 | 部分企业代理会阻断 | HTTP 兼容,代理友好 | 需要专用端口或 WS 桥接 |
| 二进制数据 | 原生支持 | 需 Base64 编码 | 原生支持 |
| 适用场景 | 聊天、游戏、协同编辑 | 通知推送、实时仪表盘 | 物联网、传感器数据采集 |
| 连接管理复杂度 | 中 | 低 | 中(Broker 托管) |
| 典型延迟 | 亚毫秒级(局域网) | 毫秒级 | 毫秒到秒级(取决于 QoS) |
选型建议:
- 需要双向通信(聊天、游戏):WebSocket。
- 服务端单向推送(通知、行情、日志流):SSE 优先,它更简单、代理友好、自带重连。
- 物联网/受限设备:MQTT,它专为低带宽和不稳定网络设计。
- 需要可靠投递:MQTT QoS 1/2,或在 WebSocket/SSE 之上自建 ACK 机制。
六、百万长连接的单机架构
6.1 问题规模
一台 Linux 服务器(32 核、128 GB 内存)能撑多少 WebSocket 长连接?理论上限取决于三个资源:
- 文件描述符:每个 TCP 连接消耗一个文件描述符。默认限制 1024,但可以调到百万级。
- 内存:每个连接的内核 TCP 缓冲区约 4-8 KB,应用层如果为每个连接分配一个 goroutine 或线程,内存开销更大。
- CPU:主要消耗在事件分发和消息序列化/反序列化上。
一个经验数字:在消息频率不高(每秒每连接 1 条消息以下)的场景下,一台配置合理的服务器可以维持 100 万到 200 万个空闲长连接。
6.2 epoll 事件驱动模型
百万连接的基础是 Linux 的 epoll 机制。与传统的 select/poll 不同,epoll 不需要遍历所有连接来查找有事件的连接——它通过内核回调机制,只返回有事件的文件描述符。
#include <sys/epoll.h>
#define MAX_EVENTS 10000
int main() {
int epfd = epoll_create1(0);
// 将监听 socket 加入 epoll
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 边缘触发
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);
struct epoll_event events[MAX_EVENTS];
for (;;) {
int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == listen_fd) {
// 接受新连接
int conn_fd = accept(listen_fd, NULL, NULL);
set_nonblocking(conn_fd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &ev);
} else {
// 处理已有连接的数据
handle_data(events[i].data.fd);
}
}
}
}关键配置参数:
- 边缘触发(EPOLLET):只在状态变化时通知,减少系统调用次数。但要求应用层循环读取直到
EAGAIN。 - EPOLLONESHOT:事件触发后自动从 epoll 中移除,避免多个线程同时处理同一个连接。
6.3 Go 语言的 goroutine 模型
Go 语言的网络库在底层使用 epoll(Linux)/ kqueue(macOS),但暴露给开发者的是同步阻塞的编程模型。每个连接一个 goroutine,goroutine 在等待 I/O 时会被 Go 运行时挂起,不占用操作系统线程。
func main() {
listener, _ := net.Listen("tcp", ":8080")
upgrader := websocket.Upgrader{}
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil)
go readLoop(conn) // 每个连接一个读 goroutine
go writeLoop(conn) // 每个连接一个写 goroutine
})
http.Serve(listener, nil)
}
func readLoop(conn *websocket.Conn) {
defer conn.Close()
for {
_, msg, err := conn.ReadMessage()
if err != nil {
return
}
processMessage(conn, msg)
}
}问题在于:100 万连接意味着 200 万个 goroutine(读 + 写各一个),每个 goroutine 默认栈大小 8 KB,光栈内存就消耗 16 GB。加上 WebSocket 读写缓冲区,总内存可能超过 40 GB。
6.4 优化:reactor 模式
对于真正的百万连接场景,更高效的做法是放弃”每连接一个 goroutine”的模型,回到 reactor 模式:少量 goroutine 通过 epoll 管理大量连接,只在有数据到达时才分配 goroutine 处理。
// 伪代码:基于 epoll 的 Go reactor
type Reactor struct {
epfd int
conns map[int]*Connection
workers chan func()
}
func (r *Reactor) Run() {
events := make([]syscall.EpollEvent, 1024)
for {
n, _ := syscall.EpollWait(r.epfd, events, -1)
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
conn := r.conns[fd]
r.workers <- func() {
conn.handleEvent()
}
}
}
}Go 生态中 gnet、nbio
等库实现了这种模式,能在单机上以较低的内存开销维持数百万连接。
6.5 系统调优清单
百万连接需要调整的内核参数:
# 文件描述符限制
ulimit -n 2000000
echo "fs.file-max = 2000000" >> /etc/sysctl.conf
# TCP 连接回收
echo "net.ipv4.tcp_tw_reuse = 1" >> /etc/sysctl.conf
echo "net.ipv4.tcp_fin_timeout = 15" >> /etc/sysctl.conf
# TCP 缓冲区(减小默认值以节省内存)
echo "net.ipv4.tcp_rmem = 4096 4096 16384" >> /etc/sysctl.conf
echo "net.ipv4.tcp_wmem = 4096 4096 16384" >> /etc/sysctl.conf
# 本地端口范围(用于出站连接)
echo "net.ipv4.ip_local_port_range = 1024 65535" >> /etc/sysctl.conf
# somaxconn(监听队列长度)
echo "net.core.somaxconn = 65535" >> /etc/sysctl.conf
sysctl -p七、心跳检测与断线重连
7.1 为什么需要应用层心跳
TCP 协议本身有 keepalive 机制,但它的默认参数不适合推送场景:
tcp_keepalive_time:连接空闲 7200 秒(2 小时)后才开始探测。tcp_keepalive_intvl:探测间隔 75 秒。tcp_keepalive_probes:连续 9 次探测失败后断开。
也就是说,一个已经断开的连接,TCP 层最长需要 2 小时 + 675 秒才能发现。这对推送系统是不可接受的。
更严重的是,很多中间设备(NAT 网关、防火墙、负载均衡器)会静默丢弃长时间空闲的连接。移动网络环境下,NAT 超时通常在 5-10 分钟。如果不发心跳,连接可能在客户端和服务端都没感知的情况下被中间设备切断。
7.2 心跳策略设计
type HeartbeatConfig struct {
Interval time.Duration // 心跳发送间隔
Timeout time.Duration // 等待回复的超时时间
MaxMissed int // 最大允许连续丢失次数
}
// 推荐配置
var defaultConfig = HeartbeatConfig{
Interval: 30 * time.Second,
Timeout: 10 * time.Second,
MaxMissed: 3,
}
type Connection struct {
conn net.Conn
lastPong time.Time
missCount int
mu sync.Mutex
}
func (c *Connection) heartbeatLoop(cfg HeartbeatConfig) {
ticker := time.NewTicker(cfg.Interval)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
since := time.Since(c.lastPong)
c.mu.Unlock()
if since > cfg.Interval*time.Duration(cfg.MaxMissed) {
log.Printf("连接超时,最后一次 Pong 在 %v 前", since)
c.conn.Close()
return
}
// 发送 Ping
c.conn.SetWriteDeadline(time.Now().Add(cfg.Timeout))
if err := c.sendPing(); err != nil {
log.Printf("发送 Ping 失败: %v", err)
c.conn.Close()
return
}
}
}
func (c *Connection) onPong() {
c.mu.Lock()
c.lastPong = time.Now()
c.missCount = 0
c.mu.Unlock()
}7.3 智能心跳间隔
不同网络环境下 NAT 超时不同。移动网络通常 5-10 分钟,Wi-Fi 环境可能更长。一种优化策略是动态探测最优心跳间隔:
- 初始心跳间隔设为较大值(如 5 分钟)。
- 如果连接断开后重连成功,缩短心跳间隔。
- 如果连接长时间稳定,逐步延长心跳间隔。
- 记录每个网络环境下的最优间隔,下次连接时直接使用。
微信客户端在移动网络下的心跳间隔大约是 4.5 分钟,这是经过大规模测试得到的经验值——短到足以在大多数 NAT 超时之前保活,长到不会过度消耗电量和流量。
7.4 断线重连与指数退避
当连接断开后,客户端需要尝试重连。直接重连可能导致”惊群效应”(Thundering Herd)——如果服务端重启,上万个客户端同时重连,瞬间冲垮服务端。
指数退避(Exponential Backoff)加随机抖动(Jitter)是标准解决方案:
type ReconnectPolicy struct {
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
MaxRetries int
}
func (p *ReconnectPolicy) nextDelay(attempt int) time.Duration {
if attempt >= p.MaxRetries {
return p.MaxDelay
}
delay := float64(p.BaseDelay) * math.Pow(p.Multiplier, float64(attempt))
if delay > float64(p.MaxDelay) {
delay = float64(p.MaxDelay)
}
// 加入 [0, delay) 范围的随机抖动
jitter := rand.Float64() * delay
return time.Duration(jitter)
}
func reconnectLoop(addr string, policy ReconnectPolicy) net.Conn {
for attempt := 0; ; attempt++ {
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err == nil {
log.Println("重连成功")
return conn
}
delay := policy.nextDelay(attempt)
log.Printf("重连失败(第 %d 次),等待 %v 后重试: %v",
attempt+1, delay, err)
time.Sleep(delay)
}
}推荐参数:基础延迟 1 秒,倍率 2,最大延迟 60 秒,最大重试次数 20。
八、推送消息的可靠投递
8.1 消息丢失的三个环节
推送链路上有三个地方可能丢消息:
- 服务端到连接层:业务服务发送消息到连接网关,网关可能崩溃或过载。
- 连接层到客户端:TCP 连接可能已断开但服务端尚未感知。
- 客户端处理失败:客户端收到消息但处理过程中崩溃。
graph LR
A[业务服务] -->|MQ| B[连接网关]
B -->|WebSocket/TCP| C[客户端]
style A fill:#f9f,stroke:#333
style B fill:#bbf,stroke:#333
style C fill:#bfb,stroke:#333
D[丢失点1: MQ 到网关] -.-> B
E[丢失点2: 网关到客户端] -.-> C
F[丢失点3: 客户端处理失败] -.-> C
8.2 端到端 ACK 机制
要实现可靠投递,必须引入应用层的确认(ACK)机制:
type Message struct {
ID string `json:"id"`
Type string `json:"type"`
Payload []byte `json:"payload"`
Timestamp int64 `json:"ts"`
NeedACK bool `json:"need_ack"`
}
type ACK struct {
MessageID string `json:"msg_id"`
ClientID string `json:"client_id"`
Timestamp int64 `json:"ts"`
}
// 服务端:发送消息并等待 ACK
type DeliveryManager struct {
pending map[string]*PendingMessage
mu sync.RWMutex
retryPool chan *PendingMessage
}
type PendingMessage struct {
msg *Message
conn *Connection
attempts int
nextRetry time.Time
}
func (dm *DeliveryManager) Send(conn *Connection, msg *Message) {
if msg.NeedACK {
pm := &PendingMessage{
msg: msg,
conn: conn,
attempts: 0,
nextRetry: time.Now().Add(3 * time.Second),
}
dm.mu.Lock()
dm.pending[msg.ID] = pm
dm.mu.Unlock()
}
conn.Write(msg)
}
func (dm *DeliveryManager) OnACK(ack *ACK) {
dm.mu.Lock()
delete(dm.pending, ack.MessageID)
dm.mu.Unlock()
}
// 重试协程
func (dm *DeliveryManager) retryLoop() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
now := time.Now()
dm.mu.Lock()
for id, pm := range dm.pending {
if now.After(pm.nextRetry) {
if pm.attempts >= 3 {
// 投递失败,放入离线队列
dm.moveToOffline(pm)
delete(dm.pending, id)
} else {
pm.attempts++
pm.nextRetry = now.Add(
time.Duration(pm.attempts*3) * time.Second,
)
pm.conn.Write(pm.msg)
}
}
}
dm.mu.Unlock()
}
}8.3 离线消息队列
当客户端不在线时,消息需要存入离线队列,等客户端重连后再投递:
type OfflineStore interface {
// 存入离线消息
Push(userID string, msg *Message) error
// 拉取离线消息(按时间顺序)
Pull(userID string, limit int) ([]*Message, error)
// 确认已收到,从队列删除
Ack(userID string, msgIDs []string) error
// 获取未读消息数量
Count(userID string) (int, error)
}存储选型:
- Redis List/Stream:适合消息量不大、不需要持久化保证的场景。Redis Stream 还支持消费组和 ACK。
- Kafka:适合高吞吐场景,消息可以保留较长时间。每个用户一个分区不现实,通常按用户 ID 哈希分区。
- 数据库(MySQL/PostgreSQL):适合消息量不大但需要强持久化的场景。注意索引设计——按用户 ID + 时间戳索引。
8.4 消息去重
由于重试机制的存在,客户端可能收到重复消息。去重有两种策略:
- 服务端去重:发送前检查消息 ID 是否已经发送过。适用于 QoS 2 级别的场景。
- 客户端去重:客户端维护一个已处理消息 ID 的滑动窗口,收到重复消息直接丢弃。
type Deduplicator struct {
seen map[string]struct{}
order []string
maxSize int
mu sync.Mutex
}
func (d *Deduplicator) IsDuplicate(msgID string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if _, exists := d.seen[msgID]; exists {
return true
}
d.seen[msgID] = struct{}{}
d.order = append(d.order, msgID)
// 窗口满了,淘汰最旧的
if len(d.order) > d.maxSize {
oldest := d.order[0]
d.order = d.order[1:]
delete(d.seen, oldest)
}
return false
}九、推送系统的水平扩展
9.1 连接层与业务层分离
单机能撑百万连接,但单机终究有上限。水平扩展的第一步是把连接层和业务层分离:
graph TB
subgraph 连接层
GW1[网关 1]
GW2[网关 2]
GW3[网关 3]
end
subgraph 业务层
BIZ1[聊天服务]
BIZ2[通知服务]
BIZ3[行情服务]
end
LB[负载均衡] --> GW1
LB --> GW2
LB --> GW3
GW1 <-->|gRPC| BIZ1
GW2 <-->|gRPC| BIZ2
GW3 <-->|gRPC| BIZ3
MQ[消息队列] --> GW1
MQ --> GW2
MQ --> GW3
连接网关的职责:
- 维持与客户端的长连接。
- 协议解析(WebSocket 帧解码、MQTT 报文解析)。
- 心跳管理和连接保活。
- 将业务消息转发到业务层。
- 将业务层的推送消息发送给客户端。
业务层的职责:
- 处理业务逻辑(消息路由、群聊扩散、权限校验)。
- 决定消息推送的目标用户。
- 将推送消息发送到消息队列,由网关消费并投递。
9.2 连接状态路由
分离之后有一个核心问题:业务层要给用户 A 推送消息,但用户 A 连接在哪台网关上?
解决方案是维护一个全局的连接路由表:
type ConnRouter interface {
// 用户上线时注册
Register(userID string, gatewayID string) error
// 用户下线时注销
Unregister(userID string, gatewayID string) error
// 查询用户连接在哪台网关
Lookup(userID string) ([]string, error)
// 一个用户可能有多个设备同时在线
}存储选型:
- Redis Hash:
user:{userID}->{deviceID: gatewayID}。查询 O(1),但需要处理 Redis 故障和数据一致性。 - 一致性哈希(Consistent Hashing):将用户 ID 哈希到固定的网关。优点是不需要中心化路由表;缺点是网关扩缩容时需要迁移连接。
9.3 跨网关消息转发
当业务层确定消息的目标用户后,需要将消息投递到对应的网关。两种方案:
方案一:消息队列广播
每台网关订阅一个独立的消息队列主题(如
push.gateway.{id})。业务层根据路由表查到目标网关,将消息投递到对应的队列。
优点:解耦,网关故障时消息不丢(堆积在队列中)。 缺点:增加了一跳延迟,需要维护队列基础设施。
方案二:直接 RPC
业务层通过 gRPC 直接调用目标网关的推送接口。
优点:延迟低。 缺点:需要处理网关故障时的重试和降级。
实际工程中,两种方案通常结合使用:在线消息走 RPC 追求低延迟,离线消息走消息队列保证不丢。
9.4 长轮询作为降级方案
当客户端无法建立 WebSocket 或 SSE 连接时(企业防火墙、老旧浏览器),长轮询(Long Polling)是最后的退路:
func longPollHandler(w http.ResponseWriter, r *http.Request) {
userID := r.URL.Query().Get("user_id")
lastSeq := parseSequence(r.URL.Query().Get("last_seq"))
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(r.Context(), timeout)
defer cancel()
messages, err := waitForMessages(ctx, userID, lastSeq)
if err == context.DeadlineExceeded {
// 超时无消息,返回空响应
w.WriteHeader(http.StatusNoContent)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(messages)
}长轮询的本质是:客户端发起一个 HTTP 请求,服务端挂起这个请求直到有新消息或超时。超时后客户端立刻发起下一个请求。与传统短轮询相比,长轮询减少了大量无效请求;与 WebSocket 相比,它的延迟更高(至少一个 RTT)且服务端需要为每个挂起的请求占用一个连接。
十、工程案例:即时通讯与物联网推送平台
10.1 案例一:千万级 IM 系统
某社交产品的即时通讯(IM)系统需要支持以下指标:
- 日活用户 1000 万,峰值同时在线 300 万。
- 单聊消息端到端延迟 P99 < 500ms。
- 群聊支持万人群,消息扩散延迟 P99 < 2 秒。
- 消息不丢、不重复、有序。
架构设计:
graph TB
subgraph 客户端
APP1[移动端]
APP2[桌面端]
APP3[Web 端]
end
subgraph 接入层
LB[LVS + Nginx]
GW1[WebSocket 网关]
GW2[WebSocket 网关]
GW3[WebSocket 网关]
end
subgraph 逻辑层
MSG[消息服务]
GROUP[群聊服务]
ROUTE[路由服务]
OFFLINE[离线服务]
end
subgraph 存储层
REDIS[(Redis 集群)]
KAFKA[(Kafka)]
MYSQL[(MySQL)]
MINIO[(MinIO)]
end
APP1 --> LB
APP2 --> LB
APP3 --> LB
LB --> GW1
LB --> GW2
LB --> GW3
GW1 <--> ROUTE
GW2 <--> MSG
GW3 <--> GROUP
MSG --> KAFKA
KAFKA --> GW1
KAFKA --> GW2
KAFKA --> GW3
OFFLINE --> REDIS
MSG --> MYSQL
MSG --> MINIO
关键设计决策:
连接网关无状态化:网关不存储业务状态,连接路由表存在 Redis 中。网关故障时,客户端重连到其他网关,自动恢复。
消息存储模型:采用”写扩散”(Fan-out on Write)模型。单聊消息写入发送者和接收者的消息收件箱(inbox)。群聊消息写入群的消息时间线,成员读取时拉取。万人群不能做写扩散——写放大 10000 倍会压垮存储层。
消息有序性:每个会话维护一个单调递增的序列号(sequence)。服务端使用 Redis INCR 生成序列号,保证同一会话内的消息有序。
ACK 与重投:消息下发后启动 3 秒计时器,未收到客户端 ACK 则重投。重投 3 次仍失败则放入离线队列。客户端每次上线后主动拉取离线消息。
10.2 案例二:工业物联网推送平台
某制造企业的物联网(IoT)平台需要接入 50 万台设备:
- 设备类型:温度传感器、振动传感器、PLC 控制器。
- 数据上报频率:传感器每 5 秒一次,PLC 每 100ms 一次。
- 下行指令:远程参数调整、固件升级、紧急停机。
- 网络环境:工厂内网 + 4G 蜂窝网络,部分设备通过网关桥接。
协议选型:MQTT
选择 MQTT 的理由:
- 传感器设备计算资源有限(ARM Cortex-M4,256 KB RAM),MQTT 客户端库(如 Eclipse Paho Embedded)的内存占用仅几 KB。
- 4G 网络不稳定,MQTT 的 QoS 1 保证消息至少送达一次。
- 遗嘱消息天然支持设备离线检测。
- 主题层级结构与工厂的物理层级(工厂/车间/产线/设备)完美匹配。
主题设计:
# 数据上报
telemetry/{factory_id}/{floor}/{machine_id}/temperature
telemetry/{factory_id}/{floor}/{machine_id}/vibration
# 设备状态
status/{factory_id}/{floor}/{machine_id}/online
status/{factory_id}/{floor}/{machine_id}/alarm
# 下行指令
command/{factory_id}/{floor}/{machine_id}/config
command/{factory_id}/{floor}/{machine_id}/firmware
command/{factory_id}/{floor}/{machine_id}/emergency_stop
# 遗嘱主题
status/{factory_id}/{floor}/{machine_id}/online -> payload: "offline"
桥接模式(Bridge Pattern):
部分传感器不直接支持 MQTT(例如 Modbus RTU 协议的老设备),通过边缘网关(Edge Gateway)桥接:
// Rust 边缘网关:Modbus -> MQTT 桥接
use tokio_modbus::prelude::*;
use rumqttc::{MqttOptions, AsyncClient, QoS};
async fn bridge_loop(
modbus_ctx: &mut tcp::Context,
mqtt_client: &AsyncClient,
device_id: &str,
) {
let mut interval = tokio::time::interval(
tokio::time::Duration::from_secs(5)
);
loop {
interval.tick().await;
// 从 Modbus 设备读取温度寄存器
let registers = modbus_ctx
.read_holding_registers(0x0001, 2)
.await;
match registers {
Ok(data) => {
let temperature = decode_temperature(&data);
let topic = format!(
"telemetry/factory01/floor1/{}/temperature",
device_id
);
let payload = serde_json::json!({
"value": temperature,
"unit": "celsius",
"timestamp": chrono::Utc::now().timestamp()
});
mqtt_client
.publish(
topic,
QoS::AtLeastOnce,
false,
payload.to_string(),
)
.await
.ok();
}
Err(e) => {
eprintln!("Modbus 读取失败: {}", e);
}
}
}
}QoS 级别分配:
| 消息类型 | QoS | 理由 |
|---|---|---|
| 温度数据 | 0 | 高频上报,丢一个点不影响趋势 |
| 振动告警 | 1 | 告警不能丢,但重复可以在业务层去重 |
| 紧急停机指令 | 2 | 不能丢也不能重复执行 |
| 固件升级 | 1 | 升级流程本身有校验,重复指令不会导致问题 |
| 设备状态 | 1 + 保留 | 新订阅者需要立即获取设备当前状态 |
Broker 集群架构:
单台 EMQX Broker 可以承载约 200 万连接。50 万设备只需一个小型集群(3 节点)。但考虑到数据上报频率(传感器 5 秒一次,50 万台 = 每秒 10 万条消息;PLC 100ms 一次,假设 1000 台 = 每秒 1 万条消息),消息吞吐是更大的挑战。
EMQX 的集群方案基于 Erlang/OTP 的分布式能力,节点间通过 Erlang 分布式协议通信,订阅表在集群内全量复制。客户端连接到任意节点,发布的消息会被路由到所有持有匹配订阅的节点。
10.3 工程经验总结
从这两个案例中可以提炼出推送系统设计的几个通用原则:
- 协议选型跟着场景走。浏览器端优先 WebSocket/SSE,物联网用 MQTT,不要反过来。
- 连接层做薄,业务层做厚。连接网关的职责越少,稳定性越高。
- 可靠投递不是免费的。ACK、重试、离线队列每一个都有成本——延迟、存储、复杂度。只在需要的消息上开启。
- 心跳间隔是一个权衡。太短浪费带宽和电量,太长导致连接静默断开。没有银弹,需要针对网络环境实测。
- 水平扩展的关键是状态外置。连接路由表、离线消息、会话状态全部放到外部存储,网关本身无状态。
参考资料
- RFC 6455 - The WebSocket Protocol. IETF, 2011.
- Server-Sent Events. W3C Recommendation, 2015.
- MQTT Version 5.0. OASIS Standard, 2019.
- MQTT Version 3.1.1. OASIS Standard, 2014.
- Libenzi, D. “epoll - I/O event notification facility”. Linux Programmer’s Manual.
- 陶辉.《深入理解 Nginx:模块开发与架构解析》. 机械工业出版社, 2013.
- EMQX Documentation. “Cluster Architecture”. https://docs.emqx.com/en/emqx/latest/deploy/cluster/introduction.html
- Gorilla WebSocket. “Chat Example”. https://github.com/gorilla/websocket/tree/main/examples/chat
- Eclipse Paho. “MQTT Embedded C/C++ Client Libraries”. https://github.com/eclipse/paho.mqtt.embedded-c
- Grigorik, I. “High Performance Browser Networking”. O’Reilly Media, 2013. Chapter 17: WebSocket.
- Fielding, R. & Reschke, J. RFC 7230 - HTTP/1.1 Message Syntax and Routing. IETF, 2014.
- 许式伟.《Go 语言编程》. 人民邮电出版社, 2012.
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】Slack 架构:实时协作的工程挑战
Slack 每天为超过一千万活跃用户提供实时消息服务,峰值时段同时维持数百万条 WebSocket(全双工通信协议)长连接。一条消息从发送到被同一频道所有成员看到,端到端延迟通常控制在 200 毫秒以内。这套系统并非一蹴而就:它从一个 PHP 单体应用起步,历经数次关键重构,逐步演变为以 Hack、Go、Java 为核…
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。
【系统架构设计百科】复杂性管理:架构的核心战场
系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略