土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】Slack 架构:实时协作的工程挑战

文章导航

分类入口
architecture
标签入口
#Slack#WebSocket#real-time#Flannel#Vitess#PHP-migration

目录

Slack 每天为超过一千万活跃用户提供实时消息服务,峰值时段同时维持数百万条 WebSocket(全双工通信协议)长连接。一条消息从发送到被同一频道所有成员看到,端到端延迟通常控制在 200 毫秒以内。这套系统并非一蹴而就:它从一个 PHP 单体应用起步,历经数次关键重构,逐步演变为以 Hack、Go、Java 为核心语言的微服务架构。本文将从连接管理、消息扇出、边缘缓存、语言迁移、数据库扩展等维度,深入剖析 Slack 的架构设计与工程实践。

一、Slack 技术架构演进概览

1.1 早期架构(2013-2016)

Slack 最初的技术栈非常典型:PHP + MySQL + Memcached + Apache。整个应用是一个巨大的单体(Monolith),所有业务逻辑——消息收发、频道管理、用户认证、文件上传——都运行在同一个代码库中。这种架构在产品早期验证阶段效率很高,但随着用户量的指数增长,问题逐渐暴露。

早期架构的核心组件:

# Slack 早期架构组件(2013-2015)
web_tier:
  language: PHP 5.x
  server: Apache + mod_php
  framework: 自研 MVC 框架
  session: Memcached

data_tier:
  primary_db: MySQL 5.6(单主多从)
  cache: Memcached
  search: Solr
  queue: Redis + 自研 Job Runner

realtime_tier:
  protocol: WebSocket
  server: Node.js(独立进程)
  pubsub: Redis Pub/Sub

1.2 中期重构(2016-2019)

2016 年是 Slack 架构的转折点。团队开始将 PHP 代码库迁移到 Hack(HHVM 运行时),同时引入 Go 语言来构建高性能基础设施服务。这一阶段的关键成果包括:

1.3 现代架构(2020 至今)

当前 Slack 的架构可以概括为一个分层体系:边缘层处理连接和缓存,应用层承载业务逻辑,数据层提供持久化和索引。以下是整体架构的高层视图:

graph TB
    subgraph 客户端
        A1[桌面客户端<br/>Electron]
        A2[移动客户端<br/>iOS / Android]
        A3[Web 客户端<br/>React]
    end

    subgraph 边缘层
        B1[负载均衡<br/>Envoy / HAProxy]
        B2[WebSocket 网关<br/>Go 实现]
        B3[Flannel<br/>边缘缓存层]
        B4[API Gateway<br/>速率限制 + 认证]
    end

    subgraph 应用层
        C1[消息服务<br/>Hack / PHP]
        C2[频道服务<br/>Hack / PHP]
        C3[用户服务<br/>Hack / PHP]
        C4[通知服务<br/>Go]
        C5[搜索服务<br/>Java]
        C6[文件服务<br/>Go]
    end

    subgraph 数据层
        D1[Vitess<br/>MySQL 分片集群]
        D2[Memcached<br/>分布式缓存]
        D3[Redis<br/>Pub/Sub + 队列]
        D4[Solr / Elasticsearch<br/>全文索引]
        D5[S3<br/>文件存储]
    end

    A1 & A2 & A3 --> B1
    B1 --> B2
    B1 --> B4
    B2 --> B3
    B4 --> C1 & C2 & C3
    B3 --> C1
    C1 --> D1 & D2 & D3
    C2 --> D1 & D2
    C4 --> D3
    C5 --> D4
    C6 --> D5

1.4 架构演进中的关键决策

时间节点 决策 驱动因素 影响
2014 引入 Redis Pub/Sub 做实时消息分发 原有轮询机制延迟过高 消息延迟从秒级降至百毫秒级
2016 PHP 迁移到 Hack(HHVM) PHP 性能瓶颈,类型安全需求 吞吐量提升 2-5 倍
2017 用 Go 重写 WebSocket 网关 Node.js 单线程模型内存效率低 单机连接数提升 10 倍
2018 引入 Flannel 边缘缓存 客户端启动时间过长 启动 API 调用减少 50% 以上
2019 MySQL 迁移到 Vitess 单实例 MySQL 容量上限 支持千亿级消息存储
2021 搜索引擎从 Solr 迁移到自研方案 索引延迟和扩展性问题 搜索延迟 P99 下降 40%

二、WebSocket 长连接架构

2.1 连接管理概述

Slack 的实时通信核心是 WebSocket 长连接。每当用户打开 Slack 客户端,客户端会与服务端建立一条 WebSocket 连接,后续所有实时事件(消息、状态变更、输入提示等)都通过这条连接下发。这种模型比传统的 HTTP 长轮询(Long Polling)在延迟和带宽效率上都有显著优势。

连接建立的完整流程:

1. 客户端发起 HTTPS 请求到 wss://wss-primary.slack.com
2. 负载均衡器(Envoy)将请求路由到 WebSocket 网关节点
3. 网关节点验证认证令牌(Token)
4. 网关节点为该连接分配内存,注册到连接注册表(Connection Registry)
5. 网关节点订阅该用户所属的所有频道(Channel)的 Pub/Sub 主题
6. 连接建立完成,开始双向通信

2.2 Go 语言网关实现

Slack 在 2017 年用 Go 重写了 WebSocket 网关层,取代了原先基于 Node.js 的实现。选择 Go 的核心原因是它的 goroutine 模型非常适合处理大量并发连接——每个连接对应两个 goroutine(一个读、一个写),内存开销极小。

以下是网关核心连接管理的简化实现:

package gateway

import (
    "context"
    "net/http"
    "sync"
    "time"

    "github.com/gorilla/websocket"
)

// ConnRegistry 管理所有活跃的 WebSocket 连接
type ConnRegistry struct {
    mu    sync.RWMutex
    conns map[string]*ClientConn // key: connectionID
    users map[string][]*ClientConn // key: userID
}

// ClientConn 表示一个客户端连接
type ClientConn struct {
    ID        string
    UserID    string
    TeamID    string
    Conn      *websocket.Conn
    Channels  []string
    SendCh    chan []byte
    CreatedAt time.Time
    ctx       context.Context
    cancel    context.CancelFunc
}

var upgrader = websocket.Upgrader{
    ReadBufferSize:  4096,
    WriteBufferSize: 4096,
    CheckOrigin: func(r *http.Request) bool {
        return validateOrigin(r)
    },
}

// HandleWebSocket 处理新的 WebSocket 连接请求
func (reg *ConnRegistry) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
    // 认证检查
    token := r.URL.Query().Get("token")
    userInfo, err := authenticateToken(token)
    if err != nil {
        http.Error(w, "Unauthorized", http.StatusUnauthorized)
        return
    }

    // 升级 HTTP 连接为 WebSocket
    wsConn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }

    ctx, cancel := context.WithCancel(r.Context())
    client := &ClientConn{
        ID:        generateConnID(),
        UserID:    userInfo.UserID,
        TeamID:    userInfo.TeamID,
        Conn:      wsConn,
        Channels:  userInfo.Channels,
        SendCh:    make(chan []byte, 256),
        CreatedAt: time.Now(),
        ctx:       ctx,
        cancel:    cancel,
    }

    // 注册连接
    reg.register(client)

    // 订阅用户所属频道
    for _, ch := range client.Channels {
        subscribeToPubSub(client.TeamID, ch, client)
    }

    // 启动读写 goroutine
    go client.readPump(reg)
    go client.writePump()
}

