土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】特性开关架构:安全发布与实验的工程实践

文章导航

分类入口
architecture
标签入口
#feature-flags#LaunchDarkly#Unleash#release-management#experimentation

目录

2023 年,某大型 SaaS 公司的工程师在代码仓库中搜索 feature flag 相关的代码,发现了一个触目惊心的数字:系统中有超过 4200 个 flag,其中 60% 已经超过一年没有被修改过。更糟糕的是,没有人能确定哪些 flag 还在生效、哪些可以安全删除。一位新入职的工程师在排查线上故障时,花了三个小时才理解一段被五层 flag 嵌套包裹的业务逻辑——每一层 flag 都是不同时期、不同团队、为不同目的加的,没有文档,没有负责人。

这不是个案。Google 的工程团队在内部文档中承认,flag 管理是其代码库中最棘手的技术债务来源之一。GitHub 在 2021 年的工程博客中提到,他们曾经积累了超过 1500 个 flag,最终不得不启动专项清理。Netflix 的工程师在 QCon 演讲中透露,他们每季度要花费大量工程时间来审计和清理过期的 flag。

特性开关(Feature Flag)本身是一个简单的概念——在运行时通过配置决定代码路径。但当它从”一个临时开关”演变为”数千个 flag 的管理系统”时,架构问题就浮出水面了:

这篇文章要回答的核心问题是:Feature Flag 不只是 if-else——如何管理数千个 flag 的生命周期?


一、特性开关的本质与四种分类

1.1 最简模型

特性开关的最简形式就是一个运行时条件判断:

if (featureFlags.isEnabled("new-checkout-flow")) {
    return newCheckoutService.process(order);
} else {
    return legacyCheckoutService.process(order);
}

看起来和普通的 if-else 没有区别。区别在于:new-checkout-flow 这个判断条件不是写死在代码里的,而是来自外部配置——可以是配置文件、数据库、远程服务、甚至是一个实时推送的值。这意味着你可以在不发版、不重启的情况下改变系统行为。

1.2 Martin Fowler 的四象限分类

2017 年,Pete Hodgson 在 Martin Fowler 网站上发表了一篇经典文章 “Feature Toggles (aka Feature Flags)”,提出了特性开关的四象限分类。这个分类至今仍是业界最广泛使用的框架:

分类 英文名 生命周期 动态性 典型场景 所有者
发布开关 Release Toggle 短期(天到周) 功能灰度发布、主干开发 开发团队
实验开关 Experiment Toggle 中期(周到月) A/B 测试、多变量实验 产品/增长团队
运维开关 Ops Toggle 长期(月到年) 降级开关、熔断控制 SRE/运维团队
权限开关 Permission Toggle 长期(可能永久) 付费功能、内测资格 产品/商务团队

这四种 flag 的核心差异在于生命周期预期管理责任。混淆这四种类型是 flag 管理混乱的根源。

1.3 发布开关(Release Toggle)

发布开关是最常用也最容易管理的类型。它的唯一目的是让未完成的功能可以合入主干(Trunk-Based Development)而不影响生产用户。

典型场景:

# flag 配置
new-search-algorithm:
  type: release
  owner: search-team
  created: 2026-03-01
  expected-removal: 2026-04-01
  description: "新的搜索排序算法,目前仅对内部用户开放"
  default: false
  targeting:
    - segment: internal-employees
      value: true

发布开关的关键原则:

  1. 生命周期短。一个发布开关应该在功能完全上线后立即移除。超过 30 天未移除的发布开关就是技术债务。
  2. 求值逻辑简单。通常只是 true/false,最多加一个用户分段。不要给发布开关加复杂的定向规则。
  3. 代码中要保留”清除路径”。写 flag 的时候就应该想好怎么删——使用 @Deprecated 注解或 TODO(flag-cleanup) 注释标记。

1.4 实验开关(Experiment Toggle)

实验开关用于 A/B 测试和多变量实验。它和发布开关的关键区别是:实验开关需要统计学上的严格性——用户分组必须稳定、随机、可追溯。

# 实验开关示例:按钮颜色 A/B 测试
variant = feature_flags.get_variant(
    "checkout-button-color",
    user_id=user.id,
    default="blue"
)

if variant == "blue":
    render_blue_button()
elif variant == "green":
    render_green_button()
elif variant == "orange":
    render_orange_button()

# 上报实验事件
analytics.track("checkout-button-impression", {
    "variant": variant,
    "user_id": user.id,
    "session_id": session.id
})

实验开关的特殊要求:

  1. 一致性哈希分组。同一个用户在实验期间内必须始终看到同一个变体。不能用 Math.random(),要用 hash(user_id + experiment_key) % 100 这样的确定性分组。
  2. 互斥与分层。当多个实验同时运行时,需要避免交叉污染。常用的方案是分层实验(Layered Experiment)——把流量先按层(Layer)划分,同一层内的实验互斥,不同层的实验正交。
  3. 生命周期明确。实验有明确的开始和结束时间,结束后必须决定”全量上线”或”回滚”,不能让实验一直跑下去。

1.5 运维开关(Ops Toggle)

运维开关是系统的”断路器”。当线上出现问题时,可以通过运维开关快速降级或禁用某些功能,而不需要回滚代码。

// 运维开关示例:推荐服务降级
func GetRecommendations(ctx context.Context, userID string) ([]Product, error) {
    if !featureFlags.IsEnabled("recommendation-service-enabled") {
        // 降级到热门商品列表
        return getPopularProducts(ctx)
    }

    resp, err := recommendationClient.GetPersonalized(ctx, userID)
    if err != nil {
        return getPopularProducts(ctx)
    }
    return resp.Products, nil
}

运维开关的特点:

  1. 长期存在。和发布开关不同,运维开关可能需要永久保留在代码中。它们是系统弹性的一部分。
  2. 操作频率低。大多数时间运维开关都处于默认状态,只在故障时才会被翻转。
  3. 需要快速生效。运维开关的状态变更必须在秒级甚至毫秒级传播到所有实例。这对 flag 的传输机制有很高要求。

1.6 权限开关(Permission Toggle)

权限开关控制的是”谁能看到什么”。它和前三种开关的最大区别是:它可能永远不会被删除,因为它是业务逻辑的一部分。

// 权限开关示例:高级功能控制
async function renderDashboard(user: User): Promise<Dashboard> {
    const dashboard = new Dashboard();

    dashboard.addWidget(new BasicAnalytics());

    if (await featureFlags.isEnabled("advanced-analytics", { userId: user.id })) {
        dashboard.addWidget(new AdvancedAnalytics());
        dashboard.addWidget(new PredictiveInsights());
    }

    if (await featureFlags.isEnabled("custom-reports", { userId: user.id })) {
        dashboard.addWidget(new CustomReportBuilder());
    }

    return dashboard;
}

权限开关的管理挑战在于:它和用户的付费状态、角色、组织关系紧密耦合。简单的 true/false 不够用,需要复杂的定向规则(Targeting Rules)。

1.7 四种开关的生命周期对比

下面这张状态机图展示了四种 flag 的完整生命周期:

stateDiagram-v2
    [*] --> Created : 创建 Flag

    state "Release Toggle" as RT {
        Created --> DevTesting : 开发测试
        DevTesting --> InternalRollout : 内部灰度
        InternalRollout --> PercentageRollout : 百分比放量
        PercentageRollout --> FullRollout : 全量上线
        FullRollout --> CodeCleanup : 清理代码
        CodeCleanup --> [*] : 删除 Flag
    }

    state "Experiment Toggle" as ET {
        Created --> ExperimentDesign : 设计实验
        ExperimentDesign --> Running : 开始实验
        Running --> AnalyzingResults : 分析结果
        AnalyzingResults --> WinnerSelected : 选择优胜方案
        WinnerSelected --> FullRollout2 : 全量上线
        FullRollout2 --> CodeCleanup2 : 清理代码
        CodeCleanup2 --> [*] : 删除 Flag
    }

    state "Ops Toggle" as OT {
        Created --> Active : 激活
        Active --> Triggered : 触发降级
        Triggered --> Active : 恢复正常
        Active --> Retired : 功能下线
        Retired --> [*] : 删除 Flag
    }

    state "Permission Toggle" as PT {
        Created --> Configured : 配置规则
        Configured --> InUse : 生产使用
        InUse --> RuleUpdated : 规则更新
        RuleUpdated --> InUse : 继续使用
        InUse --> Deprecated : 功能下线
        Deprecated --> [*] : 删除 Flag
    }

关键观察:发布开关和实验开关有明确的终态——它们应该被删除。运维开关和权限开关可能长期存在,但也需要定期审计。


二、特性开关的求值架构

2.1 求值的核心流程

当应用代码调用 featureFlags.isEnabled("my-flag", context) 时,背后发生了什么?一个完整的求值流程如下:

flowchart LR
    A["应用代码调用\nSDK.isEnabled()"] --> B["本地缓存查找\nflag 定义"]
    B -->|命中| C["执行求值规则"]
    B -->|未命中| D["从远程获取\nflag 定义"]
    D --> C
    C --> E{"匹配用户定向规则?"}
    E -->|是| F["返回定向值"]
    E -->|否| G{"匹配分段规则?"}
    G -->|是| H["返回分段值"]
    G -->|否| I{"匹配百分比规则?"}
    I -->|是| J["一致性哈希分桶\n返回变体值"]
    I -->|否| K["返回默认值"]

