土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】Uber 架构演进:从单体到领域导向微服务

文章导航

分类入口
architecture
标签入口
#Uber#DOMA#Ringpop#Schemaless#microservice-governance

目录

Uber 在 2010 年上线时只有一个 Python 单体应用,服务三个城市的出行需求。到 2020 年,这家公司运行着超过 4000 个微服务,覆盖出行、外卖、货运、金融等多条业务线,日均处理数千万次行程请求。这段十年的技术演进史,浓缩了单体拆分、微服务膨胀、治理回归三个阶段的完整教训。本文将从时间线出发,逐层拆解 Uber 在每个阶段面对的核心技术问题和对应的架构决策,重点分析 DOMA(Domain-Oriented Microservice Architecture,领域导向微服务架构)、Ringpop 一致性哈希库、Schemaless 追加写入存储引擎三个标志性技术方案。

一、Uber 技术栈演进时间线

Uber 的架构演进可以划分为四个阶段,每个阶段的驱动力不同,做出的架构决策也截然不同。

时间段 阶段 核心架构 驱动力 关键技术决策
2010-2012 单体起步 Python 单体(Dispatch) 快速验证产品 单一代码库,PostgreSQL
2012-2014 单体拆分 Python + Node.js 双单体 团队扩张,部署瓶颈 按业务线拆分为两个独立应用
2014-2018 微服务爆炸 数千个微服务 工程团队从百人到千人 服务自治,技术栈多样化
2018-2022 领域治理 DOMA 分层架构 微服务膨胀带来的复杂性危机 领域网关,平台化,服务归属

下面的时间线图展示了关键技术里程碑:

timeline
    title Uber 架构演进时间线
    2010 : 第一版 Dispatch 单体上线
         : Python + PostgreSQL
    2012 : 拆分为 Dispatch 和 API 两个服务
         : 引入 Node.js
    2014 : 微服务化启动
         : Schemaless 存储上线
         : Ringpop 开源
    2015 : 迁移至 Go 和 Java
         : TChannel RPC 框架发布
    2016 : M3 监控平台上线
         : Peloton 资源调度器
    2017 : 微服务数量突破 2000
         : Jaeger 分布式追踪开源
    2018 : 微服务数量突破 4000
         : DOMA 架构提出
    2020 : 领域网关全面铺开
         : Up 平台化框架
    2022 : DOMA 成熟
         : 服务数量回落至约 2200

二、单体架构阶段(2010-2014)

2.1 Dispatch:一个 Python 单体打天下

Uber 最初的核心系统名为 Dispatch(调度系统),使用 Python 编写,运行在单台服务器上。这个系统承担了所有职责:

数据库选用 PostgreSQL,所有数据存储在一个实例中。早期这个架构运转良好——团队只有不到 20 个工程师,产品迭代速度是第一优先级。

# 早期 Dispatch 系统的简化结构(概念性代码)
class DispatchService:
    """Uber 早期单体调度系统的核心逻辑"""

    def __init__(self, db: PostgresConnection):
        self.db = db
        self.geo_index = InMemoryGeoIndex()

    def request_ride(self, rider_id: str, pickup: LatLng, dropoff: LatLng) -> Trip:
        # 定价计算
        price = self._calculate_price(pickup, dropoff)

        # 创建行程记录
        trip = self.db.insert("trips", {
            "rider_id": rider_id,
            "pickup_lat": pickup.lat,
            "pickup_lng": pickup.lng,
            "dropoff_lat": dropoff.lat,
            "dropoff_lng": dropoff.lng,
            "price": price,
            "status": "REQUESTING"
        })

        # 查找最近的可用司机
        nearby_drivers = self.geo_index.find_nearest(
            pickup, radius_km=5.0, limit=10
        )

        # 逐一推送给司机
        for driver in nearby_drivers:
            if self._offer_trip_to_driver(driver, trip):
                trip.status = "MATCHED"
                trip.driver_id = driver.id
                self.db.update("trips", trip)
                return trip

        trip.status = "NO_DRIVERS"
        self.db.update("trips", trip)
        return trip

    def _calculate_price(self, pickup: LatLng, dropoff: LatLng) -> Decimal:
        distance = haversine(pickup, dropoff)
        base_fare = Decimal("2.50")
        per_km = Decimal("1.75")
        return base_fare + per_km * Decimal(str(distance))

    def _offer_trip_to_driver(self, driver: Driver, trip: Trip) -> bool:
        # 通过长轮询推送给司机端 App
        response = self._push_to_driver(driver.id, {
            "type": "TRIP_OFFER",
            "trip_id": trip.id,
            "pickup": trip.pickup,
            "timeout_seconds": 15
        })
        return response.accepted

2.2 单体的崩溃时刻

到 2013 年,Uber 已扩展到数十个城市,工程师团队也增长到 100 多人。单体架构的问题集中爆发:

部署耦合:任何一个功能改动都需要部署整个 Dispatch 应用。一次支付逻辑的 bug 修复导致了调度模块的回归——因为两个模块共享了同一个数据库事务上下文。

数据库瓶颈:所有读写都指向单个 PostgreSQL 实例。高峰期的行程请求和司机位置更新(每秒数万次写入)使数据库 CPU 持续处于 90% 以上。

技术栈锁定:Python 的 GIL(全局解释器锁)限制了 CPU 密集型任务(如定价计算、路径规划)的并发能力。团队想用 Go 或 Java 重写某些模块,但单体架构不允许混合技术栈。

组织摩擦:100 多个工程师在一个代码仓库里工作,合并冲突频繁,代码审查负担沉重,发布周期从每天一次拉长到每周一次。

2.3 第一次拆分:从一到二

2013 年底,Uber 进行了第一次架构拆分,将系统分为两个独立的应用:

这次拆分缓解了部署耦合问题,但本质上只是把一个大单体变成了两个较小的单体。数据库仍然共享,真正的解耦并未实现。

三、微服务爆炸增长(2014-2018)

3.1 微服务化的动因

2014 年,Uber 同时面对三个压力:

  1. 业务扩展:从出行扩展到外卖(UberEATS)、货运(Uber Freight),每条新业务线都需要独立的开发和部署节奏
  2. 全球化:进入中国、印度、东南亚市场,需要本地化的技术方案
  3. 工程团队暴涨:工程师数量从 200 人增长到 2000 人,团队自治成为组织效率的前提

在这些压力下,Uber 开始大规模推进微服务化。核心原则是:每个团队拥有自己的服务,独立部署,独立选择技术栈。

3.2 微服务拆分的策略

Uber 的微服务拆分并非按照教科书式的领域驱动设计(Domain-Driven Design, DDD)进行。实际的拆分更多是团队驱动的——哪个团队需要独立迭代某个功能,就把那个功能拆成一个服务。这导致了几个后果:

3.3 RPC 框架:从 HTTP 到 TChannel

早期微服务之间通过 HTTP/JSON 通信。随着服务数量增长,HTTP 的开销变得不可忽视——每次请求的 TCP 握手、文本序列化和反序列化都在消耗延迟预算。

2015 年,Uber 开发了 TChannel,一个基于 TCP 的多路复用 RPC 框架。TChannel 的核心特性:

// TChannel 服务端注册示例(Go)
package main

import (
    "context"
    "log"

    "github.com/uber/tchannel-go"
    "github.com/uber/tchannel-go/thrift"
)