// readPump 从 WebSocket 读取客户端消息
func (c *ClientConn) readPump(reg *ConnRegistry) {
    defer func() {
        reg.unregister(c)
        c.Conn.Close()
    }()

    c.Conn.SetReadLimit(maxMessageSize)
    c.Conn.SetReadDeadline(time.Now().Add(pongWait))
    c.Conn.SetPongHandler(func(string) error {
        c.Conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    for {
        _, message, err := c.Conn.ReadMessage()
        if err != nil {
            break
        }
        processIncomingMessage(c, message)
    }
}

// writePump 向 WebSocket 写入服务端消息
func (c *ClientConn) writePump() {
    ticker := time.NewTicker(pingPeriod)
    defer func() {
        ticker.Stop()
        c.Conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.SendCh:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if !ok {
                c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
                return
            }
        case <-ticker.C:
            c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        case <-c.ctx.Done():
            return
        }
    }
}

const (
    maxMessageSize = 65536
    pongWait       = 60 * time.Second
    pingPeriod     = 54 * time.Second
    writeWait      = 10 * time.Second
)

2.3 连接的水平扩展

单台网关服务器能处理的连接数是有限的。Slack 通过以下策略实现水平扩展:

连接分布策略: 负载均衡器使用一致性哈希(Consistent Hashing)将同一团队(Team)的连接尽可能路由到同一组网关节点。这样做的好处是,当向某个团队的频道广播消息时,需要通知的网关节点数量最少。

连接注册表: 每个网关节点维护本地的连接注册表,同时将连接元数据写入 Redis。当需要向特定用户发送消息时,先查 Redis 确定用户连接在哪台网关上,再将消息路由过去。

// ConnectionLocator 负责定位用户连接所在的网关节点
type ConnectionLocator struct {
    redisClient *redis.Client
}

// LocateUser 查找用户的所有活跃连接
func (l *ConnectionLocator) LocateUser(userID string) ([]GatewayEndpoint, error) {
    key := fmt.Sprintf("user:conns:%s", userID)
    members, err := l.redisClient.SMembers(context.Background(), key).Result()
    if err != nil {
        return nil, fmt.Errorf("failed to locate user %s: %w", userID, err)
    }

    endpoints := make([]GatewayEndpoint, 0, len(members))
    for _, member := range members {
        ep, err := parseGatewayEndpoint(member)
        if err != nil {
            continue
        }
        endpoints = append(endpoints, ep)
    }
    return endpoints, nil
}

// RegisterConnection 注册新连接到 Redis
func (l *ConnectionLocator) RegisterConnection(conn *ClientConn, gatewayAddr string) error {
    key := fmt.Sprintf("user:conns:%s", conn.UserID)
    value := fmt.Sprintf("%s|%s|%d", gatewayAddr, conn.ID, conn.CreatedAt.Unix())

    pipe := l.redisClient.Pipeline()
    pipe.SAdd(context.Background(), key, value)
    pipe.Expire(context.Background(), key, 2*time.Hour)
    _, err := pipe.Exec(context.Background())
    return err
}

2.4 心跳与断线重连

WebSocket 连接可能因为网络波动、客户端休眠等原因断开。Slack 实现了多层机制来确保连接的可靠性:

// ReconnectManager 客户端重连管理
type ReconnectManager struct {
    maxRetries    int
    baseDelay     time.Duration
    maxDelay      time.Duration
    lastSeqNum    int64
}

// GetBackoffDuration 计算指数退避的等待时间
func (rm *ReconnectManager) GetBackoffDuration(attempt int) time.Duration {
    if attempt >= rm.maxRetries {
        return rm.maxDelay
    }

    delay := rm.baseDelay * time.Duration(1<<uint(attempt))
    if delay > rm.maxDelay {
        delay = rm.maxDelay
    }

    // 添加 0-25% 的随机抖动
    jitter := time.Duration(rand.Int63n(int64(delay) / 4))
    return delay + jitter
}

// HandleReconnect 处理客户端重连请求
func HandleReconnect(conn *ClientConn, lastSeqNum int64) error {
    // 查询该用户在断线期间错过的消息
    missedMessages, err := fetchMessagesSince(conn.UserID, conn.Channels, lastSeqNum)
    if err != nil {
        return fmt.Errorf("failed to fetch missed messages: %w", err)
    }

    // 按顺序补发消息
    for _, msg := range missedMessages {
        data, _ := json.Marshal(msg)
        select {
        case conn.SendCh <- data:
        default:
            return fmt.Errorf("send channel full for conn %s", conn.ID)
        }
    }
    return nil
}

三、Channel 消息扇出策略

3.1 扇出问题的本质

消息扇出(Fan-out)是 Slack 架构中最具挑战性的问题之一。当一个用户在频道中发送消息时,这条消息需要被推送到该频道所有在线成员的 WebSocket 连接上。对于大型企业工作区,一个频道可能有数万甚至数十万成员,如何高效完成这个扇出过程直接影响用户体验。

3.2 分层扇出架构

Slack 采用了分层扇出(Tiered Fan-out)策略,将扇出过程分解为多个阶段:

sequenceDiagram
    participant Client as 发送客户端
    participant API as API 服务
    participant MQ as 消息队列(Kafka)
    participant FO as 扇出服务(Fan-out Worker)
    participant Redis as Redis Pub/Sub
    participant GW1 as 网关节点 1
    participant GW2 as 网关节点 2
    participant R1 as 接收客户端 A
    participant R2 as 接收客户端 B

    Client->>API: 发送消息(HTTP POST)
    API->>API: 验证权限,写入数据库
    API->>MQ: 发布消息事件
    MQ->>FO: 消费消息事件
    FO->>FO: 查询频道成员列表
    FO->>FO: 按网关节点分组
    FO->>Redis: 发布到网关级 Pub/Sub 主题
    Redis->>GW1: 推送到网关 1
    Redis->>GW2: 推送到网关 2
    GW1->>GW1: 查找本地匹配连接
    GW2->>GW2: 查找本地匹配连接
    GW1->>R1: WebSocket 推送
    GW2->>R2: WebSocket 推送

3.3 大频道与小频道的不同策略

Slack 根据频道的成员规模采用不同的扇出策略:

特性 小频道(小于 500 人) 大频道(500 人以上)
扇出方式 写时扇出(Fan-out on Write) 混合扇出(Hybrid Fan-out)
成员列表缓存 直接从缓存读取完整成员列表 分批加载,增量更新
消息投递 直接投递到每个在线成员 先投递到网关级主题,再由网关本地分发
离线消息 记录未读计数,上线时拉取 记录未读标记,按需拉取
Pub/Sub 粒度 每个频道一个主题 按网关节点聚合主题
延迟目标 小于 100 毫秒 小于 300 毫秒

写时扇出(Fan-out on Write): 消息发送时立即推送给所有在线成员。适用于小频道,因为扇出量可控。

读时扇出(Fan-out on Read): 消息只写入存储,成员打开频道时主动拉取。适用于公告类大频道。

混合扇出(Hybrid Fan-out): 对在线且活跃的成员做写时扇出,对非活跃成员做读时扇出。这是 Slack 对大频道采用的核心策略。

// FanoutStrategy 决定消息的扇出策略
type FanoutStrategy struct {
    smallChannelThreshold int
    activeWindowDuration  time.Duration
}

// DetermineFanoutPlan 根据频道规模和成员状态确定扇出计划
func (fs *FanoutStrategy) DetermineFanoutPlan(
    channelID string,
    members []Member,
    onlineUsers map[string]bool,
) *FanoutPlan {
    plan := &FanoutPlan{
        ChannelID:   channelID,
        DirectPush:  make([]string, 0),
        DeferredPull: make([]string, 0),
    }

    if len(members) <= fs.smallChannelThreshold {
        // 小频道:对所有在线成员直接推送
        for _, m := range members {
            if onlineUsers[m.UserID] {
                plan.DirectPush = append(plan.DirectPush, m.UserID)
            } else {
                plan.DeferredPull = append(plan.DeferredPull, m.UserID)
            }
        }
        plan.Strategy = "fan_out_on_write"
        return plan
    }

    // 大频道:混合策略
    now := time.Now()
    for _, m := range members {
        if onlineUsers[m.UserID] && now.Sub(m.LastActiveAt) < fs.activeWindowDuration {
            plan.DirectPush = append(plan.DirectPush, m.UserID)
        } else {
            plan.DeferredPull = append(plan.DeferredPull, m.UserID)
        }
    }
    plan.Strategy = "hybrid"
    return plan
}

// FanoutPlan 扇出执行计划
type FanoutPlan struct {
    ChannelID    string
    Strategy     string
    DirectPush   []string // 立即推送的用户
    DeferredPull []string // 延迟拉取的用户
}

3.4 扇出性能优化

为了在大规模扇出时保持低延迟,Slack 使用了以下优化手段:

批量投递: 将发往同一网关节点的消息合并为一个批次,减少网络往返次数。

并行扇出: 扇出工作由多个 Worker 并行执行,使用 Kafka(分布式消息队列)进行任务分发,保证扇出速度线性扩展。

成员列表缓存: 频道成员列表缓存在 Memcached 中,设置合理的 TTL(生存时间),避免每次扇出都查数据库。

连接亲和性: 同一团队的用户连接尽量落在同一组网关节点上,减少扇出需要通知的网关数量。

// BatchDispatcher 批量消息分发器
type BatchDispatcher struct {
    batchSize    int
    flushInterval time.Duration
    buffers      map[string][]Message // key: gatewayAddr
    mu           sync.Mutex
}

// Dispatch 将消息加入批量缓冲区
func (bd *BatchDispatcher) Dispatch(gatewayAddr string, msg Message) {
    bd.mu.Lock()
    defer bd.mu.Unlock()

    bd.buffers[gatewayAddr] = append(bd.buffers[gatewayAddr], msg)

    if len(bd.buffers[gatewayAddr]) >= bd.batchSize {
        bd.flush(gatewayAddr)
    }
}

// flush 将缓冲区中的消息批量发送到目标网关
func (bd *BatchDispatcher) flush(gatewayAddr string) {
    messages := bd.buffers[gatewayAddr]
    if len(messages) == 0 {
        return
    }

    bd.buffers[gatewayAddr] = make([]Message, 0, bd.batchSize)

    go func(addr string, msgs []Message) {
        batch := MessageBatch{
            GatewayAddr: addr,
            Messages:    msgs,
            Timestamp:   time.Now(),
        }
        sendBatchToGateway(batch)
    }(gatewayAddr, messages)
}

四、Flannel:边缘缓存层设计

4.1 Flannel 的设计背景

当用户打开 Slack 客户端时,需要加载大量数据:用户所属的所有频道信息、每个频道的最新消息、未读计数、团队成员列表、用户偏好设置等。在 Flannel 出现之前,这些数据全部通过调用后端 API 逐一获取,导致客户端启动时间随着用户加入的频道数量线性增长。对于加入了数百个频道的企业用户,启动时间可能长达 10 秒以上。

Flannel(法兰绒,取自其”覆盖在底层之上”的含义)是 Slack 在 2018 年引入的边缘缓存层(Edge Cache Layer)。它的核心职责是为客户端启动过程提供一个聚合的、预计算好的数据快照,将原先需要几十次 API 调用才能获取的数据合并为一次请求。

4.2 Flannel 的架构设计

graph LR
    subgraph 客户端
        CL[Slack Client]
    end

    subgraph Flannel 层
        FL[Flannel Server<br/>Go 实现]
        LC[本地缓存<br/>LRU + TTL]
        BQ[变更队列<br/>Change Queue]
    end

    subgraph 后端服务
        MS[消息服务]
        CS[频道服务]
        US[用户服务]
        PS[偏好设置服务]
    end

    subgraph 数据源
        DB[(Vitess)]
        MC[(Memcached)]
        RD[(Redis)]
    end

    CL -->|Boot 请求| FL
    FL --> LC
    LC -->|缓存未命中| MS & CS & US & PS
    MS & CS & US & PS --> DB & MC
    RD -->|变更事件| BQ
    BQ -->|增量更新| LC

4.3 数据模型与缓存策略

Flannel 为每个用户维护一份工作区快照(Workspace Snapshot),包含以下核心数据:

// WorkspaceSnapshot 用户工作区快照
type WorkspaceSnapshot struct {
    UserID       string                 `json:"user_id"`
    TeamID       string                 `json:"team_id"`
    Version      int64                  `json:"version"`
    GeneratedAt  time.Time              `json:"generated_at"`

    // 频道数据
    Channels     []ChannelSummary       `json:"channels"`
    DMs          []DirectMessageSummary `json:"dms"`

    // 用户数据
    Self         UserProfile            `json:"self"`
    TeamMembers  []UserBasicInfo        `json:"team_members"`

    // 未读状态
    UnreadCounts map[string]UnreadInfo  `json:"unread_counts"`

    // 偏好设置
    Preferences  UserPreferences        `json:"preferences"`
}

// ChannelSummary 频道摘要信息
type ChannelSummary struct {
    ID            string    `json:"id"`
    Name          string    `json:"name"`
    Topic         string    `json:"topic"`
    Purpose       string    `json:"purpose"`
    MemberCount   int       `json:"member_count"`
    LastMessageTS string    `json:"last_message_ts"`
    LastReadTS    string    `json:"last_read_ts"`
    IsMuted       bool      `json:"is_muted"`
    IsArchived    bool      `json:"is_archived"`
}

// UnreadInfo 未读信息
type UnreadInfo struct {
    ChannelID     string `json:"channel_id"`
    UnreadCount   int    `json:"unread_count"`
    MentionCount  int    `json:"mention_count"`
    LastReadTS    string `json:"last_read_ts"`
}

缓存策略采用 LRU(最近最少使用)淘汰与 TTL(生存时间)过期相结合的方式:

数据类型 TTL 更新方式 缓存命中率
频道列表 5 分钟 事件驱动增量更新 大于 95%
最新消息 30 秒 事件驱动增量更新 大于 90%
未读计数 10 秒 事件驱动增量更新 大于 85%
用户资料 15 分钟 定时刷新 大于 98%
团队成员列表 30 分钟 定时刷新 + 变更通知 大于 99%

4.4 增量更新机制

Flannel 不是简单的只读缓存。它通过监听 Redis 发布的变更事件来实时更新缓存内容,保证客户端获取到的快照与后端数据的一致性。

// ChangeEventProcessor 变更事件处理器
type ChangeEventProcessor struct {
    cache        *SnapshotCache
    redisClient  *redis.Client
}

// ChangeEvent 数据变更事件
type ChangeEvent struct {
    Type      string          `json:"type"`
    TeamID    string          `json:"team_id"`
    ChannelID string          `json:"channel_id,omitempty"`
    UserID    string          `json:"user_id,omitempty"`
    Payload   json.RawMessage `json:"payload"`
    Timestamp int64           `json:"ts"`
}

// StartListening 开始监听变更事件
func (p *ChangeEventProcessor) StartListening(ctx context.Context) {
    pubsub := p.redisClient.Subscribe(ctx,
        "flannel:changes:messages",
        "flannel:changes:channels",
        "flannel:changes:users",
        "flannel:changes:unreads",
    )
    defer pubsub.Close()

    ch := pubsub.Channel()
    for {
        select {
        case msg := <-ch:
            var event ChangeEvent
            if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
                log.Printf("failed to unmarshal change event: %v", err)
                continue
            }
            p.processEvent(event)
        case <-ctx.Done():
            return
        }
    }
}