这个流程的设计有两个关键约束:

  1. 求值必须在本地完成。每次 flag 求值都去远程服务查询是不可接受的——一个请求可能需要评估 20-30 个 flag,每个 flag 都走网络就会引入数百毫秒的延迟。所以 flag 定义必须缓存在应用进程内存中。
  2. 规则变更必须快速传播。当你在管理后台翻转一个运维开关时,不能等到下一次轮询才生效。所以需要实时推送机制。

2.2 SDK 架构的三种模式

业界的 flag SDK 架构主要有三种模式:

模式一:轮询模式(Polling)

SDK 每隔固定时间(通常 30 秒到 5 分钟)从 flag 服务拉取最新的 flag 定义,更新本地缓存。

# 轮询模式的伪代码
class PollingFlagClient:
    def __init__(self, api_url: str, poll_interval: int = 30):
        self._cache: dict[str, FlagDefinition] = {}
        self._api_url = api_url
        self._poll_interval = poll_interval
        self._start_polling()

    def _start_polling(self):
        """后台线程定期拉取 flag 定义"""
        def poll_loop():
            while True:
                try:
                    response = requests.get(
                        f"{self._api_url}/flags",
                        headers={"If-None-Match": self._etag}
                    )
                    if response.status_code == 200:
                        self._cache = response.json()
                        self._etag = response.headers.get("ETag")
                except Exception as e:
                    logger.warning(f"Flag polling failed: {e}")
                time.sleep(self._poll_interval)

        thread = threading.Thread(target=poll_loop, daemon=True)
        thread.start()

    def is_enabled(self, flag_key: str, context: dict = None) -> bool:
        flag_def = self._cache.get(flag_key)
        if flag_def is None:
            return False  # 未知 flag 返回 false(安全默认值)
        return self._evaluate(flag_def, context)

优点:实现简单,对网络要求低。缺点:flag 变更的传播延迟等于轮询间隔。

模式二:流式推送模式(Streaming / SSE)

SDK 和 flag 服务建立长连接(通常是 Server-Sent Events),flag 变更时服务端主动推送更新。

// 流式推送模式的伪代码(Java)
public class StreamingFlagClient implements FlagClient {
    private final ConcurrentHashMap<String, FlagDefinition> cache;
    private final EventSource eventSource;

    public StreamingFlagClient(String streamUrl, String sdkKey) {
        this.cache = new ConcurrentHashMap<>();

        EventHandler handler = new EventHandler() {
            @Override
            public void onMessage(String event, MessageEvent messageEvent) {
                if ("put".equals(event)) {
                    // 全量更新
                    Map<String, FlagDefinition> allFlags =
                        JsonParser.parseFlags(messageEvent.getData());
                    cache.clear();
                    cache.putAll(allFlags);
                } else if ("patch".equals(event)) {
                    // 增量更新单个 flag
                    FlagPatch patch = JsonParser.parsePatch(messageEvent.getData());
                    cache.put(patch.getKey(), patch.getDefinition());
                } else if ("delete".equals(event)) {
                    String key = JsonParser.parseDeleteKey(messageEvent.getData());
                    cache.remove(key);
                }
            }

            @Override
            public void onError(Throwable t) {
                logger.error("SSE connection error, will retry", t);
            }
        };

        this.eventSource = new EventSource.Builder(handler, URI.create(streamUrl))
            .header("Authorization", sdkKey)
            .reconnectTime(Duration.ofSeconds(1))
            .build();
        this.eventSource.start();
    }

    @Override
    public boolean isEnabled(String flagKey, EvaluationContext context) {
        FlagDefinition def = cache.get(flagKey);
        if (def == null) {
            return false;
        }
        return evaluate(def, context);
    }
}

优点:变更传播延迟低(毫秒级)。缺点:需要维护长连接,对服务端有连接数压力。

模式三:边缘求值模式(Edge Evaluation)

SDK 不直接连接中心 flag 服务,而是连接部署在边缘节点(CDN PoP 或本地代理)的 flag 代理。代理负责缓存 flag 定义并转发变更。

应用实例 --> 本地 Flag Relay(同一数据中心)--> 中心 Flag 服务

应用实例 --> 边缘 Flag Relay(CDN PoP)--> 中心 Flag 服务

这种模式的优势在于:

  1. 降低中心服务的连接数。1000 个应用实例不需要各自建立到中心服务的长连接,而是由每个数据中心的一个 Relay 代理。
  2. 提高可用性。即使中心 flag 服务宕机,边缘节点上的缓存仍然可以提供服务。
  3. 降低跨区域延迟。边缘节点和应用实例在同一网络区域内。

2.3 求值引擎的核心数据结构

一个 flag 的完整定义通常包含以下结构:

{
  "key": "new-checkout-flow",
  "version": 42,
  "on": true,
  "prerequisites": [
    { "key": "payment-service-v2", "variation": 0 }
  ],
  "targets": [
    {
      "variation": 0,
      "values": ["user-123", "user-456"]
    }
  ],
  "rules": [
    {
      "id": "rule-1",
      "clauses": [
        {
          "attribute": "country",
          "op": "in",
          "values": ["CN", "JP", "KR"]
        },
        {
          "attribute": "plan",
          "op": "in",
          "values": ["enterprise", "pro"]
        }
      ],
      "variation_or_rollout": {
        "rollout": {
          "variations": [
            { "variation": 0, "weight": 80000 },
            { "variation": 1, "weight": 20000 }
          ],
          "bucket_by": "user_id"
        }
      }
    }
  ],
  "fallthrough": {
    "variation": 1
  },
  "off_variation": 1,
  "variations": [true, false],
  "salt": "abc123"
}

求值引擎按以下顺序处理这个结构:

  1. 检查 on 字段。如果 flag 关闭,直接返回 off_variation
  2. 检查前置条件(Prerequisites)。如果依赖的其他 flag 不满足条件,返回 off_variation
  3. 检查用户定向(Targets)。如果当前用户在白名单中,返回对应的变体。
  4. 依次评估规则(Rules)。每条规则包含多个子句(Clauses),所有子句满足时规则命中。
  5. 如果没有规则命中,走 fallthrough 分支

2.4 一致性哈希与百分比发布

百分比发布是特性开关最核心的能力之一。它的实现依赖一致性哈希:

// 一致性哈希分桶实现
func bucketUser(flagKey, salt, userID, bucketBy string) float64 {
    key := fmt.Sprintf("%s.%s.%s", flagKey, salt, userID)
    hash := sha1.Sum([]byte(key))
    // 取前 15 个十六进制字符,转换为整数
    hashStr := hex.EncodeToString(hash[:])[:15]
    intVal, _ := strconv.ParseInt(hashStr, 16, 64)
    // 归一化到 [0, 1) 区间
    return float64(intVal) / float64(0x1000000000000000)
}

func evaluateRollout(rollout Rollout, flagKey, salt string, context EvalContext) int {
    bucketVal := bucketUser(flagKey, salt, context.GetAttribute(rollout.BucketBy), "")
    var sum float64
    for _, wv := range rollout.Variations {
        sum += float64(wv.Weight) / 100000.0
        if bucketVal < sum {
            return wv.Variation
        }
    }
    return rollout.Variations[len(rollout.Variations)-1].Variation
}

这个设计有几个重要的性质:

  1. 确定性。给定相同的 flagKeysaltuserID,哈希结果始终相同。所以同一个用户总是分到同一个桶里。
  2. 均匀分布。SHA-1 哈希的输出是均匀分布的,所以用户分桶也是均匀的。
  3. 独立性。不同 flag 使用不同的 salt,所以一个用户在 flag A 中分到的桶和在 flag B 中分到的桶是独立的。

2.5 求值性能

在高流量服务中,flag 求值的性能至关重要。以下是一些基准数据:

操作 典型延迟 说明
简单布尔 flag(无规则) 50-200 ns 直接查 HashMap,返回变体
带 3 条规则的 flag 500 ns - 2 us 需要遍历规则和子句
带百分比发布的 flag 1-3 us 需要计算 SHA-1 哈希
带 10 条规则 + 百分比的复杂 flag 5-15 us 规则数量是性能的主要瓶颈

对于一个每请求评估 30 个 flag 的服务,flag 求值的总开销大约在 10-100 微秒之间,相比网络延迟(通常毫秒级)可以忽略不计。但如果 flag 的规则数量失控(比如一个 flag 有上百条规则),求值性能就会成为问题。


三、LaunchDarkly 架构深度解析

3.1 整体架构

LaunchDarkly 是目前市场份额最大的商业特性开关服务。它的架构设计代表了业界的最佳实践。

