土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】流处理架构:从批处理到实时的范式迁移

文章导航

分类入口
architecture
标签入口
#stream-processing#Flink#Kafka-Streams#exactly-once#windowing#event-time

目录

一个电商平台的风控系统需要在用户下单后 200 毫秒内判断这笔交易是否存在欺诈风险。风控规则依赖最近 5 分钟内该用户的下单频率、IP 地址变化、设备指纹匹配度。数据来自三个不同的 Kafka topic,需要做多流关联。

如果用批处理来做,最快也是分钟级延迟——攒一批数据、启动 Spark 作业、读取输入、计算、写出结果。等批处理跑完,欺诈交易早已完成。如果用流处理来做,每条交易事件到达的瞬间就触发计算,200 毫秒内返回结果。

但流处理不只是”把批处理的延迟降低”这么简单。批处理面对的是有界数据集(Bounded Dataset),输入有明确的开始和结束;流处理面对的是无界数据流(Unbounded Stream),数据永远不会”到齐”。这个根本差异导致了一系列工程难题:什么时候输出结果?怎么处理迟到数据?状态存在哪里?故障恢复时怎么保证不丢不重?

本文要回答的核心问题是:流处理的精确一次语义(Exactly-Once Semantics)在工程上到底有多难?窗口计算的语义陷阱是什么?

版本说明 本文涉及的 Flink 特性基于 Apache Flink 1.17/1.18,Kafka 特性基于 Apache Kafka 3.x,Kafka Streams 基于 3.5+。

上一篇 中我们讨论了数据湖与数据仓库的架构设计,流处理是实时数据管道的核心计算引擎。


一、批处理与流处理的本质区别

很多文章把批处理和流处理的区别简单归结为”延迟高低”。延迟确实是最直观的差异,但不是本质差异。本质差异在于计算模型对数据边界的假设不同

批处理:有界数据上的完整计算

批处理的核心假设是:输入数据是有界的,计算开始前所有数据已经就绪

MapReduce 是批处理的经典范式。一个 WordCount 作业的执行流程是:

  1. 输入:HDFS 上的一组文件,文件数量和大小在作业启动前已知
  2. Map 阶段:逐行读取,输出 (word, 1) 键值对
  3. Shuffle 阶段:按 key 分区、排序、传输到 Reducer
  4. Reduce 阶段:对同一个 key 的所有 value 做聚合
  5. 输出:写入 HDFS

整个过程有明确的开始和结束。当 Reduce 阶段完成时,结果是精确的——因为所有输入数据都已经被处理过了。

Spark 用 RDD(Resilient Distributed Dataset)替代了 MapReduce 的两阶段模型,支持更丰富的算子组合和内存计算,但核心假设没变:输入是有界的,计算是一次性的

// Spark 批处理示例:有界数据上的 WordCount
JavaRDD<String> lines = spark.read().textFile("hdfs:///input/logs/*.txt").javaRDD();
JavaPairRDD<String, Integer> counts = lines
    .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey(Integer::sum);
counts.saveAsTextFile("hdfs:///output/word-counts");
// 作业结束,结果完整且精确

流处理:无界数据上的持续计算

流处理的核心假设完全相反:输入数据是无界的,数据持续到达,计算永远不会”完成”

一个实时 WordCount 没有”最终结果”——每来一条数据,当前的计数就可能变化。这引出了批处理不需要面对的三个根本问题:

问题一:什么时候输出结果?

批处理在所有数据处理完毕后输出一次结果。流处理不能等,因为数据永远不会到齐。必须定义一个触发条件(Trigger),比如”每 10 秒输出一次当前结果”或”每收到 100 条数据输出一次”。

问题二:怎么定义”时间”?

批处理的时间语义很简单——文件的修改时间,或者日志里的时间戳。流处理必须区分两种时间:

这两个时间可能相差几秒、几分钟甚至几小时(比如移动端离线后重新上线批量上报数据)。选择哪种时间作为计算基准,直接影响结果的正确性。

问题三:故障恢复时怎么保证结果正确?

批处理的故障恢复相对简单:重新执行失败的 task 即可,输入数据在 HDFS 上不会丢失,输出是幂等的覆盖写入。流处理的状态是持续累积的——一个实时计数器已经处理了 100 万条数据,如果此时节点宕机,恢复后必须精确地从断点继续,既不能丢数据(导致少计),也不能重复处理(导致多计)。

一个对比表格

维度 批处理 流处理
数据边界 有界(Bounded) 无界(Unbounded)
计算模式 一次性执行,有明确结束 持续执行,无结束
延迟 分钟到小时级 毫秒到秒级
结果语义 精确(所有数据已处理) 近似或最终精确(取决于窗口和水印)
时间语义 通常不需要区分 必须区分事件时间和处理时间
状态管理 无状态或临时状态 必须管理长期运行的增量状态
故障恢复 重新执行 task checkpoint + 精确一次语义
典型框架 MapReduce、Spark Batch Flink、Kafka Streams、Spark Structured Streaming

Tyler Akidau 在 Google 的 Dataflow 论文和《Streaming Systems》一书中提出了一个关键洞见:批处理是流处理的特例。如果流处理引擎能够正确处理无界数据,那么处理有界数据只是一个退化场景——设定数据的边界,等所有数据到达后触发最终计算即可。Flink 正是基于这个理念设计的:批处理模式是流处理模式的一个特殊优化路径。


二、精确一次语义:工程上到底有多难

“精确一次”(Exactly-Once)可能是分布式系统中最容易被误解的术语。很多人以为它意味着”每条消息只被处理一次”。实际上,在分布式环境中,由于网络分区、节点故障、重启等原因,消息的重复投递几乎不可避免。精确一次语义真正的含义是:即使消息被重复投递,系统的最终状态和输出效果与每条消息只被处理一次完全相同

三种投递语义

在讨论精确一次之前,先厘清三种投递语义(Delivery Semantics):

至多一次(At-Most-Once):消息可能丢失,但不会重复。实现最简单——发送后不管确认,不做重试。适合对丢失不敏感的场景,比如日志采集中的少量丢失通常可以接受。

至少一次(At-Least-Once):消息不会丢失,但可能重复。发送方在未收到确认时重试发送。大多数消息系统默认提供这个级别。问题是重复消息会导致计算结果偏差——比如一笔订单被重复计入销售额。

精确一次(Exactly-Once):消息既不丢失也不重复,或者更准确地说,重复消息的效果被消除。这是最难实现的级别。

为什么精确一次这么难

考虑一个简单的场景:流处理引擎从 Kafka 读取交易数据,计算实时销售总额,结果写入数据库。

Kafka Topic --> Flink 算子(累加销售额) --> MySQL

实现精确一次需要同时解决三个层面的问题:

第一层:Source 端的精确一次