func main() {
    ch, err := tchannel.NewChannel("pricing-service", nil)
    if err != nil {
        log.Fatalf("创建 TChannel 失败: %v", err)
    }

    server := thrift.NewServer(ch)
    server.Register(newPricingHandler())

    if err := ch.ListenAndServe("0.0.0.0:7890"); err != nil {
        log.Fatalf("监听失败: %v", err)
    }

    log.Println("pricing-service 启动在端口 7890")
    select {} // 阻塞主协程
}

type pricingHandler struct{}

func newPricingHandler() *pricingHandler {
    return &pricingHandler{}
}

func (h *pricingHandler) CalculatePrice(
    ctx context.Context,
    request *PriceRequest,
) (*PriceResponse, error) {
    distance := haversine(
        request.PickupLat, request.PickupLng,
        request.DropoffLat, request.DropoffLng,
    )

    surgeMultiplier := getSurgeMultiplier(
        request.PickupLat, request.PickupLng,
    )

    baseFare := 2.50
    perKm := 1.75
    price := (baseFare + perKm*distance) * surgeMultiplier

    return &PriceResponse{
        PriceCents:      int64(price * 100),
        SurgeMultiplier: surgeMultiplier,
        Currency:        request.Currency,
    }, nil
}

后来 Uber 逐步从 TChannel 迁移到了 gRPC,但 TChannel 的设计思想——多路复用、内置追踪、二进制帧——在 Uber 的 RPC 基础设施中留下了深刻印记。

3.4 微服务膨胀的代价

到 2017-2018 年,4000 多个微服务带来的复杂性成本已经超过了它们带来的自治收益:

问题 具体表现 影响
调用链爆炸 一次行程请求触发 50+ 个服务调用 P99 延迟不可控,单点故障放大
服务归属不清 30% 的服务没有明确的负责团队 故障时无人响应
变更协调困难 一次 API 变更可能影响 20+ 个下游服务 发布周期反而变长
测试成本高 集成测试需要启动数十个依赖服务 开发体验恶化
基础设施成本 每个服务独立部署,资源利用率低 基础设施开支超支

这个阶段的核心教训是:微服务不是免费的午餐,服务数量超过组织管理能力后,分布式系统的复杂性成本会急剧上升

四、DOMA:领域导向微服务架构

4.1 DOMA 的提出背景

2018 年,Uber 工程团队提出了 DOMA(Domain-Oriented Microservice Architecture,领域导向微服务架构),目标是在保留微服务自治优势的同时,通过领域分层和接口管控来降低系统整体复杂性。

DOMA 不是要回退到单体,而是要在微服务之上建立一层治理结构。它的核心观点是:微服务的边界不应该由团队组织结构决定,而应该由业务领域决定

4.2 DOMA 的层次模型

DOMA 将 Uber 的微服务体系划分为四个层次:

graph TB
    subgraph "DOMA 层次模型"
        direction TB
        
        subgraph "Layer 1: 基础设施层(Infrastructure Layer)"
            L1A[存储服务]
            L1B[消息队列]
            L1C[监控平台]
            L1D[网络基础设施]
        end

        subgraph "Layer 2: 业务平台层(Business Platform Layer)"
            L2A[用户服务]
            L2B[支付平台]
            L2C[地图与路径]
            L2D[通知服务]
            L2E[定价引擎]
        end

        subgraph "Layer 3: 产品领域层(Product Domain Layer)"
            L3A[出行领域]
            L3B[外卖领域]
            L3C[货运领域]
            L3D[金融领域]
        end

        subgraph "Layer 4: 边缘层 / 网关层(Edge Layer)"
            L4A[乘客网关]
            L4B[司机网关]
            L4C[商户网关]
            L4D[内部工具网关]
        end

        L4A --> L3A
        L4A --> L3B
        L4B --> L3A
        L4C --> L3B
        L4D --> L3D

        L3A --> L2A
        L3A --> L2B
        L3A --> L2C
        L3A --> L2E
        L3B --> L2A
        L3B --> L2B
        L3B --> L2D
        L3C --> L2A
        L3C --> L2C

        L2A --> L1A
        L2B --> L1A
        L2B --> L1B
        L2C --> L1A
        L2D --> L1B
        L2E --> L1A
    end

每一层的职责和约束如下:

基础设施层:提供通用的技术能力,如存储、消息、监控。这一层的服务不包含业务逻辑,接口稳定,变更频率低。

业务平台层:封装跨产品线复用的业务能力,如用户管理、支付处理、地图服务。这一层的关键约束是:平台服务不能依赖产品层服务,依赖方向必须自上而下。

产品领域层:每条业务线(出行、外卖、货运)拥有自己的领域,内部可以自由组织微服务。领域之间只通过领域网关(Domain Gateway)通信,不允许直接调用对方的内部服务。

边缘层:面向不同客户端(乘客 App、司机 App、商户后台)的 API 网关,负责协议转换、认证鉴权、请求路由。

4.3 领域网关(Domain Gateway)

领域网关是 DOMA 中最关键的机制。它的作用类似于面向对象编程中的接口(Interface)——领域的内部实现对外不可见,所有跨领域调用必须通过网关暴露的标准接口。

// 出行领域网关的接口定义(Java + Protobuf)
// ride_domain_gateway.proto

syntax = "proto3";

package uber.ride.gateway;

option java_package = "com.uber.ride.gateway";
option java_outer_classname = "RideDomainGatewayProto";

// 出行领域对外暴露的标准接口
service RideDomainGateway {
    // 请求行程
    rpc RequestRide(RequestRideRequest) returns (RequestRideResponse);

    // 查询行程状态
    rpc GetRideStatus(GetRideStatusRequest) returns (GetRideStatusResponse);

    // 取消行程
    rpc CancelRide(CancelRideRequest) returns (CancelRideResponse);

    // 获取行程预估价格
    rpc EstimateRide(EstimateRideRequest) returns (EstimateRideResponse);
}

message RequestRideRequest {
    string rider_id = 1;
    Location pickup = 2;
    Location dropoff = 3;
    string product_id = 4;        // UberX, UberBlack 等
    PaymentMethod payment = 5;
}

message RequestRideResponse {
    string ride_id = 1;
    RideStatus status = 2;
    DriverInfo driver = 3;
    PriceEstimate price = 4;
    int32 eta_seconds = 5;
}

message Location {
    double latitude = 1;
    double longitude = 2;
    string address = 3;
}

enum RideStatus {
    RIDE_STATUS_UNKNOWN = 0;
    REQUESTING = 1;
    MATCHING = 2;
    MATCHED = 3;
    EN_ROUTE_TO_PICKUP = 4;
    ARRIVED_AT_PICKUP = 5;
    IN_TRIP = 6;
    COMPLETED = 7;
    CANCELLED = 8;
}
// 出行领域网关的实现(Java)
package com.uber.ride.gateway;

import com.uber.ride.dispatch.DispatchService;
import com.uber.ride.pricing.PricingService;
import com.uber.ride.matching.MatchingService;
import com.uber.ride.trip.TripService;

public class RideDomainGatewayImpl implements RideDomainGateway {

    private final DispatchService dispatchService;
    private final PricingService pricingService;
    private final MatchingService matchingService;
    private final TripService tripService;