// processEvent 处理单个变更事件
func (p *ChangeEventProcessor) processEvent(event ChangeEvent) {
    switch event.Type {
    case "new_message":
        p.cache.UpdateLatestMessage(event.TeamID, event.ChannelID, event.Payload)
    case "channel_created":
        p.cache.AddChannel(event.TeamID, event.Payload)
    case "channel_archived":
        p.cache.ArchiveChannel(event.TeamID, event.ChannelID)
    case "member_joined_channel":
        p.cache.AddChannelMember(event.TeamID, event.ChannelID, event.UserID)
    case "unread_updated":
        p.cache.UpdateUnreadCount(event.TeamID, event.UserID, event.Payload)
    case "user_profile_changed":
        p.cache.UpdateUserProfile(event.TeamID, event.UserID, event.Payload)
    }
}

4.5 Flannel 的效果

Flannel 上线后的实际效果:

五、从 PHP 到 Hack + 微服务的迁移

5.1 为什么要离开 PHP

Slack 的初始代码库完全用 PHP 编写,运行在 Apache + mod_php 上。随着业务增长,PHP 暴露出几个严峻问题:

性能瓶颈: PHP 的请求-响应模型意味着每次请求都需要重新初始化运行时环境。虽然 OPcache 缓解了字节码编译开销,但对象创建、依赖注入等开销依然显著。

