土法炼钢兴趣小组的算法知识备份

【网络工程】SSE 与长轮询:服务端推送的轻量解法

文章导航

分类入口
network
标签入口
#sse#server-sent-events#long-polling#server-push#real-time

目录

不是所有实时通信都需要 WebSocket。当应用只需要服务端向客户端推送数据——股票行情、通知消息、日志流、构建状态更新——SSE(Server-Sent Events,服务端发送事件)和长轮询(Long Polling)是更轻量的选择。

它们建立在标准 HTTP 之上,不需要协议升级,天然兼容代理、CDN、负载均衡器。在很多场景下,它们比 WebSocket 更简单、更可靠。

一、SSE:服务端发送事件

SSE 是 HTML5 规范定义的服务端推送机制。客户端通过 EventSource API 建立持久 HTTP 连接,服务端通过这个连接持续推送文本事件。

1.1 协议基础

# 客户端请求
GET /events HTTP/1.1
Host: api.example.com
Accept: text/event-stream
Cache-Control: no-cache

# 服务端响应
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked

data: {"type":"heartbeat","time":"2025-07-31T10:00:00Z"}

event: notification
data: {"title":"New message","body":"Hello!"}

id: 12345
event: stock_update
data: {"symbol":"AAPL","price":195.27}

关键 HTTP 头:

Content-Type: text/event-stream    ← 必须,标识 SSE 流
Cache-Control: no-cache            ← 防止缓存中间响应
Connection: keep-alive             ← 保持连接持久
X-Accel-Buffering: no              ← Nginx: 禁用代理缓冲

1.2 事件流格式

SSE 使用纯文本协议,每个事件由若干行组成,事件之间用空行分隔:

事件流格式(Event Stream Format):

字段       │ 说明                              │ 示例
───────────┼───────────────────────────────────┼────────────────────
data:      │ 事件数据(可多行)                 │ data: hello world
event:     │ 事件类型(默认 "message")          │ event: notification
id:        │ 事件 ID(用于断线重连)            │ id: 12345
retry:     │ 重连间隔(毫秒)                   │ retry: 3000
: comment  │ 注释(被忽略,可用作心跳)          │ : keepalive

规则:
1. 每行以 "字段: 值" 格式
2. 事件之间用空行(\n\n)分隔
3. 行结束符: \n 或 \r\n 或 \r
4. 未知字段被忽略
# 多行 data(每行一个 data: 前缀,接收端用 \n 拼接)
data: line 1
data: line 2
data: line 3

# 等价于接收端收到: "line 1\nline 2\nline 3"

# 自定义事件类型
event: user_login
data: {"userId":"u-123","name":"Alice"}

event: user_logout
data: {"userId":"u-456","name":"Bob"}

# 带 ID 的事件(用于断线重连定位)
id: 1001
event: chat_message
data: {"from":"Alice","text":"Hello!"}

id: 1002
event: chat_message
data: {"from":"Bob","text":"Hi there!"}

# 心跳注释(保持连接活跃,防止超时断开)
: heartbeat

# 设置重连间隔
retry: 5000

1.3 客户端 API:EventSource

// 基本用法
const eventSource = new EventSource('/events');

// 监听默认事件(没有 event: 字段的事件)
eventSource.onmessage = (event) => {
    console.log('Message:', event.data);
    console.log('ID:', event.lastEventId);
};

// 监听自定义事件类型
eventSource.addEventListener('notification', (event) => {
    const data = JSON.parse(event.data);
    showNotification(data.title, data.body);
});

eventSource.addEventListener('stock_update', (event) => {
    const data = JSON.parse(event.data);
    updateStockPrice(data.symbol, data.price);
});

// 连接状态
eventSource.onopen = () => {
    console.log('Connected');
    console.log('State:', eventSource.readyState);
    // 0 = CONNECTING, 1 = OPEN, 2 = CLOSED
};

// 错误处理
eventSource.onerror = (event) => {
    if (eventSource.readyState === EventSource.CLOSED) {
        console.log('Connection closed');
    } else {
        console.log('Error, reconnecting...');
        // 浏览器自动重连,不需要手动处理
    }
};