LaunchDarkly 的核心架构分为三层:

  1. 管理平面(Management Plane)。Web 控制台、REST API、审计日志。负责 flag 的创建、修改、删除。
  2. 数据平面(Data Plane)。流式推送服务(基于 SSE),负责将 flag 变更实时推送给所有 SDK 实例。
  3. 客户端 SDK。嵌入在应用代码中,维护 flag 定义的本地缓存,在本地执行求值逻辑。
                    +-------------------+
                    |   管理控制台       |
                    |   REST API        |
                    +--------+----------+
                             |
                             v
                    +-------------------+
                    |   Flag 存储       |
                    |  (DynamoDB)      |
                    +--------+----------+
                             |
                    +--------v----------+
                    |   流式推送服务     |
                    |  (SSE Endpoints)|
                    +---+------+----+---+
                        |      |    |
              +---------+  +---+  +-+--------+
              v            v       v          v
         +--------+  +--------+  +--------+  +--------+
         | SDK    |  | SDK    |  | SDK    |  | SDK    |
         | 实例 1 |  | 实例 2 |  | 实例 3 |  | 实例 N |
         +--------+  +--------+  +--------+  +--------+
         |本地缓存|  |本地缓存|  |本地缓存|  |本地缓存|
         |求值引擎|  |求值引擎|  |求值引擎|  |求值引擎|
         +--------+  +--------+  +--------+  +--------+

3.2 服务端 SDK vs 客户端 SDK

LaunchDarkly 维护两种不同架构的 SDK:

服务端 SDK(Java、Go、Python、Node.js、Ruby、.NET 等):

// LaunchDarkly 服务端 SDK 使用示例(Java)
LDConfig config = new LDConfig.Builder()
    .events(
        Components.sendEvents()
            .capacity(10000)
            .flushInterval(Duration.ofSeconds(5))
    )
    .dataSource(
        Components.streamingDataSource()
            .initialReconnectDelay(Duration.ofSeconds(1))
    )
    .build();

LDClient client = new LDClient("sdk-key-server-side", config);

// 求值 flag——完全在本地执行,无网络调用
LDContext context = LDContext.builder("user-123")
    .set("plan", "enterprise")
    .set("country", "CN")
    .build();

boolean showNewFeature = client.boolVariation(
    "new-checkout-flow",
    context,
    false  // 默认值,当 flag 不存在或求值失败时使用
);

客户端 SDK(JavaScript、iOS、Android、React Native 等):

// LaunchDarkly 客户端 SDK 使用示例(JavaScript)
import * as LDClient from "launchdarkly-js-client-sdk";

const context = {
    kind: "user",
    key: "user-123",
    name: "Alice",
    email: "alice@example.com",
    custom: {
        plan: "enterprise",
        country: "CN"
    }
};

const client = LDClient.initialize("client-side-id", context);

client.on("ready", () => {
    const showNewFeature = client.variation("new-checkout-flow", false);
    if (showNewFeature) {
        renderNewCheckoutFlow();
    } else {
        renderLegacyCheckoutFlow();
    }
});

// 监听 flag 值变更
client.on("change:new-checkout-flow", (current, previous) => {
    console.log(`Flag changed from ${previous} to ${current}`);
    rerender();
});

3.3 LaunchDarkly Relay Proxy

对于大规模部署,LaunchDarkly 提供了 Relay Proxy——一个部署在客户数据中心内的代理服务:

# LaunchDarkly Relay Proxy 配置
main:
  port: 8030
  heartbeatInterval: 15s

environment:
  production:
    sdkKey: "sdk-key-production"
    mobileKey: "mob-key-production"
    envId: "client-side-id-production"
    prefix: "ld-prod"

  staging:
    sdkKey: "sdk-key-staging"

redis:
  host: "redis.internal"
  port: 6379
  localTtl: 30s

Relay Proxy 的价值:

  1. 降低出站连接数。数百个应用实例只需要 Relay Proxy 与 LaunchDarkly 建立一条连接。
  2. 内网求值。对于有合规要求的场景,用户上下文数据不需要离开数据中心。
  3. 离线容灾。如果与 LaunchDarkly 的连接中断,Relay Proxy 可以从 Redis 缓存中继续提供服务。

3.4 事件处理与分析

LaunchDarkly SDK 在每次 flag 求值后会生成事件数据,用于统计分析和调试:

{
  "kind": "feature",
  "creationDate": 1712000000000,
  "key": "new-checkout-flow",
  "version": 42,
  "context": {
    "kind": "user",
    "key": "user-123"
  },
  "variation": 0,
  "value": true,
  "default": false,
  "reason": {
    "kind": "RULE_MATCH",
    "ruleIndex": 0,
    "ruleId": "rule-1"
  }
}

事件不会实时发送,而是在 SDK 内部的事件缓冲区中积累,定期批量上报(默认每 5 秒一次)。为了控制事件量,SDK 还会做事件去重——同一个用户对同一个 flag 的重复求值,只记录第一次和值变更时的事件。


四、Unleash 开源方案深度解析

4.1 架构概览

Unleash 是目前最流行的开源特性开关平台。它的架构比 LaunchDarkly 简单,但核心思想相同:

+-------------------+     +-------------------+     +--------------+
|   Unleash Web UI  |     |   Unleash API     |     | PostgreSQL   |
|                   |---->|   (Node.js)       |---->|              |
+-------------------+     +--------+----------+     +--------------+
                                   |
                          +--------v----------+
                          |   /api/client      |
                          |   /api/frontend    |
                          +---+------+----+---+
                              |      |    |
                    +---------+  +---+  +-+--------+
                    v            v       v          v
               +--------+  +--------+  +--------+  +--------+
               | SDK    |  | SDK    |  | SDK    |  | SDK    |
               | 实例 1 |  | 实例 2 |  | 实例 3 |  | 实例 N |
               +--------+  +--------+  +--------+  +--------+

4.2 激活策略(Activation Strategies)

Unleash 的核心概念是”激活策略”——预定义的求值规则类型:

// Unleash 内置激活策略
interface ActivationStrategy {
    name: string;
    parameters: Record<string, string>;
    constraints?: Constraint[];
}

// 1. 标准策略(Standard):对所有人开放
const standardStrategy: ActivationStrategy = {
    name: "default",
    parameters: {}
};

// 2. 渐进发布策略(Gradual Rollout)
const gradualRollout: ActivationStrategy = {
    name: "flexibleRollout",
    parameters: {
        rollout: "30",           // 30% 的用户
        stickiness: "userId",    // 按 userId 做一致性分桶
        groupId: "new-checkout"  // 分桶标识
    }
};

// 3. 用户 ID 策略(UserIDs)
const userIdStrategy: ActivationStrategy = {
    name: "userWithId",
    parameters: {
        userIds: "user-123,user-456,user-789"
    }
};

// 4. IP 地址策略(IPs)
const ipStrategy: ActivationStrategy = {
    name: "remoteAddress",
    parameters: {
        IPs: "10.0.0.0/8,192.168.1.0/24"
    }
};

// 5. 自定义策略
const customStrategy: ActivationStrategy = {
    name: "betaUsers",
    parameters: {
        minAccountAge: "90",
        requiredPlan: "pro,enterprise"
    },
    constraints: [
        {
            contextName: "environment",
            operator: "IN",
            values: ["production", "staging"]
        }
    ]
};

4.3 Unleash SDK 使用示例

Unleash 为主流语言都提供了官方 SDK:

// Java SDK
UnleashConfig config = UnleashConfig.builder()
    .appName("my-service")
    .instanceId("instance-1")
    .unleashAPI("http://unleash.internal:4242/api")
    .apiKey("default:development.abc123")
    .synchronousFetchOnInitialisation(true)
    .fetchTogglesInterval(10)  // 每 10 秒轮询一次
    .sendMetricsInterval(60)   // 每 60 秒上报指标
    .build();

Unleash unleash = new DefaultUnleash(config);

// 简单布尔判断
if (unleash.isEnabled("new-checkout-flow")) {
    newCheckout();
}

// 带上下文的判断
UnleashContext context = UnleashContext.builder()
    .userId("user-123")
    .addProperty("plan", "enterprise")
    .addProperty("country", "CN")
    .build();

if (unleash.isEnabled("premium-feature", context)) {
    showPremiumFeature();
}

// 获取变体(用于 A/B 测试)
Variant variant = unleash.getVariant("checkout-button-color", context);
String buttonColor = variant.getPayload()
    .map(Payload::getValue)
    .orElse("blue");
# Python SDK
from UnleashClient import UnleashClient

client = UnleashClient(
    url="http://unleash.internal:4242/api",
    app_name="my-python-service",
    custom_headers={"Authorization": "default:development.abc123"}
)
client.initialize_client()

# 简单判断
if client.is_enabled("new-search-algorithm"):
    use_new_search()

# 带上下文
context = {
    "userId": "user-123",
    "properties": {
        "plan": "enterprise",
        "region": "asia-pacific"
    }
}

if client.is_enabled("regional-pricing", context):
    apply_regional_pricing()

# 变体
variant = client.get_variant("onboarding-flow", context)
if variant.get("name") == "simplified":
    show_simplified_onboarding()
elif variant.get("name") == "guided":
    show_guided_onboarding()

4.4 Unleash Edge

Unleash Edge 是 Unleash 的边缘代理组件(用 Rust 编写),用于在大规模部署中解决连接数和延迟问题:

# Unleash Edge 配置
upstream:
  url: "http://unleash.internal:4242/api"
  tokens:
    - "default:production.xyz789"
  refresh_interval_seconds: 10

server:
  port: 3063
  host: "0.0.0.0"