类型安全缺失: PHP 是动态类型语言,大量类型错误只能在运行时发现。随着代码库膨胀到数百万行,缺乏静态类型检查导致重构风险极高。

并发模型局限: PHP 的进程模型不适合长连接和异步处理场景,每个请求独占一个进程,资源利用率低。

5.2 选择 Hack 和 HHVM

Hack 是 Facebook(现 Meta)开发的编程语言,它在 PHP 语法基础上增加了静态类型系统、泛型(Generics)、异步编程(async/await)等现代语言特性,运行在 HHVM(HipHop Virtual Machine)上。Slack 选择 Hack 而非完全重写为 Go 或 Java 的原因是:

迁移前后的性能对比:

指标 PHP(Zend) Hack(HHVM) 改善幅度
API 平均响应时间 120 毫秒 45 毫秒 62.5%
API P99 响应时间 800 毫秒 200 毫秒 75%
内存使用(每进程) 80 MB 35 MB 56%
请求吞吐量 1200 QPS/节点 4000 QPS/节点 233%
CPU 利用率 75% 40% 47%
类型错误检出 仅运行时 编译期 + 运行时 显著提升

5.3 渐进式迁移策略

迁移采用了”逐文件转换”的方式,而不是”大爆炸”式重写。团队开发了自动化转换工具来处理大部分机械性转换,人工只需处理类型标注和复杂逻辑。

// 迁移前:PHP 代码
// function getChannelMembers($channelId, $limit = 100) {
//     $members = [];
//     $result = $db->query("SELECT ...", [$channelId, $limit]);
//     while ($row = $result->fetch()) {
//         $members[] = $row;
//     }
//     return $members;
// }

// 迁移后:Hack 代码(添加了类型标注和异步支持)
async function getChannelMembers(
    string $channelId,
    int $limit = 100,
): Awaitable<vec<ChannelMember>> {
    $result = await $this->db->queryAsync(
        "SELECT user_id, role, joined_at FROM channel_members "
        ."WHERE channel_id = %s ORDER BY joined_at LIMIT %d",
        $channelId,
        $limit,
    );

    $members = vec[];
    foreach ($result->rows() as $row) {
        $members[] = new ChannelMember(
            userId: $row['user_id'] as string,
            role: ChannelRole::from($row['role'] as string),
            joinedAt: new DateTimeImmutable($row['joined_at'] as string),
        );
    }
    return $members;
}

5.4 微服务拆分原则

在语言迁移的同时,Slack 也在逐步将单体拆分为微服务。拆分遵循以下原则:

按领域边界拆分: 消息、频道、用户、搜索、通知——每个领域成为独立服务。

数据所有权明确: 每个服务拥有自己的数据库(或 Vitess 中的一组分片),不允许跨服务直接访问数据库。

API 优先: 服务间通过 gRPC(远程过程调用)通信,定义了严格的接口契约(Protocol Buffers)。

// 消息服务的 gRPC 接口定义
syntax = "proto3";

package slack.messaging.v1;

service MessageService {
    // 发送消息
    rpc SendMessage(SendMessageRequest) returns (SendMessageResponse);

    // 获取频道历史消息
    rpc GetChannelHistory(GetChannelHistoryRequest) returns (GetChannelHistoryResponse);

    // 编辑消息
    rpc EditMessage(EditMessageRequest) returns (EditMessageResponse);

    // 删除消息
    rpc DeleteMessage(DeleteMessageRequest) returns (DeleteMessageResponse);

    // 搜索消息
    rpc SearchMessages(SearchMessagesRequest) returns (SearchMessagesResponse);
}

message SendMessageRequest {
    string channel_id = 1;
    string user_id = 2;
    string text = 3;
    string thread_ts = 4; // 可选:回复某条消息
    repeated Attachment attachments = 5;
    repeated Block blocks = 6;
}

message SendMessageResponse {
    bool ok = 1;
    string message_ts = 2;
    string channel_id = 3;
    MessageError error = 4;
}