    public RideDomainGatewayImpl(
            DispatchService dispatchService,
            PricingService pricingService,
            MatchingService matchingService,
            TripService tripService) {
        this.dispatchService = dispatchService;
        this.pricingService = pricingService;
        this.matchingService = matchingService;
        this.tripService = tripService;
    }

    @Override
    public RequestRideResponse requestRide(RequestRideRequest request) {
        // 1. 计算价格
        PriceEstimate price = pricingService.calculate(
            request.getPickup(),
            request.getDropoff(),
            request.getProductId()
        );

        // 2. 创建行程记录
        Trip trip = tripService.create(
            request.getRiderId(),
            request.getPickup(),
            request.getDropoff(),
            price
        );

        // 3. 发起司机匹配(异步)
        matchingService.findDriverAsync(trip.getId(), request.getPickup());

        // 4. 返回响应
        return RequestRideResponse.newBuilder()
            .setRideId(trip.getId())
            .setStatus(RideStatus.REQUESTING)
            .setPrice(price)
            .setEtaSeconds(dispatchService.estimateEta(request.getPickup()))
            .build();
    }

    @Override
    public GetRideStatusResponse getRideStatus(GetRideStatusRequest request) {
        Trip trip = tripService.get(request.getRideId());
        return GetRideStatusResponse.newBuilder()
            .setRideId(trip.getId())
            .setStatus(trip.getStatus())
            .setDriver(trip.getDriverInfo())
            .setCurrentLocation(trip.getCurrentLocation())
            .build();
    }

    @Override
    public CancelRideResponse cancelRide(CancelRideRequest request) {
        Trip trip = tripService.get(request.getRideId());

        CancellationResult result = tripService.cancel(
            trip.getId(),
            request.getReason()
        );

        if (result.hasFee()) {
            // 领域网关内部协调取消费的计算
            pricingService.applyCancellationFee(
                trip.getRiderId(),
                result.getCancellationFee()
            );
        }

        return CancelRideResponse.newBuilder()
            .setRideId(trip.getId())
            .setStatus(RideStatus.CANCELLED)
            .setCancellationFee(result.getCancellationFee())
            .build();
    }

    @Override
    public EstimateRideResponse estimateRide(EstimateRideRequest request) {
        PriceEstimate price = pricingService.calculate(
            request.getPickup(),
            request.getDropoff(),
            request.getProductId()
        );

        int etaSeconds = dispatchService.estimateEta(request.getPickup());

        return EstimateRideResponse.newBuilder()
            .setPrice(price)
            .setEtaSeconds(etaSeconds)
            .build();
    }
}

4.4 DOMA 的核心约束

DOMA 能够发挥作用的前提是严格遵守以下约束:

约束 描述 违反后果
分层依赖 上层可以依赖下层,下层不能依赖上层 循环依赖,部署耦合
领域封装 领域内部服务不对外暴露,只通过网关 领域边界名存实亡
数据隔离 每个领域拥有自己的数据存储 数据耦合,无法独立演进
接口版本化 网关接口变更必须向后兼容 跨领域集成频繁中断
单一归属 每个服务必须归属于且仅归属于一个领域 治理失效,无人负责

4.5 DOMA 的落地效果

Uber 从 2018 年开始推行 DOMA,到 2022 年取得了显著效果:

五、Ringpop:一致性哈希库

5.1 为什么需要 Ringpop

在微服务架构中,许多场景需要将请求路由到特定的服务实例——例如,行程状态管理需要保证同一行程的所有操作都路由到同一台机器(会话亲和性),地理空间索引需要按区域分片。

传统方案是使用外部的一致性哈希服务(如 ZooKeeper),但这引入了额外的基础设施依赖和网络跳数。Uber 开发了 Ringpop,一个嵌入式的一致性哈希库,直接集成到应用进程中。

5.2 Ringpop 的核心机制

Ringpop 结合了两个协议:

  1. SWIM 协议(Scalable Weakly-consistent Infection-style Process Group Membership Protocol):用于节点发现和故障检测
  2. 一致性哈希环(Consistent Hash Ring):用于请求路由
graph LR
    subgraph "Ringpop 一致性哈希环"
        direction LR
        N1["节点 A<br/>哈希值: 0-90"]
        N2["节点 B<br/>哈希值: 91-180"]
        N3["节点 C<br/>哈希值: 181-270"]
        N4["节点 D<br/>哈希值: 271-360"]
    end

    REQ1["请求 Key=trip_123<br/>hash=142"] --> N2
    REQ2["请求 Key=trip_456<br/>hash=305"] --> N4
    REQ3["请求 Key=trip_789<br/>hash=55"] --> N1

    subgraph "SWIM 故障检测"
        PING["周期性 Ping"]
        INDIRECT["间接 Ping 验证"]
        SUSPECT["可疑标记"]
        FAULTY["故障确认"]
        PING --> INDIRECT
        INDIRECT --> SUSPECT
        SUSPECT --> FAULTY
    end

SWIM 协议的工作流程:

  1. 每个节点周期性地(默认 200ms)随机选择一个节点发送 Ping
  2. 如果 Ping 超时,通过 K 个随机节点发送间接 Ping(Ping-req)
  3. 如果间接 Ping 也超时,将目标节点标记为可疑(Suspect)
  4. 可疑状态持续超过阈值后,确认为故障(Faulty),从哈希环移除
  5. 成员变更信息通过 Gossip 协议在集群中传播

5.3 Ringpop 的实现细节

// Ringpop 核心使用示例(Go)
package main

import (
    "fmt"
    "log"

    "github.com/uber/ringpop-go"
    "github.com/uber/ringpop-go/swim"
    "github.com/uber/tchannel-go"
)

func main() {
    // 创建 TChannel 通道
    ch, err := tchannel.NewChannel("trip-state-service", nil)
    if err != nil {
        log.Fatal(err)
    }

    // 创建 Ringpop 实例
    rp, err := ringpop.New(
        "trip-state-ring",
        ringpop.Channel(ch),
        ringpop.Address("10.0.1.100:7000"),
    )
    if err != nil {
        log.Fatal(err)
    }

    // 启动监听
    if err := ch.ListenAndServe("0.0.0.0:7000"); err != nil {
        log.Fatal(err)
    }

    // 加入集群(通过引导节点列表)
    bootstrapNodes := []string{
        "10.0.1.100:7000",
        "10.0.1.101:7000",
        "10.0.1.102:7000",
    }
    opts := new(swim.BootstrapOptions)
    opts.Hosts = bootstrapNodes

    if _, err := rp.Bootstrap(opts); err != nil {
        log.Fatal(err)
    }

    // 根据行程 ID 查找负责的节点
    tripID := "trip_abc123"
    targetNode, err := rp.Lookup(tripID)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("行程 %s 由节点 %s 负责处理\n", tripID, targetNode)

    // 如果当前节点就是目标节点,直接处理
    if targetNode == rp.WhoAmI() {
        handleTripLocally(tripID)
    } else {
        forwardToNode(targetNode, tripID)
    }
}

func handleTripLocally(tripID string) {
    fmt.Printf("本地处理行程: %s\n", tripID)
}

func forwardToNode(node string, tripID string) {
    fmt.Printf("转发行程 %s 到节点 %s\n", tripID, node)
}

5.4 Ringpop 的虚拟节点策略

