某跨境电商平台的数据团队管理着一套运行了四年的分析系统。数据仓库用 Hive 搭建,T+1 的批处理任务每晚跑 6 个小时,凌晨 6 点出报表。实时看板用 Flink 从 Kafka 消费事件,写入 ClickHouse。两套系统各自维护一份数据处理逻辑——同一个”订单金额”指标,批处理管道里用的是 SQL 聚合加上汇率修正,实时管道里用的是 Java 代码做的近似计算。两边的口径不一致,导致每天早上运营团队打开报表时,昨天的实时数据和今天的批处理数据对不上,差异率在 3%-8% 之间。
这不是个例。凡是同时运行批处理和实时处理两条管道的团队,几乎都会遇到”数据不一致”的问题。Lambda 架构在理论上要求两条路径产出相同结果,但在工程实践中,两套代码、两套计算引擎、两套存储系统之间的语义差异几乎无法消除。
过去十年,数据分析架构经历了三次范式迁移:从数据仓库(Data Warehouse)到数据湖(Data Lake),再到 Lambda/Kappa 的流批分离与统一尝试,最终走向 Lakehouse 架构。每一次迁移都在解决上一代架构的核心缺陷,但也带来新的工程挑战。
本文从数据仓库的经典设计开始,逐步拆解每种架构的设计动机、核心机制和工程限制,最后给出架构选型的决策框架。
一、数据仓库的经典架构与局限
1.1 数据仓库的设计原则
数据仓库(Data Warehouse)的概念由 Bill Inmon 在 1990 年代系统化提出,核心定义是”面向主题的、集成的、非易失的、时变的数据集合”。它的设计目标很明确:把分散在各个业务系统中的数据抽取出来,经过清洗和转换,加载到一个统一的存储中,供分析和决策使用。
这个过程就是经典的 ETL(Extract-Transform-Load)流程:
业务数据库 A ──┐
业务数据库 B ──┤── Extract ── Transform ── Load ── 数据仓库
业务数据库 C ──┘
数据仓库内部通常采用星型模型(Star Schema)或雪花模型(Snowflake Schema)组织数据。星型模型的核心是一张事实表(Fact Table)围绕多张维度表(Dimension Table):
-- 星型模型示例:订单事实表
CREATE TABLE fact_orders (
order_id BIGINT PRIMARY KEY,
customer_key INT REFERENCES dim_customer(customer_key),
product_key INT REFERENCES dim_product(product_key),
date_key INT REFERENCES dim_date(date_key),
store_key INT REFERENCES dim_store(store_key),
quantity INT,
unit_price DECIMAL(10, 2),
discount_amount DECIMAL(10, 2),
total_amount DECIMAL(12, 2)
);
-- 维度表:客户维度
CREATE TABLE dim_customer (
customer_key INT PRIMARY KEY,
customer_id VARCHAR(32),
customer_name VARCHAR(128),
segment VARCHAR(32),
region VARCHAR(64),
registration_date DATE,
-- 缓慢变化维度(SCD Type 2)
effective_from DATE,
effective_to DATE,
is_current BOOLEAN
);
-- 维度表:时间维度
CREATE TABLE dim_date (
date_key INT PRIMARY KEY,
full_date DATE,
year INT,
quarter INT,
month INT,
week_of_year INT,
day_of_week INT,
is_holiday BOOLEAN
);这种设计有几个明确的优势:查询模式可预测(分析师总是围绕维度做聚合),数据质量有保证(ETL 过程中做了清洗和校验),查询性能可优化(预先知道查询模式,可以建索引、做预聚合)。
1.2 经典数据仓库的四个局限
但数据仓库在过去十年遇到了四个根本性的挑战:
第一,数据类型受限。 传统数据仓库只能存储结构化数据(Structured Data)。当企业需要分析日志文件、图片元数据、用户行为轨迹、传感器时序数据时,这些半结构化(Semi-structured)和非结构化(Unstructured)数据无法直接放进关系型数据仓库。强行转换会丢失信息或引入大量 NULL 列。
第二,扩展成本高昂。 传统数据仓库(Teradata、Oracle Exadata、Netezza)采用计算与存储耦合的架构,扩展意味着同时购买更多的计算节点和存储节点。当数据量从 TB 级增长到 PB 级时,硬件成本呈线性甚至超线性增长。
第三,Schema-on-Write 的刚性。 数据仓库要求数据在写入前就确定 schema。这意味着每当业务系统新增一个字段,数据仓库就要经历”修改 schema → 修改 ETL → 回填数据”的完整流程。在业务快速迭代的环境下,这个流程的周转时间经常以周计算。
第四,实时性不足。 ETL 批处理通常按小时或按天运行。在”T+1”的模式下,业务决策者看到的数据至少是昨天的。对于需要分钟级甚至秒级数据新鲜度的场景(实时风控、实时推荐、运营看板),传统数据仓库完全无法满足。
1.3 云数据仓库的部分改进
Snowflake、BigQuery、Redshift 等云数据仓库解决了部分问题——计算与存储分离降低了扩展成本,半结构化数据类型(VARIANT、JSON)提供了一定的灵活性——但它们的核心模型仍然是 Schema-on-Write,仍然以结构化分析为主,对非结构化数据和低延迟场景的支持有限。
-- Snowflake 中处理半结构化数据
CREATE TABLE raw_events (
event_id VARCHAR(64),
event_time TIMESTAMP,
payload VARIANT -- 半结构化 JSON 数据
);
-- 查询 VARIANT 字段
SELECT
event_id,
payload:user_id::STRING AS user_id,
payload:action::STRING AS action,
payload:properties:page_url::STRING AS page_url
FROM raw_events
WHERE event_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP());这些改进重要但不够。企业需要的是一个能同时存储结构化、半结构化和非结构化数据,能支持批处理和实时处理,能灵活适应 schema 变化的统一平台。数据湖就是在这个需求背景下出现的。
二、数据湖的设计理念与”数据沼泽”问题
2.1 数据湖的核心思想
数据湖(Data Lake)的概念由 Pentaho 的 CTO James Dixon 在 2010 年左右提出。它的核心理念可以用两句话概括:
- 存储一切原始数据——不管格式、不管结构、不管是否有明确的使用场景,先存下来再说。
- Schema-on-Read——数据在写入时不需要预定义 schema,在读取时再根据具体需求解释数据结构。
技术上,数据湖通常建立在分布式文件系统(HDFS)或对象存储(S3、Azure Blob Storage、GCS)之上,数据以原始格式存储——CSV、JSON、Parquet、Avro、ORC、图片、视频、日志文件都可以直接写入。
数据湖的典型存储布局:
s3://data-lake/
raw/ # 原始数据(Raw Zone)
events/2026/04/13/ # 按日期分区的事件日志
transactions/2026/04/13/ # 交易数据
images/product-catalog/ # 产品图片
logs/nginx/2026/04/13/ # Web 服务器日志
curated/ # 清洗后的数据(Curated Zone)
orders/ # 经过 ETL 清洗的订单数据
customers/ # 客户主数据
aggregated/ # 聚合数据(Aggregated Zone)
daily_sales/ # 日销售汇总
monthly_reports/ # 月度报表
2.2 数据湖的技术优势
相比数据仓库,数据湖有三个明确的技术优势:
成本优势。 对象存储的成本远低于数据仓库的专用存储。以 AWS 为例,S3 Standard 的存储成本约 0.023 USD/GB/月,而 Redshift 的存储成本约 0.024 USD/GB/月(ra3.xlplus 节点)。看起来差不多,但 S3 支持分层存储——Infrequent Access 降到 0.0125 USD/GB/月,Glacier 降到 0.004 USD/GB/月。对于 PB 级的历史数据,分层存储的成本优势非常显著。
灵活性优势。 Schema-on-Read 意味着数据湖可以接受任何格式的数据,不需要预先定义 schema。这对于探索性分析(Exploratory Analysis)特别有价值——数据科学家可以先把数据灌进来,然后在 Jupyter Notebook 里用 Pandas 或 Spark 自由探索。
计算与存储解耦。 数据存在 S3 里,计算用 EMR/Spark/Presto/Athena,两者独立扩展。不需要分析时可以关掉计算集群,只付存储费用。
2.3 数据沼泽问题
但数据湖在实践中暴露了一个严重的问题——“数据沼泽”(Data Swamp)。当大量数据被无组织地倾倒进数据湖后,没人知道里面有什么数据、数据的质量如何、数据之间有什么关系。数据湖退化成了一个谁都往里面扔数据、但谁都不敢用的”数据垃圾场”。
数据沼泽的根本原因有三个:
缺乏事务支持。 HDFS 和 S3 都不支持 ACID 事务。一个 Spark 任务正在写 Parquet 文件,另一个查询同时在读,读到的可能是半写的、不完整的数据。没有原子性保证,就无法保证数据一致性。
缺乏 schema 演进机制。 Schema-on-Read 的灵活性是以”没有 schema 约束”为代价的。上游系统改了字段名,下游的分析任务就会静默失败或者产出垃圾数据。没有 schema 校验,错误发现得越晚,修复成本越高。
缺乏数据管理能力。 没有元数据管理,不知道数据的来源(Lineage)、数据的拥有者(Ownership)、数据的更新频率、数据的质量指标。数据湖里的数据越多,找到”可信数据”的难度就越大。
# 数据沼泽的典型问题:读取到不一致的数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataSwampDemo").getOrCreate()
# 写入任务正在往这个路径写 Parquet 文件
# 如果读取任务在写入过程中执行,可能读到部分数据
df = spark.read.parquet("s3://data-lake/raw/events/2026/04/13/")
# 没有 schema 校验,上游改了字段名不会报错
# 只是查询结果变成 NULL
df.select("user_id", "event_type", "timestamp").show()
# 如果上游把 "user_id" 改成了 "userId",
# 这里 user_id 列全是 NULL,但不会抛异常数据沼泽问题本质上说明了一件事:数据湖解决了”存什么”的问题,但没有解决”怎么管”的问题。后来出现的 Delta Lake、Apache Iceberg、Apache Hudi,正是为了给数据湖补上事务、schema 管理和数据质量这些缺失的能力。
三、Lambda 架构:双路径设计与维护成本
3.1 Lambda 架构的起源与设计
Lambda 架构由 Nathan Marz(Storm 的作者)在 2011 年提出,核心目标是同时满足批处理的完整性和实时处理的低延迟。它的设计思路很直接:既然批处理准确但慢,实时处理快但近似,那就同时跑两条路径,用两者的结果互补。
Lambda 架构包含三层:
- 批处理层(Batch Layer):对全量历史数据做批处理,产出完整且准确的视图。通常每天或每几小时运行一次。
- 速度层(Speed Layer):对实时数据流做增量处理,产出低延迟但可能不完全准确的视图。
- 服务层(Serving Layer):合并批处理层和速度层的结果,对外提供查询服务。当批处理结果追上来后,速度层的对应数据就可以丢弃。
graph LR
A[数据源] --> B[消息队列<br/>Kafka]
B --> C[批处理层<br/>Spark/MapReduce]
B --> D[速度层<br/>Storm/Flink]
C --> E[批处理视图<br/>HDFS/Hive]
D --> F[实时视图<br/>HBase/Redis]
E --> G[服务层<br/>合并查询]
F --> G
G --> H[查询接口]
3.2 批处理层的实现
批处理层通常用 Spark 或 MapReduce 实现,定期对全量数据重新计算:
# 批处理层:每日订单汇总(PySpark)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("BatchLayer-DailyOrderSummary") \
.getOrCreate()
# 读取全量订单数据
orders = spark.read.parquet("s3://data-lake/raw/orders/")
# 计算每日、每地区的订单汇总
daily_summary = orders \
.filter(F.col("order_status") == "completed") \
.groupBy(
F.to_date("order_time").alias("order_date"),
F.col("region")
) \
.agg(
F.count("order_id").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value"),
F.countDistinct("customer_id").alias("unique_customers")
)
# 写入批处理视图
daily_summary.write \
.mode("overwrite") \
.partitionBy("order_date") \
.parquet("s3://data-lake/batch-views/daily-order-summary/")3.3 速度层的实现
速度层用流处理引擎实现,对实时事件做增量计算:
// 速度层:实时订单计数(Flink Java)
public class RealtimeOrderCounter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kafka 消费订单事件
KafkaSource<OrderEvent> source = KafkaSource.<OrderEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("order-events")
.setGroupId("speed-layer")
.setValueOnlyDeserializer(new OrderEventDeserializer())
.build();
DataStream<OrderEvent> orders = env.fromSource(
source,
WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(
Duration.ofSeconds(10)
).withTimestampAssigner((event, ts) -> event.getEventTime()),
"OrderSource"
);
// 按地区和 1 分钟窗口聚合
DataStream<OrderSummary> realtimeSummary = orders
.filter(e -> "completed".equals(e.getStatus()))
.keyBy(OrderEvent::getRegion)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregator());
// 写入实时视图(HBase 或 Redis)
realtimeSummary.addSink(new HBaseSink("realtime_order_summary"));
env.execute("SpeedLayer-RealtimeOrderCounter");
}
}3.4 服务层的合并逻辑
服务层需要合并批处理视图和实时视图,这是 Lambda 架构中最复杂的部分:
# 服务层:合并批处理和实时视图
import datetime
from typing import Dict
class LambdaServingLayer:
def __init__(self, batch_store, realtime_store):
self.batch_store = batch_store # 批处理视图(如 Hive)
self.realtime_store = realtime_store # 实时视图(如 Redis)
def query_order_summary(self, region: str, date: str) -> Dict:
# 批处理视图的最新数据截止时间
batch_cutoff = self.batch_store.get_latest_batch_time()
# 从批处理视图获取基准数据
batch_result = self.batch_store.query(
region=region,
date=date,
up_to=batch_cutoff
)
# 从实时视图获取增量数据(批处理截止时间之后的数据)
realtime_result = self.realtime_store.query(
region=region,
date=date,
after=batch_cutoff
)
# 合并结果
return {
"region": region,
"date": date,
"order_count": batch_result["order_count"]
+ realtime_result["order_count"],
"total_revenue": batch_result["total_revenue"]
+ realtime_result["total_revenue"],
"data_source": "merged",
"batch_cutoff": str(batch_cutoff),
"freshness": str(datetime.datetime.now())
}3.5 Lambda 架构的核心问题
Lambda 架构在理论上很优雅,但在工程实践中有三个致命问题:
双倍代码维护。 批处理层和速度层使用不同的计算引擎(Spark vs. Flink),需要编写两套逻辑。即使业务逻辑完全相同,两种引擎的 API、语义、数据模型都不一样。“订单金额汇总”这么简单的逻辑,在 Spark SQL 和 Flink DataStream API 中的实现就有微妙的差异——窗口对齐方式不同、NULL 处理语义不同、浮点数精度不同。
数据一致性难保证。 两条路径独立运行,对同一份数据的处理结果可能不一致。批处理用的是 T+1 的完整数据,速度层用的是实时流数据。数据到达顺序、迟到数据处理策略、错误数据过滤规则——任何一个环节的差异都会导致最终结果不一致。
运维复杂度高。 两套计算引擎、两套存储系统、一个合并层——总共五个组件需要独立监控、独立调优、独立故障排查。当数据对不上时,要同时检查批处理任务是否正常完成、实时任务是否有延迟、合并逻辑的时间边界是否正确。
Jay Kreps(Kafka 的作者)在 2014 年发表了一篇文章”Questioning the Lambda Architecture”,直接指出了这个问题。他的替代方案就是 Kappa 架构。
四、Kappa 架构:简化思路与适用边界
4.1 Kappa 架构的核心设计
Kappa 架构由 Jay Kreps 在 2014 年提出,核心思想是:只保留速度层,用一套流处理引擎同时处理实时数据和历史数据重放。如果需要重新计算历史数据(比如修正了计算逻辑),不是跑一个批处理任务,而是从 Kafka 中重放历史事件,让流处理引擎重新处理一遍。
graph LR
A[数据源] --> B[消息队列<br/>Kafka<br/>长期保留]
B --> C[流处理引擎<br/>Flink/Kafka Streams]
C --> D[服务层<br/>数据库/缓存]
D --> E[查询接口]
style B fill:#e8f4f8,stroke:#2196F3
Kappa 架构的前提条件是消息队列能够长期保留历史数据。Kafka 通过分层存储(Tiered Storage)可以实现近乎无限期的数据保留——热数据在本地磁盘,冷数据在对象存储。
4.2 重放机制的实现
Kappa 架构的关键能力是”重放”——当计算逻辑发生变化时,启动一个新的流处理任务,从 Kafka 的起始偏移量开始重新消费所有历史事件:
# Kappa 架构的重放机制(Flink Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
env = StreamExecutionEnvironment.get_execution_environment()
# 重放模式:从最早的 offset 开始消费
replay_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("order-events") \
.set_group_id("kappa-replay-v2") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(OrderEventDeserializer()) \
.build()
# 使用与实时处理完全相同的处理逻辑
orders = env.from_source(
replay_source,
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10)),
"ReplaySource"
)
# 相同的处理逻辑,输出到新的目标表
result = process_orders(orders) # 复用同一个函数
result.add_sink(create_sink("order_summary_v2"))
env.execute("Kappa-Replay-V2")4.3 Kappa 架构的适用边界
Kappa 架构在概念上比 Lambda 简单得多,但它不是万能的。它有三个明确的适用边界:
第一,重放时间。 当历史数据量达到 PB 级时,从 Kafka 重放所有事件可能需要几天甚至几周。在这段时间内,新旧两个版本的流处理任务同时运行,需要双倍的计算资源。如果业务不能容忍这个时间窗口和资源开销,Kappa 架构就不合适。
第二,计算类型。 流处理引擎擅长的是事件驱动的增量计算——计数、求和、窗口聚合。但某些分析场景需要全量扫描——比如计算 Top-K、做复杂的 JOIN、训练机器学习模型。这些计算在流处理引擎上要么效率极低,要么根本无法实现。
第三,Kafka 的存储成本。 虽然 Kafka 支持分层存储,但长期保留所有原始事件的成本不低。对于每天产生 TB 级数据的系统,保留一年的历史数据意味着 PB 级的存储开销。
下表对比了 Lambda 和 Kappa 架构在关键维度上的差异:
| 维度 | Lambda 架构 | Kappa 架构 |
|---|---|---|
| 代码维护 | 两套代码(批+流) | 一套代码(流) |
| 数据一致性 | 难保证(两套逻辑) | 天然一致(同一逻辑) |
| 历史重算 | 批处理任务(快) | 事件重放(慢) |
| 适用计算类型 | 批+流都擅长 | 增量计算为主 |
| 存储成本 | 中(批处理存储) | 高(Kafka 长期保留) |
| 运维复杂度 | 高(五个组件) | 中(三个组件) |
| 延迟 | 秒级(速度层) | 秒级 |
| 成熟度 | 高(广泛应用) | 中(场景受限) |
4.4 Lambda 与 Kappa 的共同瓶颈
不管是 Lambda 还是 Kappa,它们都有一个共同的架构瓶颈:存储层缺乏事务和管理能力。
Lambda 架构的批处理视图通常存在 HDFS/S3 上的 Parquet 文件中,没有 ACID 事务。速度层的数据存在 HBase 或 Redis 中,有事务但容量有限。两个存储系统之间没有统一的 schema 管理和元数据管理。
Kappa 架构的存储分两部分:Kafka 存原始事件,下游数据库存计算结果。Kafka 不支持随机读取和 SQL 查询,下游数据库不存原始数据。两者之间存在信息断层。
Lakehouse 架构的核心贡献,正是在存储层加上了事务、schema 管理和统一查询的能力。
五、Lakehouse 架构——Delta Lake 与 Apache Iceberg 的设计原理
5.1 Lakehouse 的设计动机
Lakehouse 架构由 Databricks 在 2020 年系统提出(论文”Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Data Lakes”)。它的设计动机很直接:在数据湖的低成本存储之上,补上数据仓库的事务、schema 管理和查询优化能力。
Lakehouse 不是数据湖和数据仓库的简单叠加。它的关键创新是在对象存储(S3/ADLS/GCS)之上增加了一个事务元数据层(Transactional Metadata Layer),这个元数据层提供了:
- ACID 事务(原子性读写)
- Schema 约束和演进(Schema Enforcement & Evolution)
- 时间旅行(Time Travel)
- 数据版本管理
- 统一的批流读写接口
graph TB
subgraph "Lakehouse 架构"
A[BI 工具 / SQL 客户端] --> B[查询引擎<br/>Spark / Trino / Flink]
B --> C[事务元数据层<br/>Delta Lake / Iceberg / Hudi]
C --> D[对象存储<br/>S3 / ADLS / GCS]
end
subgraph "数据写入"
E[批处理 ETL] --> C
F[流处理<br/>Flink / Spark Streaming] --> C
G[CDC 管道<br/>Debezium] --> C
end
subgraph "数据消费"
A
H[数据科学<br/>Pandas / Notebook] --> B
I[机器学习<br/>训练管道] --> B
end
5.2 Delta Lake 的核心机制
Delta Lake 是 Databricks 开源的存储层,它的核心设计围绕一个事务日志(Transaction Log)展开。
事务日志(Delta Log)
Delta Lake 在数据目录下维护一个 _delta_log/
目录,里面的 JSON
文件记录了对数据的每一次操作——添加文件、删除文件、更新元数据、修改
schema:
s3://data-lake/orders/
_delta_log/
00000000000000000000.json # 第一次写入:添加了 3 个 Parquet 文件
00000000000000000001.json # 第二次写入:添加了 2 个文件,删除了 1 个
00000000000000000002.json # 第三次写入:schema 演进,新增列
00000000000000000010.checkpoint.parquet # 每 10 个版本的检查点
part-00000-xxxx.parquet
part-00001-xxxx.parquet
...
每个 JSON 日志文件包含一系列”动作”(Action):
{
"add": {
"path": "part-00000-abc123.parquet",
"size": 52428800,
"partitionValues": {"order_date": "2026-04-13"},
"modificationTime": 1744531200000,
"dataChange": true,
"stats": "{\"numRecords\":125000,\"minValues\":{\"order_id\":1},\"maxValues\":{\"order_id\":125000}}"
}
}
{
"remove": {
"path": "part-00000-old456.parquet",
"deletionTimestamp": 1744531200000,
"dataChange": true
}
}乐观并发控制(Optimistic Concurrency Control)
Delta Lake 使用乐观并发控制来处理多个写入者的冲突。每个写入者在提交时,检查自己读取的版本是否仍然是最新版本。如果不是,根据冲突检测规则决定是否需要重试:
# Delta Lake 的 ACID 事务示例
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 原子性写入:要么全部成功,要么全部回滚
new_orders = spark.read.json("s3://staging/orders/2026-04-13/")
new_orders.write \
.format("delta") \
.mode("append") \
.partitionBy("order_date") \
.save("s3://data-lake/orders/")
# MERGE 操作:upsert(更新+插入)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/orders/")
delta_table.alias("target").merge(
new_orders.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(set={
"order_status": "source.order_status",
"updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()
# 时间旅行:查询历史版本的数据
historical_orders = spark.read \
.format("delta") \
.option("versionAsOf", 5) \
.load("s3://data-lake/orders/")
# 或者按时间戳查询
yesterday_orders = spark.read \
.format("delta") \
.option("timestampAsOf", "2026-04-12T00:00:00") \
.load("s3://data-lake/orders/")Schema 约束与演进
Delta Lake 默认开启 schema 约束——如果写入的数据 schema 与表的 schema 不匹配,写入会被拒绝:
# Schema 约束:阻止不匹配的写入
# 假设表的 schema 是 (order_id: long, amount: double, status: string)
bad_data = spark.createDataFrame([
(1, "not_a_number", "completed") # amount 应该是 double,这里是 string
], ["order_id", "amount", "status"])
# 这行会抛出 AnalysisException:
# 数据 schema 和表 schema 不匹配
bad_data.write.format("delta").mode("append") \
.save("s3://data-lake/orders/")
# Schema 演进:显式允许新增列
new_data_with_extra_col = spark.createDataFrame([
(1001, 99.99, "completed", "CN")
], ["order_id", "amount", "status", "country"])
new_data_with_extra_col.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("s3://data-lake/orders/")5.3 Apache Iceberg 的核心机制
Apache Iceberg 是 Netflix 开源的表格式(Table Format),与 Delta Lake 解决同一类问题,但设计思路有所不同。
三层元数据结构
Iceberg 使用三层元数据结构来跟踪数据:
- Catalog:存储表的当前元数据指针(指向最新的 metadata file)。
- Metadata File(元数据文件):JSON 格式,记录表的 schema、分区规范、快照列表。
- Manifest List / Manifest File:记录每个快照包含哪些数据文件及其统计信息。
Iceberg 元数据结构:
Catalog(如 Hive Metastore / Nessie / REST Catalog)
└── metadata pointer
└── metadata/v3.metadata.json
├── schema: {order_id: long, amount: double, ...}
├── partition-spec: [date(order_time)]
└── snapshots:
├── snap-001 → manifest-list-001.avro
│ ├── manifest-abc.avro → [file1.parquet, file2.parquet]
│ └── manifest-def.avro → [file3.parquet]
└── snap-002 → manifest-list-002.avro
├── manifest-abc.avro → [file1.parquet, file2.parquet]
└── manifest-ghi.avro → [file4.parquet] # file3 被替换
分区演进(Partition Evolution)
Iceberg 最突出的特性之一是分区演进——可以在不重写历史数据的情况下修改分区策略。这对于随着数据量增长需要调整分区粒度的场景非常有价值:
-- Iceberg 分区演进示例(Spark SQL)
-- 创建表,初始按月分区
CREATE TABLE catalog.db.orders (
order_id BIGINT,
order_time TIMESTAMP,
customer_id BIGINT,
amount DOUBLE,
status STRING
) USING iceberg
PARTITIONED BY (month(order_time));
-- 数据量增长后,改为按天分区
-- 历史数据不需要重写,新数据按天分区,旧数据仍按月分区
ALTER TABLE catalog.db.orders
SET PARTITION SPEC (day(order_time));
-- 进一步细化:按小时分区(用于高频写入场景)
ALTER TABLE catalog.db.orders
SET PARTITION SPEC (hour(order_time));
-- Iceberg 查询引擎会自动处理混合分区:
-- 扫描 2024 年数据时用月分区过滤
-- 扫描 2025 年数据时用天分区过滤
-- 扫描 2026 年数据时用小时分区过滤
SELECT * FROM catalog.db.orders
WHERE order_time BETWEEN '2026-04-01' AND '2026-04-13';隐式分区(Hidden Partitioning)
传统 Hive 分区要求用户显式指定分区列和分区值,查询时也要显式使用分区列过滤。Iceberg 的分区是”隐式”的——用户只需要用原始列过滤,Iceberg 自动利用分区信息跳过无关文件:
-- Hive 风格:用户必须知道分区结构
SELECT * FROM hive_orders
WHERE year = 2026 AND month = 4 AND day = 13; -- 必须按分区列查询
-- Iceberg 风格:用户用原始列查询,引擎自动做分区裁剪
SELECT * FROM iceberg_orders
WHERE order_time >= '2026-04-13 00:00:00'
AND order_time < '2026-04-14 00:00:00';
-- Iceberg 自动将时间条件映射到分区,跳过无关文件5.4 Delta Lake 与 Iceberg 对比
| 特性 | Delta Lake | Apache Iceberg |
|---|---|---|
| 发起方 | Databricks | Netflix / Apache |
| 事务日志格式 | JSON + Checkpoint Parquet | JSON Metadata + Avro Manifest |
| 分区演进 | 不支持(需重写数据) | 原生支持 |
| 隐式分区 | 不支持 | 原生支持 |
| Schema 演进 | 支持(列追加/重命名) | 支持(全面,含列提升) |
| 时间旅行 | 支持 | 支持 |
| 计算引擎绑定 | Spark 优先(对其他引擎支持逐步完善) | 引擎无关(Spark/Flink/Trino/Presto) |
| 行级操作 | MERGE/UPDATE/DELETE | MERGE/UPDATE/DELETE |
| 并发控制 | 乐观并发 | 乐观并发 + 分支隔离 |
| 社区生态 | Databricks 主导 | 开放社区(Apple/Netflix/LinkedIn) |
选择 Delta Lake 还是 Iceberg,关键看两个因素:如果团队深度使用 Databricks 平台,Delta Lake 的集成体验更好;如果需要多引擎支持(Spark + Flink + Trino)和分区演进能力,Iceberg 的开放设计更有优势。
六、流批一体的工程挑战
6.1 流批一体的定义
流批一体(Stream-Batch Unification)是 Lakehouse 架构的核心目标之一:用同一套代码、同一个计算引擎、读写同一个存储系统,同时处理实时流数据和历史批数据。它不只是”流和批用同一个引擎”,而是要求端到端的统一——从数据摄入到处理逻辑到输出目标,全链路使用同一套抽象。
Apache Flink 是目前实现流批一体最成熟的引擎。Flink 的设计哲学是”流是批的超集”——批处理是有限数据流的特例。这意味着用 Flink 写的流处理逻辑,可以不修改代码直接用于批处理:
-- Flink SQL:同一段 SQL 可以跑在流模式和批模式下
-- 定义 Kafka 数据源(流模式)
CREATE TABLE order_events (
order_id BIGINT,
customer_id BIGINT,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'order-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- 定义 Iceberg 输出表
CREATE TABLE order_summary (
window_start TIMESTAMP(3),
region STRING,
order_count BIGINT,
total_amount DOUBLE
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'lakehouse',
'catalog-type' = 'hive',
'warehouse' = 's3://data-lake/warehouse'
);
-- 同一段聚合逻辑:
-- 流模式下,每分钟输出一次增量结果
-- 批模式下,一次性处理全部历史数据
INSERT INTO order_summary
SELECT
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
region,
COUNT(order_id) AS order_count,
SUM(amount) AS total_amount
FROM order_events
GROUP BY
TUMBLE(event_time, INTERVAL '1' MINUTE),
region;6.2 工程挑战一:语义一致性
流模式和批模式的语义差异是流批一体最大的工程挑战。即使使用同一段代码,以下几个方面的行为在流和批之间仍然可能不同:
窗口语义。 在流模式下,窗口由时间驱动触发(水位线推进到窗口结束时间时触发计算)。在批模式下,窗口由数据驱动(数据扫描到窗口边界时触发)。如果数据存在乱序或延迟,两种模式的窗口归属可能不同。
状态管理。 流模式需要维护状态(窗口中间结果、计数器等),状态大小随时间增长,需要 TTL 和清理策略。批模式不需要持久状态——所有数据一次性可用,引擎可以用排序+聚合来替代状态维护。
错误处理。 流模式下一条错误数据可能导致流处理任务持续失败和重启。批模式下可以先跳过错误数据,处理完所有正常数据后再统一处理错误记录。
# 流批一体的语义对齐:显式处理迟到数据
from pyflink.table import EnvironmentSettings, TableEnvironment
# 批模式设置
batch_settings = EnvironmentSettings.new_instance() \
.in_batch_mode() \
.build()
batch_env = TableEnvironment.create(batch_settings)
# 流模式设置
stream_settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
stream_env = TableEnvironment.create(stream_settings)
# 关键:在两种模式下使用完全相同的 SQL
# 但需要额外配置来保证语义一致
UNIFIED_SQL = """
SELECT
window_start,
region,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM TABLE(
TUMBLE(TABLE order_events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
)
GROUP BY window_start, region
"""
# 批模式执行
batch_env.execute_sql(UNIFIED_SQL)
# 流模式执行(需要额外配置水位线和允许延迟)
stream_env.get_config().set(
"table.exec.emit.early-fire.enabled", "true"
)
stream_env.get_config().set(
"table.exec.emit.early-fire.delay", "60s"
)
stream_env.execute_sql(UNIFIED_SQL)6.3 工程挑战二:Exactly-Once 保证
流批一体要求端到端的精确一次(Exactly-Once)语义。这意味着从数据源读取、处理、到写入目标存储,每条数据恰好被处理一次——不多不少。
在批处理中,Exactly-Once 相对简单——任务失败就重跑,输出采用”覆盖写”即可。在流处理中,Exactly-Once 需要分布式快照(Checkpoint)和两阶段提交(Two-Phase Commit)的配合:
// Flink + Iceberg 的 Exactly-Once 写入配置
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 开启 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(
CheckpointingMode.EXACTLY_ONCE
);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(300000);
// Iceberg Sink 配合 Checkpoint 实现 Exactly-Once
// 每次 Checkpoint 完成时,Iceberg Sink 将暂存的数据文件
// 原子性地提交到 Iceberg 表
FlinkSink.forRowData(dataStream)
.table(icebergTable)
.tableLoader(tableLoader)
.writeParallelism(4)
.upsert(true) // 开启 upsert 模式
.equalityFieldColumns(Arrays.asList("order_id"))
.append();
env.execute("ExactlyOnce-Iceberg-Sink");6.4 工程挑战三:小文件问题
流处理以高频率持续写入数据,每次 Checkpoint 都会生成新的数据文件。如果 Checkpoint 间隔是 1 分钟,每天就会生成 1440 个小文件——一年下来超过 50 万个。大量小文件会严重降低查询性能(文件元数据开销远大于数据扫描开销)并增加存储成本(对象存储按请求次数计费)。
解决方案是定期做文件合并(Compaction):
-- Delta Lake 的 OPTIMIZE 命令:合并小文件
OPTIMIZE delta.`s3://data-lake/orders/`
WHERE order_date >= '2026-04-01';
-- Z-ORDER 优化:按查询模式重新组织数据布局
OPTIMIZE delta.`s3://data-lake/orders/`
ZORDER BY (customer_id, order_date);
-- Iceberg 的 Compaction(通过 Spark 调用)
-- 重写小文件,合并为更大的文件
CALL catalog.system.rewrite_data_files(
table => 'db.orders',
strategy => 'sort',
sort_order => 'order_date ASC, customer_id ASC',
options => map(
'target-file-size-bytes', '134217728', -- 128MB
'min-file-size-bytes', '67108864', -- 64MB
'max-file-size-bytes', '201326592' -- 192MB
)
);
-- 清理过期快照,释放被删除文件的存储空间
CALL catalog.system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2026-04-06 00:00:00',
retain_last => 10
);6.5 三种架构的全面对比
下面这张表从十个维度对比 Lambda、Kappa 和 Lakehouse 三种架构:
| 维度 | Lambda | Kappa | Lakehouse |
|---|---|---|---|
| 代码维护 | 两套(批+流) | 一套(流) | 一套(流批一体) |
| 存储层 | HDFS + HBase/Redis | Kafka + DB | 对象存储 + 表格式 |
| 事务支持 | 无(HDFS)/ 有(HBase) | 有(Kafka 事务) | 有(Delta/Iceberg) |
| Schema 管理 | 弱 | 弱 | 强(约束+演进) |
| 时间旅行 | 不支持 | 通过 Kafka offset | 原生支持 |
| 查询引擎 | Hive/Presto(批)+ 自建(实时) | 受限于流引擎 | 多引擎(Spark/Trino/Flink) |
| 数据新鲜度 | 秒级(速度层) | 秒级 | 分钟级(微批)到秒级 |
| 历史重算 | 批处理(快) | 事件重放(慢) | 批处理(快) |
| 非结构化数据 | 支持(数据湖部分) | 不擅长 | 支持 |
| 运维复杂度 | 高 | 中 | 中低 |
七、数据质量保证架构
7.1 数据质量的三个层次
Lakehouse 解决了存储层的事务和 schema 问题,但数据质量是一个更广泛的问题。数据质量保证需要在三个层次上建设:
- 技术层:数据是否符合 schema?字段类型是否正确?是否有 NULL 值?
- 业务层:数据是否符合业务规则?订单金额是否为正数?用户年龄是否在合理范围内?
- 时效层:数据是否按时到达?数据新鲜度是否满足 SLA?
7.2 数据契约(Data Contract)
数据契约(Data Contract)是近年来兴起的数据治理理念:上游数据生产者和下游数据消费者之间签订一份”合同”,明确约定数据的 schema、质量标准、更新频率和 SLA。当上游违反契约时,系统自动告警或阻断。
# 数据契约定义示例(data-contract.yaml)
apiVersion: datacontract/v1.0
kind: DataContract
metadata:
name: order-events-contract
owner: order-service-team
consumers:
- analytics-team
- recommendation-team
schema:
type: object
properties:
order_id:
type: integer
description: "订单唯一标识"
constraints:
- not_null
- unique
customer_id:
type: integer
description: "客户 ID"
constraints:
- not_null
- "references: customer_master.customer_id"
amount:
type: number
description: "订单金额(单位:元)"
constraints:
- not_null
- "range: [0.01, 10000000]"
order_time:
type: timestamp
description: "下单时间"
constraints:
- not_null
- "freshness: <= 5 minutes"
status:
type: string
description: "订单状态"
constraints:
- not_null
- "enum: [created, paid, shipped, completed, cancelled]"
quality:
completeness:
threshold: 99.5% # 非 NULL 字段的完整率不低于 99.5%
uniqueness:
columns: [order_id]
threshold: 100% # 主键必须完全唯一
freshness:
max_delay: 5 minutes # 数据延迟不超过 5 分钟
volume:
min_records_per_hour: 1000 # 每小时至少 1000 条记录
max_records_per_hour: 500000 # 每小时不超过 50 万条
sla:
availability: 99.9%
notification:
channels: [slack, pagerduty]
on_violation: block_pipeline # 违反契约时阻断管道7.3 Schema 校验的自动化实现
在数据管道中嵌入自动化 schema 校验,可以在数据进入 Lakehouse 之前拦截不合规的数据:
# 基于 Great Expectations 的数据质量校验
import great_expectations as gx
# 创建数据上下文
context = gx.get_context()
# 定义数据源
datasource = context.data_sources.add_spark("lakehouse_source")
data_asset = datasource.add_dataframe_asset("order_events")
# 定义期望套件(Expectation Suite)
suite = context.suites.add(
gx.ExpectationSuite(name="order_events_quality")
)
# 添加数据质量规则
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount",
min_value=0.01,
max_value=10000000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["created", "paid", "shipped", "completed", "cancelled"]
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=500000
)
)
# 运行校验
batch = data_asset.add_batch_definition_whole_dataframe(
"daily_batch"
).get_batch(batch_parameters={"dataframe": order_df})
validation_result = batch.validate(suite)
# 根据校验结果决定是否继续管道
if not validation_result.success:
failed_expectations = [
r for r in validation_result.results if not r.success
]
raise DataQualityException(
f"数据质量校验失败:{len(failed_expectations)} 项不合格",
details=failed_expectations
)7.4 数据血缘追踪(Data Lineage)
数据血缘(Data Lineage)回答一个关键问题:这条数据是从哪里来的,经过了哪些变换,最终到了哪里? 当数据出现异常时,血缘信息可以帮助快速定位问题源头。
# 基于 OpenLineage 的血缘追踪集成
# OpenLineage 是一个开放的血缘追踪标准
# 支持 Spark、Flink、Airflow 等引擎
# Spark 配置 OpenLineage
spark = SparkSession.builder \
.appName("OrderETL") \
.config("spark.extraListeners",
"io.openlineage.spark.agent.OpenLineageSparkListener") \
.config("spark.openlineage.transport.type", "http") \
.config("spark.openlineage.transport.url",
"http://lineage-server:5000") \
.config("spark.openlineage.namespace", "production") \
.getOrCreate()
# 正常编写 ETL 逻辑
# OpenLineage 自动捕获输入/输出/转换信息
raw_orders = spark.read.format("delta") \
.load("s3://data-lake/raw/orders/")
cleaned_orders = raw_orders \
.filter(F.col("amount") > 0) \
.filter(F.col("status").isNotNull()) \
.withColumn("amount_usd",
F.col("amount") * F.col("exchange_rate"))
cleaned_orders.write.format("delta") \
.mode("overwrite") \
.save("s3://data-lake/curated/orders/")
# OpenLineage 自动记录:
# 输入:s3://data-lake/raw/orders/
# 输出:s3://data-lake/curated/orders/
# 转换:filter(amount > 0), filter(status IS NOT NULL),
# withColumn(amount_usd = amount * exchange_rate)7.5 数据质量监控的分层架构
完整的数据质量保证体系通常分为四层:
第四层:业务指标监控
├── 指标异常检测(同比/环比偏差告警)
├── 跨数据源一致性校验
└── SLA 达成率追踪
第三层:管道级校验
├── 输入数据质量门禁(Great Expectations)
├── 输出数据质量校验
└── 管道运行时长/数据量异常检测
第二层:表级保护
├── Schema 约束(Delta Lake / Iceberg 内置)
├── CHECK 约束(Delta Lake 支持)
└── 分区完整性校验
第一层:基础设施层
├── 数据血缘(OpenLineage / Apache Atlas)
├── 数据目录(DataHub / Amundsen)
└── 元数据管理(Hive Metastore / Iceberg Catalog)
八、工程案例:某电商平台从 Lambda 到 Lakehouse 的迁移
8.1 背景
某中型跨境电商平台(日均订单量约 50 万,日均事件量约 2 亿条)运营着一套 Lambda 架构的分析系统,已经运行了三年。架构组成:
- 批处理层:Spark on EMR,每日 T+1 跑批,产出 Hive 表
- 速度层:Flink on Kubernetes,消费 Kafka,写入 ClickHouse
- 服务层:自建的 Java 服务,合并 Hive 和 ClickHouse 的查询结果
8.2 面临的核心问题
这套系统面临三个持续恶化的问题:
问题一:数据口径不一致。 批处理管道用 Spark SQL 计算”有效订单金额”,逻辑是”order_status = ‘completed’ AND refund_amount = 0”。速度层用 Flink Java 代码做同样的过滤,但 Flink 中 refund_amount 字段在事件流中是异步到达的——订单完成时 refund_amount 还没有最终值,导致实时管道多计了一部分退款订单。每天早上运营看板的实时数据和批处理数据的差异在 3%-8% 之间。
问题二:运维成本高。 三套系统(Spark、Flink、ClickHouse)各有自己的集群管理、监控告警、故障处理流程。团队 8 个人,其中 3 个全职做运维。任何一个组件的版本升级都可能影响其他组件。
问题三:数据湖已成沼泽。 三年来各团队往 S3 倾倒了大量原始数据,没有统一的 schema 管理和元数据管理。数据科学团队想做用户行为分析,花了两周时间才搞清楚事件数据的字段含义和数据质量。
8.3 迁移方案
团队选择迁移到 Lakehouse 架构,核心技术选型:
- 存储层:S3 + Apache Iceberg(选 Iceberg 而非 Delta Lake,因为需要 Flink 和 Trino 的多引擎支持)
- 计算层:Flink(流批一体)+ Trino(交互式查询)
- 数据质量:Great Expectations + OpenLineage
- 元数据管理:Nessie Catalog(提供 Git-like 的数据版本管理)
迁移分三个阶段:
阶段一(4 周):存储层升级。 将 Hive 表迁移到 Iceberg 表格式,保持计算层不变。Spark 批处理任务改为读写 Iceberg 表,验证数据一致性。
# 阶段一:Hive 表迁移到 Iceberg
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MigrateToIceberg") \
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakehouse.type", "hive") \
.config("spark.sql.catalog.lakehouse.uri", "thrift://metastore:9083") \
.getOrCreate()
# 从 Hive 表读取数据
hive_orders = spark.sql("SELECT * FROM hive_db.orders")
# 创建 Iceberg 表
spark.sql("""
CREATE TABLE lakehouse.analytics.orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP,
amount DOUBLE,
refund_amount DOUBLE,
status STRING,
region STRING
) USING iceberg
PARTITIONED BY (days(order_time))
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.target-file-size-bytes' = '134217728'
)
""")
# 数据迁移
hive_orders.writeTo("lakehouse.analytics.orders").append()
# 数据一致性校验
iceberg_count = spark.sql(
"SELECT COUNT(*) FROM lakehouse.analytics.orders"
).collect()[0][0]
hive_count = spark.sql(
"SELECT COUNT(*) FROM hive_db.orders"
).collect()[0][0]
assert iceberg_count == hive_count, \
f"数据量不一致:Iceberg={iceberg_count}, Hive={hive_count}"阶段二(6 周):计算层统一。 将 Spark 批处理任务和 Flink 实时任务统一为 Flink SQL,消除双路径的代码差异。
-- 阶段二:统一的 Flink SQL(流批共用)
-- 这段 SQL 既可以在流模式下运行(实时消费 Kafka)
-- 也可以在批模式下运行(全量扫描 Iceberg 表)
-- 流模式数据源
CREATE TABLE kafka_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP(3),
amount DOUBLE,
refund_amount DOUBLE,
status STRING,
region STRING,
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'order-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 批模式数据源(同一张 Iceberg 表)
CREATE TABLE iceberg_orders (
order_id BIGINT,
customer_id BIGINT,
order_time TIMESTAMP(3),
amount DOUBLE,
refund_amount DOUBLE,
status STRING,
region STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'lakehouse',
'catalog-type' = 'hive',
'warehouse' = 's3://data-lake/warehouse'
);
-- 统一的计算逻辑(核心:用同一段 SQL)
-- 关键修改:refund_amount 使用 COALESCE 处理异步到达
INSERT INTO iceberg_order_summary
SELECT
TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,
region,
COUNT(*) AS order_count,
SUM(CASE
WHEN status = 'completed'
AND COALESCE(refund_amount, 0) = 0
THEN amount
ELSE 0
END) AS effective_revenue,
SUM(amount) AS total_revenue
FROM kafka_orders -- 流模式用 kafka_orders,批模式换成 iceberg_orders
GROUP BY
TUMBLE(order_time, INTERVAL '1' HOUR),
region;阶段三(3 周):数据质量体系建设。 部署 Great Expectations 做管道级校验,OpenLineage 做血缘追踪,建立数据契约。
8.4 迁移效果
迁移完成后,关键指标的变化:
| 指标 | 迁移前(Lambda) | 迁移后(Lakehouse) |
|---|---|---|
| 数据口径差异率 | 3%-8% | < 0.1%(流批同源同逻辑) |
| 数据管道代码量 | 约 12000 行(Spark + Flink Java) | 约 3500 行(Flink SQL) |
| 运维人力 | 3 人全职 | 1 人兼职 |
| 数据新鲜度 | T+1(批)+ 秒级(实时) | 分钟级(统一) |
| 数据质量问题发现时间 | 平均 4 小时(人工发现) | 平均 5 分钟(自动告警) |
| 月度基础设施成本 | 约 $18,000 | 约 $12,000 |
需要注意的是,迁移过程本身的代价不低:三个阶段合计 13 周,团队投入了 5 个工程师。迁移期间两套系统并行运行,基础设施成本一度达到 $28,000/月。这个投入对于日均 50 万订单的中型平台是值得的,但对于更小规模的团队需要重新评估 ROI。
九、架构选型决策框架
9.1 决策树
面对 Lambda、Kappa 和 Lakehouse 三种架构,选型决策可以按以下路径进行:
数据量级是否超过 TB 级?
├── 否 → 考虑简单的批处理 + 缓存方案,不需要复杂架构
└── 是 → 是否需要秒级数据新鲜度?
├── 否 → 是否需要多引擎支持和复杂分析?
│ ├── 否 → 云数据仓库(Snowflake/BigQuery)可能就够了
│ └── 是 → Lakehouse(Delta Lake 或 Iceberg)
└── 是 → 是否能接受流批两套代码的维护成本?
├── 是 → Lambda 架构(成熟度高,生态完善)
└── 否 → 数据类型是否主要是事件/日志类?
├── 是 → Kappa 架构(流处理 + Kafka 长期存储)
└── 否 → Lakehouse + Flink(流批一体)
9.2 关键决策因素
在最终拍板之前,需要回答以下五个问题:
- 团队能力:团队是否有 Flink/Spark 的运维经验?如果没有,Lakehouse 的学习曲线需要计入成本。
- 数据类型:如果大量数据是非结构化的(图片、视频、日志),数据湖/Lakehouse 是必须的。如果全是结构化数据,云数据仓库可能更简单。
- 查询模式:如果查询模式固定且可预测(报表、仪表盘),预聚合 + 数据仓库的方案性价比最高。如果需要灵活的探索式分析,Lakehouse 的开放格式更有优势。
- 新鲜度要求:T+1 可接受就用批处理,分钟级用微批(Spark Structured Streaming),秒级用流处理(Flink)。不同新鲜度的工程复杂度和成本差异巨大。
- 合规要求:如果有 GDPR/CCPA 的”被遗忘权”(Right to Erasure)要求,需要行级删除能力——传统数据湖不支持,Delta Lake 和 Iceberg 支持。
9.3 避免过度架构
最后一个建议:不要为了”先进”而选择复杂架构。很多团队在日均数据量只有 GB 级的情况下就上 Lakehouse,在只需要 T+1 报表的场景下就搞流批一体。这种过度架构(Over-Engineering)带来的运维成本远超它解决的问题。
一个务实的原则是:从最简单的方案开始,当具体瓶颈出现时再演进。
- 数据量 < 100 GB,日更新频率 → PostgreSQL + dbt 就够了
- 数据量 100 GB - 10 TB,小时级更新 → 云数据仓库(BigQuery/Snowflake)
- 数据量 10 TB - 1 PB,分钟级更新 → Lakehouse(Iceberg + Spark/Flink)
- 数据量 > 1 PB,秒级更新,多团队协作 → Lakehouse + 流处理 + 完整数据治理体系
-- 最简方案示例:PostgreSQL + dbt
-- 对于 GB 级数据,这可能就是全部所需
-- dbt 模型文件:models/order_summary.sql
{{ config(materialized='table', schema='analytics') }}
WITH completed_orders AS (
SELECT
DATE_TRUNC('day', order_time) AS order_date,
region,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
SUM(CASE WHEN refund_amount = 0 THEN amount ELSE 0 END)
AS effective_revenue,
COUNT(DISTINCT customer_id) AS unique_customers
FROM {{ source('raw', 'orders') }}
WHERE status = 'completed'
GROUP BY 1, 2
)
SELECT
order_date,
region,
order_count,
total_revenue,
effective_revenue,
unique_customers,
effective_revenue / NULLIF(order_count, 0) AS avg_effective_value
FROM completed_orders导航
上一篇: 多模数据库选型
下一篇: 流处理架构
参考资料
- Armbrust, M., et al. “Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Data Lakes.” CIDR, 2021.
- Marz, N., Warren, J. “Big Data: Principles and Best Practices of Scalable Real-Time Data Systems.” Manning, 2015.
- Kreps, J. “Questioning the Lambda Architecture.” O’Reilly Radar, 2014.
- Apache Iceberg Documentation. “Table Spec.” https://iceberg.apache.org/spec/
- Delta Lake Documentation. “Delta Transaction Log Protocol.” https://github.com/delta-io/delta/blob/master/PROTOCOL.md
- Inmon, W. H. “Building the Data Warehouse.” Wiley, 2005.
- Dehghani, Z. “Data Mesh: Delivering Data-Driven Value at Scale.” O’Reilly, 2022.
- Apache Flink Documentation. “Streaming Concepts.” https://nightlies.apache.org/flink/flink-docs-stable/
- Andrew Jones. “Data Contracts.” https://datacontract.com/
- OpenLineage Project. “OpenLineage Spec.” https://openlineage.io/
同主题继续阅读
把当前热点继续串成多页阅读,而不是停在单篇消费。
【系统架构设计百科】数据密集型架构:批流一体与 Lakehouse
从 Lambda 架构的双轨困境出发,深入剖析 Kappa 架构与批流一体的演进逻辑,对比 Flink 与 Spark Structured Streaming 的核心差异,解读 Delta Lake、Apache Iceberg 等 Table Format 的技术之争,并给出实时数仓的落地架构方案。
【系统架构设计百科】Serverless 架构:冷启动、成本模型与适用场景
2023 年,Datadog 发布的年度 Serverless 报告显示,超过 70% 的 AWS 用户已在生产环境中使用 Lambda,平均每个组织部署了超过 1000 个 Lambda 函数。然而,同一份报告也指出,冷启动(Cold Start)仍然是开发者最关注的性能问题——在 Java 运行时中,P99 冷启动…
【系统架构设计百科】架构质量属性:不只是"高可用高性能"
需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。
【系统架构设计百科】告警策略:如何避免"狼来了"
大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。