message GetChannelHistoryRequest {
    string channel_id = 1;
    string latest = 2; // 时间戳上界
    string oldest = 3; // 时间戳下界
    int32 limit = 4;   // 默认 100
    bool inclusive = 5;
}

message GetChannelHistoryResponse {
    bool ok = 1;
    repeated Message messages = 2;
    bool has_more = 3;
    ResponseMetadata response_metadata = 4;
}

5.5 迁移过程中的经验教训

双写验证: 对于关键路径,先让 PHP 和 Hack 版本并行运行,比对输出结果,确认无差异后再切流量。

渐进式流量切换: 使用功能开关(Feature Flag)控制新旧代码路径,先切 1% 流量观察,再逐步放大。

回滚机制: 每次切换都有自动回滚触发条件——如果错误率或延迟超过阈值,自动回退到旧代码路径。

六、Vitess 分库扩展实践

6.1 MySQL 的扩展瓶颈

Slack 的消息数据存储在 MySQL 中。早期每个团队的数据存储在独立的 MySQL 实例上(按团队分片),但这种粗粒度的分片方式在大客户场景下遇到了瓶颈:

6.2 为什么选择 Vitess

Vitess 是 YouTube 开源的 MySQL 分片中间件,它在 MySQL 之上提供了透明的水平分片能力。Slack 选择 Vitess 而非其他方案(如 CockroachDB、TiDB)的原因:

方案 优势 劣势(对 Slack 而言)
Vitess 兼容 MySQL 协议,迁移成本低;YouTube 大规模验证;灵活的分片策略 跨分片查询能力有限
CockroachDB 原生分布式事务;自动负载均衡 需要全面适配新的 SQL 方言;性能模型不同
TiDB MySQL 兼容;HTAP 能力 当时(2019)生态成熟度不足
手动分片 完全可控 开发和运维成本极高

6.3 分片策略设计

Slack 在 Vitess 上采用了两级分片键(Sharding Key)设计:

-- Vitess VSchema 定义(简化版)
-- 定义 messages 表的分片规则

-- 创建 vindexes(虚拟索引,用于路由查询到正确分片)
-- team_channel_hash: 基于 team_id 和 channel_id 的复合哈希

CREATE TABLE messages (
    team_id       BIGINT NOT NULL,
    channel_id    VARCHAR(32) NOT NULL,
    message_ts    VARCHAR(32) NOT NULL,
    user_id       VARCHAR(32) NOT NULL,
    text          TEXT,
    thread_ts     VARCHAR(32) DEFAULT NULL,
    edited_ts     VARCHAR(32) DEFAULT NULL,
    is_deleted    TINYINT DEFAULT 0,
    attachments   JSON DEFAULT NULL,
    reactions     JSON DEFAULT NULL,
    created_at    DATETIME NOT NULL,
    updated_at    DATETIME NOT NULL,
    PRIMARY KEY (team_id, channel_id, message_ts),
    INDEX idx_channel_ts (channel_id, message_ts),
    INDEX idx_user_ts (user_id, message_ts),
    INDEX idx_thread (channel_id, thread_ts, message_ts)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Vitess 的 VSchema(虚拟表结构)配置:

{
    "sharded": true,
    "vindexes": {
        "team_channel_hash": {
            "type": "xxhash",
            "params": {}
        },
        "team_id_lookup": {
            "type": "consistent_lookup",
            "params": {
                "table": "team_channel_lookup",
                "from": "team_id,channel_id",
                "to": "keyspace_id"
            }
        }
    },
    "tables": {
        "messages": {
            "column_vindexes": [
                {
                    "columns": ["team_id", "channel_id"],
                    "name": "team_channel_hash"
                }
            ]
        },
        "channel_members": {
            "column_vindexes": [
                {
                    "columns": ["team_id", "channel_id"],
                    "name": "team_channel_hash"
                }
            ]
        }
    }
}

6.4 迁移到 Vitess 的实施步骤

迁移过程分为四个阶段:

阶段一——影子流量: 将所有写入请求同时发送到原 MySQL 和 Vitess,只使用原 MySQL 的结果。比对两边结果确认一致性。

阶段二——读流量切换: 将部分读请求路由到 Vitess,监控延迟和正确性。

阶段三——写流量切换: 将写请求切换到 Vitess 作为主库,原 MySQL 降级为备份。

阶段四——完全切换: 停止向原 MySQL 写入,所有流量走 Vitess。

// VitessMigrationRouter 迁移期间的流量路由器
type VitessMigrationRouter struct {
    phase         MigrationPhase
    legacyDB      *sql.DB
    vitessDB      *sql.DB
    readPercent   int // Vitess 承担的读流量百分比
    featureFlags  *FeatureFlagService
}

type MigrationPhase int

const (
    PhaseShadow      MigrationPhase = iota // 阶段一:影子流量
    PhaseReadMigrate                        // 阶段二:读迁移
    PhaseWriteMigrate                       // 阶段三:写迁移
    PhaseComplete                           // 阶段四:完全切换
)

// RouteQuery 根据迁移阶段路由查询
func (r *VitessMigrationRouter) RouteQuery(
    ctx context.Context,
    query string,
    args []interface{},
    isWrite bool,
) (*sql.Rows, error) {
    switch r.phase {
    case PhaseShadow:
        // 所有操作使用旧库,同时写入 Vitess 做比对
        result, err := r.legacyDB.QueryContext(ctx, query, args...)
        if isWrite {
            go r.shadowWrite(ctx, query, args)
        }
        return result, err

    case PhaseReadMigrate:
        if isWrite {
            return r.legacyDB.QueryContext(ctx, query, args...)
        }
        // 按百分比分流读请求
        if rand.Intn(100) < r.readPercent {
            return r.vitessDB.QueryContext(ctx, query, args...)
        }
        return r.legacyDB.QueryContext(ctx, query, args...)

    case PhaseWriteMigrate:
        if isWrite {
            return r.vitessDB.QueryContext(ctx, query, args...)
        }
        return r.vitessDB.QueryContext(ctx, query, args...)

    case PhaseComplete:
        return r.vitessDB.QueryContext(ctx, query, args...)
    }

    return r.legacyDB.QueryContext(ctx, query, args...)
}

// shadowWrite 影子写入 Vitess 并比对结果
func (r *VitessMigrationRouter) shadowWrite(
    ctx context.Context,
    query string,
    args []interface{},
) {
    _, err := r.vitessDB.ExecContext(ctx, query, args...)
    if err != nil {
        metrics.IncrCounter("vitess.shadow.write.error", 1)
        log.Printf("shadow write failed: %v", err)
    }
}

6.5 Vitess 运行效果

迁移完成后的数据规模和性能指标:

七、搜索架构与索引优化

7.1 搜索的特殊挑战

Slack 的搜索面临独特的挑战:用户期望搜索能覆盖他们有权访问的所有消息,且结果要实时或准实时。这意味着索引系统需要在毫秒级别跟上消息的写入速度,同时在查询时严格执行访问控制(ACL,访问控制列表)。

7.2 索引架构

Slack 的搜索索引采用双层架构:

实时索引层: 最近几小时内的消息存储在内存中的倒排索引(Inverted Index)结构里,支持毫秒级写入和查询。

持久索引层: 历史消息存储在分布式搜索引擎中(基于 Lucene 构建),按团队和时间范围分段(Segment)。

// 搜索查询构建器(简化示例)
public class SlackSearchQueryBuilder {

    private final AclResolver aclResolver;
    private final QueryParser queryParser;

    public SlackSearchQueryBuilder(AclResolver aclResolver, QueryParser queryParser) {
        this.aclResolver = aclResolver;
        this.queryParser = queryParser;
    }

    /**
     * 构建带有访问控制的搜索查询
     */
    public SearchQuery buildQuery(SearchRequest request) {
        // 解析用户输入的搜索词
        ParsedQuery parsed = queryParser.parse(request.getQueryText());

        // 获取用户有权访问的频道列表
        Set<String> accessibleChannels = aclResolver.getAccessibleChannels(
            request.getUserId(),
            request.getTeamId()
        );

        // 构建带有 ACL 过滤的查询
        SearchQuery query = SearchQuery.builder()
            .teamId(request.getTeamId())
            .textQuery(parsed.getTextClause())
            .channelFilter(accessibleChannels)
            .dateRange(parsed.getDateRange())
            .fromUser(parsed.getFromUser())
            .hasAttachment(parsed.hasAttachment())
            .sortBy(request.getSortBy())
            .limit(request.getLimit())
            .offset(request.getOffset())
            .build();

        // 添加搜索结果的高亮配置
        query.setHighlightConfig(HighlightConfig.builder()
            .preTag("<mark>")
            .postTag("</mark>")
            .fragmentSize(200)
            .numberOfFragments(3)
            .build());

        return query;
    }
}

/**
 * 搜索结果排序器:综合考虑相关性、时效性和用户互动
 */
public class SlackSearchRanker {

    private static final double RECENCY_WEIGHT = 0.3;
    private static final double RELEVANCE_WEIGHT = 0.5;
    private static final double INTERACTION_WEIGHT = 0.2;

    public double computeScore(SearchHit hit, SearchContext context) {
        double relevanceScore = hit.getTextRelevanceScore();

        // 时效性得分:越近的消息得分越高
        long ageHours = Duration.between(
            hit.getTimestamp(), Instant.now()
        ).toHours();
        double recencyScore = 1.0 / (1.0 + Math.log1p(ageHours));

        // 互动得分:基于用户与该频道和发送者的互动频率
        double interactionScore = computeInteractionScore(
            context.getUserId(),
            hit.getChannelId(),
            hit.getSenderId()
        );

        return RELEVANCE_WEIGHT * relevanceScore
             + RECENCY_WEIGHT * recencyScore
             + INTERACTION_WEIGHT * interactionScore;
    }

    private double computeInteractionScore(
        String userId, String channelId, String senderId
    ) {
        int channelActivity = getRecentChannelActivity(userId, channelId);
        int senderInteraction = getRecentUserInteraction(userId, senderId);
        return Math.min(1.0, (channelActivity + senderInteraction) / 100.0);
    }
}

7.3 索引更新流水线

消息写入后需要尽快出现在搜索结果中。Slack 使用了基于 Kafka 的索引更新流水线(Indexing Pipeline):

graph LR
    A[消息写入] --> B[Kafka 消息主题]
    B --> C[索引消费者集群]
    C --> D{消息时效}
    D -->|最近数小时| E[实时索引<br/>内存倒排索引]
    D -->|历史消息| F[批量索引<br/>Lucene 段合并]
    E --> G[搜索查询路由器]
    F --> G
    G --> H[合并排序结果]
    H --> I[返回客户端]

八、Slack 的可靠性工程

8.1 可靠性目标

Slack 将可用性目标设定为 99.99%(四个九),这意味着每年允许的停机时间不超过 52 分钟。为了达到这个目标,Slack 在多个层面构建了可靠性保障体系。

8.2 故障隔离

团队级隔离: 每个团队的数据在逻辑上是隔离的,一个团队的故障不会影响其他团队。这种隔离从数据库分片层面就已经保证。

服务级隔离: 关键服务(消息、认证、连接)与非关键服务(搜索、文件预览、自定义表情)之间有严格的资源隔离。非关键服务的故障不会拖垮关键路径。

区域级隔离: Slack 在多个可用区(Availability Zone)部署服务,单个可用区故障时流量自动切换到其他可用区。

8.3 限流与背压

// RateLimiter 多维度限流器
type RateLimiter struct {
    teamLimits    map[string]*TokenBucket
    userLimits    map[string]*TokenBucket
    globalLimit   *TokenBucket
    mu            sync.RWMutex
}

// TokenBucket 令牌桶实现
type TokenBucket struct {
    tokens     float64
    capacity   float64
    refillRate float64 // 每秒补充的令牌数
    lastRefill time.Time
    mu         sync.Mutex
}

// Allow 检查是否允许请求通过
func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(tb.lastRefill).Seconds()
    tb.tokens = math.Min(tb.capacity, tb.tokens+elapsed*tb.refillRate)
    tb.lastRefill = now

    if tb.tokens >= 1 {
        tb.tokens--
        return true
    }
    return false
}