// 关闭连接
eventSource.close();

1.4 带认证的 SSE

EventSource API 不支持设置自定义头(如 Authorization),这是它最大的限制之一。解决方案:

// 方案 1: URL 参数传递 Token
const eventSource = new EventSource('/events?token=eyJhbGci...');

// 方案 2: Cookie 认证(EventSource 自动携带同源 Cookie)
// 需要在 /login 时设置 Cookie

// 方案 3: 使用 fetch + ReadableStream 替代 EventSource
async function sseWithAuth(url, token) {
    const response = await fetch(url, {
        headers: {
            'Authorization': `Bearer ${token}`,
            'Accept': 'text/event-stream',
        },
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        // 按空行分割事件
        const events = buffer.split('\n\n');
        buffer = events.pop(); // 最后一个可能是不完整的事件

        for (const eventStr of events) {
            if (!eventStr.trim()) continue;
            const event = parseSSEEvent(eventStr);
            handleEvent(event);
        }
    }
}

function parseSSEEvent(str) {
    const event = { data: '', type: 'message', id: '' };
    for (const line of str.split('\n')) {
        if (line.startsWith('data: ')) event.data += line.slice(6) + '\n';
        else if (line.startsWith('event: ')) event.type = line.slice(7);
        else if (line.startsWith('id: ')) event.id = line.slice(4);
    }
    event.data = event.data.trimEnd();
    return event;
}

1.5 自动重连机制

SSE 最大的工程优势之一是内置的自动重连:

自动重连流程:

  客户端                              服务端
    │  GET /events                      │
    │  Accept: text/event-stream        │
    │──────────────────────────────────→│
    │                                   │
    │  id: 100                          │
    │  data: event A                    │
    │←──────────────────────────────────│
    │                                   │
    │  id: 101                          │
    │  data: event B                    │
    │←──────────────────────────────────│
    │                                   │
    │  ╳ 连接断开(网络中断/服务端重启)  │
    │                                   │
    │  (等待 retry 间隔,默认 3 秒)     │
    │                                   │
    │  GET /events                      │
    │  Last-Event-ID: 101               │  ← 自动携带最后的 ID
    │──────────────────────────────────→│
    │                                   │
    │  id: 102                          │  ← 服务端从 102 开始推送
    │  data: event C                    │
    │←──────────────────────────────────│

服务端实现要点:
1. 每个事件必须设置 id: 字段
2. 服务端维护事件缓冲区(内存或 Redis)
3. 检查 Last-Event-ID 头,从断点续传
4. 如果 ID 过老(超出缓冲区),发送全量快照

1.6 Go 语言 SSE 服务端实现

package main

import (
    "fmt"
    "net/http"
    "time"
)

type SSEBroker struct {
    clients    map[chan string]struct{}
    register   chan chan string
    unregister chan chan string
    messages   chan string
}

func NewSSEBroker() *SSEBroker {
    b := &SSEBroker{
        clients:    make(map[chan string]struct{}),
        register:   make(chan chan string),
        unregister: make(chan chan string),
        messages:   make(chan string, 100),
    }
    go b.run()
    return b
}

func (b *SSEBroker) run() {
    for {
        select {
        case client := <-b.register:
            b.clients[client] = struct{}{}
        case client := <-b.unregister:
            delete(b.clients, client)
            close(client)
        case msg := <-b.messages:
            for client := range b.clients {
                select {
                case client <- msg:
                default:
                    // 客户端缓冲满,丢弃消息
                    delete(b.clients, client)
                    close(client)
                }
            }
        }
    }
}

func (b *SSEBroker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("X-Accel-Buffering", "no")

    // 检查断线重连
    lastID := r.Header.Get("Last-Event-ID")
    if lastID != "" {
        // 从 lastID 之后开始补发缺失的事件
        fmt.Fprintf(w, ": reconnected, last ID was %s\n\n", lastID)
        flusher.Flush()
    }

    messageChan := make(chan string, 32)
    b.register <- messageChan
    defer func() {
        b.unregister <- messageChan
    }()

    // 设置重连间隔
    fmt.Fprintf(w, "retry: 3000\n\n")
    flusher.Flush()

    ctx := r.Context()
    for {
        select {
        case <-ctx.Done():
            return
        case msg, ok := <-messageChan:
            if !ok {
                return
            }
            fmt.Fprintf(w, "%s\n\n", msg)
            flusher.Flush()
        }
    }
}

func main() {
    broker := NewSSEBroker()

    // 模拟事件生成
    go func() {
        id := 0
        for {
            id++
            broker.messages <- fmt.Sprintf(
                "id: %d\nevent: tick\ndata: {\"time\":\"%s\",\"seq\":%d}",
                id, time.Now().Format(time.RFC3339), id)
            time.Sleep(2 * time.Second)
        }
    }()

    http.Handle("/events", broker)
    http.ListenAndServe(":8080", nil)
}

二、长轮询(Long Polling)

长轮询是 WebSocket 和 SSE 出现之前的主流服务端推送方案。它的原理很简单:客户端发送请求,服务端持有连接不立即响应,等到有新数据或超时时才返回。

2.1 工作原理

短轮询(Short Polling)— 浪费资源:

  客户端                        服务端
    │  GET /updates              │
    │──────────────────────────→ │  (无新数据)
    │  200 {data: []}            │
    │←────────────────────────── │
    │  (等待 1 秒)               │
    │  GET /updates              │
    │──────────────────────────→ │  (无新数据)
    │  200 {data: []}            │
    │←────────────────────────── │
    │  (等待 1 秒)               │
    │  GET /updates              │
    │──────────────────────────→ │  (有新数据!)
    │  200 {data: [...]}         │
    │←────────────────────────── │

  问题: 大量空响应,浪费带宽和服务端资源

长轮询(Long Polling)— 按需响应:

  客户端                        服务端
    │  GET /updates              │
    │──────────────────────────→ │
    │  (连接保持,等待...)        │  ← 服务端不立即返回
    │                            │     等待新数据或超时
    │                            │  (有新数据!)
    │  200 {data: [...]}         │
    │←────────────────────────── │
    │                            │
    │  GET /updates              │  ← 收到响应后立即发新请求
    │──────────────────────────→ │
    │  (连接保持,等待...)        │
    │  ...                       │

2.2 长轮询实现

// Go 语言长轮询服务端
package main

import (
    "context"
    "encoding/json"
    "net/http"
    "sync"
    "time"
)

type LongPollServer struct {
    mu       sync.Mutex
    waiters  map[string][]chan []byte  // topic -> waiting channels
}

func NewLongPollServer() *LongPollServer {
    return &LongPollServer{
        waiters: make(map[string][]chan []byte),
    }
}

func (s *LongPollServer) Subscribe(w http.ResponseWriter, r *http.Request) {
    topic := r.URL.Query().Get("topic")
    timeout := 30 * time.Second

    ch := make(chan []byte, 1)

    // 注册等待者
    s.mu.Lock()
    s.waiters[topic] = append(s.waiters[topic], ch)
    s.mu.Unlock()

    ctx, cancel := context.WithTimeout(r.Context(), timeout)
    defer cancel()

    select {
    case data := <-ch:
        w.Header().Set("Content-Type", "application/json")
        w.Write(data)
    case <-ctx.Done():
        // 超时,返回空响应,客户端应立即重新发起请求
        w.Header().Set("Content-Type", "application/json")
        w.WriteHeader(http.StatusNoContent) // 204
    }

    // 清理等待者
    s.mu.Lock()
    chans := s.waiters[topic]
    for i, c := range chans {
        if c == ch {
            s.waiters[topic] = append(chans[:i], chans[i+1:]...)
            break
        }
    }
    s.mu.Unlock()
}

func (s *LongPollServer) Publish(w http.ResponseWriter, r *http.Request) {
    topic := r.URL.Query().Get("topic")
    var msg map[string]interface{}
    json.NewDecoder(r.Body).Decode(&msg)
    data, _ := json.Marshal(msg)

    s.mu.Lock()
    waiters := s.waiters[topic]
    s.waiters[topic] = nil  // 清空所有等待者
    s.mu.Unlock()

    for _, ch := range waiters {
        select {
        case ch <- data:
        default:
        }
    }

    w.WriteHeader(http.StatusOK)
}
// 客户端长轮询实现
async function longPoll(url, onMessage) {
    while (true) {
        try {
            const response = await fetch(url, {
                signal: AbortSignal.timeout(35000), // 比服务端超时长一点
            });

            if (response.status === 204) {
                // 超时,立即重试
                continue;
            }

            if (response.ok) {
                const data = await response.json();
                onMessage(data);
            }
        } catch (error) {
            if (error.name === 'TimeoutError' || error.name === 'AbortError') {
                continue; // 超时,重试
            }
            console.error('Long poll error:', error);
            // 网络错误,等待后重试
            await new Promise(resolve => setTimeout(resolve, 3000));
        }
    }
}

// 使用
longPoll('/api/updates?topic=orders', (data) => {
    console.log('New update:', data);
});

2.3 长轮询的工程注意事项

1. 超时设置:
   服务端持有时间: 20-30 秒(不要太长,防止代理超时)
   客户端请求超时: 服务端超时 + 5 秒(留余量)
   Nginx proxy_read_timeout: 必须大于长轮询超时

2. 服务端资源:
   每个等待的长轮询请求占用一个 HTTP 连接
   1000 个在线用户 = 1000 个并发连接
   必须使用异步/协程模型(Go/Node.js/异步 Python)
   传统的线程模型(PHP/Java Servlet)不适合

3. 消息丢失:
   响应返回到客户端发送新请求之间有一个时间窗口
   在这个窗口内的消息可能丢失
   解决方案: 每条消息带序号,客户端携带 lastSeq 参数

4. 负载均衡:
   长轮询是标准 HTTP,负载均衡器天然支持
   但要注意: 同一用户的请求可能分到不同后端
   如果使用内存存储 waiters,需要 sticky session 或 Redis Pub/Sub

三、短轮询的适用场景

短轮询不应该被完全否定——在某些场景下它是最简单有效的方案:

适用场景:
  - 低频更新(每分钟一次)的数据拉取
  - 客户端数量有限(<100)
  - 不需要实时性(延迟可接受 3-10 秒)
  - 后端无状态,容易水平扩展
  - 临时方案或 MVP 阶段

实现:
  setInterval(() => {
      fetch('/api/status')
          .then(r => r.json())
          .then(data => updateUI(data));
  }, 5000); // 每 5 秒轮询一次

优化:
  - 使用 HTTP 304(条件请求)减少传输
  - 指数退避: 无变化时逐渐增加间隔
  - 可见性 API: 页面不可见时停止轮询
    document.addEventListener('visibilitychange', () => {
        if (document.hidden) { clearInterval(pollTimer); }
        else { pollTimer = setInterval(poll, 5000); }
    });

四、推送方案对比

4.1 四种方案的技术对比

特性              │ 短轮询      │ 长轮询        │ SSE          │ WebSocket
──────────────────┼────────────┼──────────────┼─────────────┼────────────
方向              │ 客户端拉    │ 客户端拉      │ 服务端推     │ 双向
底层协议          │ HTTP       │ HTTP         │ HTTP        │ WebSocket
连接持久性        │ 短连接      │ 中等          │ 长连接       │ 长连接
实时性            │ 秒级        │ 接近实时      │ 实时         │ 实时
消息格式          │ 任意        │ 任意          │ 文本         │ 文本/二进制
自动重连          │ ✅(自定义) │ ✅(自定义)   │ ✅(内置)    │ ❌(需自己实现)
代理/CDN 兼容     │ ✅          │ ✅            │ ✅           │ ⚠️(需配置)
浏览器支持        │ ✅ 全部     │ ✅ 全部       │ ✅ 除 IE     │ ✅ 全部
多路复用          │ ❌          │ ❌            │ HTTP/2 下可  │ ❌
认证              │ ✅ 简单     │ ✅ 简单       │ ⚠️ 有限      │ ⚠️ 有限
服务端复杂度      │ 低          │ 中            │ 低           │ 中高

4.2 选型决策树

你的应用需要什么?

1. 只需要服务端 → 客户端推送?
   → SSE(首选)
   → 长轮询(如果需要兼容 IE 或跨域困难)

2. 需要客户端 → 服务端 实时发送?
   → WebSocket

3. 更新频率低(>30 秒一次),不需要实时?
   → 短轮询

4. 需要浏览器和非浏览器客户端统一?
   → WebSocket 或 gRPC Stream

具体场景推荐:

场景                 │ 推荐方案        │ 原因
─────────────────────┼────────────────┼──────────────────────
通知/提醒            │ SSE            │ 单向推送,自动重连
实时聊天             │ WebSocket      │ 双向实时通信
股票行情             │ SSE            │ 高频单向数据流
在线协作编辑         │ WebSocket      │ 双向低延迟
CI/CD 构建日志       │ SSE            │ 日志流推送
API 数据轮询         │ 短轮询          │ 低频,简单
IoT 设备状态         │ MQTT/WebSocket │ 需要 QoS 保证
GPT 流式输出         │ SSE            │ 服务端流式推送文本

4.3 性能与资源对比

1000 在线用户的资源开销估算:

方案         │ 连接数      │ 请求/秒     │ 带宽开销/秒
─────────────┼────────────┼────────────┼──────────────
短轮询(5s)   │ ~200 并发   │ 200 req/s   │ ~200KB(含空响应)
长轮询(30s)  │ 1000 并发   │ ~33 req/s   │ ~3KB(仅有效数据)
SSE          │ 1000 并发   │ 0 req/s     │ ~1KB(心跳+数据)
WebSocket    │ 1000 并发   │ 0 req/s     │ ~0.5KB(帧开销更小)

分析:
- 短轮询请求最多,但每个请求很轻量
- 长轮询连接多,但请求数最少
- SSE 连接最多,带宽最省(HTTP 头只发送一次)
- WebSocket 帧开销最小(2-10 字节头 vs SSE 的文本前缀)

五、SSE 的 Nginx 代理配置

SSE 通过 Nginx 代理时需要特殊配置,否则 Nginx 的缓冲机制会导致事件延迟到达客户端:

# /etc/nginx/conf.d/sse.conf

upstream sse_backend {
    server 10.0.1.10:8080;
    server 10.0.1.11:8080;
    # 不要用 ip_hash — SSE 是长连接,
    # 新的请求(断线重连)应该能分配到任意后端
}

server {
    listen 443 ssl;
    server_name events.example.com;

    location /events {
        proxy_pass http://sse_backend;
        proxy_http_version 1.1;
        proxy_set_header Connection '';  # 清除 Connection: close

        # 关键: 禁用缓冲
        proxy_buffering off;
        proxy_cache off;

        # 禁用 gzip 压缩(会缓冲数据)
        proxy_set_header Accept-Encoding '';

        # 设置足够长的超时
        proxy_read_timeout 86400s;   # 24 小时
        proxy_send_timeout 86400s;

        # 传递客户端信息
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        # chunked transfer encoding
        chunked_transfer_encoding on;
    }
}
Nginx 配置要点:

1. proxy_buffering off
   Nginx 默认缓冲后端响应,积攒够一定量才发给客户端
   SSE 需要每个事件立即到达,必须关闭缓冲

2. proxy_read_timeout
   默认 60 秒,SSE 的长连接会被超时断开
   设置为足够长的值(24 小时或更长)

3. Accept-Encoding 头
   如果后端开启了 gzip,Nginx 可能缓冲压缩数据
   清除 Accept-Encoding 确保后端不压缩 SSE 流

4. Connection 头
   确保 Nginx 到后端的连接是 keep-alive
   清除默认的 Connection: close

六、SSE 与 HTTP/2 的协同

HTTP/1.1 下的 SSE 限制:
  浏览器对同一域名的并发连接数限制(通常 6 个)
  每个 SSE 连接占用 1 个连接槽
  如果页面打开了 6 个 SSE 连接,其他 HTTP 请求会被阻塞

HTTP/2 下的 SSE:
  多路复用: SSE 和普通请求共享同一个 TCP 连接
  不受并发连接数限制
  SSE 使用一个 HTTP/2 stream,不影响其他请求

实际影响:
  HTTP/1.1: 最多 6 个 SSE 连接(同域名)
  HTTP/2:   理论上可以开上百个 SSE stream(受 SETTINGS_MAX_CONCURRENT_STREAMS 限制)

建议:
  确保服务端支持 HTTP/2
  如果受限于 HTTP/1.1,考虑用一个 SSE 连接 + 事件类型区分

七、大规模 SSE 的工程挑战

7.1 连接管理

挑战: 每个 SSE 客户端占用一个持久连接
      10 万在线用户 = 10 万 HTTP 连接

解决方案:

1. 连接数监控与限制
   - 监控每个后端实例的 SSE 连接数
   - 设置连接上限(如单实例 5 万)
   - 达到上限后拒绝新连接(返回 503)

2. 优雅关闭
   - 服务端重启前发送 retry: 5000 + 关闭连接
   - 客户端自动在 5 秒后重连到新实例

3. 资源回收
   - 检测死连接(客户端已断开但服务端未感知)
   - 定期发送心跳注释(: heartbeat),写入失败则清理
   - 设置连接最大存活时间(如 4 小时),强制重连

7.2 跨节点广播

问题: 事件产生在一个节点,SSE 连接在另一个节点

  事件源 → 后端 A(有 SSE 连接 1, 2, 3)
            后端 B(有 SSE 连接 4, 5, 6)
            后端 C(有 SSE 连接 7, 8, 9)

解决方案: 发布/订阅

  事件源 → Redis Pub/Sub → 后端 A → SSE 连接 1, 2, 3
                          → 后端 B → SSE 连接 4, 5, 6
                          → 后端 C → SSE 连接 7, 8, 9

  替代方案:
  - NATS(更高吞吐,更适合大规模)
  - Kafka(需要持久化事件历史时)
  - gRPC Stream(节点间直接流式通信)

八、总结

SSE 和长轮询是 WebSocket 之外的重要服务端推送方案。选择哪个取决于你的实际需求。

  1. 优先考虑 SSE。如果只需要服务端向客户端推送数据,SSE 比 WebSocket 更简单——标准 HTTP、自动重连、代理友好、浏览器原生支持。

  2. 长轮询是 SSE 的降级方案。在不支持 SSE 的环境中(极少),或需要对请求/响应有更精细控制时使用。注意消息间隙丢失问题。

  3. 短轮询别轻易否定。低频更新、少量客户端、无状态后端——短轮询可能是最简单有效的方案。

  4. SSE 的自动重连是核心优势。id 字段 + Last-Event-ID 头 + retry 指令,三者配合实现了完整的断线续传机制。这是 WebSocket 需要手动实现的功能。

  5. Nginx 代理 SSE 必须禁用缓冲proxy_buffering off 和足够长的 proxy_read_timeout 是 SSE 能正常工作的前提。

  6. HTTP/2 解锁 SSE 的并发限制。HTTP/1.1 下同域名 6 个连接的限制让 SSE 很受限。升级到 HTTP/2 后,多个 SSE stream 可以共享一个 TCP 连接。


参考文献


上一篇:gRPC 深度剖析:HTTP/2 上的 RPC 框架

下一篇:MQTT 工程:IoT 协议的 QoS 与 MQTT 5.0

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2025-08-04 · network

【网络工程】QUIC 生态与工程部署:从实验到生产

QUIC 已经不是实验性协议——HTTP/3 标准化后,CDN、浏览器和主流服务端框架都在推进 QUIC 支持。本文从工程视角对比主流 QUIC 库的成熟度和性能特征,讲解 CDN/负载均衡器的 QUIC 适配方案、从 TCP 迁移到 QUIC 的渐进路径、QUIC 调试工具链,以及生产环境的部署陷阱和性能调优实践。


By .