为了避免哈希环上节点分布不均导致的负载倾斜,Ringpop 使用了虚拟节点(Virtual Node)策略。每个物理节点在哈希环上映射多个虚拟位置:

// 虚拟节点配置示例(Go)
package hashring

import (
    "crypto/md5"
    "encoding/binary"
    "fmt"
    "sort"
    "sync"
)

type HashRing struct {
    mu             sync.RWMutex
    nodes          map[string]bool
    virtualNodes   int
    ring           []uint32
    ringToNode     map[uint32]string
}

func NewHashRing(virtualNodes int) *HashRing {
    return &HashRing{
        nodes:        make(map[string]bool),
        virtualNodes: virtualNodes,
        ringToNode:   make(map[uint32]string),
    }
}

func (h *HashRing) AddNode(node string) {
    h.mu.Lock()
    defer h.mu.Unlock()

    h.nodes[node] = true

    for i := 0; i < h.virtualNodes; i++ {
        virtualKey := fmt.Sprintf("%s#%d", node, i)
        hashValue := h.hash(virtualKey)
        h.ring = append(h.ring, hashValue)
        h.ringToNode[hashValue] = node
    }

    sort.Slice(h.ring, func(i, j int) bool {
        return h.ring[i] < h.ring[j]
    })
}

func (h *HashRing) RemoveNode(node string) {
    h.mu.Lock()
    defer h.mu.Unlock()

    delete(h.nodes, node)

    newRing := make([]uint32, 0)
    for _, hashValue := range h.ring {
        if h.ringToNode[hashValue] != node {
            newRing = append(newRing, hashValue)
        } else {
            delete(h.ringToNode, hashValue)
        }
    }
    h.ring = newRing
}

func (h *HashRing) Lookup(key string) string {
    h.mu.RLock()
    defer h.mu.RUnlock()

    if len(h.ring) == 0 {
        return ""
    }

    hashValue := h.hash(key)

    // 二分查找第一个大于等于 hashValue 的位置
    idx := sort.Search(len(h.ring), func(i int) bool {
        return h.ring[i] >= hashValue
    })

    // 环绕
    if idx >= len(h.ring) {
        idx = 0
    }

    return h.ringToNode[h.ring[idx]]
}

func (h *HashRing) hash(key string) uint32 {
    sum := md5.Sum([]byte(key))
    return binary.BigEndian.Uint32(sum[:4])
}

5.5 Ringpop 与同类方案的对比

特性 Ringpop ZooKeeper Consul etcd
部署方式 嵌入式(无额外进程) 独立集群(3-5 节点) 独立集群 独立集群
故障检测 SWIM Gossip Session + Ephemeral Node Gossip + Serf Raft 心跳
一致性级别 最终一致 强一致(ZAB) 最终一致(Gossip)/ 强一致(Raft) 强一致(Raft)
网络开销 O(1) 每周期 O(N) 心跳 O(log N) Gossip O(N) Raft
适用场景 请求路由、分片 元数据管理、选主 服务发现、KV 配置管理、选主
运维复杂度 低(无独立集群) 高(需要运维 ZK 集群)

六、Schemaless:追加写入存储引擎

6.1 PostgreSQL 的瓶颈

Uber 在 2014 年从 PostgreSQL 迁移到 MySQL 的过程中,开发了 Schemaless——一个构建在 MySQL 之上的追加写入(Append-Only)存储层。迁移的直接原因是 PostgreSQL 在 Uber 的高写入场景下暴露了严重的性能问题:

6.2 Schemaless 的数据模型

Schemaless 的核心设计思想是:将所有写操作转化为追加操作,永不原地更新

数据模型基于三个概念:

-- Schemaless 底层的 MySQL 表结构
CREATE TABLE schemaless_cells (
    row_key    VARCHAR(255) NOT NULL,
    column_name VARCHAR(255) NOT NULL,
    ref_key    BIGINT NOT NULL,
    body       MEDIUMBLOB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    PRIMARY KEY (row_key, column_name, ref_key),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB
  DEFAULT CHARSET=utf8mb4;
# Schemaless 客户端使用示例(Python)
import json
import time
from typing import Optional

class SchemalessClient:
    """Schemaless 追加写入存储客户端"""

    def __init__(self, mysql_pool, shard_count: int = 4096):
        self.mysql_pool = mysql_pool
        self.shard_count = shard_count

    def put_cell(
        self, row_key: str, column_name: str, body: dict
    ) -> int:
        """写入一个 cell(追加操作,不覆盖旧版本)"""
        shard_id = self._get_shard(row_key)
        conn = self.mysql_pool.get_connection(shard_id)

        ref_key = int(time.time() * 1_000_000)  # 微秒级时间戳
        body_bytes = json.dumps(body).encode("utf-8")

        conn.execute(
            """
            INSERT INTO schemaless_cells
                (row_key, column_name, ref_key, body)
            VALUES (%s, %s, %s, %s)
            """,
            (row_key, column_name, ref_key, body_bytes),
        )
        conn.commit()
        return ref_key

    def get_cell_latest(
        self, row_key: str, column_name: str
    ) -> Optional[dict]:
        """读取指定 row_key + column_name 的最新版本"""
        shard_id = self._get_shard(row_key)
        conn = self.mysql_pool.get_connection(shard_id)

        result = conn.execute(
            """
            SELECT body FROM schemaless_cells
            WHERE row_key = %s AND column_name = %s
            ORDER BY ref_key DESC
            LIMIT 1
            """,
            (row_key, column_name),
        )

        row = result.fetchone()
        if row is None:
            return None
        return json.loads(row[0])

    def get_cell_history(
        self, row_key: str, column_name: str, limit: int = 10
    ) -> list:
        """读取指定 cell 的历史版本"""
        shard_id = self._get_shard(row_key)
        conn = self.mysql_pool.get_connection(shard_id)

        result = conn.execute(
            """
            SELECT ref_key, body, created_at
            FROM schemaless_cells
            WHERE row_key = %s AND column_name = %s
            ORDER BY ref_key DESC
            LIMIT %s
            """,
            (row_key, column_name, limit),
        )

        return [
            {
                "ref_key": row[0],
                "body": json.loads(row[1]),
                "created_at": row[2],
            }
            for row in result.fetchall()
        ]

    def _get_shard(self, row_key: str) -> int:
        """基于行键计算分片 ID"""
        hash_value = hash(row_key)
        return hash_value % self.shard_count


# 使用示例:行程状态管理
client = SchemalessClient(mysql_pool)

# 创建行程
trip_id = "trip_2024_abc123"
client.put_cell(trip_id, "base", {
    "rider_id": "rider_001",
    "pickup": {"lat": 37.7749, "lng": -122.4194},
    "dropoff": {"lat": 37.3382, "lng": -121.8863},
    "product": "UberX",
    "created_at": "2024-01-15T10:30:00Z",
})

# 行程状态更新(追加新版本,不覆盖旧版本)
client.put_cell(trip_id, "status", {
    "state": "MATCHING",
    "updated_at": "2024-01-15T10:30:01Z",
})

client.put_cell(trip_id, "status", {
    "state": "MATCHED",
    "driver_id": "driver_042",
    "updated_at": "2024-01-15T10:30:15Z",
})

client.put_cell(trip_id, "status", {
    "state": "IN_TRIP",
    "updated_at": "2024-01-15T10:35:00Z",
})

# 查询最新状态
latest_status = client.get_cell_latest(trip_id, "status")
# 返回: {"state": "IN_TRIP", "updated_at": "2024-01-15T10:35:00Z"}

# 查询状态变更历史
history = client.get_cell_history(trip_id, "status")
# 返回最近 10 条状态变更记录

6.3 Schemaless 的分片策略

Schemaless 将数据分布到 4096 个逻辑分片上,每个分片对应一个 MySQL 数据库实例中的一张表。分片策略的关键设计:

graph TB
    subgraph "Schemaless 分片架构"
        CLIENT[Schemaless 客户端] --> ROUTER[分片路由层]

        ROUTER --> |"hash(row_key) mod 4096"| SHARD_MAP[分片映射表]

        SHARD_MAP --> CLUSTER1["MySQL 集群 1<br/>分片 0-1023"]
        SHARD_MAP --> CLUSTER2["MySQL 集群 2<br/>分片 1024-2047"]
        SHARD_MAP --> CLUSTER3["MySQL 集群 3<br/>分片 2048-3071"]
        SHARD_MAP --> CLUSTER4["MySQL 集群 4<br/>分片 3072-4095"]

        CLUSTER1 --> PRIMARY1["主节点"]
        CLUSTER1 --> REPLICA1A["副本 1"]
        CLUSTER1 --> REPLICA1B["副本 2"]

        CLUSTER2 --> PRIMARY2["主节点"]
        CLUSTER2 --> REPLICA2A["副本 1"]
        CLUSTER2 --> REPLICA2B["副本 2"]
    end

    subgraph "变更通知"
        PRIMARY1 --> |"Binlog"| CDC["变更数据捕获<br/>(CDC)"]
        PRIMARY2 --> |"Binlog"| CDC
        CDC --> KAFKA["Kafka"]
        KAFKA --> CONSUMER["下游消费者"]
    end

6.4 追加写入的优势与代价

维度 追加写入(Schemaless) 原地更新(传统 RDBMS)
写入性能 顺序写入,性能稳定 随机 I/O,性能随碎片增加而下降
历史审计 天然保留所有版本 需要额外的审计表或 CDC
存储成本 高(保留所有版本) 低(仅最新版本)
读取性能 需要额外排序获取最新版本 直接读取
并发冲突 无冲突(每次写入都是新行) 需要行锁或乐观锁
模式变更 灵活(body 是 JSON/Blob) 需要 ALTER TABLE
空间回收 需要后台压缩任务 VACUUM / Purge

七、实时计算与地理空间索引

7.1 动态定价(Surge Pricing)的架构

Uber 的动态定价是其最具技术挑战性的系统之一。它需要在秒级时间窗口内完成以下计算:

  1. 统计每个六边形区域(H3 格子)内的实时供需比
  2. 基于供需比计算倍率(Surge Multiplier)
  3. 将倍率同步到定价引擎
  4. 在乘客请求行程时实时应用
// 动态定价计算核心逻辑(Go)
package surge

import (
    "math"
    "sync"
    "time"
)

// H3Cell 代表一个 H3 六边形区域
type H3Cell struct {
    Index      uint64
    Resolution int
}

// SupplyDemand 记录一个区域的供需数据
type SupplyDemand struct {
    ActiveDrivers   int     // 可用司机数
    OpenRequests    int     // 未匹配的行程请求数
    RecentPickups   int     // 最近 5 分钟内的接单数
    AvgWaitSeconds  float64 // 平均等待时间
}

// SurgeCalculator 动态定价计算器
type SurgeCalculator struct {
    mu              sync.RWMutex
    cellData        map[uint64]*SupplyDemand
    surgeMultiplier map[uint64]float64
    config          SurgeConfig
}

type SurgeConfig struct {
    MinMultiplier     float64       // 最低倍率(通常为 1.0)
    MaxMultiplier     float64       // 最高倍率(通常为 5.0 或 8.0)
    CalculateInterval time.Duration // 计算周期
    SmoothingFactor   float64       // 平滑因子,避免倍率剧烈波动
    DemandThreshold   float64       // 触发加价的供需比阈值
}

func NewSurgeCalculator(config SurgeConfig) *SurgeCalculator {
    return &SurgeCalculator{
        cellData:        make(map[uint64]*SupplyDemand),
        surgeMultiplier: make(map[uint64]float64),
        config:          config,
    }
}

// UpdateSupplyDemand 更新某个区域的供需数据
func (sc *SurgeCalculator) UpdateSupplyDemand(
    cellIndex uint64, data *SupplyDemand,
) {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    sc.cellData[cellIndex] = data
}

// CalculateSurge 计算所有区域的倍率
func (sc *SurgeCalculator) CalculateSurge() {
    sc.mu.Lock()
    defer sc.mu.Unlock()

    for cellIndex, data := range sc.cellData {
        newMultiplier := sc.computeMultiplier(data)
        oldMultiplier, exists := sc.surgeMultiplier[cellIndex]

        if !exists {
            oldMultiplier = 1.0
        }

        // 指数移动平均平滑,避免倍率剧烈波动
        smoothed := oldMultiplier +
            sc.config.SmoothingFactor*(newMultiplier-oldMultiplier)

        // 四舍五入到 0.1
        smoothed = math.Round(smoothed*10) / 10

        // 限制在合法范围内
        if smoothed < sc.config.MinMultiplier {
            smoothed = sc.config.MinMultiplier
        }
        if smoothed > sc.config.MaxMultiplier {
            smoothed = sc.config.MaxMultiplier
        }

        sc.surgeMultiplier[cellIndex] = smoothed
    }
}

func (sc *SurgeCalculator) computeMultiplier(
    data *SupplyDemand,
) float64 {
    if data.ActiveDrivers == 0 {
        return sc.config.MaxMultiplier
    }

    // 供需比 = 需求 / 供给
    demandSupplyRatio := float64(data.OpenRequests) /
        float64(data.ActiveDrivers)

    if demandSupplyRatio <= sc.config.DemandThreshold {
        return 1.0
    }

    // 超过阈值的部分线性映射到倍率
    excessRatio := demandSupplyRatio - sc.config.DemandThreshold
    multiplier := 1.0 + excessRatio*0.5

    return multiplier
}

// GetSurge 获取指定区域的当前倍率
func (sc *SurgeCalculator) GetSurge(cellIndex uint64) float64 {
    sc.mu.RLock()
    defer sc.mu.RUnlock()

    if m, ok := sc.surgeMultiplier[cellIndex]; ok {
        return m
    }
    return 1.0
}

7.2 H3 地理空间索引

Uber 开源了 H3 地理空间索引系统,将地球表面划分为层级化的六边形网格。H3 相比传统的经纬度矩形网格有几个优势:

// H3 索引使用示例(Go)
package geo

import (
    "github.com/uber/h3-go/v4"
)

// FindNearbyDrivers 查找某个位置附近的可用司机
func FindNearbyDrivers(
    lat, lng float64,
    radiusKm float64,
    driverIndex map[h3.Cell][]DriverInfo,
) []DriverInfo {
    // 将经纬度转换为 H3 索引(分辨率 9,约 0.1 km^2)
    center := h3.NewLatLng(lat, lng)
    centerCell := h3.LatLngToCell(center, 9)

    // 计算覆盖指定半径的 K 环大小
    // 分辨率 9 的六边形边长约 174 米
    kRings := int(radiusKm / 0.174)
    if kRings < 1 {
        kRings = 1
    }

    // 获取中心点及周围的所有六边形
    cells := h3.GridDisk(centerCell, kRings)

    var drivers []DriverInfo
    for _, cell := range cells {
        if cellDrivers, ok := driverIndex[cell]; ok {
            drivers = append(drivers, cellDrivers...)
        }
    }

    return drivers
}

type DriverInfo struct {
    DriverID  string
    Lat       float64
    Lng       float64
    ProductID string  // UberX, UberBlack 等
    Available bool
}

7.3 实时数据流水线

Uber 的实时计算架构基于 Apache Kafka 和 Apache Flink 构建。司机位置更新、行程事件、定价变更等数据通过 Kafka 在服务间流转,Flink 负责窗口聚合和复杂事件处理。

# Flink 作业配置示例(YAML)
job:
  name: surge-pricing-aggregator
  parallelism: 128
  checkpoint:
    interval: 30s
    mode: EXACTLY_ONCE
    storage: s3://uber-flink-checkpoints/surge/

sources:
  - name: driver-locations
    type: kafka
    topic: driver.location.updates
    group: surge-calculator
    deserializer: avro
    watermark:
      strategy: bounded-out-of-orderness
      max-delay: 5s

  - name: ride-requests
    type: kafka
    topic: ride.request.events
    group: surge-calculator
    deserializer: avro

sinks:
  - name: surge-output
    type: kafka
    topic: surge.multiplier.updates
    serializer: avro

windows:
  - name: supply-demand-window
    type: sliding
    size: 5m
    slide: 30s
    key: h3_cell_index
    aggregate:
      - field: active_drivers
        function: count_distinct(driver_id)
        filter: "status = 'AVAILABLE'"
      - field: open_requests
        function: count(request_id)
        filter: "status = 'UNMATCHED'"
      - field: avg_wait
        function: avg(wait_seconds)

八、Uber 的可观测性平台

8.1 M3:分布式时序数据库

Uber 在 2016 年开发并开源了 M3,一个高性能的分布式时序数据库,用于存储和查询来自 4000 多个微服务的指标数据。M3 的设计目标是在每秒数十亿数据点的写入规模下保持亚秒级查询延迟。

M3 的架构包含三个核心组件:

graph TB
    subgraph "M3 可观测性平台"
        direction TB

        SERVICES["4000+ 微服务"] --> |"指标数据"| AGGREGATOR["M3 Aggregator<br/>实时聚合与降采样"]

        AGGREGATOR --> |"原始精度(10s)"| M3DB_RAW["M3DB 集群<br/>原始数据<br/>保留 48 小时"]
        AGGREGATOR --> |"1 分钟聚合"| M3DB_1M["M3DB 集群<br/>1 分钟数据<br/>保留 30 天"]
        AGGREGATOR --> |"10 分钟聚合"| M3DB_10M["M3DB 集群<br/>10 分钟数据<br/>保留 1 年"]

        GRAFANA["Grafana 仪表盘"] --> COORD["M3 Coordinator<br/>查询路由"]
        ALERTING["告警引擎"] --> COORD

        COORD --> M3DB_RAW
        COORD --> M3DB_1M
        COORD --> M3DB_10M
    end

8.2 Jaeger:分布式追踪

Uber 开源的 Jaeger 是目前最广泛使用的分布式追踪系统之一。在 Uber 内部,Jaeger 每天采集数十亿个追踪 Span(跨度),帮助工程师定位跨服务调用链中的性能瓶颈和错误根因。

// Jaeger 追踪集成示例(Go)
package main

import (
    "context"
    "fmt"
    "net/http"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "go.opentelemetry.io/otel/trace"
)

func initTracer() (*sdktrace.TracerProvider, error) {
    exporter, err := jaeger.New(
        jaeger.WithAgentEndpoint(
            jaeger.WithAgentHost("jaeger-agent.uber.internal"),
            jaeger.WithAgentPort("6831"),
        ),
    )
    if err != nil {
        return nil, err
    }

    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("ride-pricing-service"),
            attribute.String("environment", "production"),
            attribute.String("domain", "ride"),
        )),
        sdktrace.WithSampler(
            sdktrace.ParentBased(sdktrace.TraceIDRatioBased(0.01)),
        ),
    )

    otel.SetTracerProvider(tp)
    return tp, nil
}

