一个电商平台的风控系统需要在用户下单后 200 毫秒内判断这笔交易是否存在欺诈风险。风控规则依赖最近 5 分钟内该用户的下单频率、IP 地址变化、设备指纹匹配度。数据来自三个不同的 Kafka topic,需要做多流关联。
如果用批处理来做,最快也是分钟级延迟——攒一批数据、启动 Spark 作业、读取输入、计算、写出结果。等批处理跑完,欺诈交易早已完成。如果用流处理来做,每条交易事件到达的瞬间就触发计算,200 毫秒内返回结果。
但流处理不只是”把批处理的延迟降低”这么简单。批处理面对的是有界数据集(Bounded Dataset),输入有明确的开始和结束;流处理面对的是无界数据流(Unbounded Stream),数据永远不会”到齐”。这个根本差异导致了一系列工程难题:什么时候输出结果?怎么处理迟到数据?状态存在哪里?故障恢复时怎么保证不丢不重?
本文要回答的核心问题是:流处理的精确一次语义(Exactly-Once Semantics)在工程上到底有多难?窗口计算的语义陷阱是什么?
版本说明 本文涉及的 Flink 特性基于 Apache Flink 1.17/1.18,Kafka 特性基于 Apache Kafka 3.x,Kafka Streams 基于 3.5+。
在 上一篇 中我们讨论了数据湖与数据仓库的架构设计,流处理是实时数据管道的核心计算引擎。
一、批处理与流处理的本质区别
很多文章把批处理和流处理的区别简单归结为”延迟高低”。延迟确实是最直观的差异,但不是本质差异。本质差异在于计算模型对数据边界的假设不同。
批处理:有界数据上的完整计算
批处理的核心假设是:输入数据是有界的,计算开始前所有数据已经就绪。
MapReduce 是批处理的经典范式。一个 WordCount 作业的执行流程是:
- 输入:HDFS 上的一组文件,文件数量和大小在作业启动前已知
- Map 阶段:逐行读取,输出
(word, 1)键值对 - Shuffle 阶段:按 key 分区、排序、传输到 Reducer
- Reduce 阶段:对同一个 key 的所有 value 做聚合
- 输出:写入 HDFS
整个过程有明确的开始和结束。当 Reduce 阶段完成时,结果是精确的——因为所有输入数据都已经被处理过了。
Spark 用 RDD(Resilient Distributed Dataset)替代了 MapReduce 的两阶段模型,支持更丰富的算子组合和内存计算,但核心假设没变:输入是有界的,计算是一次性的。
// Spark 批处理示例:有界数据上的 WordCount
JavaRDD<String> lines = spark.read().textFile("hdfs:///input/logs/*.txt").javaRDD();
JavaPairRDD<String, Integer> counts = lines
.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey(Integer::sum);
counts.saveAsTextFile("hdfs:///output/word-counts");
// 作业结束,结果完整且精确流处理:无界数据上的持续计算
流处理的核心假设完全相反:输入数据是无界的,数据持续到达,计算永远不会”完成”。
一个实时 WordCount 没有”最终结果”——每来一条数据,当前的计数就可能变化。这引出了批处理不需要面对的三个根本问题:
问题一:什么时候输出结果?
批处理在所有数据处理完毕后输出一次结果。流处理不能等,因为数据永远不会到齐。必须定义一个触发条件(Trigger),比如”每 10 秒输出一次当前结果”或”每收到 100 条数据输出一次”。
问题二:怎么定义”时间”?
批处理的时间语义很简单——文件的修改时间,或者日志里的时间戳。流处理必须区分两种时间:
- 事件时间(Event Time):事件实际发生的时间,嵌入在数据本身中
- 处理时间(Processing Time):事件到达流处理引擎的时间
这两个时间可能相差几秒、几分钟甚至几小时(比如移动端离线后重新上线批量上报数据)。选择哪种时间作为计算基准,直接影响结果的正确性。
问题三:故障恢复时怎么保证结果正确?
批处理的故障恢复相对简单:重新执行失败的 task 即可,输入数据在 HDFS 上不会丢失,输出是幂等的覆盖写入。流处理的状态是持续累积的——一个实时计数器已经处理了 100 万条数据,如果此时节点宕机,恢复后必须精确地从断点继续,既不能丢数据(导致少计),也不能重复处理(导致多计)。
一个对比表格
| 维度 | 批处理 | 流处理 |
|---|---|---|
| 数据边界 | 有界(Bounded) | 无界(Unbounded) |
| 计算模式 | 一次性执行,有明确结束 | 持续执行,无结束 |
| 延迟 | 分钟到小时级 | 毫秒到秒级 |
| 结果语义 | 精确(所有数据已处理) | 近似或最终精确(取决于窗口和水印) |
| 时间语义 | 通常不需要区分 | 必须区分事件时间和处理时间 |
| 状态管理 | 无状态或临时状态 | 必须管理长期运行的增量状态 |
| 故障恢复 | 重新执行 task | checkpoint + 精确一次语义 |
| 典型框架 | MapReduce、Spark Batch | Flink、Kafka Streams、Spark Structured Streaming |
Tyler Akidau 在 Google 的 Dataflow 论文和《Streaming Systems》一书中提出了一个关键洞见:批处理是流处理的特例。如果流处理引擎能够正确处理无界数据,那么处理有界数据只是一个退化场景——设定数据的边界,等所有数据到达后触发最终计算即可。Flink 正是基于这个理念设计的:批处理模式是流处理模式的一个特殊优化路径。
二、精确一次语义:工程上到底有多难
“精确一次”(Exactly-Once)可能是分布式系统中最容易被误解的术语。很多人以为它意味着”每条消息只被处理一次”。实际上,在分布式环境中,由于网络分区、节点故障、重启等原因,消息的重复投递几乎不可避免。精确一次语义真正的含义是:即使消息被重复投递,系统的最终状态和输出效果与每条消息只被处理一次完全相同。
三种投递语义
在讨论精确一次之前,先厘清三种投递语义(Delivery Semantics):
至多一次(At-Most-Once):消息可能丢失,但不会重复。实现最简单——发送后不管确认,不做重试。适合对丢失不敏感的场景,比如日志采集中的少量丢失通常可以接受。
至少一次(At-Least-Once):消息不会丢失,但可能重复。发送方在未收到确认时重试发送。大多数消息系统默认提供这个级别。问题是重复消息会导致计算结果偏差——比如一笔订单被重复计入销售额。
精确一次(Exactly-Once):消息既不丢失也不重复,或者更准确地说,重复消息的效果被消除。这是最难实现的级别。
为什么精确一次这么难
考虑一个简单的场景:流处理引擎从 Kafka 读取交易数据,计算实时销售总额,结果写入数据库。
Kafka Topic --> Flink 算子(累加销售额) --> MySQL
实现精确一次需要同时解决三个层面的问题:
第一层:Source 端的精确一次
Flink 从 Kafka 消费数据时,需要记录当前消费到的偏移量(offset)。如果 Flink 处理完一条消息后还没来得及提交 offset 就宕机了,重启后会从上次提交的 offset 重新消费,导致同一条消息被处理两次。
解决思路:不单独提交 Kafka offset,而是把 offset 和算子状态一起纳入 checkpoint。恢复时,从 checkpoint 中恢复 offset 和状态,保证两者一致。
第二层:内部处理的精确一次
Flink 的算子可能有多个并行实例,数据在算子之间通过网络传输。如果一个算子处理完数据并更新了本地状态,但发送给下游算子的消息在网络传输中丢失了,怎么办?如果重发,下游算子会重复处理。
解决思路:Flink 使用 checkpoint 机制,把所有算子的状态在逻辑上对齐到同一个时间点。恢复时,所有算子都回退到同一个 checkpoint,从那个一致的状态点重新处理。
第三层:Sink 端的精确一次
即使 Flink 内部实现了精确一次,如果写入外部系统(MySQL、Elasticsearch、Kafka)时发生了重复写入,最终结果还是不精确的。比如 Flink 已经把销售额 1000 写入了 MySQL,但在确认写入成功之前宕机了。恢复后,Flink 从 checkpoint 重新计算,又得到 1000,再次写入 MySQL。如果 MySQL 端没有去重机制,两次写入就导致了错误结果。
解决思路有两种:
- 幂等写入(Idempotent Write):写入操作天然幂等,重复执行不影响结果。比如用主键做 UPSERT,或者写入 KV 存储时用确定性的 key
- 两阶段提交(Two-Phase Commit):Flink
通过
TwoPhaseCommitSinkFunction实现。第一阶段(pre-commit)写入但不提交事务,第二阶段(commit)在 checkpoint 完成后提交事务。如果 checkpoint 失败,回滚未提交的事务
// Flink 精确一次写入 Kafka 的配置
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 关键配置:启用精确一次语义
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 事务前缀,用于 Kafka 事务 ID
.setTransactionalIdPrefix("flink-tx-")
.build();
// Kafka 消费端也需要配置
// isolation.level = read_committed
// 否则消费端会读到未提交的事务数据精确一次的性能代价
精确一次不是免费的。启用 Flink 的精确一次 Kafka Sink 后,以下性能影响是可预期的:
- 吞吐量下降:两阶段提交引入额外的协调开销。根据 Confluent 的基准测试,启用事务后 Kafka Producer 的吞吐量会下降 10%-30%,具体取决于消息大小和批量配置
- 延迟增加:事务数据在 commit
之前对下游消费者不可见(如果消费者配置了
read_committed),延迟至少增加一个 checkpoint 间隔(通常 1-10 分钟) - 资源占用增加:每个 Sink 并行实例需要维护独立的 Kafka 事务,增加 Kafka Broker 端的事务协调负担
我认为在大多数业务场景中,at-least-once 加上下游幂等去重是比 exactly-once 更务实的选择。端到端的精确一次语义对整条链路上每个组件都有要求,任何一个环节掉链子都会破坏整体保证。而幂等写入只需要在 Sink 端做好去重逻辑,架构更简单,故障排查也更容易。
三、Flink 的 Checkpoint 机制
Flink 的 checkpoint 是实现精确一次语义的核心机制。它的理论基础是 Chandy-Lamport 全局快照算法(1985),但 Flink 对这个算法做了大量工程化改造,使其适用于大规模流处理场景。
Chandy-Lamport 算法的核心思想
Chandy-Lamport 算法要解决的问题是:在一个分布式系统中,如何在不暂停系统运行的情况下,获取一个全局一致的快照?
算法的核心机制是标记消息(Marker):
- 某个进程发起快照,记录自己的本地状态,然后向所有出边发送 Marker
- 其他进程收到 Marker 后,记录自己的本地状态,并向自己的所有出边转发 Marker
- 在收到 Marker 之前,从该入边收到的所有消息都记录为”通道状态”(即传输中的消息)
- 当所有进程都完成了本地状态记录和通道状态记录,全局快照完成
关键性质:这个快照是一致的(Consistent Cut)——如果快照中包含了一个事件的效果,那么导致这个事件的所有因果前驱事件的效果也一定包含在快照中。
Flink 的 Checkpoint Barrier
Flink 把 Chandy-Lamport 的 Marker 具体化为 Checkpoint Barrier。Barrier 是一种特殊的控制消息,由 JobManager 注入到数据流的源头,随数据流向下游传播。
以下是 Flink checkpoint 的执行流程:
sequenceDiagram
participant JM as JobManager
participant S1 as Source 1
participant S2 as Source 2
participant OP as Operator(keyBy + sum)
participant SK as Sink
participant ST as State Backend(RocksDB)
JM->>S1: 触发 Checkpoint N
JM->>S2: 触发 Checkpoint N
S1->>S1: 记录 Kafka offset
S2->>S2: 记录 Kafka offset
S1->>OP: 发送 Barrier N(混在数据流中)
S2->>OP: 发送 Barrier N(混在数据流中)
Note over OP: 收到 S1 的 Barrier N<br/>但还没收到 S2 的 Barrier N<br/>开始 Barrier 对齐
OP->>OP: 缓存 S1 后续数据<br/>继续处理 S2 数据
Note over OP: 收到 S2 的 Barrier N<br/>对齐完成
OP->>ST: 异步快照算子状态
OP->>SK: 转发 Barrier N
SK->>SK: pre-commit 外部事务
SK->>JM: 确认 Checkpoint N 完成
OP->>JM: 确认 Checkpoint N 完成
S1->>JM: 确认 Checkpoint N 完成
S2->>JM: 确认 Checkpoint N 完成
JM->>JM: 所有确认收齐<br/>Checkpoint N 成功
JM->>SK: 通知 commit 事务
Barrier 对齐与非对齐 Checkpoint
上面流程中有一个关键步骤:Barrier 对齐(Barrier Alignment)。当一个算子有多个输入通道时,它可能先收到某个通道的 Barrier,而其他通道的 Barrier 还在路上。在对齐模式下,算子会:
- 收到第一个 Barrier 后,暂停处理该通道的后续数据(缓存到内存或磁盘)
- 继续正常处理其他通道的数据
- 等所有通道的 Barrier 都到齐后,执行状态快照
- 快照完成后,处理之前缓存的数据,恢复正常处理
对齐的目的是保证快照的一致性:快照中包含的状态,精确对应 Barrier 之前所有数据的处理结果,不包含 Barrier 之后任何数据的影响。
但对齐有一个严重的工程问题:反压放大(Backpressure Amplification)。
假设算子有两个输入通道 A 和 B,通道 A 的 Barrier 很快到达了,但通道 B 因为上游处理慢或网络延迟,Barrier 迟迟不来。在等待期间,通道 A 的数据被持续缓存,不能处理。如果等待时间很长,缓存会消耗大量内存,甚至引发 OOM(Out of Memory)。更糟的是,通道 A 的反压会传导到 A 的上游,导致整个流水线变慢。
Flink 1.11 引入了非对齐 Checkpoint(Unaligned Checkpoint)来解决这个问题:
// 启用非对齐 Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000); // 60 秒间隔
env.getCheckpointConfig().enableUnalignedCheckpoints();
// 非对齐 Checkpoint 在反压严重时特别有用
// 但会增加 checkpoint 的大小(因为需要保存传输中的数据)非对齐 Checkpoint 的做法是:算子收到任意一个通道的 Barrier 后,立即执行快照,同时把其他通道中尚未处理的数据(即 Barrier 之前的数据)也作为快照的一部分保存。恢复时,先恢复状态,再重放这些缓存的数据。
| 维度 | 对齐 Checkpoint | 非对齐 Checkpoint |
|---|---|---|
| 反压敏感度 | 高:反压导致对齐时间增长 | 低:不需要等待对齐 |
| Checkpoint 大小 | 只包含算子状态 | 包含算子状态 + 传输中的数据 |
| Checkpoint 耗时 | 反压时可能很长 | 相对稳定 |
| 恢复时间 | 较快(直接恢复状态) | 较慢(需要重放缓存数据) |
| 适用场景 | 正常负载、低反压 | 高反压、对 checkpoint 稳定性要求高 |
State Backend:状态存在哪里
Checkpoint 本质上是对算子状态的快照。算子状态存在哪里,直接决定了 checkpoint 的性能特征。Flink 提供两种主要的 State Backend:
HashMapStateBackend(原 MemoryStateBackend / FsStateBackend):状态存储在 JVM 堆内存中,checkpoint 时序列化写入持久化存储(通常是 HDFS 或 S3)。优点是读写速度快(内存操作),缺点是状态大小受 JVM 堆内存限制,大状态场景容易 OOM。
EmbeddedRocksDBStateBackend(原 RocksDBStateBackend):状态存储在 RocksDB 中(一个嵌入式 KV 存储引擎),RocksDB 的数据文件存放在本地磁盘。checkpoint 时通过增量快照(Incremental Checkpoint)只上传变化的 SST 文件。优点是状态大小不受内存限制(可以到 TB 级),缺点是每次状态访问都涉及序列化/反序列化和可能的磁盘 I/O。
// 配置 RocksDB State Backend 并启用增量 Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = 增量 checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints");
// RocksDB 调优参数
Configuration config = new Configuration();
// 每个算子实例的 RocksDB 内存预算
config.set(RocksDBConfigurableOptions.FIX_PER_TKM_MEMORY_SIZE, MemorySize.ofMebiBytes(256));
// Block cache 大小,影响读性能
config.set(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.ofMebiBytes(128));实际工程中,状态大小是选择 State Backend 的核心考量。如果状态在几十 GB 以内且对延迟敏感,HashMapStateBackend 是更好的选择;如果状态可能增长到几百 GB 甚至 TB 级别,RocksDB 是唯一的选择。
四、事件时间与处理时间的工程影响
时间语义的选择是流处理中最容易被低估的决策。选错了时间语义,计算结果可能系统性地偏离预期,而且这种偏差在测试环境中很难暴露——因为测试环境通常不存在网络延迟、消费者积压等生产环境的常见状况。
两种时间的定义
事件时间(Event Time):事件在源头实际发生的时间。比如用户在 14:00:03 点击了一个按钮,这个时间戳被写入点击事件的数据中。无论这条数据经过多少中间系统、延迟多久到达流处理引擎,它的事件时间永远是 14:00:03。
处理时间(Processing Time):流处理引擎实际处理这条数据的系统时钟时间。如果这条数据在 14:00:08 到达 Flink 算子,处理时间就是 14:00:08。
// Flink 中提取事件时间的典型做法
DataStream<ClickEvent> clicks = env
.addSource(new FlinkKafkaConsumer<>("clicks", new ClickEventDeserializer(), kafkaProps))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getClickTime())
);一个具体的例子:为什么处理时间不靠谱
假设我们要统计每分钟的网站点击量。下面两个场景说明了处理时间的问题:
场景一:消费者积压
某天凌晨 3:00,Flink 作业因为一个 bug 宕机了,凌晨 3:30 修复后重启。在这 30 分钟内,Kafka 中积压了 30 分钟的数据。重启后,Flink 快速消费积压数据,原本属于 3:00-3:30 的数据在几秒内被全部处理。
如果用处理时间做窗口统计,3:30 这一分钟的点击量会出现一个巨大的峰值(包含了 30 分钟积压的数据),而 3:00-3:30 的点击量为零。这完全不反映真实的用户行为。
如果用事件时间做窗口统计,数据会被正确地分配到它们实际发生的那一分钟,即使这些数据是在 3:30 才被处理的。
场景二:跨时区数据源
数据来自全球多个数据中心。美国数据中心的数据到达北京的 Flink 集群需要 200 毫秒网络延迟,而北京本地数据中心的延迟只有 2 毫秒。如果用处理时间做窗口统计,美国用户的点击会系统性地被分配到比实际发生时间晚 200 毫秒的窗口中。大多数时候这 200 毫秒跨不了窗口边界,但在窗口边界附近(比如整分钟前后 200 毫秒),美国用户的点击会被错误地归入下一个窗口。
事件时间的代价
事件时间虽然语义上更正确,但引入了额外的工程复杂性:
- 数据必须携带时间戳:需要在数据生产端嵌入准确的时间戳,并且各数据源的时钟需要合理同步(通常通过 NTP)
- 需要处理乱序数据:网络延迟、多分区消费等原因导致数据到达顺序和事件时间顺序不一致
- 需要水印(Watermark)机制:告诉系统”某个时间点之前的数据应该都到齐了”,从而触发窗口计算
- 需要处理迟到数据(Late Data):水印之后仍然可能到达的数据需要有明确的处理策略
这些代价不小,但在绝大多数需要时间窗口统计的场景中,事件时间是唯一正确的选择。我见过太多团队一开始为了省事用处理时间,后来在生产环境中被积压回放、跨区延迟等问题反复教训,最终还是切换到了事件时间。
五、水印机制与迟到数据处理
水印(Watermark)是事件时间模型中最精巧也最容易出错的机制。它的本质是一个启发式的进度指标:当系统生成一个值为 T 的水印时,它声明的是”时间戳小于 T 的数据应该都已经到达了”。
注意”应该”二字。水印是一个估计,不是保证。如果估计得太保守(水印推进太慢),结果输出的延迟会增加;如果估计得太激进(水印推进太快),部分迟到数据会被遗漏。
水印的生成策略
Flink 提供了两种内置的水印生成策略:
单调递增水印(Monotonously Increasing):假设数据严格按事件时间顺序到达,水印直接取最新数据的时间戳。适用于单分区、单数据源、严格有序的场景。
WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getTimestamp());有界乱序水印(Bounded Out-of-Orderness):假设数据可能乱序到达,但乱序程度有一个上界。水印 = 当前最大事件时间 - 允许的最大乱序时间。
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp());
// 如果当前看到的最大事件时间是 14:00:30
// 水印 = 14:00:30 - 10s = 14:00:20
// 含义:14:00:20 之前的数据应该都到齐了在实际工程中,forBoundedOutOfOrderness
是最常用的策略,但参数的选择需要基于对数据特性的深入理解。这个参数设多大合适?没有标准答案,取决于数据源的延迟分布。一种做法是先采样一段时间的数据,统计事件时间和到达时间之差的分布,然后选取
99 百分位(P99)作为最大乱序时间。
多数据源的水印推进
当数据流有多个源时,水印推进变得更加复杂。Flink 的水印推进规则是:取所有源水印的最小值作为全局水印。
这意味着如果某个源暂时没有数据产出(比如一个低流量的 Kafka partition),它的水印不会推进,进而拖住全局水印。全局水印被拖住后,下游所有依赖事件时间的窗口计算都无法触发。
Flink 提供了空闲源检测机制来应对这个问题:
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
// 如果某个源超过 30 秒没有数据,标记为空闲
// 空闲源的水印不参与全局水印计算
.withIdleness(Duration.ofSeconds(30));迟到数据的处理策略
水印推过某个窗口的结束时间后,该窗口会被触发计算并输出结果。但之后仍然可能到达属于这个窗口的数据——这就是迟到数据(Late Data)。
Flink 提供三种处理策略:
策略一:直接丢弃(默认行为)
窗口触发后,迟到数据被直接丢弃。简单粗暴,适用于对少量数据丢失不敏感的场景。
策略二:允许迟到(Allowed Lateness)
窗口触发后,保留窗口状态一段时间。在允许迟到时间内到达的数据会触发窗口的重新计算和结果更新。
DataStream<AggResult> result = clicks
.keyBy(ClickEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// 允许 5 分钟的迟到
.allowedLateness(Time.minutes(5))
.aggregate(new ClickCountAggregator());
// 窗口首次触发后,5 分钟内到达的迟到数据
// 会触发窗口重新计算并输出更新后的结果
// 5 分钟后窗口状态被清除,之后的迟到数据被丢弃策略三:旁路输出(Side Output)
把迟到数据发送到一个单独的数据流中,由业务逻辑自行处理——比如写入一个”迟到数据修正”的离线流程。
OutputTag<ClickEvent> lateDataTag = new OutputTag<ClickEvent>("late-data") {};
SingleOutputStreamOperator<AggResult> result = clicks
.keyBy(ClickEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.sideOutputLateData(lateDataTag)
.aggregate(new ClickCountAggregator());
// 获取迟到数据流,单独处理
DataStream<ClickEvent> lateData = result.getSideOutput(lateDataTag);
lateData.addSink(new LateDataSink()); // 写入离线修正流程实际工程中,我推荐的做法是组合使用策略二和策略三:设置一个合理的
allowedLateness(比如水印乱序时间的 2-3
倍),同时用旁路输出兜底超过允许迟到时间的数据。这样既能在线修正大部分迟到数据的影响,又不会丢失任何数据。
六、窗口计算的语义陷阱
窗口(Window)是流处理中把无界数据切分成有界块的核心抽象。窗口的类型选择和参数配置直接决定计算结果的语义。选错窗口类型或者误解窗口边界,会导致结果系统性偏差。
三种基本窗口类型
滚动窗口(Tumbling Window):固定长度、不重叠。每条数据恰好属于一个窗口。
时间轴:|--- 窗口1 ---|--- 窗口2 ---|--- 窗口3 ---|
[0, 5) [5, 10) [10, 15)
// 每 5 分钟统计一次各商品的销售额
DataStream<SalesResult> sales = orders
.keyBy(Order::getProductId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new SalesAggregator());滚动窗口最常见的陷阱是窗口对齐问题。Flink 的滚动窗口默认对齐到 epoch(Unix 时间戳 0)。比如一个 1 小时的窗口,边界是 0:00-1:00、1:00-2:00,而不是作业启动时间开始计算。如果你的业务定义的”小时”是从 0:30 到 1:30,需要设置 offset:
// 窗口从每小时的第 30 分钟开始
TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(30))
// 窗口边界:0:30-1:30, 1:30-2:30, ...滑动窗口(Sliding Window):固定长度、有重叠。滑动步长小于窗口长度时,一条数据可能属于多个窗口。
窗口长度 = 10,滑动步长 = 5
|--- 窗口1 ---|
|--- 窗口2 ---|
|--- 窗口3 ---|
[0, 10) [5, 15) [10, 20)
// "最近 10 分钟的移动平均",每 5 分钟更新一次
DataStream<AvgResult> movingAvg = metrics
.keyBy(Metric::getHostName)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.aggregate(new MovingAverageAggregator());滑动窗口的陷阱是资源消耗。如果窗口长度是 1 小时、滑动步长是 1 秒,那么每条数据会被分配到 3600 个窗口中。这意味着 3600 倍的状态存储和计算量。在实际工程中,滑动窗口的窗口长度/滑动步长比值不应超过几十,否则应该考虑用增量聚合(如指数衰减平均)来替代。
会话窗口(Session Window):窗口的边界由数据驱动。如果两条相邻数据的时间间隔超过指定的 gap,就分割成不同的窗口。
gap = 5
数据到达时间:1, 3, 4, 12, 13, 25
窗口划分: [1, 9) [12, 18) [25, 30)
// 用户会话分析:超过 30 分钟无活动则视为会话结束
DataStream<SessionResult> sessions = userActions
.keyBy(UserAction::getUserId)
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionAggregator());会话窗口的陷阱是窗口合并(Window Merging)。一条新数据的到来可能导致两个原本独立的窗口合并成一个。比如用户在 14:00 和 14:40 分别有一次点击,形成了两个独立的会话窗口。如果这时收到一条迟到数据(事件时间 14:20),两个窗口之间的间隔不再超过 30 分钟,它们需要合并成一个大窗口。这个合并操作涉及状态的重新组织,在大规模场景下代价不小。
窗口语义的一个真实陷阱
我在实际项目中遇到过一个让人困惑的问题:“为什么每天的 PV(Page View)统计和 Hive 离线统计对不上?”
流处理作业使用事件时间的 1 天滚动窗口统计每日 PV。按照 Flink 的默认行为,窗口边界是 UTC 0:00 到 UTC 24:00。但业务方期望的”一天”是北京时间 0:00 到 24:00(即 UTC 16:00 到次日 UTC 16:00)。UTC 0:00 到 UTC 16:00 之间的数据被分配到了错误的日期窗口中。
修复方法是设置窗口的 offset:
// 北京时间 = UTC + 8
// 需要 offset = -8 小时 = +16 小时(等价)
TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))
// 窗口边界变为:UTC 16:00 到次日 UTC 16:00
// 即北京时间 0:00 到 24:00这个问题看起来简单,但在大量流处理作业中,时区偏移是出现频率最高的 bug 之一。Hive 离线作业通常按照分区日期(北京时间)组织数据,而 Flink 的默认窗口对齐是 UTC。如果不显式处理,两边的统计结果永远对不上。
七、Kafka Streams 与 Flink 的架构对比
Kafka Streams 和 Apache Flink 是当前最主流的两个流处理框架,但它们的设计哲学和架构取舍截然不同。理解这些差异对于技术选型至关重要。
部署模型
Flink:独立的分布式计算集群。Flink 有自己的 JobManager(负责调度和协调)和 TaskManager(负责执行计算任务)。部署方式包括 Standalone 集群、YARN、Kubernetes 等。Flink 作业提交到集群后,由 JobManager 分配资源和调度执行。
Kafka Streams:一个 Java 库(Library),不是一个独立的集群。Kafka Streams 应用就是一个普通的 Java 应用程序,直接嵌入到你的微服务中。没有 master 节点,没有集群管理器。多个实例之间通过 Kafka 的消费者组(Consumer Group)协议进行分区分配和协调。
// Kafka Streams:就是一个普通的 Java 应用
public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("input-topic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-counts-store"));
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 应用运行在你自己的进程中
// 水平扩展 = 启动更多实例
}
}这个差异的工程影响是深远的:
- 运维复杂度:Flink 需要运维一个独立的计算集群(或者依赖 Kubernetes),Kafka Streams 只需要部署普通的应用实例
- 资源隔离:Flink 集群可以被多个作业共享,也可以独占;Kafka Streams 的资源就是应用进程本身的资源
- 弹性伸缩:Flink 可以通过 Reactive Mode 或 Kubernetes Operator 实现自动伸缩;Kafka Streams 的伸缩依赖于应用层的容器编排
状态管理
Flink:状态存储在 TaskManager 本地(内存或 RocksDB),通过 checkpoint 机制定期持久化到远端存储(HDFS/S3)。Flink 管理状态的整个生命周期:分配、备份、恢复、重分布(rescaling)。
Kafka Streams:状态存储在本地 RocksDB 中,同时通过 changelog topic 持久化到 Kafka。每次状态变更都会异步写入对应的 changelog topic。当应用实例宕机后在另一台机器上重启时,从 changelog topic 重建本地状态。
// Kafka Streams 的状态存储
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
.groupBy((key, word) -> word)
// Materialized.as() 创建一个有名字的状态存储
// 底层是本地 RocksDB + Kafka changelog topic
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("word-counts-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()));
// 可以通过 Interactive Queries 直接查询本地状态
ReadOnlyKeyValueStore<String, Long> store =
streams.store(StoreQueryParameters.fromNameAndType("word-counts-store",
QueryableStoreTypes.<String, Long>keyValueStore()));
Long count = store.get("hello"); // 直接查询,不需要外部数据库Kafka Streams 的状态管理方式有一个独特的优势:Interactive Queries。应用可以直接暴露本地状态供外部查询,不需要把结果写入外部数据库。这在某些场景下可以简化架构——比如一个实时计数服务,Kafka Streams 应用本身就可以直接提供查询 API。
架构全面对比
| 维度 | Apache Flink | Kafka Streams |
|---|---|---|
| 定位 | 独立的分布式流处理引擎 | 嵌入式流处理库 |
| 部署模型 | 独立集群(Standalone/YARN/K8s) | 普通 Java 应用进程 |
| Master 节点 | JobManager | 无(通过 Consumer Group 协调) |
| 数据源 | Kafka、文件、Socket、自定义 Source | 仅 Kafka |
| 数据输出 | 任意 Sink(Kafka、DB、文件等) | 仅 Kafka(或通过 API 暴露) |
| 状态存储 | 内存 / RocksDB + 远端 checkpoint | RocksDB + Kafka changelog topic |
| 精确一次 | checkpoint + 两阶段提交 | Kafka 事务(依赖 Kafka 0.11+) |
| 窗口类型 | 滚动、滑动、会话、全局 | 滚动、滑动、会话 |
| 事件时间支持 | 完善(水印、迟到数据、旁路输出) | 支持(基于记录时间戳) |
| SQL 支持 | Flink SQL(功能丰富) | 不支持 |
| 吞吐量(单集群) | 极高(百万级 events/sec) | 中高(取决于实例数和分区数) |
| 延迟 | 毫秒级 | 毫秒级 |
| 运维复杂度 | 高(需要管理集群) | 低(普通应用部署) |
| 学习曲线 | 较陡(概念多、API 丰富) | 较平缓(API 简洁) |
| 批流一体 | 支持(统一 API) | 不支持批处理 |
| 适用场景 | 大规模、复杂逻辑、多数据源 | 中小规模、Kafka 生态内 |
怎么选
我的判断标准是:
- 如果你的数据已经在 Kafka 中,计算逻辑不太复杂(聚合、过滤、简单关联),团队没有 Flink 运维经验——用 Kafka Streams。它的部署和运维成本显著低于 Flink,而且和 Kafka 生态天然集成
- 如果需要多数据源关联、复杂 CEP(复杂事件处理)、大规模状态管理、SQL 支持,或者需要批流一体——用 Flink。它的计算能力和功能丰富度远超 Kafka Streams
- 如果你已经有 Kubernetes 基础设施并且团队对容器化运维有经验——Flink on Kubernetes(通过 Flink Kubernetes Operator)可以大幅降低 Flink 的运维成本,使得选择 Flink 的门槛降低
八、工程案例:某金融平台的实时风控系统
以下案例来自一个真实的金融平台风控系统改造项目。出于保密要求,具体数字和细节做了脱敏处理,但架构决策和遇到的问题是真实的。
背景
该平台原有的风控系统是基于批处理的:每隔 5 分钟运行一次 Spark 作业,从数据库中拉取最近 5 分钟的交易数据,执行风控规则,标记可疑交易。问题很明显——5 分钟的延迟意味着一个欺诈者可以在被发现之前完成多笔交易。业务方要求将风控判定延迟降低到 500 毫秒以内。
架构设计
改造后的架构基于 Flink:
交易系统 --> Kafka(交易事件 topic)--> Flink 风控作业 --> Kafka(风控结果 topic)--> 交易网关
|
用户画像 Kafka topic
设备指纹 Kafka topic
历史风控 RocksDB 状态
关键设计决策:
决策一:选择事件时间而非处理时间
风控规则中有一条:“5 分钟内同一用户在 3 个不同 IP 发起交易,标记为可疑”。这个”5 分钟”必须基于交易实际发生的时间(事件时间),而不是交易数据到达风控引擎的时间。原因和前面讨论的一样——如果 Kafka 消费出现积压,处理时间会严重失真。
决策二:使用 RocksDB State Backend
风控状态需要维护每个用户最近 24 小时的交易记录摘要(IP 列表、设备列表、交易金额分布等)。平台有千万级活跃用户,每个用户的状态约 2KB,总状态量约 20GB。这个规模超出了 JVM 堆内存的合理范围,必须使用 RocksDB。
决策三:选择 at-least-once + 幂等写入,而非端到端 exactly-once
风控结果写入 Kafka 后,由交易网关消费。交易网关根据交易 ID 做幂等处理——同一笔交易如果收到多次风控结果,只执行第一次。这样即使 Flink 重启后重复产出了一些风控结果,也不会导致一笔交易被重复拦截或重复放行。
放弃端到端 exactly-once 的原因是:启用 Kafka 事务后,风控结果的端到端延迟会增加一个 checkpoint 间隔(配置为 60 秒),这和 500 毫秒的延迟要求直接冲突。
遇到的问题
问题一:水印推进停滞
上线后发现,凌晨时段部分 Kafka partition
长时间没有数据,导致水印不推进,窗口计算卡住。修复方案是配置
withIdleness(Duration.ofSeconds(30))。
问题二:RocksDB compaction 导致延迟毛刺
RocksDB 后台的 compaction 操作会导致间歇性的延迟毛刺(P99 从 100ms 飙升到 2s)。优化方案:
// 限制 RocksDB compaction 的并发度和 I/O 带宽
RocksDBOptionsFactory customFactory = new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handles) {
return currentOptions
.setMaxBackgroundJobs(2) // 限制后台线程数
.setMaxOpenFiles(256) // 限制打开文件数
.setBytesPerSync(1048576); // 1MB,控制 sync 频率
}
@Override
public ColumnFamilyOptions createColumnFamilyOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handles) {
return currentOptions
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setTargetFileSizeBase(67108864) // 64MB
.setMaxBytesForLevelBase(268435456); // 256MB
}
};问题三:Checkpoint 超时
状态量增长到 20GB 后,全量 checkpoint 超时频繁发生。切换到增量 checkpoint 后问题解决——增量 checkpoint 只上传自上次 checkpoint 以来变化的 SST 文件,每次传输的数据量从 20GB 降低到几百 MB。
最终效果
改造后的系统在日均 5000 万笔交易的负载下,P99 风控判定延迟为 180 毫秒,满足 500 毫秒的要求。风控规则命中率提升了 15%(因为实时计算能捕获到批处理遗漏的短时间窗口内的行为模式)。
九、流处理的高级模式
双流 Join
流处理中的 Join 和批处理中的 Join 有本质区别。批处理的 Join 操作在两个完整的数据集上执行——两边的数据在 Join 开始前就已经全部就绪。流处理的 Join 面对的是两个无界数据流——来自流 A 的一条数据到达时,流 B 中与之匹配的数据可能还没到(或者可能永远不会到)。
Flink 提供了基于时间窗口的 Interval Join:
// 订单流和支付流的 Join
// 订单创建后 5 分钟内应该收到支付事件
DataStream<Order> orders = /* 订单流,包含事件时间 */;
DataStream<Payment> payments = /* 支付流,包含事件时间 */;
DataStream<OrderPaymentResult> joined = orders
.keyBy(Order::getOrderId)
.intervalJoin(payments.keyBy(Payment::getOrderId))
// 订单事件时间之后 0 到 5 分钟的支付事件会被匹配
.between(Time.seconds(0), Time.minutes(5))
.process(new ProcessJoinFunction<Order, Payment, OrderPaymentResult>() {
@Override
public void processElement(Order order, Payment payment,
Context ctx, Collector<OrderPaymentResult> out) {
out.collect(new OrderPaymentResult(
order.getOrderId(),
order.getAmount(),
payment.getPaymentMethod(),
payment.getPayTime()
));
}
});Interval Join 的语义是:对于流 A 中时间戳为
t 的元素,它会和流 B 中时间戳在
[t + lowerBound, t + upperBound]
范围内的元素进行
Join。这个时间窗口限制了状态的大小——只需要缓存时间窗口范围内的数据,超过时间窗口的数据可以被清理。
复杂事件处理(CEP)
Flink CEP 库允许在数据流中检测符合特定模式的事件序列。比如检测”用户登录失败 3 次后,5 分钟内从新 IP 登录成功”这种欺诈模式:
// 定义事件模式
Pattern<LoginEvent, ?> fraudPattern = Pattern
.<LoginEvent>begin("failed-logins")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event) {
return event.getStatus().equals("FAILED");
}
})
.times(3) // 连续 3 次失败
.consecutive() // 必须是连续的,中间不能有成功
.next("success-from-new-ip")
.where(new IterativeCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent event, Context<LoginEvent> ctx) {
// 成功登录,且 IP 与之前的失败登录不同
if (!event.getStatus().equals("SUCCESS")) return false;
Set<String> failedIps = new HashSet<>();
for (LoginEvent failed : ctx.getEventsForPattern("failed-logins")) {
failedIps.add(failed.getIp());
}
return !failedIps.contains(event.getIp());
}
})
.within(Time.minutes(5)); // 整个模式必须在 5 分钟内匹配
// 应用模式并提取匹配结果
PatternStream<LoginEvent> patternStream = CEP.pattern(
loginEvents.keyBy(LoginEvent::getUserId), fraudPattern);
DataStream<FraudAlert> alerts = patternStream.select(
new PatternSelectFunction<LoginEvent, FraudAlert>() {
@Override
public FraudAlert select(Map<String, List<LoginEvent>> pattern) {
List<LoginEvent> failures = pattern.get("failed-logins");
LoginEvent success = pattern.get("success-from-new-ip").get(0);
return new FraudAlert(
success.getUserId(),
failures.size(),
success.getIp(),
"SUSPICIOUS_LOGIN_PATTERN"
);
}
});CEP 的底层实现基于 NFA(Non-deterministic Finite
Automaton,非确定性有限自动机)。每个正在匹配中的模式实例都是
NFA
中的一个状态。在高吞吐场景下,如果模式中包含不够精确的匹配条件(比如
where 条件过于宽泛),NFA
的状态数量可能爆炸式增长,导致内存耗尽。因此,CEP
模式的设计需要尽可能精确,并且设置合理的 within
时间限制来控制状态生命周期。
Async I/O
流处理中经常需要关联外部系统的数据——比如根据用户 ID 查询用户画像数据库。如果用同步方式查询,每条消息的处理延迟都会增加一次数据库 RTT(通常几毫秒到几十毫秒),严重限制吞吐量。
Flink 提供了 Async I/O 算子来解决这个问题:
// 异步查询用户画像
DataStream<EnrichedEvent> enriched = AsyncDataStream.unorderedWait(
events,
new AsyncFunction<Event, EnrichedEvent>() {
private transient AsyncHttpClient httpClient;
@Override
public void open(Configuration parameters) {
httpClient = new AsyncHttpClient();
}
@Override
public void asyncInvoke(Event event, ResultFuture<EnrichedEvent> resultFuture) {
CompletableFuture<UserProfile> future = httpClient
.get("http://user-service/profiles/" + event.getUserId());
future.thenAccept(profile -> {
resultFuture.complete(Collections.singleton(
new EnrichedEvent(event, profile)
));
}).exceptionally(ex -> {
resultFuture.complete(Collections.singleton(
new EnrichedEvent(event, UserProfile.UNKNOWN)
));
return null;
});
}
},
30, TimeUnit.SECONDS, // 超时时间
100 // 最大并发请求数
);unorderedWait
允许异步查询的结果乱序返回(先完成的先输出),从而最大化吞吐量。如果业务要求保持输入顺序,可以使用
orderedWait,但吞吐量会受到最慢查询的限制。
十、Flink SQL 与流批一体
Flink 1.9 以后大力推进 Flink SQL,目标是用 SQL 统一批处理和流处理的开发体验。Flink SQL 的底层引擎会根据输入数据是有界还是无界,自动选择批处理或流处理的执行策略。
流式 SQL 的语义
流式 SQL 和传统 SQL 最大的区别在于:传统 SQL 查询一个静态的表,返回一个静态的结果;流式 SQL 查询一个持续变化的表(Dynamic Table),产出一个持续更新的结果流。
Flink SQL 用 Changelog Stream 来表达这种持续更新的语义:
-- 创建 Kafka Source 表
CREATE TABLE orders (
order_id STRING,
user_id STRING,
product_id STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'broker1:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 实时统计每个用户的累计消费金额
-- 这是一个持续查询,结果随新订单到来而更新
SELECT user_id, SUM(amount) AS total_spent
FROM orders
GROUP BY user_id;
-- 输出是一个 changelog 流:
-- +I (user_001, 100.00) -- 插入
-- -U (user_001, 100.00) -- 更新前的旧值(撤回)
-- +U (user_001, 250.00) -- 更新后的新值时间窗口聚合
Flink SQL 支持在 SQL 语法中直接使用时间窗口:
-- 每 5 分钟统计各商品的销售额和订单数
SELECT
product_id,
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM orders
GROUP BY
product_id,
TUMBLE(order_time, INTERVAL '5' MINUTE);Flink 1.13 引入了更灵活的窗口 TVF(Table-Valued Function)语法:
-- 使用窗口 TVF 语法(推荐)
SELECT
product_id,
window_start,
window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)
)
GROUP BY product_id, window_start, window_end;窗口 TVF 语法的优势是支持更多的窗口类型(包括累积窗口 CUMULATE),以及窗口的 Top-N 等高级分析操作。
Flink SQL 的局限
Flink SQL 的表达能力在快速进步,但目前仍有一些场景需要退回到 DataStream API:
- 复杂的状态管理逻辑:比如需要手动管理多个不同 TTL 的状态,或者需要自定义的状态清理策略
- 复杂的 CEP 模式:SQL 目前不支持 CEP 语法(MATCH_RECOGNIZE 有限支持)
- 精细的性能调优:比如需要控制 RocksDB 的 compaction 策略、内存分配等底层参数
- 自定义窗口触发逻辑:SQL 的窗口触发机制是固定的,不支持自定义 Trigger
在这些场景下,通常的做法是用 DataStream API
实现核心逻辑,再通过 Table.toChangelogStream()
和 StreamTableEnvironment.fromChangelogStream()
与 SQL 层互通。
十一、流处理的可观测性
流处理作业是长期运行的服务,可观测性至关重要。一个风控 Flink 作业 7x24 小时运行,任何异常都需要快速发现和定位。
关键监控指标
吞吐量相关: -
numRecordsInPerSecond /
numRecordsOutPerSecond:每秒处理的记录数 -
numBytesInPerSecond /
numBytesOutPerSecond:每秒处理的字节数
延迟相关: -
currentInputWatermark:当前水印值。如果水印长时间不推进,说明有数据源空闲或严重积压
- Kafka consumer
lag:消费者偏移量与最新偏移量的差值。持续增长说明处理速度跟不上数据产出速度
Checkpoint 相关: -
lastCheckpointDuration:最近一次 checkpoint
的耗时 - lastCheckpointSize:最近一次
checkpoint 的大小 -
numberOfFailedCheckpoints:checkpoint
失败次数。连续失败意味着系统无法建立恢复点
反压相关: -
isBackPressured:算子是否处于反压状态 -
busyTimeMsPerSecond:算子在 1
秒内的繁忙时间(毫秒)。接近 1000 说明算子是瓶颈
反压排查
反压是流处理中最常见的性能问题。表现为上游算子的输出缓冲区满,数据堆积,整条流水线的吞吐量下降。
Flink Web UI 提供了反压可视化:每个算子会显示绿色(正常)、黄色(轻微反压)或红色(严重反压)。排查反压的基本步骤:
- 找到第一个反压为红色的算子——那就是瓶颈所在
- 检查该算子的
busyTimeMsPerSecond——如果接近 1000,说明计算能力不足,需要增加并行度 - 检查该算子是否涉及外部系统调用——同步 I/O 是常见的反压原因,改用 Async I/O
- 检查该算子的状态访问模式——大量随机读 RocksDB 可能成为瓶颈,考虑增加 block cache 或使用 HashMapStateBackend
日志与链路追踪
Flink 作业的日志散落在多个 TaskManager 上,排查问题时需要登录不同的机器查看日志。建议把 Flink 日志统一接入到 ELK 或者 Loki 等日志平台,按 JobID 和 TaskName 建立索引。
对于需要追踪单条数据处理路径的场景,可以在数据中注入 traceId,并在每个算子的处理逻辑中记录日志。但要注意日志量——高吞吐场景下(百万 TPS),每条数据都记录日志是不现实的,通常采用采样日志(比如每 1000 条记录一次)。
十二、流处理架构的演进趋势
Lambda 架构与 Kappa 架构
Lambda 架构(Nathan Marz 提出):用批处理层保证结果的准确性,用速度层(流处理)提供低延迟的近似结果。查询时合并两层的结果。问题是需要维护两套计算逻辑——一套用 Spark 写批处理,一套用 Flink 写流处理——同一个业务逻辑实现两遍,维护成本高,而且两套逻辑很容易出现不一致。
Kappa 架构(Jay Kreps 提出):去掉批处理层,所有计算都通过流处理完成。如果需要重新计算历史数据(比如修正了一个 bug),通过回放 Kafka 中的历史消息来实现。前提是 Kafka 需要保留足够长的历史数据。
Lambda 架构:
数据 --> 批处理层(Spark)--> 批处理视图 --+
数据 --> 速度层(Flink)--> 实时视图 --+--> 合并查询
Kappa 架构:
数据 --> Kafka(长期保留)--> 流处理(Flink)--> 服务层
我认为 Kappa 架构在大多数场景下是更优的选择,但有一个前提:流处理引擎的精确一次语义和故障恢复机制足够成熟。Flink 目前已经满足这个条件。但在某些场景下,比如需要对 PB 级历史数据做全量重算,纯流处理的效率仍然不如批处理——因为批处理可以利用数据的局部性做更激进的优化(比如列式存储的向量化扫描)。
流式数据湖
Flink 与数据湖格式(Apache Iceberg、Apache Hudi、Delta Lake)的集成是近年来的热点方向。核心思路是用 Flink 作为实时写入引擎,将流数据以数据湖格式写入对象存储,同时支持批处理引擎(Spark、Trino)直接查询:
-- Flink SQL 写入 Iceberg 表
CREATE TABLE iceberg_orders (
order_id STRING,
user_id STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'warehouse' = 's3://data-lake/warehouse'
);
INSERT INTO iceberg_orders
SELECT order_id, user_id, amount, order_time
FROM kafka_orders; -- 实时消费 Kafka 并写入 Iceberg这种架构模糊了”流处理”和”数据仓库”的边界——数据通过流处理实时写入,通过批处理引擎事后分析,两者共享同一份存储。
更细粒度的容错
传统的 checkpoint 是全局一致的——所有算子的状态在逻辑上对齐到同一个时间点。这意味着恢复时,所有算子都必须回退到同一个 checkpoint。如果一个 Flink 作业有几百个算子,其中一个算子的 TaskManager 宕机了,整个作业都需要回退。
Flink 社区正在探索更细粒度的容错机制,比如基于 Region 的故障恢复——只回退受影响的算子子图,不影响其他部分。这对于大规模作业(几千个并行度)的可用性有显著提升。
十三、常见误区与实践建议
误区一:流处理可以完全替代批处理
流处理在延迟敏感的场景有明显优势,但不是万能的。以下场景批处理仍然是更好的选择:
- 全量历史数据重算:比如修改了一个指标的计算口径,需要重算过去一年的所有数据。虽然可以通过回放 Kafka 实现,但批处理引擎在大规模扫描上的效率更高
- 复杂的多表 Join:涉及多个大表的 Join,批处理可以利用 sort-merge join、broadcast join 等策略进行优化,而流处理的 Join 受限于状态大小
- 机器学习模型训练:模型训练通常需要多次迭代遍历全量数据,这是批处理的典型场景
误区二:Checkpoint 间隔越短越好
直觉上,checkpoint 间隔越短,故障后丢失的数据越少。但 checkpoint 本身有成本——状态快照、网络传输、磁盘写入。如果 checkpoint 间隔太短,上一次 checkpoint 还没完成下一次就开始了,系统大量资源被 checkpoint 占用,反而降低了正常处理的吞吐量。
经验法则:checkpoint 间隔至少应该是 checkpoint 耗时的 2-3 倍。如果一次 checkpoint 需要 30 秒,间隔至少设置为 60-90 秒。
误区三:并行度越高越好
增加并行度确实可以提升吞吐量,但也会引入更多的网络 shuffle、更多的状态碎片、更大的 checkpoint。特别是 keyBy 操作后,如果 key 的基数远小于并行度,会导致数据倾斜——部分并行实例处理大量数据,其他实例几乎空闲。
优化数据倾斜的常用手段:
// 两阶段聚合:先加盐打散,再聚合
DataStream<Tuple2<String, Long>> preAgg = events
.map(event -> {
// 在 key 后面附加随机后缀,打散到更多的并行实例
String saltedKey = event.getKey() + "-" + ThreadLocalRandom.current().nextInt(10);
return Tuple2.of(saltedKey, 1L);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
DataStream<Tuple2<String, Long>> finalAgg = preAgg
.map(t -> {
// 去掉盐值,恢复原始 key
String originalKey = t.f0.substring(0, t.f0.lastIndexOf("-"));
return Tuple2.of(originalKey, t.f1);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));误区四:忽视状态 TTL
流处理作业的状态会持续增长。如果不设置状态的 TTL(Time-To-Live),状态会无限膨胀直到 OOM 或磁盘耗尽。
// 为 ValueState 设置 TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000) // 每处理 1000 条记录触发一次清理
.build();
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);在 上一篇 中我们讨论了数据湖与数据仓库的架构设计。在 下一篇 中,我们将探讨搜索引擎的架构设计——倒排索引的工程实现、相关性评分的算法,以及分布式搜索的协调机制。
参考资料
- Tyler Akidau, Slava Chernyak, Reuven Lax. Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing. O’Reilly Media, 2018.
- Fabian Hueske, Vasiliki Kalavri. Stream Processing with Apache Flink. O’Reilly Media, 2019.
- Paris Carbone et al. “Apache Flink: Stream and Batch Processing in a Single Engine.” Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015.
- K. Mani Chandy, Leslie Lamport. “Distributed Snapshots: Determining Global States of Distributed Systems.” ACM Transactions on Computer Systems, 1985.
- Apache Flink 官方文档. “Stateful Stream Processing.” https://flink.apache.org/
- Jay Kreps. “Questioning the Lambda Architecture.” O’Reilly Radar, 2014.
- Confluent 官方文档. “Kafka Streams Architecture.” https://docs.confluent.io/
- Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly Media, 2017. Chapter 11: Stream Processing.
- Guozhang Wang et al. “Kafka Streams: A Streaming Platform for Processing and Analyzing Data in Real-Time.” VLDB Endowment, 2021.
- Apache Flink 官方文档. “Checkpointing.” https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/checkpointing/
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】消息队列架构:异步解耦的设计与陷阱
在分布式系统中,服务之间的直接同步调用会导致强耦合、级联故障和性能瓶颈。消息队列(Message Queue)作为异步通信的核心基础设施,在现代架构中承担着解耦、削峰、容错等关键职责。然而,引入消息队列并非没有代价——投递语义的选择、顺序性保证、消费者组再平衡、幂等消费等问题,每一个都隐藏着工程陷阱。本文将从原理到实践…
【系统架构设计百科】数据密集型架构:批流一体与 Lakehouse
从 Lambda 架构的双轨困境出发,深入剖析 Kappa 架构与批流一体的演进逻辑,对比 Flink 与 Spark Structured Streaming 的核心差异,解读 Delta Lake、Apache Iceberg 等 Table Format 的技术之争,并给出实时数仓的落地架构方案。
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。