Flink 从 Kafka 消费数据时,需要记录当前消费到的偏移量(offset)。如果 Flink 处理完一条消息后还没来得及提交 offset 就宕机了,重启后会从上次提交的 offset 重新消费,导致同一条消息被处理两次。

解决思路:不单独提交 Kafka offset,而是把 offset 和算子状态一起纳入 checkpoint。恢复时,从 checkpoint 中恢复 offset 和状态,保证两者一致。

第二层:内部处理的精确一次

Flink 的算子可能有多个并行实例,数据在算子之间通过网络传输。如果一个算子处理完数据并更新了本地状态,但发送给下游算子的消息在网络传输中丢失了,怎么办?如果重发,下游算子会重复处理。

解决思路:Flink 使用 checkpoint 机制,把所有算子的状态在逻辑上对齐到同一个时间点。恢复时,所有算子都回退到同一个 checkpoint,从那个一致的状态点重新处理。

第三层:Sink 端的精确一次

即使 Flink 内部实现了精确一次,如果写入外部系统(MySQL、Elasticsearch、Kafka)时发生了重复写入,最终结果还是不精确的。比如 Flink 已经把销售额 1000 写入了 MySQL,但在确认写入成功之前宕机了。恢复后,Flink 从 checkpoint 重新计算,又得到 1000,再次写入 MySQL。如果 MySQL 端没有去重机制,两次写入就导致了错误结果。

解决思路有两种:

  1. 幂等写入(Idempotent Write):写入操作天然幂等,重复执行不影响结果。比如用主键做 UPSERT,或者写入 KV 存储时用确定性的 key
  2. 两阶段提交(Two-Phase Commit):Flink 通过 TwoPhaseCommitSinkFunction 实现。第一阶段(pre-commit)写入但不提交事务,第二阶段(commit)在 checkpoint 完成后提交事务。如果 checkpoint 失败,回滚未提交的事务
// Flink 精确一次写入 Kafka 的配置
KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("broker1:9092,broker2:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.builder()
            .setTopic("output-topic")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    // 关键配置:启用精确一次语义
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    // 事务前缀,用于 Kafka 事务 ID
    .setTransactionalIdPrefix("flink-tx-")
    .build();

// Kafka 消费端也需要配置
// isolation.level = read_committed
// 否则消费端会读到未提交的事务数据

精确一次的性能代价

精确一次不是免费的。启用 Flink 的精确一次 Kafka Sink 后,以下性能影响是可预期的:

  1. 吞吐量下降:两阶段提交引入额外的协调开销。根据 Confluent 的基准测试,启用事务后 Kafka Producer 的吞吐量会下降 10%-30%,具体取决于消息大小和批量配置
  2. 延迟增加:事务数据在 commit 之前对下游消费者不可见(如果消费者配置了 read_committed),延迟至少增加一个 checkpoint 间隔(通常 1-10 分钟)
  3. 资源占用增加:每个 Sink 并行实例需要维护独立的 Kafka 事务,增加 Kafka Broker 端的事务协调负担

我认为在大多数业务场景中,at-least-once 加上下游幂等去重是比 exactly-once 更务实的选择。端到端的精确一次语义对整条链路上每个组件都有要求,任何一个环节掉链子都会破坏整体保证。而幂等写入只需要在 Sink 端做好去重逻辑,架构更简单,故障排查也更容易。


Flink 的 checkpoint 是实现精确一次语义的核心机制。它的理论基础是 Chandy-Lamport 全局快照算法(1985),但 Flink 对这个算法做了大量工程化改造,使其适用于大规模流处理场景。

Chandy-Lamport 算法的核心思想

Chandy-Lamport 算法要解决的问题是:在一个分布式系统中,如何在不暂停系统运行的情况下,获取一个全局一致的快照?

算法的核心机制是标记消息(Marker)

  1. 某个进程发起快照,记录自己的本地状态,然后向所有出边发送 Marker
  2. 其他进程收到 Marker 后,记录自己的本地状态,并向自己的所有出边转发 Marker
  3. 在收到 Marker 之前,从该入边收到的所有消息都记录为”通道状态”(即传输中的消息)
  4. 当所有进程都完成了本地状态记录和通道状态记录,全局快照完成

关键性质:这个快照是一致的(Consistent Cut)——如果快照中包含了一个事件的效果,那么导致这个事件的所有因果前驱事件的效果也一定包含在快照中。

Flink 把 Chandy-Lamport 的 Marker 具体化为 Checkpoint Barrier。Barrier 是一种特殊的控制消息,由 JobManager 注入到数据流的源头,随数据流向下游传播。

以下是 Flink checkpoint 的执行流程:

sequenceDiagram
    participant JM as JobManager
    participant S1 as Source 1
    participant S2 as Source 2
    participant OP as Operator(keyBy + sum)
    participant SK as Sink
    participant ST as State Backend(RocksDB)

    JM->>S1: 触发 Checkpoint N
    JM->>S2: 触发 Checkpoint N
    S1->>S1: 记录 Kafka offset
    S2->>S2: 记录 Kafka offset
    S1->>OP: 发送 Barrier N(混在数据流中)
    S2->>OP: 发送 Barrier N(混在数据流中)
    Note over OP: 收到 S1 的 Barrier N<br/>但还没收到 S2 的 Barrier N<br/>开始 Barrier 对齐
    OP->>OP: 缓存 S1 后续数据<br/>继续处理 S2 数据
    Note over OP: 收到 S2 的 Barrier N<br/>对齐完成
    OP->>ST: 异步快照算子状态
    OP->>SK: 转发 Barrier N
    SK->>SK: pre-commit 外部事务
    SK->>JM: 确认 Checkpoint N 完成
    OP->>JM: 确认 Checkpoint N 完成
    S1->>JM: 确认 Checkpoint N 完成
    S2->>JM: 确认 Checkpoint N 完成
    JM->>JM: 所有确认收齐<br/>Checkpoint N 成功
    JM->>SK: 通知 commit 事务

Barrier 对齐与非对齐 Checkpoint

上面流程中有一个关键步骤:Barrier 对齐(Barrier Alignment)。当一个算子有多个输入通道时,它可能先收到某个通道的 Barrier,而其他通道的 Barrier 还在路上。在对齐模式下,算子会:

  1. 收到第一个 Barrier 后,暂停处理该通道的后续数据(缓存到内存或磁盘)
  2. 继续正常处理其他通道的数据
  3. 等所有通道的 Barrier 都到齐后,执行状态快照
  4. 快照完成后,处理之前缓存的数据,恢复正常处理

对齐的目的是保证快照的一致性:快照中包含的状态,精确对应 Barrier 之前所有数据的处理结果,不包含 Barrier 之后任何数据的影响。

但对齐有一个严重的工程问题:反压放大(Backpressure Amplification)

假设算子有两个输入通道 A 和 B,通道 A 的 Barrier 很快到达了,但通道 B 因为上游处理慢或网络延迟,Barrier 迟迟不来。在等待期间,通道 A 的数据被持续缓存,不能处理。如果等待时间很长,缓存会消耗大量内存,甚至引发 OOM(Out of Memory)。更糟的是,通道 A 的反压会传导到 A 的上游,导致整个流水线变慢。

Flink 1.11 引入了非对齐 Checkpoint(Unaligned Checkpoint)来解决这个问题:

// 启用非对齐 Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // 60 秒间隔
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 非对齐 Checkpoint 在反压严重时特别有用
// 但会增加 checkpoint 的大小(因为需要保存传输中的数据)

非对齐 Checkpoint 的做法是:算子收到任意一个通道的 Barrier 后,立即执行快照,同时把其他通道中尚未处理的数据(即 Barrier 之前的数据)也作为快照的一部分保存。恢复时,先恢复状态,再重放这些缓存的数据。

维度 对齐 Checkpoint 非对齐 Checkpoint
反压敏感度 高:反压导致对齐时间增长 低:不需要等待对齐
Checkpoint 大小 只包含算子状态 包含算子状态 + 传输中的数据
Checkpoint 耗时 反压时可能很长 相对稳定
恢复时间 较快(直接恢复状态) 较慢(需要重放缓存数据)
适用场景 正常负载、低反压 高反压、对 checkpoint 稳定性要求高

State Backend:状态存在哪里

Checkpoint 本质上是对算子状态的快照。算子状态存在哪里,直接决定了 checkpoint 的性能特征。Flink 提供两种主要的 State Backend:

HashMapStateBackend(原 MemoryStateBackend / FsStateBackend):状态存储在 JVM 堆内存中,checkpoint 时序列化写入持久化存储(通常是 HDFS 或 S3)。优点是读写速度快(内存操作),缺点是状态大小受 JVM 堆内存限制,大状态场景容易 OOM。

EmbeddedRocksDBStateBackend(原 RocksDBStateBackend):状态存储在 RocksDB 中(一个嵌入式 KV 存储引擎),RocksDB 的数据文件存放在本地磁盘。checkpoint 时通过增量快照(Incremental Checkpoint)只上传变化的 SST 文件。优点是状态大小不受内存限制(可以到 TB 级),缺点是每次状态访问都涉及序列化/反序列化和可能的磁盘 I/O。

// 配置 RocksDB State Backend 并启用增量 Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = 增量 checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");

// RocksDB 调优参数
Configuration config = new Configuration();
// 每个算子实例的 RocksDB 内存预算
config.set(RocksDBConfigurableOptions.FIX_PER_TKM_MEMORY_SIZE, MemorySize.ofMebiBytes(256));
// Block cache 大小,影响读性能
config.set(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.ofMebiBytes(128));

实际工程中,状态大小是选择 State Backend 的核心考量。如果状态在几十 GB 以内且对延迟敏感,HashMapStateBackend 是更好的选择;如果状态可能增长到几百 GB 甚至 TB 级别,RocksDB 是唯一的选择。


四、事件时间与处理时间的工程影响

时间语义的选择是流处理中最容易被低估的决策。选错了时间语义,计算结果可能系统性地偏离预期,而且这种偏差在测试环境中很难暴露——因为测试环境通常不存在网络延迟、消费者积压等生产环境的常见状况。

两种时间的定义

事件时间(Event Time):事件在源头实际发生的时间。比如用户在 14:00:03 点击了一个按钮,这个时间戳被写入点击事件的数据中。无论这条数据经过多少中间系统、延迟多久到达流处理引擎,它的事件时间永远是 14:00:03。

处理时间(Processing Time):流处理引擎实际处理这条数据的系统时钟时间。如果这条数据在 14:00:08 到达 Flink 算子,处理时间就是 14:00:08。

// Flink 中提取事件时间的典型做法
DataStream<ClickEvent> clicks = env
    .addSource(new FlinkKafkaConsumer<>("clicks", new ClickEventDeserializer(), kafkaProps))
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.getClickTime())
    );