features:
  offline_mode: false
  token_validation: strict

Unleash Edge 的工作模式:

  1. 边缘模式(Edge Mode):连接上游 Unleash 实例,缓存 flag 定义,为本地 SDK 提供服务。
  2. 离线模式(Offline Mode):从本地文件加载 flag 定义,不需要网络连接。

五、LaunchDarkly vs Unleash vs 自建方案

5.1 综合对比

维度 LaunchDarkly Unleash(开源版) Unleash(企业版) 自建方案
部署方式 SaaS 自托管 SaaS / 自托管 自托管
数据传输 SSE 流式推送 轮询(默认 15s) SSE 流式推送 取决于实现
传播延迟 毫秒级 秒级(轮询间隔) 毫秒级 取决于实现
SDK 语言支持 25+ 语言 15+ 语言 15+ 语言 需要自行开发
A/B 测试 原生支持,内置统计引擎 基础变体支持 高级实验功能 需要集成第三方
审计日志 完整审计追踪 基础事件日志 完整审计追踪 需要自行实现
权限控制 RBAC,项目级 基础 RBAC 高级 RBAC 需要自行实现
边缘代理 Relay Proxy Unleash Edge Unleash Edge 需要自行开发
成本(100 用户) ~$1000/月起 免费 ~$500/月起 开发 + 运维成本
适用规模 中大型团队 小中型团队 中大型团队 有特殊定制需求

5.2 选型决策树

选择哪种方案取决于几个关键因素:

  1. 团队规模和 flag 数量。50 个以下的 flag,环境变量 + 配置文件就够了。50-500 个 flag,Unleash 开源版是好的起点。500 个以上,需要 LaunchDarkly 或 Unleash 企业版的治理能力。
  2. 数据合规要求。如果用户上下文数据不能离开数据中心,需要自托管方案(Unleash 或 LaunchDarkly Relay Proxy)。
  3. 实时性要求。如果运维开关需要毫秒级生效,轮询模式不够用,需要流式推送。
  4. A/B 测试需求。如果有成熟的实验平台(如 Optimizely、Statsig),flag 系统只需要提供基础的变体能力。如果没有,LaunchDarkly 的内置实验功能可以省去集成成本。

5.3 自建方案的最小架构

如果决定自建,以下是一个最小可用的架构:

// 最小自建 flag 服务(Go)
package main

import (
    "encoding/json"
    "log"
    "net/http"
    "sync"
)

type Flag struct {
    Key          string            `json:"key"`
    Enabled      bool              `json:"enabled"`
    Percentage   float64           `json:"percentage,omitempty"`
    AllowList    []string          `json:"allow_list,omitempty"`
    Variants     map[string]int    `json:"variants,omitempty"`
    Metadata     map[string]string `json:"metadata,omitempty"`
}

type FlagStore struct {
    mu    sync.RWMutex
    flags map[string]*Flag
    // SSE 订阅者
    subscribers map[chan []byte]struct{}
    subMu       sync.RWMutex
}

func (s *FlagStore) GetAll() []*Flag {
    s.mu.RLock()
    defer s.mu.RUnlock()
    result := make([]*Flag, 0, len(s.flags))
    for _, f := range s.flags {
        result = append(result, f)
    }
    return result
}

func (s *FlagStore) Update(flag *Flag) {
    s.mu.Lock()
    s.flags[flag.Key] = flag
    s.mu.Unlock()
    // 通知所有 SSE 订阅者
    data, _ := json.Marshal(flag)
    s.subMu.RLock()
    for ch := range s.subscribers {
        select {
        case ch <- data:
        default:
            // 订阅者消费太慢,跳过
        }
    }
    s.subMu.RUnlock()
}

func (s *FlagStore) Subscribe() chan []byte {
    ch := make(chan []byte, 100)
    s.subMu.Lock()
    s.subscribers[ch] = struct{}{}
    s.subMu.Unlock()
    return ch
}

func (s *FlagStore) Unsubscribe(ch chan []byte) {
    s.subMu.Lock()
    delete(s.subscribers, ch)
    s.subMu.Unlock()
    close(ch)
}

// SSE 端点
func (s *FlagStore) StreamHandler(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // 先发送当前所有 flag 的全量数据
    allFlags := s.GetAll()
    data, _ := json.Marshal(allFlags)
    w.Write([]byte("event: put\ndata: "))
    w.Write(data)
    w.Write([]byte("\n\n"))
    flusher.Flush()

    // 订阅增量更新
    ch := s.Subscribe()
    defer s.Unsubscribe(ch)

    for {
        select {
        case msg := <-ch:
            w.Write([]byte("event: patch\ndata: "))
            w.Write(msg)
            w.Write([]byte("\n\n"))
            flusher.Flush()
        case <-r.Context().Done():
            return
        }
    }
}