func handlePriceRequest(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    tracer := otel.Tracer("ride-pricing")

    ctx, span := tracer.Start(ctx, "CalculatePrice",
        trace.WithAttributes(
            attribute.String("product_id", r.URL.Query().Get("product")),
        ),
    )
    defer span.End()

    // 子 Span:路径计算
    routeCtx, routeSpan := tracer.Start(ctx, "CalculateRoute")
    route := calculateRoute(routeCtx)
    routeSpan.SetAttributes(
        attribute.Float64("distance_km", route.DistanceKm),
        attribute.Int("duration_minutes", route.DurationMinutes),
    )
    routeSpan.End()

    // 子 Span:获取动态定价倍率
    surgeCtx, surgeSpan := tracer.Start(ctx, "GetSurgeMultiplier")
    surge := getSurgeMultiplier(surgeCtx, route.PickupH3Cell)
    surgeSpan.SetAttributes(
        attribute.Float64("surge_multiplier", surge),
    )
    surgeSpan.End()

    // 子 Span:最终价格计算
    _, priceSpan := tracer.Start(ctx, "ComputeFinalPrice")
    price := computePrice(route, surge)
    priceSpan.SetAttributes(
        attribute.Int64("price_cents", price.Cents),
        attribute.String("currency", price.Currency),
    )
    priceSpan.End()

    span.SetAttributes(
        attribute.Int64("final_price_cents", price.Cents),
    )

    fmt.Fprintf(w, `{"price_cents": %d, "currency": "%s"}`,
        price.Cents, price.Currency)
}

func calculateRoute(ctx context.Context) *Route {
    time.Sleep(50 * time.Millisecond)
    return &Route{DistanceKm: 15.3, DurationMinutes: 22}
}

func getSurgeMultiplier(ctx context.Context, h3Cell uint64) float64 {
    time.Sleep(10 * time.Millisecond)
    return 1.5
}

func computePrice(route *Route, surge float64) *Price {
    baseFare := int64(250)
    perKm := int64(175)
    distanceFare := int64(route.DistanceKm * float64(perKm))
    total := int64(float64(baseFare+distanceFare) * surge)
    return &Price{Cents: total, Currency: "USD"}
}

type Route struct {
    DistanceKm      float64
    DurationMinutes int
    PickupH3Cell    uint64
}

type Price struct {
    Cents    int64
    Currency string
}

8.3 统一日志平台

Uber 的日志平台每天处理超过 100 PB 的日志数据。日志从各个服务通过 sidecar agent 收集,写入 Kafka,再经过 Flink 进行实时过滤和结构化处理后,分流到 Elasticsearch(近实时查询)和 HDFS(长期归档)。

日志平台的关键设计决策:

决策 选择 原因
日志收集 Sidecar Agent 与应用解耦,不影响服务 GC
传输通道 Kafka 削峰填谷,解耦生产与消费
实时索引 Elasticsearch 全文搜索能力强
长期存储 HDFS + Parquet 成本低,支持批量分析
采样策略 按服务优先级差异化采样 控制成本,关键服务全量