一个具体的例子:为什么处理时间不靠谱

假设我们要统计每分钟的网站点击量。下面两个场景说明了处理时间的问题:

场景一:消费者积压

某天凌晨 3:00,Flink 作业因为一个 bug 宕机了,凌晨 3:30 修复后重启。在这 30 分钟内,Kafka 中积压了 30 分钟的数据。重启后,Flink 快速消费积压数据,原本属于 3:00-3:30 的数据在几秒内被全部处理。

如果用处理时间做窗口统计,3:30 这一分钟的点击量会出现一个巨大的峰值(包含了 30 分钟积压的数据),而 3:00-3:30 的点击量为零。这完全不反映真实的用户行为。

如果用事件时间做窗口统计,数据会被正确地分配到它们实际发生的那一分钟,即使这些数据是在 3:30 才被处理的。

场景二:跨时区数据源

数据来自全球多个数据中心。美国数据中心的数据到达北京的 Flink 集群需要 200 毫秒网络延迟,而北京本地数据中心的延迟只有 2 毫秒。如果用处理时间做窗口统计,美国用户的点击会系统性地被分配到比实际发生时间晚 200 毫秒的窗口中。大多数时候这 200 毫秒跨不了窗口边界,但在窗口边界附近(比如整分钟前后 200 毫秒),美国用户的点击会被错误地归入下一个窗口。

事件时间的代价

事件时间虽然语义上更正确,但引入了额外的工程复杂性:

  1. 数据必须携带时间戳:需要在数据生产端嵌入准确的时间戳,并且各数据源的时钟需要合理同步(通常通过 NTP)
  2. 需要处理乱序数据:网络延迟、多分区消费等原因导致数据到达顺序和事件时间顺序不一致
  3. 需要水印(Watermark)机制:告诉系统”某个时间点之前的数据应该都到齐了”,从而触发窗口计算
  4. 需要处理迟到数据(Late Data):水印之后仍然可能到达的数据需要有明确的处理策略

这些代价不小,但在绝大多数需要时间窗口统计的场景中,事件时间是唯一正确的选择。我见过太多团队一开始为了省事用处理时间,后来在生产环境中被积压回放、跨区延迟等问题反复教训,最终还是切换到了事件时间。


五、水印机制与迟到数据处理

水印(Watermark)是事件时间模型中最精巧也最容易出错的机制。它的本质是一个启发式的进度指标:当系统生成一个值为 T 的水印时,它声明的是”时间戳小于 T 的数据应该都已经到达了”。

注意”应该”二字。水印是一个估计,不是保证。如果估计得太保守(水印推进太慢),结果输出的延迟会增加;如果估计得太激进(水印推进太快),部分迟到数据会被遗漏。