// CheckRateLimit 执行多层限流检查
func (rl *RateLimiter) CheckRateLimit(teamID, userID string) error {
    // 全局限流
    if !rl.globalLimit.Allow() {
        return &RateLimitError{
            Level:   "global",
            Message: "Global rate limit exceeded",
            RetryAfter: 5 * time.Second,
        }
    }

    // 团队级限流
    rl.mu.RLock()
    teamBucket, exists := rl.teamLimits[teamID]
    rl.mu.RUnlock()
    if exists && !teamBucket.Allow() {
        return &RateLimitError{
            Level:   "team",
            Message: "Team rate limit exceeded",
            RetryAfter: 2 * time.Second,
        }
    }

    // 用户级限流
    rl.mu.RLock()
    userBucket, exists := rl.userLimits[userID]
    rl.mu.RUnlock()
    if exists && !userBucket.Allow() {
        return &RateLimitError{
            Level:   "user",
            Message: "User rate limit exceeded",
            RetryAfter: 1 * time.Second,
        }
    }

    return nil
}

8.4 熔断与降级

Slack 使用断路器模式(Circuit Breaker Pattern)来防止级联故障。当某个下游服务的错误率超过阈值时,断路器会”断开”,后续请求直接返回降级结果,避免继续向故障服务施压。

// CircuitBreaker 断路器实现
type CircuitBreaker struct {
    name          string
    state         CircuitState
    failCount     int64
    successCount  int64
    threshold     int64       // 触发断路的连续失败次数
    resetTimeout  time.Duration
    lastFailure   time.Time
    mu            sync.Mutex
}

type CircuitState int

const (
    StateClosed   CircuitState = iota // 正常状态
    StateOpen                         // 断开状态
    StateHalfOpen                     // 半开状态(试探)
)