九、工程案例:行程定价服务的 DOMA 重构

9.1 重构前的混乱状态

2018 年之前,Uber 的行程定价涉及 12 个独立的微服务,分布在 4 个不同团队中。一次定价请求的调用链如下:

乘客 App
  -> API Gateway
    -> Fare Estimator(预估价格)
      -> Route Service(路径计算)
      -> Surge Service(动态定价)
      -> Product Service(产品配置)
        -> City Config Service(城市配置)
        -> Vehicle Type Service(车型配置)
      -> Promotion Service(优惠券)
        -> User Profile Service(用户画像)
        -> Loyalty Service(会员等级)
      -> Tax Service(税费计算)
        -> Region Config Service(区域配置)
      -> Payment Method Service(支付方式检查)

这个架构存在以下问题:

  1. 延迟链过长:12 个服务的串行调用导致 P99 延迟超过 800ms
  2. 归属混乱:Fare Estimator 由出行团队维护,但 Promotion Service 由增长团队维护,两者的需求经常冲突
  3. 重复计算:预估价格和最终价格使用不同的服务路径,导致乘客看到的预估价和实际扣款不一致
  4. 测试困难:本地开发需要启动所有 12 个服务,环境搭建需要半天

9.2 DOMA 重构方案

按照 DOMA 的原则,团队将定价相关的 12 个服务重组为一个 Pricing Domain(定价领域),并设立了领域网关:

graph TB
    subgraph "重构后:Pricing Domain"
        GATEWAY["Pricing Domain Gateway<br/>定价领域网关"]

        subgraph "领域内部(对外不可见)"
            CORE["Pricing Core<br/>核心定价引擎"]
            SURGE["Surge Calculator<br/>动态定价"]
            PROMO["Promotion Engine<br/>优惠计算"]
            TAX["Tax Calculator<br/>税费计算"]
            CONFIG["Pricing Config<br/>统一配置"]
        end

        GATEWAY --> CORE
        CORE --> SURGE
        CORE --> PROMO
        CORE --> TAX
        CORE --> CONFIG
    end

    APP["乘客 App"] --> EDGE["API Gateway"]
    EDGE --> GATEWAY

    ROUTE["Route Service<br/>(地图领域)"] -.-> |"领域间调用"| GATEWAY
    PAYMENT["Payment Service<br/>(支付领域)"] -.-> |"领域间调用"| GATEWAY
    USER["User Service<br/>(用户领域)"] -.-> |"领域间调用"| GATEWAY

9.3 重构过程

重构分为三个阶段,总共耗时 8 个月:

第一阶段(2 个月):接口收拢

将所有外部对定价服务的直接调用收拢到 Pricing Domain Gateway。这一阶段不改变内部实现,只在入口处增加了一层代理。

// 第一阶段:Gateway 作为代理层(Java)
public class PricingGatewayPhase1 implements PricingDomainGateway {

    private final FareEstimatorClient fareEstimator;
    private final SurgeServiceClient surgeService;
    private final PromotionServiceClient promotionService;

    @Override
    public PriceEstimate estimatePrice(EstimateRequest request) {
        // 第一阶段:直接代理到原有服务
        // 没有改变任何内部逻辑
        FareEstimate fare = fareEstimator.estimate(
            request.getPickup(),
            request.getDropoff(),
            request.getProductId()
        );

        double surgeMultiplier = surgeService.getSurge(
            request.getPickup().getH3Cell()
        );

        PromotionDiscount discount = promotionService.calculate(
            request.getRiderId(),
            fare.getBaseFare()
        );

        return PriceEstimate.builder()
            .baseFare(fare.getBaseFare())
            .surgeMultiplier(surgeMultiplier)
            .discount(discount.getAmount())
            .finalPrice(
                fare.getBaseFare() * surgeMultiplier
                - discount.getAmount()
            )
            .build();
    }
}

第二阶段(4 个月):内部合并

将 12 个微服务合并为 5 个(Pricing Core、Surge Calculator、Promotion Engine、Tax Calculator、Pricing Config),消除冗余的中间服务和重复逻辑。

关键的合并决策:

原服务 合并到 原因
Fare Estimator Pricing Core 核心定价逻辑
Product Service Pricing Config 配置类服务统一管理
City Config Service Pricing Config 配置类服务统一管理
Vehicle Type Service Pricing Config 配置类服务统一管理
Region Config Service Pricing Config 配置类服务统一管理
Surge Service Surge Calculator 保持独立,计算密集型
Promotion Service Promotion Engine 保持独立,业务逻辑独立
Loyalty Service Promotion Engine 与优惠强相关
User Profile Service 移出定价领域 归属用户领域
Tax Service Tax Calculator 保持独立,合规要求
Payment Method Service 移出定价领域 归属支付领域
Route Service 保持独立 归属地图领域

第三阶段(2 个月):性能优化

在合并完成后,对定价领域的内部调用进行了并行化和缓存优化:

// 第三阶段:并行化与缓存优化后的定价核心(Java)
public class PricingCoreOptimized {

    private final SurgeCalculator surgeCalculator;
    private final PromotionEngine promotionEngine;
    private final TaxCalculator taxCalculator;
    private final PricingConfig pricingConfig;
    private final LoadingCache<String, ProductConfig> configCache;

    public PricingCoreOptimized(
            SurgeCalculator surgeCalculator,
            PromotionEngine promotionEngine,
            TaxCalculator taxCalculator,
            PricingConfig pricingConfig) {
        this.surgeCalculator = surgeCalculator;
        this.promotionEngine = promotionEngine;
        this.taxCalculator = taxCalculator;
        this.pricingConfig = pricingConfig;

        this.configCache = CacheBuilder.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(5, TimeUnit.MINUTES)
            .build(new CacheLoader<String, ProductConfig>() {
                @Override
                public ProductConfig load(String productId) {
                    return pricingConfig.getProductConfig(productId);
                }
            });
    }

    public PriceResult calculate(PriceRequest request) {
        // 从缓存获取产品配置
        ProductConfig config = configCache.getUnchecked(
            request.getProductId()
        );

        // 并行获取动态定价倍率和优惠信息
        CompletableFuture<Double> surgeFuture = CompletableFuture
            .supplyAsync(() -> surgeCalculator.getSurge(
                request.getPickupH3Cell()
            ));

        CompletableFuture<Discount> promoFuture = CompletableFuture
            .supplyAsync(() -> promotionEngine.calculate(
                request.getRiderId(),
                request.getProductId()
            ));

        // 等待并行结果
        double surgeMultiplier = surgeFuture.join();
        Discount discount = promoFuture.join();

        // 基础费用计算
        long baseFareCents = config.getBaseFareCents()
            + (long) (request.getDistanceKm()
                * config.getPerKmCents())
            + (long) (request.getDurationMinutes()
                * config.getPerMinuteCents());

        // 应用动态倍率
        long surgedFareCents = (long) (baseFareCents * surgeMultiplier);

        // 应用优惠
        long afterDiscountCents = Math.max(
            surgedFareCents - discount.getAmountCents(),
            config.getMinimumFareCents()
        );

        // 税费计算
        TaxResult tax = taxCalculator.calculate(
            afterDiscountCents,
            request.getPickupRegion()
        );

        long finalPriceCents = afterDiscountCents + tax.getTaxCents();

        return PriceResult.builder()
            .baseFareCents(baseFareCents)
            .surgeMultiplier(surgeMultiplier)
            .discountCents(discount.getAmountCents())
            .taxCents(tax.getTaxCents())
            .finalPriceCents(finalPriceCents)
            .currency(config.getCurrency())
            .breakdown(buildBreakdown(
                baseFareCents, surgeMultiplier,
                discount, tax, config
            ))
            .build();
    }