水印的生成策略

Flink 提供了两种内置的水印生成策略:

单调递增水印(Monotonously Increasing):假设数据严格按事件时间顺序到达,水印直接取最新数据的时间戳。适用于单分区、单数据源、严格有序的场景。

WatermarkStrategy
    .<Event>forMonotonousTimestamps()
    .withTimestampAssigner((event, ts) -> event.getTimestamp());

有界乱序水印(Bounded Out-of-Orderness):假设数据可能乱序到达,但乱序程度有一个上界。水印 = 当前最大事件时间 - 允许的最大乱序时间。

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.getTimestamp());
// 如果当前看到的最大事件时间是 14:00:30
// 水印 = 14:00:30 - 10s = 14:00:20
// 含义:14:00:20 之前的数据应该都到齐了

在实际工程中,forBoundedOutOfOrderness 是最常用的策略,但参数的选择需要基于对数据特性的深入理解。这个参数设多大合适?没有标准答案,取决于数据源的延迟分布。一种做法是先采样一段时间的数据,统计事件时间和到达时间之差的分布,然后选取 99 百分位(P99)作为最大乱序时间。

多数据源的水印推进

当数据流有多个源时,水印推进变得更加复杂。Flink 的水印推进规则是:取所有源水印的最小值作为全局水印

这意味着如果某个源暂时没有数据产出(比如一个低流量的 Kafka partition),它的水印不会推进,进而拖住全局水印。全局水印被拖住后,下游所有依赖事件时间的窗口计算都无法触发。

Flink 提供了空闲源检测机制来应对这个问题:

WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, ts) -> event.getTimestamp())
    // 如果某个源超过 30 秒没有数据,标记为空闲
    // 空闲源的水印不参与全局水印计算
    .withIdleness(Duration.ofSeconds(30));

迟到数据的处理策略

水印推过某个窗口的结束时间后,该窗口会被触发计算并输出结果。但之后仍然可能到达属于这个窗口的数据——这就是迟到数据(Late Data)。

Flink 提供三种处理策略:

策略一:直接丢弃(默认行为)

窗口触发后,迟到数据被直接丢弃。简单粗暴,适用于对少量数据丢失不敏感的场景。

策略二:允许迟到(Allowed Lateness)

窗口触发后,保留窗口状态一段时间。在允许迟到时间内到达的数据会触发窗口的重新计算和结果更新。

DataStream<AggResult> result = clicks
    .keyBy(ClickEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    // 允许 5 分钟的迟到
    .allowedLateness(Time.minutes(5))
    .aggregate(new ClickCountAggregator());
// 窗口首次触发后,5 分钟内到达的迟到数据
// 会触发窗口重新计算并输出更新后的结果
// 5 分钟后窗口状态被清除,之后的迟到数据被丢弃

策略三:旁路输出(Side Output)

把迟到数据发送到一个单独的数据流中,由业务逻辑自行处理——比如写入一个”迟到数据修正”的离线流程。

OutputTag<ClickEvent> lateDataTag = new OutputTag<ClickEvent>("late-data") {};

SingleOutputStreamOperator<AggResult> result = clicks
    .keyBy(ClickEvent::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .allowedLateness(Time.minutes(5))
    .sideOutputLateData(lateDataTag)
    .aggregate(new ClickCountAggregator());

// 获取迟到数据流,单独处理
DataStream<ClickEvent> lateData = result.getSideOutput(lateDataTag);
lateData.addSink(new LateDataSink()); // 写入离线修正流程

实际工程中,我推荐的做法是组合使用策略二和策略三:设置一个合理的 allowedLateness(比如水印乱序时间的 2-3 倍),同时用旁路输出兜底超过允许迟到时间的数据。这样既能在线修正大部分迟到数据的影响,又不会丢失任何数据。


六、窗口计算的语义陷阱

窗口(Window)是流处理中把无界数据切分成有界块的核心抽象。窗口的类型选择和参数配置直接决定计算结果的语义。选错窗口类型或者误解窗口边界,会导致结果系统性偏差。

三种基本窗口类型

滚动窗口(Tumbling Window):固定长度、不重叠。每条数据恰好属于一个窗口。

时间轴:|--- 窗口1 ---|--- 窗口2 ---|--- 窗口3 ---|
         [0, 5)        [5, 10)       [10, 15)
// 每 5 分钟统计一次各商品的销售额
DataStream<SalesResult> sales = orders
    .keyBy(Order::getProductId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new SalesAggregator());

滚动窗口最常见的陷阱是窗口对齐问题。Flink 的滚动窗口默认对齐到 epoch(Unix 时间戳 0)。比如一个 1 小时的窗口,边界是 0:00-1:00、1:00-2:00,而不是作业启动时间开始计算。如果你的业务定义的”小时”是从 0:30 到 1:30,需要设置 offset:

// 窗口从每小时的第 30 分钟开始
TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(30))
// 窗口边界:0:30-1:30, 1:30-2:30, ...

滑动窗口(Sliding Window):固定长度、有重叠。滑动步长小于窗口长度时,一条数据可能属于多个窗口。

窗口长度 = 10,滑动步长 = 5
|--- 窗口1 ---|
         |--- 窗口2 ---|
                  |--- 窗口3 ---|
[0, 10)   [5, 15)   [10, 20)
// "最近 10 分钟的移动平均",每 5 分钟更新一次
DataStream<AvgResult> movingAvg = metrics
    .keyBy(Metric::getHostName)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .aggregate(new MovingAverageAggregator());

滑动窗口的陷阱是资源消耗。如果窗口长度是 1 小时、滑动步长是 1 秒,那么每条数据会被分配到 3600 个窗口中。这意味着 3600 倍的状态存储和计算量。在实际工程中,滑动窗口的窗口长度/滑动步长比值不应超过几十,否则应该考虑用增量聚合(如指数衰减平均)来替代。

会话窗口(Session Window):窗口的边界由数据驱动。如果两条相邻数据的时间间隔超过指定的 gap,就分割成不同的窗口。

gap = 5
数据到达时间:1, 3, 4,    12, 13,    25
窗口划分:    [1, 9)       [12, 18)   [25, 30)
// 用户会话分析:超过 30 分钟无活动则视为会话结束
DataStream<SessionResult> sessions = userActions
    .keyBy(UserAction::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator());

会话窗口的陷阱是窗口合并(Window Merging)。一条新数据的到来可能导致两个原本独立的窗口合并成一个。比如用户在 14:00 和 14:40 分别有一次点击,形成了两个独立的会话窗口。如果这时收到一条迟到数据(事件时间 14:20),两个窗口之间的间隔不再超过 30 分钟,它们需要合并成一个大窗口。这个合并操作涉及状态的重新组织,在大规模场景下代价不小。

窗口语义的一个真实陷阱

我在实际项目中遇到过一个让人困惑的问题:“为什么每天的 PV(Page View)统计和 Hive 离线统计对不上?”

流处理作业使用事件时间的 1 天滚动窗口统计每日 PV。按照 Flink 的默认行为,窗口边界是 UTC 0:00 到 UTC 24:00。但业务方期望的”一天”是北京时间 0:00 到 24:00(即 UTC 16:00 到次日 UTC 16:00)。UTC 0:00 到 UTC 16:00 之间的数据被分配到了错误的日期窗口中。

修复方法是设置窗口的 offset:

// 北京时间 = UTC + 8
// 需要 offset = -8 小时 = +16 小时(等价)
TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))
// 窗口边界变为:UTC 16:00 到次日 UTC 16:00
// 即北京时间 0:00 到 24:00