// Execute 通过断路器执行操作
func (cb *CircuitBreaker) Execute(fn func() error) error {
    cb.mu.Lock()
    state := cb.state

    switch state {
    case StateOpen:
        // 检查是否到了重试时间
        if time.Since(cb.lastFailure) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.mu.Unlock()
            return cb.tryHalfOpen(fn)
        }
        cb.mu.Unlock()
        return &CircuitOpenError{
            Service:    cb.name,
            RetryAfter: cb.resetTimeout - time.Since(cb.lastFailure),
        }

    case StateHalfOpen:
        cb.mu.Unlock()
        return cb.tryHalfOpen(fn)

    default: // StateClosed
        cb.mu.Unlock()
        return cb.tryClosed(fn)
    }
}

func (cb *CircuitBreaker) tryClosed(fn func() error) error {
    err := fn()
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failCount++
        cb.lastFailure = time.Now()
        if cb.failCount >= cb.threshold {
            cb.state = StateOpen
            log.Printf("circuit breaker [%s] opened after %d failures", cb.name, cb.failCount)
        }
        return err
    }

    cb.failCount = 0
    return nil
}

func (cb *CircuitBreaker) tryHalfOpen(fn func() error) error {
    err := fn()
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.state = StateOpen
        cb.lastFailure = time.Now()
        return err
    }

    cb.state = StateClosed
    cb.failCount = 0
    cb.successCount++
    log.Printf("circuit breaker [%s] closed after successful probe", cb.name)
    return nil
}

8.5 可观测性体系

Slack 的可观测性建立在三大支柱上:

指标(Metrics): 使用内部指标系统采集所有服务的关键指标——延迟、吞吐量、错误率、饱和度。告警规则基于 SLO(服务等级目标)自动生成。

日志(Logging): 结构化日志统一发送到集中式日志平台,支持跨服务关联查询。

追踪(Tracing): 分布式追踪系统跟踪每个请求在不同服务间的完整路径,帮助定位性能瓶颈和故障根因。

// 统一的请求追踪中间件
type TracingMiddleware struct {
    tracer  opentracing.Tracer
    service string
}

func (tm *TracingMiddleware) Wrap(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        spanCtx, _ := tm.tracer.Extract(
            opentracing.HTTPHeaders,
            opentracing.HTTPHeadersCarrier(r.Header),
        )

        span := tm.tracer.StartSpan(
            r.URL.Path,
            ext.RPCServerOption(spanCtx),
        )
        defer span.Finish()

        ext.HTTPMethod.Set(span, r.Method)
        ext.HTTPUrl.Set(span, r.URL.String())
        span.SetTag("service", tm.service)
        span.SetTag("team_id", r.Header.Get("X-Slack-Team-ID"))

        ctx := opentracing.ContextWithSpan(r.Context(), span)
        rw := &responseWriter{ResponseWriter: w, statusCode: 200}

        next.ServeHTTP(rw, r.WithContext(ctx))

        ext.HTTPStatusCode.Set(span, uint16(rw.statusCode))
        if rw.statusCode >= 500 {
            ext.Error.Set(span, true)
        }
    })
}

九、工程案例:大规模企业工作区的性能优化

9.1 问题背景

2020 年,一家全球性企业客户将其 15 万名员工迁移到 Slack。这个工作区的规模远超 Slack 之前服务过的任何单一客户:

这个规模暴露了 Slack 架构中几个此前未显现的瓶颈。

9.2 瓶颈一:频道成员列表查询

当用户打开一个大频道时,需要加载该频道的成员列表。对于 15 万人的频道,仅成员列表的数据量就达到数 MB。传统的做法是一次性加载完整列表,但在这个规模下查询延迟超过 2 秒。

解决方案:虚拟化成员列表

团队引入了分页(Pagination)加虚拟滚动(Virtual Scrolling)的方案。客户端只加载可视区域内的成员数据,随着用户滚动再按需加载更多。

// PaginatedMemberLoader 分页成员加载器
type PaginatedMemberLoader struct {
    vitessClient *VitessClient
    cache        *MemberCache
}

type MemberPage struct {
    Members    []MemberInfo `json:"members"`
    TotalCount int          `json:"total_count"`
    NextCursor string       `json:"next_cursor"`
    HasMore    bool         `json:"has_more"`
}

// LoadMemberPage 加载一页成员数据
func (ml *PaginatedMemberLoader) LoadMemberPage(
    ctx context.Context,
    teamID string,
    channelID string,
    cursor string,
    pageSize int,
) (*MemberPage, error) {
    // 优先从缓存获取
    cacheKey := fmt.Sprintf("members:%s:%s:%s:%d", teamID, channelID, cursor, pageSize)
    if cached, ok := ml.cache.Get(cacheKey); ok {
        return cached.(*MemberPage), nil
    }

    // 使用游标分页查询 Vitess
    query := `
        SELECT cm.user_id, u.display_name, u.avatar_hash, cm.role, cm.joined_at
        FROM channel_members cm
        JOIN users u ON cm.user_id = u.user_id AND cm.team_id = u.team_id
        WHERE cm.team_id = ? AND cm.channel_id = ?
    `
    args := []interface{}{teamID, channelID}

    if cursor != "" {
        query += " AND cm.user_id > ?"
        args = append(args, cursor)
    }

    query += " ORDER BY cm.user_id LIMIT ?"
    args = append(args, pageSize+1) // 多取一条以判断是否还有更多

    rows, err := ml.vitessClient.Query(ctx, query, args...)
    if err != nil {
        return nil, fmt.Errorf("failed to query members: %w", err)
    }
    defer rows.Close()

    members := make([]MemberInfo, 0, pageSize)
    for rows.Next() {
        var m MemberInfo
        if err := rows.Scan(&m.UserID, &m.DisplayName, &m.AvatarHash, &m.Role, &m.JoinedAt); err != nil {
            return nil, err
        }
        members = append(members, m)
    }

    hasMore := len(members) > pageSize
    if hasMore {
        members = members[:pageSize]
    }

    page := &MemberPage{
        Members:    members,
        TotalCount: ml.getCachedTotalCount(teamID, channelID),
        HasMore:    hasMore,
    }
    if len(members) > 0 {
        page.NextCursor = members[len(members)-1].UserID
    }

    ml.cache.Set(cacheKey, page, 30*time.Second)
    return page, nil
}

9.3 瓶颈二:消息扇出延迟

15 万人的全员频道发消息时,扇出延迟从正常的 100 毫秒飙升到 3 秒以上。根本原因是扇出 Worker 需要遍历全部 15 万个成员来确定谁在线。

解决方案:分级扇出 + 在线状态预索引

团队为大频道构建了在线状态的预索引。扇出 Worker 不再遍历全量成员列表,而是直接查询”这个频道中哪些成员当前在线”。

// PresenceIndex 在线状态预索引
type PresenceIndex struct {
    redisClient *redis.Client
}