func main() {
    store := &FlagStore{
        flags:       make(map[string]*Flag),
        subscribers: make(map[chan []byte]struct{}),
    }

    http.HandleFunc("/api/flags", func(w http.ResponseWriter, r *http.Request) {
        switch r.Method {
        case http.MethodGet:
            json.NewEncoder(w).Encode(store.GetAll())
        case http.MethodPut:
            var flag Flag
            json.NewDecoder(r.Body).Decode(&flag)
            store.Update(&flag)
            w.WriteHeader(http.StatusOK)
        }
    })
    http.HandleFunc("/api/stream", store.StreamHandler)

    log.Println("Flag service starting on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

这是一个可以运行的最小方案,但距离生产可用还差很多:没有持久化、没有认证、没有审计日志、没有管理界面。这些都是自建方案的隐性成本。


六、渐进发布与灰度策略

6.1 金丝雀发布与 Flag 的关系

渐进发布(Progressive Delivery)是特性开关最核心的应用场景之一。它将传统的”部署 = 发布”解耦为两个独立的动作:

# 渐进发布配置示例
flag: new-payment-gateway
type: release
rollout_plan:
  - stage: canary
    percentage: 1
    duration: 2h
    success_criteria:
      error_rate: "< 0.1%"
      p99_latency: "< 200ms"
    rollback_trigger:
      error_rate: "> 1%"

  - stage: early_adopters
    percentage: 10
    duration: 24h
    success_criteria:
      error_rate: "< 0.05%"
      conversion_rate: "> 95% of baseline"

  - stage: broad_rollout
    percentage: 50
    duration: 48h
    success_criteria:
      error_rate: "< 0.05%"
      all_metrics_nominal: true

  - stage: full_rollout
    percentage: 100
    monitoring_period: 7d

6.2 基于指标的自动发布

成熟的特性开关系统可以和监控系统集成,实现自动化的渐进发布:

# 自动渐进发布控制器
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class RolloutStage:
    name: str
    percentage: int
    duration_hours: float
    max_error_rate: float
    max_p99_latency_ms: float

class ProgressiveRolloutController:
    def __init__(self, flag_client, metrics_client, flag_key: str):
        self.flag_client = flag_client
        self.metrics_client = metrics_client
        self.flag_key = flag_key

    def execute_rollout(self, stages: list[RolloutStage]) -> bool:
        for stage in stages:
            print(f"Starting stage: {stage.name} ({stage.percentage}%)")
            self.flag_client.set_percentage(self.flag_key, stage.percentage)

            # 等待指标稳定
            time.sleep(300)  # 5 分钟观察期

            # 持续监控
            check_interval = 60  # 每分钟检查一次
            total_checks = int(stage.duration_hours * 3600 / check_interval)

            for i in range(total_checks):
                metrics = self._get_current_metrics()

                if metrics["error_rate"] > stage.max_error_rate:
                    print(f"Error rate {metrics['error_rate']:.4f} exceeds "
                          f"threshold {stage.max_error_rate}. Rolling back.")
                    self._rollback()
                    return False

                if metrics["p99_latency_ms"] > stage.max_p99_latency_ms:
                    print(f"P99 latency {metrics['p99_latency_ms']:.1f}ms exceeds "
                          f"threshold {stage.max_p99_latency_ms}ms. Rolling back.")
                    self._rollback()
                    return False

                time.sleep(check_interval)

            print(f"Stage {stage.name} passed all checks.")

        print("Rollout complete. Flag is at 100%.")
        return True

    def _get_current_metrics(self) -> dict:
        return {
            "error_rate": self.metrics_client.query(
                f'rate(http_errors_total{{flag="{self.flag_key}"}}[5m])'
            ),
            "p99_latency_ms": self.metrics_client.query(
                f'histogram_quantile(0.99, '
                f'rate(http_duration_seconds_bucket{{flag="{self.flag_key}"}}[5m]))'
            ) * 1000
        }

    def _rollback(self):
        self.flag_client.set_percentage(self.flag_key, 0)
        print(f"Flag {self.flag_key} rolled back to 0%.")

6.3 用户分段策略

渐进发布不一定要按随机百分比——很多场景需要按用户属性定向发布:

{
  "key": "new-editor",
  "targeting_rules": [
    {
      "name": "内部员工优先",
      "priority": 1,
      "conditions": [
        { "attribute": "email", "op": "endsWith", "value": "@company.com" }
      ],
      "serve": { "variation": "enabled" }
    },
    {
      "name": "付费用户第二批",
      "priority": 2,
      "conditions": [
        { "attribute": "plan", "op": "in", "values": ["pro", "enterprise"] },
        { "attribute": "account_age_days", "op": ">=", "value": 90 }
      ],
      "serve": {
        "rollout": { "percentage": 30, "bucket_by": "org_id" }
      }
    },
    {
      "name": "免费用户最后",
      "priority": 3,
      "conditions": [
        { "attribute": "plan", "op": "eq", "value": "free" }
      ],
      "serve": {
        "rollout": { "percentage": 5, "bucket_by": "user_id" }
      }
    }
  ],
  "default": { "variation": "disabled" }
}

注意 bucket_by 字段的选择:对于 B2B 产品,通常按 org_id 分桶而不是 user_id——同一个组织的所有用户应该看到一致的体验。


七、特性开关与 A/B 实验

7.1 实验架构

特性开关和 A/B 测试有天然的亲缘关系:两者都需要将用户分到不同的组,并追踪不同组的行为差异。很多公司直接用特性开关系统作为实验平台的基础设施。

一个完整的实验架构包含以下组件:

+---------------+     +------------------+     +------------------+
| Flag 系统     |     | 事件收集服务     |     | 实验分析引擎     |
| 用户分组      |---->| 行为数据收集     |---->| 统计显著性计算   |
| 变体分配      |     | 曝光事件记录     |     | 结果可视化       |
+---------------+     +------------------+     +------------------+

7.2 分层实验模型

当多个实验同时运行时,需要一个分层模型来避免实验之间的交叉污染:

# 分层实验模型
class ExperimentLayer:
    """
    每个层将 100% 的流量分成不重叠的区间。
    同一层内的多个实验占据不同的区间,互斥。
    不同层之间正交(用不同的哈希盐)。
    """
    def __init__(self, layer_id: str, salt: str):
        self.layer_id = layer_id
        self.salt = salt
        self.experiments: list[Experiment] = []

    def assign_experiment(self, user_id: str) -> tuple[str, str] | None:
        bucket = self._hash_to_bucket(user_id)
        for exp in self.experiments:
            if exp.start_bucket <= bucket < exp.end_bucket:
                variant = exp.assign_variant(user_id, self.salt)
                return exp.name, variant
        return None

    def _hash_to_bucket(self, user_id: str) -> int:
        import hashlib
        h = hashlib.sha256(f"{self.salt}:{user_id}".encode()).hexdigest()
        return int(h[:8], 16) % 10000  # 0-9999,精度到 0.01%


class Experiment:
    def __init__(self, name: str, start_bucket: int, end_bucket: int,
                 variants: dict[str, int]):
        self.name = name
        self.start_bucket = start_bucket
        self.end_bucket = end_bucket
        self.variants = variants  # {"control": 5000, "treatment_a": 3000, "treatment_b": 2000}

    def assign_variant(self, user_id: str, layer_salt: str) -> str:
        import hashlib
        h = hashlib.sha256(
            f"{layer_salt}:{self.name}:{user_id}".encode()
        ).hexdigest()
        bucket = int(h[:8], 16) % 10000

        cumulative = 0
        for variant_name, weight in self.variants.items():
            cumulative += weight
            if bucket < cumulative:
                return variant_name
        return list(self.variants.keys())[-1]

分层实验的配置示例:

experiment_layers:
  - id: "ui-layer"
    salt: "ui-2026q2"
    experiments:
      - name: "checkout-redesign"
        traffic: [0, 5000]      # 前 50% 的流量
        variants:
          control: 2500
          new_design: 2500
      - name: "search-ui-test"
        traffic: [5000, 8000]   # 50%-80% 的流量
        variants:
          control: 1500
          variant_a: 1500

  - id: "backend-layer"
    salt: "backend-2026q2"
    experiments:
      - name: "new-ranking-model"
        traffic: [0, 10000]     # 100% 的流量(和 UI 层正交)
        variants:
          control: 5000
          new_model: 5000

7.3 实验指标集成

特性开关系统需要和指标系统集成,才能评估实验效果:

// 实验指标追踪(TypeScript)
interface ExperimentEvent {
    experimentKey: string;
    variantKey: string;
    userId: string;
    eventType: "exposure" | "conversion" | "custom";
    metricKey?: string;
    metricValue?: number;
    timestamp: number;
    properties?: Record<string, string | number>;
}

class ExperimentTracker {
    private exposureCache: Set<string> = new Set();

    trackExposure(experimentKey: string, variantKey: string, userId: string): void {
        // 去重:每个用户每个实验只记录一次曝光
        const dedupKey = `${experimentKey}:${userId}`;
        if (this.exposureCache.has(dedupKey)) {
            return;
        }
        this.exposureCache.add(dedupKey);

        this.emit({
            experimentKey,
            variantKey,
            userId,
            eventType: "exposure",
            timestamp: Date.now()
        });
    }

    trackConversion(
        experimentKey: string,
        variantKey: string,
        userId: string,
        metricKey: string,
        metricValue: number = 1
    ): void {
        this.emit({
            experimentKey,
            variantKey,
            userId,
            eventType: "conversion",
            metricKey,
            metricValue,
            timestamp: Date.now()
        });
    }

    private emit(event: ExperimentEvent): void {
        // 发送到事件收集服务(如 Kafka、Segment)
        eventBus.publish("experiment-events", event);
    }
}

// 使用示例
const tracker = new ExperimentTracker();
const variant = featureFlags.getVariant("pricing-experiment", user.id);

tracker.trackExposure("pricing-experiment", variant, user.id);

// 用户完成购买时
tracker.trackConversion(
    "pricing-experiment",
    variant,
    user.id,
    "purchase_completed",
    order.totalAmount
);

7.4 统计显著性

实验结束后,需要用统计方法判断结果是否可信。最常用的方法是 Z 检验(Z-Test):

# 实验结果分析
import math

def z_test_proportions(
    control_conversions: int,
    control_total: int,
    treatment_conversions: int,
    treatment_total: int
) -> dict:
    """双比例 Z 检验"""
    p_control = control_conversions / control_total
    p_treatment = treatment_conversions / treatment_total
    p_pooled = (control_conversions + treatment_conversions) / (
        control_total + treatment_total
    )

    se = math.sqrt(
        p_pooled * (1 - p_pooled) * (1/control_total + 1/treatment_total)
    )

    if se == 0:
        return {"z_score": 0, "p_value": 1.0, "significant": False}

    z_score = (p_treatment - p_control) / se

    # 双尾检验 p 值(近似计算)
    from statistics import NormalDist
    p_value = 2 * (1 - NormalDist().cdf(abs(z_score)))

    return {
        "control_rate": p_control,
        "treatment_rate": p_treatment,
        "lift": (p_treatment - p_control) / p_control if p_control > 0 else 0,
        "z_score": z_score,
        "p_value": p_value,
        "significant": p_value < 0.05,  # 95% 置信水平
        "sample_size": control_total + treatment_total
    }

# 示例:购买按钮颜色 A/B 测试结果
result = z_test_proportions(
    control_conversions=1200,    # 蓝色按钮:1200 次转化
    control_total=50000,         # 蓝色按钮:50000 次曝光
    treatment_conversions=1350,  # 绿色按钮:1350 次转化
    treatment_total=50000        # 绿色按钮:50000 次曝光
)

# 输出:
# control_rate: 0.024 (2.4%)
# treatment_rate: 0.027 (2.7%)
# lift: 0.125 (12.5%)
# p_value: 0.003
# significant: True

八、测试策略与 Flag 组合爆炸

8.1 组合爆炸问题

N 个布尔 flag 意味着 2^N 种可能的系统状态。如果系统中有 20 个 flag,理论上有超过 100 万种组合。全覆盖测试不现实。

实际的应对策略有几种:

8.2 策略一:测试 flag 的默认路径和新路径

最基本的策略是:对每个 flag,测试两种状态——onoff。不测试 flag 之间的交叉组合。

// JUnit 5 参数化测试示例
@ParameterizedTest
@CsvSource({
    "true, 期望新流程结果",
    "false, 期望旧流程结果"
})
void testCheckoutFlow(boolean newCheckoutEnabled, String expectedBehavior) {
    // 设置 flag 状态
    flagClient.override("new-checkout-flow", newCheckoutEnabled);

    CheckoutResult result = checkoutService.process(testOrder);

    if (newCheckoutEnabled) {
        assertThat(result.getFlowType()).isEqualTo("new");
    } else {
        assertThat(result.getFlowType()).isEqualTo("legacy");
    }
}

8.3 策略二:Pairwise Testing

如果需要测试 flag 之间的交互,可以使用成对测试(Pairwise Testing)——一种组合测试技术,保证任意两个 flag 的所有值组合至少被覆盖一次:

# 使用 allpairspy 生成成对测试用例
from allpairspy import AllPairs

flags = {
    "new_checkout": [True, False],
    "new_payment":  [True, False],
    "new_search":   [True, False],
    "dark_mode":    [True, False],
    "new_pricing":  [True, False],
}

parameters = list(flags.values())
flag_names = list(flags.keys())

print(f"完全组合: {2**len(flags)} = {2**len(flags)} 个测试用例")

test_cases = list(AllPairs(parameters))
print(f"成对测试: {len(test_cases)} 个测试用例")

for i, case in enumerate(test_cases):
    config = dict(zip(flag_names, case))
    print(f"测试用例 {i+1}: {config}")

# 输出示例:
# 完全组合: 32 个测试用例
# 成对测试: 6 个测试用例
# 测试用例 1: {new_checkout: True, new_payment: True, new_search: True, ...}
# 测试用例 2: {new_checkout: True, new_payment: False, new_search: False, ...}
# ...

5 个 flag 从 32 个测试用例减少到 6 个。对于 20 个 flag,成对测试通常只需要 20-30 个用例就能覆盖所有两两组合。

8.4 策略三:测试环境中固定 Flag 状态

在 CI/CD 管道中,为每个测试阶段定义明确的 flag 状态:

# 测试环境 flag 配置
environments:
  unit_test:
    description: "单元测试:所有新 flag 开启,测试新代码路径"
    flags:
      new-checkout-flow: true
      new-payment-gateway: true
      new-search-algorithm: true

  integration_test:
    description: "集成测试:混合状态,模拟真实灰度场景"
    flags:
      new-checkout-flow: true
      new-payment-gateway: false  # 旧支付通道
      new-search-algorithm: true

  regression_test:
    description: "回归测试:所有新 flag 关闭,确保旧路径不被破坏"
    flags:
      new-checkout-flow: false
      new-payment-gateway: false
      new-search-algorithm: false

  production_mirror:
    description: "生产镜像:使用与生产完全相同的 flag 状态"
    source: production  # 从生产环境同步 flag 状态

8.5 策略四:Flag 感知的测试框架

更高级的做法是在测试框架中内建 flag 感知能力:

// Go 测试中的 flag 覆盖
type FlagOverride struct {
    t       *testing.T
    client  *FlagClient
    overrides map[string]interface{}
    originals map[string]interface{}
}

func WithFlags(t *testing.T, client *FlagClient) *FlagOverride {
    return &FlagOverride{
        t:         t,
        client:    client,
        overrides: make(map[string]interface{}),
        originals: make(map[string]interface{}),
    }
}

func (fo *FlagOverride) Set(key string, value interface{}) *FlagOverride {
    fo.originals[key] = fo.client.GetRawValue(key)
    fo.client.SetOverride(key, value)
    fo.overrides[key] = value
    return fo
}

func (fo *FlagOverride) Cleanup() {
    for key, original := range fo.originals {
        fo.client.SetOverride(key, original)
    }
}

// 使用方式
func TestNewCheckout(t *testing.T) {
    flags := WithFlags(t, flagClient).
        Set("new-checkout-flow", true).
        Set("new-payment-gateway", true)
    defer flags.Cleanup()

    result := checkoutService.Process(testOrder)
    assert.Equal(t, "new", result.FlowType)
}

九、Flag 治理与组织实践

9.1 Flag 的元数据管理

每个 flag 都应该有完整的元数据,而不仅仅是一个键值对:

# Flag 元数据规范
flag:
  key: "new-checkout-flow"
  name: "新结账流程"
  description: "使用重构后的三步结账流程替代当前的五步流程"

  # 分类与生命周期
  type: release          # release | experiment | ops | permission
  status: active         # draft | active | complete | deprecated
  created_at: "2026-03-01"
  expected_removal: "2026-04-15"

  # 所有权
  owner_team: "checkout-team"
  owner_person: "alice@company.com"
  jira_ticket: "CHECKOUT-1234"

  # 技术信息
  code_references:
    - file: "src/checkout/service.go"
      line: 42
    - file: "src/checkout/handler.go"
      line: 78
  sdk_languages: ["go", "typescript"]

  # 风险评估
  risk_level: medium     # low | medium | high | critical
  rollback_plan: "将 flag 设为 false 即可回退到旧流程"
  dependencies:
    - "payment-service-v2"

  # 审计
  last_modified_by: "bob@company.com"
  last_modified_at: "2026-03-20"
  change_history:
    - date: "2026-03-01"
      action: "created"
      by: "alice@company.com"
    - date: "2026-03-10"
      action: "enabled for internal"
      by: "alice@company.com"
    - date: "2026-03-20"
      action: "rollout to 10%"
      by: "bob@company.com"

9.2 Flag 命名规范

一个好的命名规范可以从名字本身传达 flag 的类型和意图:

命名规范:{类型前缀}.{功能域}.{描述}

类型前缀:
  release.*   — 发布开关(短期)
  exp.*       — 实验开关(中期)
  ops.*       — 运维开关(长期)
  perm.*      — 权限开关(长期/永久)

示例:
  release.checkout.new-three-step-flow
  exp.search.ranking-model-v3
  ops.recommendation.circuit-breaker
  perm.analytics.advanced-dashboard

反例(不要这样命名):
  flag1
  test123
  new_feature
  johns-experiment

9.3 Flag 审计与合规

在金融、医疗等受监管行业,flag 的变更需要完整的审计追踪:

{
  "audit_event": {
    "id": "evt-2026-0401-001",
    "timestamp": "2026-04-01T10:30:00Z",
    "actor": {
      "user": "alice@company.com",
      "ip": "10.0.1.42",
      "role": "release-manager"
    },
    "action": "flag.targeting.updated",
    "resource": {
      "flag_key": "new-payment-gateway",
      "environment": "production"
    },
    "changes": {
      "before": { "percentage": 10 },
      "after": { "percentage": 50 }
    },
    "approval": {
      "required": true,
      "approved_by": "bob@company.com",
      "approval_ticket": "CHANGE-5678"
    },
    "context": {
      "reason": "Phase 2 rollout after 48h observation",
      "related_incidents": []
    }
  }
}

9.4 GitHub 的 Flag 治理实践

GitHub 在其工程博客中详细分享了他们的特性开关管理实践。以下是关键要点:

规模:GitHub 在任意时间点维护着 800-1500 个活跃 flag。每月创建约 200 个新 flag,同时清理约 150 个旧 flag。

工具链:GitHub 使用自研的特性开关系统 Flipper,它深度集成到 GitHub 的 Rails 单体应用中:

# GitHub Flipper 使用模式(简化示例)
class CheckoutController < ApplicationController
  def create
    if feature_enabled?(, current_user)
      new_checkout_flow
    else
      legacy_checkout_flow
    end
  end

  private

  def new_checkout_flow
    # ...
  end

  def legacy_checkout_flow
    # ...
  end
end

清理流程

  1. 每个 flag 创建时必须填写”预期移除日期”。
  2. 超过预期日期 14 天未移除的 flag,系统自动创建清理 Issue 并分配给 flag 的所有者。
  3. 超过 30 天未响应的 flag,会升级到团队负责人。
  4. 每季度举行一次”Flag 清理日”(Flag Cleanup Day),集中处理积压的过期 flag。

数据驱动:GitHub 追踪以下 flag 健康指标:


十、技术债务:Flag 清理策略

10.1 过期 Flag 的危害

过期的 flag 不仅仅是”代码不整洁”——它们有实际的技术和业务风险:

  1. 认知负荷增加。开发者阅读代码时需要理解 flag 的含义和当前状态,每一个未清理的 flag 都在增加代码的理解成本。
  2. 测试复杂度上升。每个 flag 都会让测试矩阵翻倍(理论上)。未清理的 flag 持续推高测试成本。
  3. 配置事故风险。有人可能误触一个早已应该删除的 flag,导致系统行为异常。2019 年 Knight Capital 的 4.4 亿美元损失事故中,一个被遗忘的旧 flag 被错误激活是根因之一。
  4. 性能开销。虽然单个 flag 的求值开销微乎其微,但数千个 flag 的定义数据占用内存,同步这些数据占用带宽和 CPU。

10.2 过期 Flag 检测

检测过期 flag 的方法有几种:

方法一:基于元数据的静态检测

# 扫描超期未清理的 flag
import datetime

def find_stale_flags(flag_store, grace_period_days: int = 14) -> list[dict]:
    stale_flags = []
    now = datetime.datetime.now()

    for flag in flag_store.get_all_flags():
        if flag.type not in ("release", "experiment"):
            continue  # 运维和权限 flag 不适用自动过期

        if flag.expected_removal is None:
            stale_flags.append({
                "key": flag.key,
                "reason": "没有设置预期移除日期",
                "owner": flag.owner,
                "created": flag.created_at,
                "age_days": (now - flag.created_at).days
            })
            continue

        deadline = flag.expected_removal + datetime.timedelta(days=grace_period_days)
        if now > deadline:
            overdue_days = (now - flag.expected_removal).days
            stale_flags.append({
                "key": flag.key,
                "reason": f"超过预期移除日期 {overdue_days} 天",
                "owner": flag.owner,
                "expected_removal": flag.expected_removal,
                "age_days": (now - flag.created_at).days
            })

    return sorted(stale_flags, key=lambda x: x["age_days"], reverse=True)

方法二:基于运行时数据的动态检测

-- 查找过去 30 天内求值结果从未变化的 flag(始终返回同一个值)
SELECT
    flag_key,
    MIN(variation) AS min_variation,
    MAX(variation) AS max_variation,
    COUNT(DISTINCT variation) AS distinct_variations,
    COUNT(*) AS total_evaluations,
    MIN(timestamp) AS first_seen,
    MAX(timestamp) AS last_seen
FROM flag_evaluations
WHERE timestamp > NOW() - INTERVAL '30 days'
GROUP BY flag_key
HAVING COUNT(DISTINCT variation) = 1
ORDER BY total_evaluations DESC;

如果一个 flag 在过去 30 天里每次求值都返回同一个值(比如始终是 true),那它很可能已经全量上线了,应该清理代码中的 flag 判断逻辑。

方法三:代码引用扫描

#!/bin/bash
# 扫描代码中引用的 flag,与 flag 服务中的定义做交叉对比

# 从代码中提取所有 flag 引用
grep -rn 'isEnabled\|getVariant\|feature_enabled' src/ \
    | grep -oP '"[a-z][a-z0-9-]+"' \
    | sort -u > flags_in_code.txt

# 从 flag 服务获取所有已定义的 flag
curl -s http://flag-service/api/flags \
    | jq -r '.[].key' \
    | sort -u > flags_in_service.txt

# 找出代码中引用但服务中不存在的 flag(幽灵引用)
comm -23 flags_in_code.txt flags_in_service.txt > orphan_references.txt

# 找出服务中存在但代码中未引用的 flag(僵尸 flag)
comm -13 flags_in_code.txt flags_in_service.txt > zombie_flags.txt

echo "幽灵引用(代码引用了不存在的 flag):"
cat orphan_references.txt

echo "僵尸 Flag(服务中存在但代码未引用):"
cat zombie_flags.txt

10.3 自动化清理流程

成熟的 flag 清理流程应该尽量自动化:

flowchart TD
    A["定时扫描任务\n每天执行"] --> B{"Flag 类型?"}
    B -->|Release / Experiment| C{"超过预期\n移除日期?"}
    B -->|Ops / Permission| D{"超过审计\n周期(90天)?"}

    C -->|否| E["正常,不处理"]
    C -->|超期 7 天| F["发送 Slack 提醒\n给 Flag 所有者"]
    C -->|超期 14 天| G["自动创建\nJira Ticket"]
    C -->|超期 30 天| H["升级到\n团队负责人"]
    C -->|超期 60 天| I["自动生成\n清理 PR"]

    D -->|否| E
    D -->|是| J["发送审计提醒\n要求确认仍需保留"]

    I --> K["PR 包含:\n1. 删除 flag 判断代码\n2. 保留胜出分支\n3. 更新测试"]

10.4 自动生成清理代码

对于简单的布尔 flag,可以用工具自动生成清理 PR:

# 自动 flag 清理工具(简化版)
import ast
import sys

class FlagCleanupTransformer(ast.NodeTransformer):
    """
    将 flag 判断代码替换为确定的分支。
    例如:if flag.is_enabled("x"):
              do_new()
          else:
              do_old()
    当 flag "x" 的最终值为 True 时,替换为:
          do_new()
    """

    def __init__(self, flag_key: str, final_value: bool):
        self.flag_key = flag_key
        self.final_value = final_value
        self.changes = []

    def visit_If(self, node: ast.If) -> ast.AST:
        self.generic_visit(node)

        if self._is_flag_check(node.test):
            self.changes.append({
                "line": node.lineno,
                "action": "removed flag check",
                "flag": self.flag_key
            })

            if self.final_value:
                # 保留 if 分支的代码
                return node.body
            else:
                # 保留 else 分支的代码
                return node.orelse if node.orelse else []

        return node

    def _is_flag_check(self, node: ast.expr) -> bool:
        if isinstance(node, ast.Call):
            if isinstance(node.func, ast.Attribute):
                if node.func.attr in ("is_enabled", "isEnabled"):
                    if node.args and isinstance(node.args[0], ast.Constant):
                        return node.args[0].value == self.flag_key
        return False

实际的清理工具需要处理更多的边界情况:flag 在条件表达式中与其他条件组合、flag 结果赋值给变量后使用、flag 在多个文件中引用等。像 Piranha(Uber 开源的 flag 清理工具)就是专门解决这个问题的。

10.5 Uber Piranha:工业级 Flag 清理

Uber 在 2020 年开源了 Piranha,一个跨语言的 flag 清理工具。它支持 Java、Swift、Objective-C 等语言,能够:

  1. 识别代码中的 flag 判断模式(if、三元表达式、switch 等)。
  2. 根据 flag 的最终值,自动删除不需要的分支。
  3. 删除由此产生的死代码(未使用的导入、空方法等)。
  4. 生成清理 PR。

Uber 的工程博客报告说,Piranha 在一年内自动清理了超过 1800 个过期的 flag,节省了大量工程时间。

配置示例:

{
  "piranha": [
    {
      "methodName": "isEnabled",
      "flagType": "treated",
      "argumentIndex": 0
    },
    {
      "methodName": "getVariant",
      "flagType": "treated",
      "argumentIndex": 0
    }
  ],
  "flags_to_clean": [
    {
      "flag_name": "new-checkout-flow",
      "treated_value": true
    },
    {
      "flag_name": "old-search-algorithm",
      "treated_value": false
    }
  ]
}

十一、工程案例:Netflix 的特性开关实践

11.1 规模与挑战

Netflix 是特性开关的重度用户。根据其工程团队在多次技术会议上的分享,Netflix 的特性开关系统具有以下特征:

11.2 架构要点

Netflix 的特性开关系统基于其自研的配置管理平台 Archaius,核心设计要点如下:

  1. 分层配置。Flag 定义分为多个层:全局默认值 -> 环境级覆盖 -> 集群级覆盖 -> 实例级覆盖。查询时按层级从下到上合并。

  2. 动态属性。Flag 的值不仅仅是布尔值——Netflix 广泛使用动态属性(Dynamic Property),包括数值、字符串、JSON 对象等。例如,推荐算法的超参数就是通过 flag 系统动态调整的。

  3. 与实验平台深度集成。Netflix 的 A/B 测试平台每天运行上百个并行实验,所有实验的用户分组都通过特性开关系统完成。实验结束后,系统会自动通知 flag 所有者进行清理。

  4. 故障隔离。Flag 系统本身的故障不能影响业务服务。如果 flag 服务不可用,SDK 使用本地缓存的最后一个已知良好状态。如果缓存也不可用,使用代码中硬编码的默认值。

11.3 经验教训

Netflix 在实践中总结的关键经验:

  1. Flag 不是免费的。每个 flag 都有成本——代码复杂度、测试矩阵、认知负荷。团队需要把 flag 的创建和清理看作同等重要的工程活动。

  2. 所有权必须明确。每个 flag 必须有一个明确的所有者(个人而非团队)。当所有者离职时,flag 的所有权必须显式转移。

  3. 默认值要安全。Flag 的默认值(当求值失败时使用的值)必须是”安全”的——通常意味着回退到旧行为。永远不要把默认值设为新功能开启状态。

  4. 监控 flag 的健康。Netflix 监控以下 flag 级别的指标:

    • 求值延迟(P50/P99)
    • 求值错误率
    • 变体分布是否符合预期
    • Flag 的年龄和活跃度

十二、Flag 系统的运维考量

12.1 高可用设计

Flag 系统是所有服务的依赖——它的不可用会影响所有使用 flag 的服务。高可用设计需要考虑以下层次:

可用性保障层次:

1. SDK 本地缓存(最后一道防线)
   - 即使 flag 服务完全不可用,SDK 使用内存中的最后已知值
   - 对于冷启动场景,SDK 可以从本地文件加载上一次的快照

2. 边缘代理 / Relay Proxy
   - 每个数据中心部署独立的代理
   - 代理有自己的持久化缓存(Redis / 本地磁盘)
   - 中心服务不可用时,代理继续提供服务

3. 中心 Flag 服务多区域部署
   - 跨可用区 / 跨区域部署
   - 数据库使用多副本
   - SSE 推送服务水平扩展

12.2 缓存一致性与最终一致

Flag 系统是一个典型的最终一致性(Eventual Consistency)系统。当你在管理后台修改一个 flag 时,变更需要经过以下路径才能到达所有应用实例:

管理后台 -> 数据库 -> 推送服务 -> 边缘代理 -> SDK 缓存 -> 应用代码

端到端延迟:
  SSE 模式:100ms - 1s
  轮询模式:取决于轮询间隔(通常 10-60s)

这个延迟在大多数场景下是可以接受的。但对于运维开关这种需要紧急生效的场景,SSE 模式的秒级延迟是必要的。

12.3 安全考量

Flag 系统需要特别注意安全:

  1. SDK 密钥管理。服务端 SDK 密钥必须作为密钥管理,不能硬编码在代码中。客户端 SDK 的 ID 可以暴露(它只能读取该环境的 flag 求值结果)。
  2. 权限控制。生产环境的 flag 修改应该需要额外的审批流程。可以按环境(开发/预发/生产)、按 flag 类型、按风险级别设置不同的权限策略。
  3. 敏感 flag 保护。运维开关(如服务降级开关)应该设置为”受保护”状态,修改时需要二次确认或多人审批。

十三、Flag 与 CI/CD 管道集成

13.1 部署与发布解耦

特性开关的核心价值之一是将”部署”和”发布”解耦。在 CI/CD 管道中,这意味着:

# CI/CD 管道示例(GitHub Actions)
name: Deploy and Release
on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - name: Build
        run: make build

      - name: Run tests (all flags on)
        run: make test FLAG_ENV=all_on

      - name: Run tests (all flags off)
        run: make test FLAG_ENV=all_off

      - name: Deploy to production
        run: make deploy
        # 新功能已部署,但 flag 关闭,用户看不到

  release:
    needs: deploy
    runs-on: ubuntu-latest
    steps:
      - name: Enable flag for internal users
        run: |
          curl -X PATCH \
            -H "Authorization: Bearer ${{ secrets.FLAG_API_TOKEN }}" \
            -H "Content-Type: application/json" \
            -d '{"targets": [{"variation": 0, "values": ["internal-segment"]}]}' \
            "https://flag-service.internal/api/flags/new-feature/targeting"

      - name: Smoke test with internal users
        run: make smoke-test USER_SEGMENT=internal

      - name: Gradual rollout to 10%
        run: |
          curl -X PATCH \
            -H "Authorization: Bearer ${{ secrets.FLAG_API_TOKEN }}" \
            -H "Content-Type: application/json" \
            -d '{"fallthrough": {"rollout": {"variations": [{"variation": 0, "weight": 10000}, {"variation": 1, "weight": 90000}]}}}' \
            "https://flag-service.internal/api/flags/new-feature/targeting"

13.2 Flag 状态作为部署前置条件

可以在部署管道中检查 flag 状态,确保关键前置条件满足:

# 部署前置条件检查
def pre_deploy_flag_check(flag_client, environment: str) -> bool:
    checks = [
        # 确保降级开关处于正常状态
        ("ops.recommendation.circuit-breaker", True, "推荐服务降级开关应该开启"),
        # 确保上一个版本的发布 flag 已经全量
        ("release.checkout.v2", True, "Checkout V2 应该已经全量上线"),
    ]

    all_passed = True
    for flag_key, expected_value, message in checks:
        actual = flag_client.is_enabled(flag_key)
        if actual != expected_value:
            print(f"FAIL: {message} (expected={expected_value}, actual={actual})")
            all_passed = False
        else:
            print(f"PASS: {flag_key} = {actual}")

    return all_passed

十四、OpenFeature 标准化

14.1 标准化的必要性

特性开关领域长期缺乏统一标准。每个供应商的 SDK 有不同的 API、不同的概念模型、不同的配置方式。如果团队从 LaunchDarkly 迁移到 Unleash,所有调用 flag 的代码都需要重写。

OpenFeature 是 CNCF(云原生计算基金会)的一个沙箱项目,目标是为特性开关提供统一的 API 标准:

// OpenFeature 统一 API(Java)
import dev.openfeature.sdk.OpenFeatureAPI;
import dev.openfeature.sdk.Client;
import dev.openfeature.sdk.EvaluationContext;

// 注册 Provider(可以是 LaunchDarkly、Unleash、或自建系统)
OpenFeatureAPI api = OpenFeatureAPI.getInstance();
api.setProvider(new LaunchDarklyProvider("sdk-key"));
// 或者:api.setProvider(new UnleashProvider(unleashConfig));
// 或者:api.setProvider(new CustomProvider(myFlagService));

// 获取客户端
Client client = api.getClient();

// 使用统一 API 求值——不依赖任何特定供应商
EvaluationContext ctx = new ImmutableContext("user-123",
    Map.of("plan", new Value("enterprise"),
           "country", new Value("CN")));

boolean showNewFeature = client.getBooleanValue(
    "new-checkout-flow",
    false,  // 默认值
    ctx
);

String buttonColor = client.getStringValue(
    "checkout-button-color",
    "blue",
    ctx
);

14.2 OpenFeature 的核心概念

OpenFeature 架构:

+-------------------+
|   应用代码        |
|   使用统一 API    |
+--------+----------+
         |
+--------v----------+
|   OpenFeature SDK  |
|   (标准化接口层)  |
+--------+----------+
         |
+--------v----------+
|   Provider         |
|   (供应商适配器)  |
+--------+----------+
         |
+--------v----------+
|   Flag 后端       |
|   (LD/Unleash/...)  |
+-------------------+

OpenFeature 定义了以下核心概念:

// OpenFeature Hook 示例(TypeScript)
import { Hook, HookContext, EvaluationDetails } from "@openfeature/server-sdk";

const loggingHook: Hook = {
    before(hookContext: HookContext): void {
        console.log(`Evaluating flag: ${hookContext.flagKey}`);
    },

    after(
        hookContext: HookContext,
        evaluationDetails: EvaluationDetails<any>
    ): void {
        console.log(
            `Flag ${hookContext.flagKey} evaluated to ${evaluationDetails.value} ` +
            `(reason: ${evaluationDetails.reason})`
        );
    },

    error(hookContext: HookContext, error: Error): void {
        console.error(
            `Flag ${hookContext.flagKey} evaluation failed: ${error.message}`
        );
    }
};

// 注册全局 Hook
OpenFeature.addHooks(loggingHook);

OpenFeature 的价值不仅在于避免供应商锁定——它还建立了一个生态系统,让 flag 相关的工具(审计、测试、监控)可以围绕统一接口构建,而不需要为每个供应商单独适配。


十五、反模式与常见陷阱

15.1 反模式一:Flag 嵌套

// 反模式:flag 嵌套
if (flags.isEnabled("feature-a")) {
    if (flags.isEnabled("feature-b")) {
        if (flags.isEnabled("feature-c")) {
            // 这三个 flag 的组合有 8 种状态
            // 你确定都测试过了?
            doSomethingComplicated();
        }
    }
}

解决方案:如果多个 flag 之间有依赖关系,使用前置条件(Prerequisites)机制在 flag 定义层面声明依赖,而不是在代码中嵌套。

15.2 反模式二:Flag 作为配置

# 反模式:用 flag 传递配置值
timeout = flag_client.get_integer("service-timeout-ms", 5000)
max_retries = flag_client.get_integer("max-retries", 3)
batch_size = flag_client.get_integer("batch-size", 100)

这些不是 flag,是配置。Flag 系统不应该变成通用的配置管理系统。配置值应该放在专门的配置管理工具(如 Consul、etcd、Spring Cloud Config)中。Flag 系统的设计重点是用户分组和渐进发布,不是键值存储。

15.3 反模式三:永生的发布 Flag

// 这个 flag 三年前就应该删除了
if flags.IsEnabled("new-user-onboarding-v2") {
    // "v2" 已经是唯一的版本了
    // 没有人知道 "v1" 长什么样
    // 没有人敢删除这个判断
    showOnboarding()
}

解决方案:在 flag 创建时就设置过期日期,并用自动化工具在过期后强制清理。

15.4 反模式四:在热路径中做远程求值

// 反模式:每次求值都调用远程服务
async function handleRequest(req) {
    // 每个请求都发一次 HTTP 调用,延迟 10-50ms
    const enabled = await fetch("https://flag-service/api/evaluate", {
        method: "POST",
        body: JSON.stringify({ key: "my-flag", context: req.user })
    }).then(r => r.json());

    if (enabled) {
        return newHandler(req);
    }
    return legacyHandler(req);
}

解决方案:使用带本地缓存的 SDK。求值应该在本地完成(微秒级),而不是通过网络调用(毫秒级)。


十六、总结

特性开关是现代软件交付的核心基础设施之一。它的价值不在于 if-else 本身——任何工程师都能写一个条件判断。它的价值在于围绕这个简单概念构建的治理体系

  1. 分类管理。不同类型的 flag 有不同的生命周期预期和管理策略。把发布开关和权限开关混在一起管理,是混乱的开始。

  2. 本地求值。flag 的求值必须在应用进程本地完成,通过 SSE 或轮询保持本地缓存与中心服务同步。这是性能和可用性的基本保障。

  3. 渐进发布。特性开关将”部署”和”发布”解耦,让团队可以安全地、渐进地、可回退地发布新功能。

  4. 实验驱动。特性开关是 A/B 测试的基础设施。分层实验模型让多个实验可以并行运行而不互相干扰。

  5. 技术债务控制。flag 的创建和清理必须被视为同等重要的工程活动。自动化的过期检测和清理工具是必需品,不是锦上添花。

  6. 标准化。OpenFeature 正在建立行业标准,减少供应商锁定,让团队可以在不同的 flag 后端之间平滑迁移。

回到开头的问题:如何管理数千个 flag 的生命周期?答案不是更好的工具——虽然工具很重要——而是更好的纪律。每个 flag 都有明确的所有者、明确的类型、明确的预期寿命。创建 flag 的那一刻,就应该规划好它的终结。

上一篇:部署架构

下一篇:事故响应


参考资料

书籍

论文与技术报告

官方文档

工程博客

工具与框架

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。

2026-04-13 · architecture

【系统架构设计百科】复杂性管理:架构的核心战场

系统复杂性是架构腐化的根源——本文从 Brooks 的本质复杂性与偶然复杂性划分出发,结合认知负荷理论与 Parnas 的信息隐藏原则,系统阐述复杂性的来源、度量与控制手段,并给出可操作的架构策略


By .