这个问题看起来简单,但在大量流处理作业中,时区偏移是出现频率最高的 bug 之一。Hive 离线作业通常按照分区日期(北京时间)组织数据,而 Flink 的默认窗口对齐是 UTC。如果不显式处理,两边的统计结果永远对不上。


Kafka Streams 和 Apache Flink 是当前最主流的两个流处理框架,但它们的设计哲学和架构取舍截然不同。理解这些差异对于技术选型至关重要。

部署模型

Flink:独立的分布式计算集群。Flink 有自己的 JobManager(负责调度和协调)和 TaskManager(负责执行计算任务)。部署方式包括 Standalone 集群、YARN、Kubernetes 等。Flink 作业提交到集群后,由 JobManager 分配资源和调度执行。

Kafka Streams:一个 Java 库(Library),不是一个独立的集群。Kafka Streams 应用就是一个普通的 Java 应用程序,直接嵌入到你的微服务中。没有 master 节点,没有集群管理器。多个实例之间通过 Kafka 的消费者组(Consumer Group)协议进行分区分配和协调。

// Kafka Streams:就是一个普通的 Java 应用
public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("input-topic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.as("word-counts-store"));
        wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        // 应用运行在你自己的进程中
        // 水平扩展 = 启动更多实例
    }
}

这个差异的工程影响是深远的:

状态管理

Flink:状态存储在 TaskManager 本地(内存或 RocksDB),通过 checkpoint 机制定期持久化到远端存储(HDFS/S3)。Flink 管理状态的整个生命周期:分配、备份、恢复、重分布(rescaling)。

Kafka Streams:状态存储在本地 RocksDB 中,同时通过 changelog topic 持久化到 Kafka。每次状态变更都会异步写入对应的 changelog topic。当应用实例宕机后在另一台机器上重启时,从 changelog topic 重建本地状态。

// Kafka Streams 的状态存储
KTable<String, Long> wordCounts = textLines
    .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
    .groupBy((key, word) -> word)
    // Materialized.as() 创建一个有名字的状态存储
    // 底层是本地 RocksDB + Kafka changelog topic
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-counts-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(Serdes.Long()));

// 可以通过 Interactive Queries 直接查询本地状态
ReadOnlyKeyValueStore<String, Long> store =
    streams.store(StoreQueryParameters.fromNameAndType("word-counts-store",
        QueryableStoreTypes.<String, Long>keyValueStore()));
Long count = store.get("hello"); // 直接查询,不需要外部数据库

Kafka Streams 的状态管理方式有一个独特的优势:Interactive Queries。应用可以直接暴露本地状态供外部查询,不需要把结果写入外部数据库。这在某些场景下可以简化架构——比如一个实时计数服务,Kafka Streams 应用本身就可以直接提供查询 API。

架构全面对比

维度 Apache Flink Kafka Streams
定位 独立的分布式流处理引擎 嵌入式流处理库
部署模型 独立集群(Standalone/YARN/K8s) 普通 Java 应用进程
Master 节点 JobManager 无(通过 Consumer Group 协调)
数据源 Kafka、文件、Socket、自定义 Source 仅 Kafka
数据输出 任意 Sink(Kafka、DB、文件等) 仅 Kafka(或通过 API 暴露)
状态存储 内存 / RocksDB + 远端 checkpoint RocksDB + Kafka changelog topic
精确一次 checkpoint + 两阶段提交 Kafka 事务(依赖 Kafka 0.11+)
窗口类型 滚动、滑动、会话、全局 滚动、滑动、会话
事件时间支持 完善(水印、迟到数据、旁路输出) 支持(基于记录时间戳)
SQL 支持 Flink SQL(功能丰富) 不支持
吞吐量(单集群) 极高(百万级 events/sec) 中高(取决于实例数和分区数)
延迟 毫秒级 毫秒级
运维复杂度 高(需要管理集群) 低(普通应用部署)
学习曲线 较陡(概念多、API 丰富) 较平缓(API 简洁)
批流一体 支持(统一 API) 不支持批处理
适用场景 大规模、复杂逻辑、多数据源 中小规模、Kafka 生态内

怎么选

我的判断标准是:

  1. 如果你的数据已经在 Kafka 中,计算逻辑不太复杂(聚合、过滤、简单关联),团队没有 Flink 运维经验——用 Kafka Streams。它的部署和运维成本显著低于 Flink,而且和 Kafka 生态天然集成
  2. 如果需要多数据源关联、复杂 CEP(复杂事件处理)、大规模状态管理、SQL 支持,或者需要批流一体——用 Flink。它的计算能力和功能丰富度远超 Kafka Streams
  3. 如果你已经有 Kubernetes 基础设施并且团队对容器化运维有经验——Flink on Kubernetes(通过 Flink Kubernetes Operator)可以大幅降低 Flink 的运维成本,使得选择 Flink 的门槛降低

八、工程案例:某金融平台的实时风控系统

以下案例来自一个真实的金融平台风控系统改造项目。出于保密要求,具体数字和细节做了脱敏处理,但架构决策和遇到的问题是真实的。

背景

该平台原有的风控系统是基于批处理的:每隔 5 分钟运行一次 Spark 作业,从数据库中拉取最近 5 分钟的交易数据,执行风控规则,标记可疑交易。问题很明显——5 分钟的延迟意味着一个欺诈者可以在被发现之前完成多笔交易。业务方要求将风控判定延迟降低到 500 毫秒以内。

架构设计

改造后的架构基于 Flink:

交易系统 --> Kafka(交易事件 topic)--> Flink 风控作业 --> Kafka(风控结果 topic)--> 交易网关
                                            |
                                     用户画像 Kafka topic
                                     设备指纹 Kafka topic
                                     历史风控 RocksDB 状态

关键设计决策:

决策一:选择事件时间而非处理时间

风控规则中有一条:“5 分钟内同一用户在 3 个不同 IP 发起交易,标记为可疑”。这个”5 分钟”必须基于交易实际发生的时间(事件时间),而不是交易数据到达风控引擎的时间。原因和前面讨论的一样——如果 Kafka 消费出现积压,处理时间会严重失真。

决策二:使用 RocksDB State Backend

风控状态需要维护每个用户最近 24 小时的交易记录摘要(IP 列表、设备列表、交易金额分布等)。平台有千万级活跃用户,每个用户的状态约 2KB,总状态量约 20GB。这个规模超出了 JVM 堆内存的合理范围,必须使用 RocksDB。

决策三:选择 at-least-once + 幂等写入,而非端到端 exactly-once

风控结果写入 Kafka 后,由交易网关消费。交易网关根据交易 ID 做幂等处理——同一笔交易如果收到多次风控结果,只执行第一次。这样即使 Flink 重启后重复产出了一些风控结果,也不会导致一笔交易被重复拦截或重复放行。

放弃端到端 exactly-once 的原因是:启用 Kafka 事务后,风控结果的端到端延迟会增加一个 checkpoint 间隔(配置为 60 秒),这和 500 毫秒的延迟要求直接冲突。

遇到的问题

问题一:水印推进停滞

上线后发现,凌晨时段部分 Kafka partition 长时间没有数据,导致水印不推进,窗口计算卡住。修复方案是配置 withIdleness(Duration.ofSeconds(30))

问题二:RocksDB compaction 导致延迟毛刺

RocksDB 后台的 compaction 操作会导致间歇性的延迟毛刺(P99 从 100ms 飙升到 2s)。优化方案:

// 限制 RocksDB compaction 的并发度和 I/O 带宽
RocksDBOptionsFactory customFactory = new RocksDBOptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handles) {
        return currentOptions
            .setMaxBackgroundJobs(2)        // 限制后台线程数
            .setMaxOpenFiles(256)           // 限制打开文件数
            .setBytesPerSync(1048576);      // 1MB,控制 sync 频率
    }

    @Override
    public ColumnFamilyOptions createColumnFamilyOptions(
            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handles) {
        return currentOptions
            .setCompactionStyle(CompactionStyle.LEVEL)
            .setLevelCompactionDynamicLevelBytes(true)
            .setTargetFileSizeBase(67108864)  // 64MB
            .setMaxBytesForLevelBase(268435456); // 256MB
    }
};

问题三:Checkpoint 超时

状态量增长到 20GB 后,全量 checkpoint 超时频繁发生。切换到增量 checkpoint 后问题解决——增量 checkpoint 只上传自上次 checkpoint 以来变化的 SST 文件,每次传输的数据量从 20GB 降低到几百 MB。

最终效果

改造后的系统在日均 5000 万笔交易的负载下,P99 风控判定延迟为 180 毫秒,满足 500 毫秒的要求。风控规则命中率提升了 15%(因为实时计算能捕获到批处理遗漏的短时间窗口内的行为模式)。


九、流处理的高级模式

双流 Join

流处理中的 Join 和批处理中的 Join 有本质区别。批处理的 Join 操作在两个完整的数据集上执行——两边的数据在 Join 开始前就已经全部就绪。流处理的 Join 面对的是两个无界数据流——来自流 A 的一条数据到达时,流 B 中与之匹配的数据可能还没到(或者可能永远不会到)。

Flink 提供了基于时间窗口的 Interval Join:

// 订单流和支付流的 Join
// 订单创建后 5 分钟内应该收到支付事件
DataStream<Order> orders = /* 订单流,包含事件时间 */;
DataStream<Payment> payments = /* 支付流,包含事件时间 */;

DataStream<OrderPaymentResult> joined = orders
    .keyBy(Order::getOrderId)
    .intervalJoin(payments.keyBy(Payment::getOrderId))
    // 订单事件时间之后 0 到 5 分钟的支付事件会被匹配
    .between(Time.seconds(0), Time.minutes(5))
    .process(new ProcessJoinFunction<Order, Payment, OrderPaymentResult>() {
        @Override
        public void processElement(Order order, Payment payment,
                Context ctx, Collector<OrderPaymentResult> out) {
            out.collect(new OrderPaymentResult(
                order.getOrderId(),
                order.getAmount(),
                payment.getPaymentMethod(),
                payment.getPayTime()
            ));
        }
    });

Interval Join 的语义是:对于流 A 中时间戳为 t 的元素,它会和流 B 中时间戳在 [t + lowerBound, t + upperBound] 范围内的元素进行 Join。这个时间窗口限制了状态的大小——只需要缓存时间窗口范围内的数据,超过时间窗口的数据可以被清理。

复杂事件处理(CEP)

Flink CEP 库允许在数据流中检测符合特定模式的事件序列。比如检测”用户登录失败 3 次后,5 分钟内从新 IP 登录成功”这种欺诈模式:

// 定义事件模式
Pattern<LoginEvent, ?> fraudPattern = Pattern
    .<LoginEvent>begin("failed-logins")
    .where(new SimpleCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent event) {
            return event.getStatus().equals("FAILED");
        }
    })
    .times(3)  // 连续 3 次失败
    .consecutive()  // 必须是连续的,中间不能有成功
    .next("success-from-new-ip")
    .where(new IterativeCondition<LoginEvent>() {
        @Override
        public boolean filter(LoginEvent event, Context<LoginEvent> ctx) {
            // 成功登录,且 IP 与之前的失败登录不同
            if (!event.getStatus().equals("SUCCESS")) return false;
            Set<String> failedIps = new HashSet<>();
            for (LoginEvent failed : ctx.getEventsForPattern("failed-logins")) {
                failedIps.add(failed.getIp());
            }
            return !failedIps.contains(event.getIp());
        }
    })
    .within(Time.minutes(5));  // 整个模式必须在 5 分钟内匹配

// 应用模式并提取匹配结果
PatternStream<LoginEvent> patternStream = CEP.pattern(
    loginEvents.keyBy(LoginEvent::getUserId), fraudPattern);

DataStream<FraudAlert> alerts = patternStream.select(
    new PatternSelectFunction<LoginEvent, FraudAlert>() {
        @Override
        public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
            List<LoginEvent> failures = pattern.get("failed-logins");
            LoginEvent success = pattern.get("success-from-new-ip").get(0);
            return new FraudAlert(
                success.getUserId(),
                failures.size(),
                success.getIp(),
                "SUSPICIOUS_LOGIN_PATTERN"
            );
        }
    });

CEP 的底层实现基于 NFA(Non-deterministic Finite Automaton,非确定性有限自动机)。每个正在匹配中的模式实例都是 NFA 中的一个状态。在高吞吐场景下,如果模式中包含不够精确的匹配条件(比如 where 条件过于宽泛),NFA 的状态数量可能爆炸式增长,导致内存耗尽。因此,CEP 模式的设计需要尽可能精确,并且设置合理的 within 时间限制来控制状态生命周期。

Async I/O

流处理中经常需要关联外部系统的数据——比如根据用户 ID 查询用户画像数据库。如果用同步方式查询,每条消息的处理延迟都会增加一次数据库 RTT(通常几毫秒到几十毫秒),严重限制吞吐量。

Flink 提供了 Async I/O 算子来解决这个问题:

// 异步查询用户画像
DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWait(
    events,
    new AsyncFunction<Event, EnrichedEvent>() {
        private transient AsyncHttpClient httpClient;

        @Override
        public void open(Configuration parameters) {
            httpClient = new AsyncHttpClient();
        }

        @Override
        public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) {
            CompletableFuture<UserProfile> future = httpClient
                .get("http://user-service/profiles/" + event.getUserId());

            future.thenAccept(profile -> {
                resultFuture.complete(Collections.singleton(
                    new EnrichedEvent(event, profile)
                ));
            }).exceptionally(ex -> {
                resultFuture.complete(Collections.singleton(
                    new EnrichedEvent(event, UserProfile.UNKNOWN)
                ));
                return null;
            });
        }
    },
    30, TimeUnit.SECONDS,  // 超时时间
    100  // 最大并发请求数
);

unorderedWait 允许异步查询的结果乱序返回(先完成的先输出),从而最大化吞吐量。如果业务要求保持输入顺序,可以使用 orderedWait,但吞吐量会受到最慢查询的限制。


Flink 1.9 以后大力推进 Flink SQL,目标是用 SQL 统一批处理和流处理的开发体验。Flink SQL 的底层引擎会根据输入数据是有界还是无界,自动选择批处理或流处理的执行策略。

流式 SQL 的语义

流式 SQL 和传统 SQL 最大的区别在于:传统 SQL 查询一个静态的表,返回一个静态的结果;流式 SQL 查询一个持续变化的表(Dynamic Table),产出一个持续更新的结果流。

Flink SQL 用 Changelog Stream 来表达这种持续更新的语义:

-- 创建 Kafka Source 表
CREATE TABLE orders (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'broker1:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- 实时统计每个用户的累计消费金额
-- 这是一个持续查询,结果随新订单到来而更新
SELECT user_id, SUM(amount) AS total_spent
FROM orders
GROUP BY user_id;
-- 输出是一个 changelog 流:
-- +I (user_001, 100.00)    -- 插入
-- -U (user_001, 100.00)    -- 更新前的旧值(撤回)
-- +U (user_001, 250.00)    -- 更新后的新值

时间窗口聚合

Flink SQL 支持在 SQL 语法中直接使用时间窗口:

-- 每 5 分钟统计各商品的销售额和订单数
SELECT
    product_id,
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM orders
GROUP BY
    product_id,
    TUMBLE(order_time, INTERVAL '5' MINUTE);

Flink 1.13 引入了更灵活的窗口 TVF(Table-Valued Function)语法:

-- 使用窗口 TVF 语法(推荐)
SELECT
    product_id,
    window_start,
    window_end,
    COUNT(*) AS order_count,
    SUM(amount) AS total_amount
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)
)
GROUP BY product_id, window_start, window_end;

窗口 TVF 语法的优势是支持更多的窗口类型(包括累积窗口 CUMULATE),以及窗口的 Top-N 等高级分析操作。

Flink SQL 的表达能力在快速进步,但目前仍有一些场景需要退回到 DataStream API:

  1. 复杂的状态管理逻辑:比如需要手动管理多个不同 TTL 的状态,或者需要自定义的状态清理策略
  2. 复杂的 CEP 模式:SQL 目前不支持 CEP 语法(MATCH_RECOGNIZE 有限支持)
  3. 精细的性能调优:比如需要控制 RocksDB 的 compaction 策略、内存分配等底层参数
  4. 自定义窗口触发逻辑:SQL 的窗口触发机制是固定的,不支持自定义 Trigger

在这些场景下,通常的做法是用 DataStream API 实现核心逻辑,再通过 Table.toChangelogStream()StreamTableEnvironment.fromChangelogStream() 与 SQL 层互通。


十一、流处理的可观测性

流处理作业是长期运行的服务,可观测性至关重要。一个风控 Flink 作业 7x24 小时运行,任何异常都需要快速发现和定位。

关键监控指标

吞吐量相关: - numRecordsInPerSecond / numRecordsOutPerSecond:每秒处理的记录数 - numBytesInPerSecond / numBytesOutPerSecond:每秒处理的字节数

延迟相关: - currentInputWatermark:当前水印值。如果水印长时间不推进,说明有数据源空闲或严重积压 - Kafka consumer lag:消费者偏移量与最新偏移量的差值。持续增长说明处理速度跟不上数据产出速度

Checkpoint 相关: - lastCheckpointDuration:最近一次 checkpoint 的耗时 - lastCheckpointSize:最近一次 checkpoint 的大小 - numberOfFailedCheckpoints:checkpoint 失败次数。连续失败意味着系统无法建立恢复点

反压相关: - isBackPressured:算子是否处于反压状态 - busyTimeMsPerSecond:算子在 1 秒内的繁忙时间(毫秒)。接近 1000 说明算子是瓶颈

反压排查

反压是流处理中最常见的性能问题。表现为上游算子的输出缓冲区满,数据堆积,整条流水线的吞吐量下降。

Flink Web UI 提供了反压可视化:每个算子会显示绿色(正常)、黄色(轻微反压)或红色(严重反压)。排查反压的基本步骤:

  1. 找到第一个反压为红色的算子——那就是瓶颈所在
  2. 检查该算子的 busyTimeMsPerSecond——如果接近 1000,说明计算能力不足,需要增加并行度
  3. 检查该算子是否涉及外部系统调用——同步 I/O 是常见的反压原因,改用 Async I/O
  4. 检查该算子的状态访问模式——大量随机读 RocksDB 可能成为瓶颈,考虑增加 block cache 或使用 HashMapStateBackend

日志与链路追踪

Flink 作业的日志散落在多个 TaskManager 上,排查问题时需要登录不同的机器查看日志。建议把 Flink 日志统一接入到 ELK 或者 Loki 等日志平台,按 JobID 和 TaskName 建立索引。

对于需要追踪单条数据处理路径的场景,可以在数据中注入 traceId,并在每个算子的处理逻辑中记录日志。但要注意日志量——高吞吐场景下(百万 TPS),每条数据都记录日志是不现实的,通常采用采样日志(比如每 1000 条记录一次)。


十二、流处理架构的演进趋势

Lambda 架构与 Kappa 架构

Lambda 架构(Nathan Marz 提出):用批处理层保证结果的准确性,用速度层(流处理)提供低延迟的近似结果。查询时合并两层的结果。问题是需要维护两套计算逻辑——一套用 Spark 写批处理,一套用 Flink 写流处理——同一个业务逻辑实现两遍,维护成本高,而且两套逻辑很容易出现不一致。

Kappa 架构(Jay Kreps 提出):去掉批处理层,所有计算都通过流处理完成。如果需要重新计算历史数据(比如修正了一个 bug),通过回放 Kafka 中的历史消息来实现。前提是 Kafka 需要保留足够长的历史数据。

Lambda 架构:
    数据 --> 批处理层(Spark)--> 批处理视图 --+
    数据 --> 速度层(Flink)--> 实时视图     --+--> 合并查询
                                                   
Kappa 架构:
    数据 --> Kafka(长期保留)--> 流处理(Flink)--> 服务层

我认为 Kappa 架构在大多数场景下是更优的选择,但有一个前提:流处理引擎的精确一次语义和故障恢复机制足够成熟。Flink 目前已经满足这个条件。但在某些场景下,比如需要对 PB 级历史数据做全量重算,纯流处理的效率仍然不如批处理——因为批处理可以利用数据的局部性做更激进的优化(比如列式存储的向量化扫描)。

流式数据湖

Flink 与数据湖格式(Apache Iceberg、Apache Hudi、Delta Lake)的集成是近年来的热点方向。核心思路是用 Flink 作为实时写入引擎,将流数据以数据湖格式写入对象存储,同时支持批处理引擎(Spark、Trino)直接查询:

-- Flink SQL 写入 Iceberg 表
CREATE TABLE iceberg_orders (
    order_id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    order_time TIMESTAMP(3)
) WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'hive_catalog',
    'catalog-type' = 'hive',
    'warehouse' = 's3://data-lake/warehouse'
);

INSERT INTO iceberg_orders
SELECT order_id, user_id, amount, order_time
FROM kafka_orders; -- 实时消费 Kafka 并写入 Iceberg

这种架构模糊了”流处理”和”数据仓库”的边界——数据通过流处理实时写入,通过批处理引擎事后分析,两者共享同一份存储。

更细粒度的容错

传统的 checkpoint 是全局一致的——所有算子的状态在逻辑上对齐到同一个时间点。这意味着恢复时,所有算子都必须回退到同一个 checkpoint。如果一个 Flink 作业有几百个算子,其中一个算子的 TaskManager 宕机了,整个作业都需要回退。

Flink 社区正在探索更细粒度的容错机制,比如基于 Region 的故障恢复——只回退受影响的算子子图,不影响其他部分。这对于大规模作业(几千个并行度)的可用性有显著提升。


十三、常见误区与实践建议

误区一:流处理可以完全替代批处理

流处理在延迟敏感的场景有明显优势,但不是万能的。以下场景批处理仍然是更好的选择:

误区二:Checkpoint 间隔越短越好

直觉上,checkpoint 间隔越短,故障后丢失的数据越少。但 checkpoint 本身有成本——状态快照、网络传输、磁盘写入。如果 checkpoint 间隔太短,上一次 checkpoint 还没完成下一次就开始了,系统大量资源被 checkpoint 占用,反而降低了正常处理的吞吐量。

经验法则:checkpoint 间隔至少应该是 checkpoint 耗时的 2-3 倍。如果一次 checkpoint 需要 30 秒,间隔至少设置为 60-90 秒。

误区三:并行度越高越好

增加并行度确实可以提升吞吐量,但也会引入更多的网络 shuffle、更多的状态碎片、更大的 checkpoint。特别是 keyBy 操作后,如果 key 的基数远小于并行度,会导致数据倾斜——部分并行实例处理大量数据,其他实例几乎空闲。

优化数据倾斜的常用手段:

// 两阶段聚合:先加盐打散,再聚合
DataStream<Tuple2<String, Long>> preAgg = events
    .map(event -> {
        // 在 key 后面附加随机后缀,打散到更多的并行实例
        String saltedKey = event.getKey() + "-" + ThreadLocalRandom.current().nextInt(10);
        return Tuple2.of(saltedKey, 1L);
    })
    .returns(Types.TUPLE(Types.STRING, Types.LONG))
    .keyBy(t -> t.f0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

DataStream<Tuple2<String, Long>> finalAgg = preAgg
    .map(t -> {
        // 去掉盐值,恢复原始 key
        String originalKey = t.f0.substring(0, t.f0.lastIndexOf("-"));
        return Tuple2.of(originalKey, t.f1);
    })
    .returns(Types.TUPLE(Types.STRING, Types.LONG))
    .keyBy(t -> t.f0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));

误区四:忽视状态 TTL

流处理作业的状态会持续增长。如果不设置状态的 TTL(Time-To-Live),状态会无限膨胀直到 OOM 或磁盘耗尽。

// 为 ValueState 设置 TTL
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(org.apache.flink.api.common.time.Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000) // 每处理 1000 条记录触发一次清理
    .build();

ValueStateDescriptor<UserProfile> descriptor =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

上一篇 中我们讨论了数据湖与数据仓库的架构设计。在 下一篇 中,我们将探讨搜索引擎的架构设计——倒排索引的工程实现、相关性评分的算法,以及分布式搜索的协调机制。


参考资料

  1. Tyler Akidau, Slava Chernyak, Reuven Lax. Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing. O’Reilly Media, 2018.
  2. Fabian Hueske, Vasiliki Kalavri. Stream Processing with Apache Flink. O’Reilly Media, 2019.
  3. Paris Carbone et al. “Apache Flink: Stream and Batch Processing in a Single Engine.” Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015.
  4. K. Mani Chandy, Leslie Lamport. “Distributed Snapshots: Determining Global States of Distributed Systems.” ACM Transactions on Computer Systems, 1985.
  5. Apache Flink 官方文档. “Stateful Stream Processing.” https://flink.apache.org/
  6. Jay Kreps. “Questioning the Lambda Architecture.” O’Reilly Radar, 2014.
  7. Confluent 官方文档. “Kafka Streams Architecture.” https://docs.confluent.io/
  8. Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 11: Stream Processing.
  9. Guozhang Wang et al. “Kafka Streams: A Streaming Platform for Processing and Analyzing Data in Real-Time.” VLDB Endowment, 2021.
  10. Apache Flink 官方文档. “Checkpointing.” https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】消息队列架构:异步解耦的设计与陷阱

在分布式系统中,服务之间的直接同步调用会导致强耦合、级联故障和性能瓶颈。消息队列(Message Queue)作为异步通信的核心基础设施,在现代架构中承担着解耦、削峰、容错等关键职责。然而,引入消息队列并非没有代价——投递语义的选择、顺序性保证、消费者组再平衡、幂等消费等问题,每一个都隐藏着工程陷阱。本文将从原理到实践…

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。


By .