// GetOnlineMembers 获取指定频道的在线成员
func (pi *PresenceIndex) GetOnlineMembers(
    ctx context.Context,
    teamID string,
    channelID string,
) ([]string, error) {
    // 使用 Redis 集合交集:频道成员集合 AND 在线用户集合
    channelKey := fmt.Sprintf("channel:members:%s:%s", teamID, channelID)
    onlineKey := fmt.Sprintf("team:online:%s", teamID)

    // SINTERSTORE 将交集结果存入临时集合
    resultKey := fmt.Sprintf("tmp:online_in_channel:%s:%s:%d",
        teamID, channelID, time.Now().UnixNano())

    pipe := pi.redisClient.Pipeline()
    pipe.SInterStore(ctx, resultKey, channelKey, onlineKey)
    pipe.Expire(ctx, resultKey, 10*time.Second)
    pipe.SMembers(ctx, resultKey)
    results, err := pipe.Exec(ctx)
    if err != nil {
        return nil, fmt.Errorf("failed to compute online members: %w", err)
    }

    // 获取交集结果
    members := results[2].(*redis.StringSliceCmd).Val()
    // 清理临时 key
    go pi.redisClient.Del(context.Background(), resultKey)

    return members, nil
}

9.4 瓶颈三:Flannel 缓存预热

用户首次登录时,Flannel 需要为其构建工作区快照。15 万成员的团队中,仅团队成员列表就非常庞大,缓存预热时间过长。

解决方案:分层快照 + 懒加载

团队将快照分为”核心层”和”扩展层”。核心层只包含用户最常用的频道和必要的元数据,客户端可以在 1 秒内完成启动。扩展层(如完整的团队成员列表)在后台异步加载。

9.5 优化成果

经过上述优化后的性能指标:

指标 优化前 优化后 改善
全员频道消息扇出延迟(P99) 3200 毫秒 280 毫秒 91%
客户端启动时间(冷启动) 12 秒 1.8 秒 85%
大频道成员列表加载 2400 毫秒 180 毫秒(首页) 92%
Flannel 缓存命中率 78% 96% 18 个百分点
峰值扇出 Worker CPU 使用率 92% 45% 51%

9.6 案例总结

这个案例揭示了实时协作系统在超大规模场景下的核心挑战:

  1. 数据量不是线性增长的问题,而是组合爆炸的问题。 频道成员数从 1 万增长到 15 万,扇出代价增长的倍数远超 15 倍,因为涉及在线状态查询、网关路由等多个维度。

  2. 缓存策略需要根据数据规模动态调整。 小团队可以缓存完整快照,大团队必须分层缓存、按需加载。

  3. 预索引是应对查询扇出的关键武器。 通过维护在线成员的预索引,将 O(N) 的遍历操作降为 O(1) 的集合运算。

十、实时协作系统的设计经验

10.1 连接管理的核心原则

最小化连接数: 每个客户端只维持一条 WebSocket 连接,所有实时事件通过这条连接多路复用(Multiplexing)。避免为不同功能建立多条连接。

优雅降级: 当 WebSocket 不可用时(某些企业网络环境),自动回退到 HTTP 长轮询。虽然延迟会增加,但功能不受影响。

连接质量检测: 不仅依赖 Ping/Pong 心跳,还在应用层追踪消息的往返延迟(RTT,Round-Trip Time)。当 RTT 持续偏高时,客户端主动重连到更优的网关节点。

10.2 消息可靠性保证

Slack 对消息可靠性的承诺是”至少一次投递”(At-Least-Once Delivery)。这意味着消息可能被重复投递,客户端负责去重。去重的依据是每条消息的唯一时间戳(message_ts)。

消息可靠性的多层保障:

10.3 一致性模型选择

Slack 在不同场景下采用不同的一致性模型:

场景 一致性模型 原因
消息发送 强一致性(写后读保证) 用户发送消息后必须立即在自己的视图中看到
消息扇出 最终一致性(通常亚秒级) 其他成员可以容忍百毫秒级的延迟
未读计数 最终一致性 计数偶尔不准确是可以接受的
频道元数据 读写一致性 修改频道名称等操作需要立即生效
用户状态 最终一致性 在线/离线状态允许短暂延迟
搜索结果 最终一致性(通常秒级) 新消息可以在几秒后出现在搜索结果中

10.4 技术选型的权衡

在构建实时协作系统时,Slack 团队在多个关键技术选型上做出了取舍:

WebSocket 还是 SSE(服务器推送事件): WebSocket 支持全双工通信,客户端可以通过同一连接发送”正在输入”等状态。SSE 只支持单向推送,虽然实现更简单但功能受限。Slack 选择了 WebSocket。

自建消息队列还是使用开源方案: Slack 最终选择了 Kafka,因为它在高吞吐量和持久化方面表现优异,且社区生态成熟。

嵌入式搜索还是独立搜索服务: 独立搜索服务虽然增加了运维复杂度,但允许独立扩展搜索容量,不影响核心消息链路的稳定性。

10.5 面向未来的架构思考

Slack 的架构演进反映了一条普遍规律:系统的复杂性随着用户规模的增长而非线性增长,每个数量级的跨越都可能需要架构层面的根本性调整。

当前 Slack 面临的新挑战包括:

参考资料

  1. Slack Engineering Blog, “Scaling Slack’s Job Queue,” 2016.
  2. Slack Engineering Blog, “Flannel: An Application-Level Edge Cache to Make Slack Scale,” 2018.
  3. Slack Engineering Blog, “A Terrible, Horrible, No-Good, Very Bad Day at Slack,” 2020.
  4. Slack Engineering Blog, “Migrating Millions of Concurrent Websockets to Envoy,” 2021.
  5. Slack Engineering Blog, “Real-time Messaging,” 2016.
  6. Sugu Sougoumarane, “Vitess: MySQL Sharding at Scale,” PlanetScale, 2019.
  7. Keith Adams, “Hack: A New Programming Language for HHVM,” Facebook Engineering, 2014.
  8. Slack Engineering Blog, “Reducing Slack’s Memory Footprint,” 2019.
  9. Slack Engineering Blog, “How Slack Built Shared Channels,” 2020.
  10. Slack Engineering Blog, “Slack’s Migration to a Cellular Architecture,” 2022.
  11. Jamie Gaskins, “How Slack Uses Vitess,” Vitess Blog, 2020.
  12. Slack Engineering Blog, “Rebuilding Slack’s Search Infrastructure,” 2021.
  13. Slack Status Page, “Incident Reports Archive,” https://status.slack.com.
  14. Martin Kleppmann, “Designing Data-Intensive Applications,” O’Reilly Media, 2017.

上一篇:Google 基础设施 下一篇:Shopify 架构

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】数据库扩展:分库分表的工程实践与替代方案

当单表数据量突破千万行、查询延迟从毫秒级劣化到秒级时,分库分表往往是团队面临的第一个选项。本文从分片时机判断、三种分片策略的工程实现、跨分片查询的六种解法讲起,再拆解 Vitess、TiDB、CockroachDB 三套工业级方案的架构差异,回答一个核心问题:NewSQL 能否让我们彻底告别分库分表?

2026-04-13 · architecture

【系统架构设计百科】长连接与推送架构:WebSocket、SSE 与 MQTT

推送系统的核心难度不在协议选型,而在连接管理、心跳检测、断线重连、消息可靠投递这些工程细节。本文从 WebSocket 帧格式、SSE 重连机制、MQTT QoS 三级语义讲起,拆解百万长连接的 epoll 单机架构,深入分析心跳探活、指数退避重连、离线消息队列的设计取舍,结合即时通讯和物联网两个工程案例,讨论推送系统从单机到集群的水平扩展路径。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。


By .