    private PriceBreakdown buildBreakdown(
            long baseFareCents,
            double surgeMultiplier,
            Discount discount,
            TaxResult tax,
            ProductConfig config) {
        return PriceBreakdown.builder()
            .baseFare(formatCents(baseFareCents, config.getCurrency()))
            .surgeAmount(formatCents(
                (long)(baseFareCents * (surgeMultiplier - 1)),
                config.getCurrency()
            ))
            .discount(formatCents(
                discount.getAmountCents(),
                config.getCurrency()
            ))
            .tax(formatCents(tax.getTaxCents(), config.getCurrency()))
            .build();
    }

    private String formatCents(long cents, String currency) {
        return String.format("%s %.2f", currency, cents / 100.0);
    }
}

9.4 重构效果

指标 重构前 重构后 改善
服务数量 12 个 5 个 -58%
负责团队 4 个(归属模糊) 1 个(Pricing Team) 归属清晰
P99 延迟 800ms 280ms -65%
预估价格一致性 85%(预估 vs 实际) 99.2% 显著提升
本地开发启动时间 30 分钟 5 分钟 -83%
月度事故数 平均 3 次 平均 0.5 次 -83%

十、微服务治理的教训与启示

10.1 Uber 的核心教训

Uber 十年架构演进留下了几条深刻教训:

教训一:微服务数量不是越多越好。服务拆分的粒度应该与团队规模和组织能力匹配。Uber 在高峰期有 4000 多个微服务但只有约 2000 名工程师,平均每人维护 2 个服务,这已经超出了可持续的管理范围。

教训二:服务拆分应该由领域驱动,而非团队组织。按团队结构拆分微服务容易导致领域概念被分散到多个服务中(如定价的例子),增加跨团队协调成本。

教训三:自治需要有边界。微服务自治的前提是明确的接口契约和分层约束。没有治理的自治等于混乱。

教训四:基础设施先行。可观测性(Jaeger、M3)、服务发现(Ringpop)、数据存储(Schemaless)等基础设施必须在微服务爆发之前就位。Uber 在基础设施方面的早期投入使其在微服务膨胀阶段仍然能够保持系统的可运维性。

10.2 DOMA 的适用条件

DOMA 并非银弹,它的适用条件相当明确:

条件 适合 DOMA 不适合 DOMA
微服务数量 100+ < 50
工程团队规模 200+ 人 < 50 人
业务线数量 多条独立业务线 单一产品
组织结构 多个独立产品团队 单一扁平团队
技术栈 异构技术栈 统一技术栈

对于中小规模的微服务系统(50 个以下),DOMA 的分层和网关机制反而会增加不必要的复杂性。在这种场景下,更实际的做法是通过 API 规范、服务目录和变更审查来管理微服务间的依赖关系。

10.3 微服务治理的通用原则

从 Uber 的经验中可以提炼出几条微服务治理的通用原则:

原则一:服务必须有明确的归属者。每个服务必须有一个明确的团队负责其全生命周期——开发、部署、运维、下线。没有归属者的服务是定时炸弹。

原则二:依赖方向必须是单向的。分层架构的依赖只能从上到下,绝不允许反向依赖或跨层调用。违反这一原则的任何”临时方案”都应该被拒绝。

原则三:接口变更必须向后兼容。领域网关暴露的接口一旦发布,就成为契约。任何 Breaking Change 都必须通过版本化机制管理,并给下游消费者足够的迁移窗口。

原则四:可观测性是治理的前提。没有可观测性,就无法知道服务之间的实际调用关系、延迟分布和错误率。Uber 在 M3 和 Jaeger 上的投入使其能够准确识别出需要合并或拆分的服务。

原则五:定期审计服务健康度。Uber 每季度对所有服务进行健康度评估,指标包括:是否有活跃的维护者、最近一次部署时间、测试覆盖率、SLA 达标率、依赖服务数量。健康度低于阈值的服务会被标记为需要合并或下线的候选。

10.4 从 Uber 到你的系统

Uber 的架构规模对大多数团队来说是遥不可及的,但其教训具有普遍性。即使你的系统只有 10 个微服务,以下实践仍然值得借鉴:

  1. 画出服务依赖图:不是架构师脑中的理想图,而是基于实际调用数据的真实依赖图。工具如 Jaeger 的服务拓扑视图可以帮助做到这一点
  2. 定义服务归属:在 README 或服务目录中明确每个服务的负责团队和值班人员
  3. 设定服务拆分/合并的量化标准:例如,“如果一个服务超过 6 个月没有独立部署过,考虑合并到调用方”
  4. 投入可观测性:分布式追踪、结构化日志、指标监控三者缺一不可
  5. 限制跨服务的直接数据库访问:数据应该通过 API 访问,而非直接查询其他服务的数据库

最终,架构设计不是追求某个理想模式,而是在当前的业务规模、团队能力和技术约束下做出最合理的取舍。Uber 的经历证明了一点:好的架构不是一步到位的,而是在持续演进中逐步逼近合理的平衡点

参考资料

  1. Adam Rogal, et al. “Introducing Domain-Oriented Microservice Architecture.” Uber Engineering Blog, 2020. https://eng.uber.com/microservice-architecture/
  2. Matt Ranney. “What I Wish I Had Known Before Scaling Uber to 1000 Services.” QCon, 2016.
  3. Jeff Hodges, et al. “Ringpop: Cooperative, Decentralized Service Discovery and Membership.” Uber Engineering Blog, 2015. https://eng.uber.com/ringpop-open-source-nodejs-library/
  4. Jakob Holdgaard Thomsen, et al. “Designing Schemaless, Uber Engineering’s Scalable Datastore Using MySQL.” Uber Engineering Blog, 2016. https://eng.uber.com/schemaless-part-one-mysql-datastore/
  5. Yuri Shkuro. “Evolving Distributed Tracing at Uber Engineering.” Uber Engineering Blog, 2017. https://eng.uber.com/distributed-tracing/
  6. Martin Liu, et al. “M3: Uber’s Open Source, Large-scale Metrics Platform for Prometheus.” Uber Engineering Blog, 2018. https://eng.uber.com/m3/
  7. Isaac Brodsky, et al. “H3: Uber’s Hexagonal Hierarchical Spatial Index.” Uber Engineering Blog, 2018. https://eng.uber.com/h3/
  8. Evan Klitzke. “Why Uber Engineering Switched from Postgres to MySQL.” Uber Engineering Blog, 2016. https://eng.uber.com/postgres-to-mysql-migration/
  9. Yuri Shkuro. “Mastering Distributed Tracing.” Packt Publishing, 2019.
  10. Sam Newman. “Building Microservices: Designing Fine-Grained Systems.” 2nd Edition, O’Reilly Media, 2021.

上一篇:Netflix 架构全景 下一篇:微信架构

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .