土法炼钢兴趣小组的算法知识备份

【系统架构设计百科】数据湖与数据仓库:分析架构的演进路线

文章导航

分类入口
architecture
标签入口
#data-lake#data-warehouse#Lambda#Kappa#Lakehouse#Delta-Lake#Iceberg

目录

某跨境电商平台的数据团队管理着一套运行了四年的分析系统。数据仓库用 Hive 搭建,T+1 的批处理任务每晚跑 6 个小时,凌晨 6 点出报表。实时看板用 Flink 从 Kafka 消费事件,写入 ClickHouse。两套系统各自维护一份数据处理逻辑——同一个”订单金额”指标,批处理管道里用的是 SQL 聚合加上汇率修正,实时管道里用的是 Java 代码做的近似计算。两边的口径不一致,导致每天早上运营团队打开报表时,昨天的实时数据和今天的批处理数据对不上,差异率在 3%-8% 之间。

这不是个例。凡是同时运行批处理和实时处理两条管道的团队,几乎都会遇到”数据不一致”的问题。Lambda 架构在理论上要求两条路径产出相同结果,但在工程实践中,两套代码、两套计算引擎、两套存储系统之间的语义差异几乎无法消除。

过去十年,数据分析架构经历了三次范式迁移:从数据仓库(Data Warehouse)到数据湖(Data Lake),再到 Lambda/Kappa 的流批分离与统一尝试,最终走向 Lakehouse 架构。每一次迁移都在解决上一代架构的核心缺陷,但也带来新的工程挑战。

本文从数据仓库的经典设计开始,逐步拆解每种架构的设计动机、核心机制和工程限制,最后给出架构选型的决策框架。


一、数据仓库的经典架构与局限

1.1 数据仓库的设计原则

数据仓库(Data Warehouse)的概念由 Bill Inmon 在 1990 年代系统化提出,核心定义是”面向主题的、集成的、非易失的、时变的数据集合”。它的设计目标很明确:把分散在各个业务系统中的数据抽取出来,经过清洗和转换,加载到一个统一的存储中,供分析和决策使用。

这个过程就是经典的 ETL(Extract-Transform-Load)流程:

业务数据库 A ──┐
业务数据库 B ──┤── Extract ── Transform ── Load ── 数据仓库
业务数据库 C ──┘

数据仓库内部通常采用星型模型(Star Schema)或雪花模型(Snowflake Schema)组织数据。星型模型的核心是一张事实表(Fact Table)围绕多张维度表(Dimension Table):

-- 星型模型示例:订单事实表
CREATE TABLE fact_orders (
    order_id        BIGINT PRIMARY KEY,
    customer_key    INT REFERENCES dim_customer(customer_key),
    product_key     INT REFERENCES dim_product(product_key),
    date_key        INT REFERENCES dim_date(date_key),
    store_key       INT REFERENCES dim_store(store_key),
    quantity        INT,
    unit_price      DECIMAL(10, 2),
    discount_amount DECIMAL(10, 2),
    total_amount    DECIMAL(12, 2)
);

-- 维度表:客户维度
CREATE TABLE dim_customer (
    customer_key    INT PRIMARY KEY,
    customer_id     VARCHAR(32),
    customer_name   VARCHAR(128),
    segment         VARCHAR(32),
    region          VARCHAR(64),
    registration_date DATE,
    -- 缓慢变化维度(SCD Type 2)
    effective_from  DATE,
    effective_to    DATE,
    is_current      BOOLEAN
);

-- 维度表:时间维度
CREATE TABLE dim_date (
    date_key        INT PRIMARY KEY,
    full_date       DATE,
    year            INT,
    quarter         INT,
    month           INT,
    week_of_year    INT,
    day_of_week     INT,
    is_holiday      BOOLEAN
);

这种设计有几个明确的优势:查询模式可预测(分析师总是围绕维度做聚合),数据质量有保证(ETL 过程中做了清洗和校验),查询性能可优化(预先知道查询模式,可以建索引、做预聚合)。

1.2 经典数据仓库的四个局限

但数据仓库在过去十年遇到了四个根本性的挑战:

第一,数据类型受限。 传统数据仓库只能存储结构化数据(Structured Data)。当企业需要分析日志文件、图片元数据、用户行为轨迹、传感器时序数据时,这些半结构化(Semi-structured)和非结构化(Unstructured)数据无法直接放进关系型数据仓库。强行转换会丢失信息或引入大量 NULL 列。

第二,扩展成本高昂。 传统数据仓库(Teradata、Oracle Exadata、Netezza)采用计算与存储耦合的架构,扩展意味着同时购买更多的计算节点和存储节点。当数据量从 TB 级增长到 PB 级时,硬件成本呈线性甚至超线性增长。

第三,Schema-on-Write 的刚性。 数据仓库要求数据在写入前就确定 schema。这意味着每当业务系统新增一个字段,数据仓库就要经历”修改 schema → 修改 ETL → 回填数据”的完整流程。在业务快速迭代的环境下,这个流程的周转时间经常以周计算。

第四,实时性不足。 ETL 批处理通常按小时或按天运行。在”T+1”的模式下,业务决策者看到的数据至少是昨天的。对于需要分钟级甚至秒级数据新鲜度的场景(实时风控、实时推荐、运营看板),传统数据仓库完全无法满足。

1.3 云数据仓库的部分改进

Snowflake、BigQuery、Redshift 等云数据仓库解决了部分问题——计算与存储分离降低了扩展成本,半结构化数据类型(VARIANT、JSON)提供了一定的灵活性——但它们的核心模型仍然是 Schema-on-Write,仍然以结构化分析为主,对非结构化数据和低延迟场景的支持有限。

-- Snowflake 中处理半结构化数据
CREATE TABLE raw_events (
    event_id    VARCHAR(64),
    event_time  TIMESTAMP,
    payload     VARIANT  -- 半结构化 JSON 数据
);

-- 查询 VARIANT 字段
SELECT
    event_id,
    payload:user_id::STRING AS user_id,
    payload:action::STRING AS action,
    payload:properties:page_url::STRING AS page_url
FROM raw_events
WHERE event_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP());

这些改进重要但不够。企业需要的是一个能同时存储结构化、半结构化和非结构化数据,能支持批处理和实时处理,能灵活适应 schema 变化的统一平台。数据湖就是在这个需求背景下出现的。


二、数据湖的设计理念与”数据沼泽”问题

2.1 数据湖的核心思想

数据湖(Data Lake)的概念由 Pentaho 的 CTO James Dixon 在 2010 年左右提出。它的核心理念可以用两句话概括:

  1. 存储一切原始数据——不管格式、不管结构、不管是否有明确的使用场景,先存下来再说。
  2. Schema-on-Read——数据在写入时不需要预定义 schema,在读取时再根据具体需求解释数据结构。

技术上,数据湖通常建立在分布式文件系统(HDFS)或对象存储(S3、Azure Blob Storage、GCS)之上,数据以原始格式存储——CSV、JSON、Parquet、Avro、ORC、图片、视频、日志文件都可以直接写入。

数据湖的典型存储布局:

s3://data-lake/
  raw/                          # 原始数据(Raw Zone)
    events/2026/04/13/          # 按日期分区的事件日志
    transactions/2026/04/13/    # 交易数据
    images/product-catalog/     # 产品图片
    logs/nginx/2026/04/13/      # Web 服务器日志
  curated/                      # 清洗后的数据(Curated Zone)
    orders/                     # 经过 ETL 清洗的订单数据
    customers/                  # 客户主数据
  aggregated/                   # 聚合数据(Aggregated Zone)
    daily_sales/                # 日销售汇总
    monthly_reports/            # 月度报表

2.2 数据湖的技术优势

相比数据仓库,数据湖有三个明确的技术优势:

成本优势。 对象存储的成本远低于数据仓库的专用存储。以 AWS 为例,S3 Standard 的存储成本约 0.023 USD/GB/月,而 Redshift 的存储成本约 0.024 USD/GB/月(ra3.xlplus 节点)。看起来差不多,但 S3 支持分层存储——Infrequent Access 降到 0.0125 USD/GB/月,Glacier 降到 0.004 USD/GB/月。对于 PB 级的历史数据,分层存储的成本优势非常显著。

灵活性优势。 Schema-on-Read 意味着数据湖可以接受任何格式的数据,不需要预先定义 schema。这对于探索性分析(Exploratory Analysis)特别有价值——数据科学家可以先把数据灌进来,然后在 Jupyter Notebook 里用 Pandas 或 Spark 自由探索。

计算与存储解耦。 数据存在 S3 里,计算用 EMR/Spark/Presto/Athena,两者独立扩展。不需要分析时可以关掉计算集群,只付存储费用。

2.3 数据沼泽问题

但数据湖在实践中暴露了一个严重的问题——“数据沼泽”(Data Swamp)。当大量数据被无组织地倾倒进数据湖后,没人知道里面有什么数据、数据的质量如何、数据之间有什么关系。数据湖退化成了一个谁都往里面扔数据、但谁都不敢用的”数据垃圾场”。

数据沼泽的根本原因有三个:

缺乏事务支持。 HDFS 和 S3 都不支持 ACID 事务。一个 Spark 任务正在写 Parquet 文件,另一个查询同时在读,读到的可能是半写的、不完整的数据。没有原子性保证,就无法保证数据一致性。

缺乏 schema 演进机制。 Schema-on-Read 的灵活性是以”没有 schema 约束”为代价的。上游系统改了字段名,下游的分析任务就会静默失败或者产出垃圾数据。没有 schema 校验,错误发现得越晚,修复成本越高。

缺乏数据管理能力。 没有元数据管理,不知道数据的来源(Lineage)、数据的拥有者(Ownership)、数据的更新频率、数据的质量指标。数据湖里的数据越多,找到”可信数据”的难度就越大。

# 数据沼泽的典型问题:读取到不一致的数据
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataSwampDemo").getOrCreate()

# 写入任务正在往这个路径写 Parquet 文件
# 如果读取任务在写入过程中执行,可能读到部分数据
df = spark.read.parquet("s3://data-lake/raw/events/2026/04/13/")

# 没有 schema 校验,上游改了字段名不会报错
# 只是查询结果变成 NULL
df.select("user_id", "event_type", "timestamp").show()
# 如果上游把 "user_id" 改成了 "userId",
# 这里 user_id 列全是 NULL,但不会抛异常

数据沼泽问题本质上说明了一件事:数据湖解决了”存什么”的问题,但没有解决”怎么管”的问题。后来出现的 Delta Lake、Apache Iceberg、Apache Hudi,正是为了给数据湖补上事务、schema 管理和数据质量这些缺失的能力。


三、Lambda 架构:双路径设计与维护成本

3.1 Lambda 架构的起源与设计

Lambda 架构由 Nathan Marz(Storm 的作者)在 2011 年提出,核心目标是同时满足批处理的完整性和实时处理的低延迟。它的设计思路很直接:既然批处理准确但慢,实时处理快但近似,那就同时跑两条路径,用两者的结果互补。

Lambda 架构包含三层:

  1. 批处理层(Batch Layer):对全量历史数据做批处理,产出完整且准确的视图。通常每天或每几小时运行一次。
  2. 速度层(Speed Layer):对实时数据流做增量处理,产出低延迟但可能不完全准确的视图。
  3. 服务层(Serving Layer):合并批处理层和速度层的结果,对外提供查询服务。当批处理结果追上来后,速度层的对应数据就可以丢弃。
graph LR
    A[数据源] --> B[消息队列<br/>Kafka]
    B --> C[批处理层<br/>Spark/MapReduce]
    B --> D[速度层<br/>Storm/Flink]
    C --> E[批处理视图<br/>HDFS/Hive]
    D --> F[实时视图<br/>HBase/Redis]
    E --> G[服务层<br/>合并查询]
    F --> G
    G --> H[查询接口]

3.2 批处理层的实现

批处理层通常用 Spark 或 MapReduce 实现,定期对全量数据重新计算:

# 批处理层:每日订单汇总(PySpark)
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("BatchLayer-DailyOrderSummary") \
    .getOrCreate()

# 读取全量订单数据
orders = spark.read.parquet("s3://data-lake/raw/orders/")

# 计算每日、每地区的订单汇总
daily_summary = orders \
    .filter(F.col("order_status") == "completed") \
    .groupBy(
        F.to_date("order_time").alias("order_date"),
        F.col("region")
    ) \
    .agg(
        F.count("order_id").alias("order_count"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("total_amount").alias("avg_order_value"),
        F.countDistinct("customer_id").alias("unique_customers")
    )

# 写入批处理视图
daily_summary.write \
    .mode("overwrite") \
    .partitionBy("order_date") \
    .parquet("s3://data-lake/batch-views/daily-order-summary/")

3.3 速度层的实现

速度层用流处理引擎实现,对实时事件做增量计算:

// 速度层:实时订单计数(Flink Java)
public class RealtimeOrderCounter {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        // 从 Kafka 消费订单事件
        KafkaSource<OrderEvent> source = KafkaSource.<OrderEvent>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("order-events")
            .setGroupId("speed-layer")
            .setValueOnlyDeserializer(new OrderEventDeserializer())
            .build();

        DataStream<OrderEvent> orders = env.fromSource(
            source,
            WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(
                Duration.ofSeconds(10)
            ).withTimestampAssigner((event, ts) -> event.getEventTime()),
            "OrderSource"
        );

        // 按地区和 1 分钟窗口聚合
        DataStream<OrderSummary> realtimeSummary = orders
            .filter(e -> "completed".equals(e.getStatus()))
            .keyBy(OrderEvent::getRegion)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new OrderAggregator());

        // 写入实时视图(HBase 或 Redis)
        realtimeSummary.addSink(new HBaseSink("realtime_order_summary"));

        env.execute("SpeedLayer-RealtimeOrderCounter");
    }
}

3.4 服务层的合并逻辑

服务层需要合并批处理视图和实时视图,这是 Lambda 架构中最复杂的部分:

# 服务层:合并批处理和实时视图
import datetime
from typing import Dict

class LambdaServingLayer:
    def __init__(self, batch_store, realtime_store):
        self.batch_store = batch_store      # 批处理视图(如 Hive)
        self.realtime_store = realtime_store  # 实时视图(如 Redis)

    def query_order_summary(self, region: str, date: str) -> Dict:
        # 批处理视图的最新数据截止时间
        batch_cutoff = self.batch_store.get_latest_batch_time()

        # 从批处理视图获取基准数据
        batch_result = self.batch_store.query(
            region=region,
            date=date,
            up_to=batch_cutoff
        )

        # 从实时视图获取增量数据(批处理截止时间之后的数据)
        realtime_result = self.realtime_store.query(
            region=region,
            date=date,
            after=batch_cutoff
        )

        # 合并结果
        return {
            "region": region,
            "date": date,
            "order_count": batch_result["order_count"]
                         + realtime_result["order_count"],
            "total_revenue": batch_result["total_revenue"]
                           + realtime_result["total_revenue"],
            "data_source": "merged",
            "batch_cutoff": str(batch_cutoff),
            "freshness": str(datetime.datetime.now())
        }

3.5 Lambda 架构的核心问题

Lambda 架构在理论上很优雅,但在工程实践中有三个致命问题:

双倍代码维护。 批处理层和速度层使用不同的计算引擎(Spark vs. Flink),需要编写两套逻辑。即使业务逻辑完全相同,两种引擎的 API、语义、数据模型都不一样。“订单金额汇总”这么简单的逻辑,在 Spark SQL 和 Flink DataStream API 中的实现就有微妙的差异——窗口对齐方式不同、NULL 处理语义不同、浮点数精度不同。

数据一致性难保证。 两条路径独立运行,对同一份数据的处理结果可能不一致。批处理用的是 T+1 的完整数据,速度层用的是实时流数据。数据到达顺序、迟到数据处理策略、错误数据过滤规则——任何一个环节的差异都会导致最终结果不一致。

运维复杂度高。 两套计算引擎、两套存储系统、一个合并层——总共五个组件需要独立监控、独立调优、独立故障排查。当数据对不上时,要同时检查批处理任务是否正常完成、实时任务是否有延迟、合并逻辑的时间边界是否正确。

Jay Kreps(Kafka 的作者)在 2014 年发表了一篇文章”Questioning the Lambda Architecture”,直接指出了这个问题。他的替代方案就是 Kappa 架构。


四、Kappa 架构:简化思路与适用边界

4.1 Kappa 架构的核心设计

Kappa 架构由 Jay Kreps 在 2014 年提出,核心思想是:只保留速度层,用一套流处理引擎同时处理实时数据和历史数据重放。如果需要重新计算历史数据(比如修正了计算逻辑),不是跑一个批处理任务,而是从 Kafka 中重放历史事件,让流处理引擎重新处理一遍。

graph LR
    A[数据源] --> B[消息队列<br/>Kafka<br/>长期保留]
    B --> C[流处理引擎<br/>Flink/Kafka Streams]
    C --> D[服务层<br/>数据库/缓存]
    D --> E[查询接口]

    style B fill:#e8f4f8,stroke:#2196F3

Kappa 架构的前提条件是消息队列能够长期保留历史数据。Kafka 通过分层存储(Tiered Storage)可以实现近乎无限期的数据保留——热数据在本地磁盘,冷数据在对象存储。

4.2 重放机制的实现

Kappa 架构的关键能力是”重放”——当计算逻辑发生变化时,启动一个新的流处理任务,从 Kafka 的起始偏移量开始重新消费所有历史事件:

# Kappa 架构的重放机制(Flink Python API)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

env = StreamExecutionEnvironment.get_execution_environment()

# 重放模式:从最早的 offset 开始消费
replay_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_topics("order-events") \
    .set_group_id("kappa-replay-v2") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(OrderEventDeserializer()) \
    .build()

# 使用与实时处理完全相同的处理逻辑
orders = env.from_source(
    replay_source,
    WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(10)),
    "ReplaySource"
)

# 相同的处理逻辑,输出到新的目标表
result = process_orders(orders)  # 复用同一个函数
result.add_sink(create_sink("order_summary_v2"))

env.execute("Kappa-Replay-V2")

4.3 Kappa 架构的适用边界

Kappa 架构在概念上比 Lambda 简单得多,但它不是万能的。它有三个明确的适用边界:

第一,重放时间。 当历史数据量达到 PB 级时,从 Kafka 重放所有事件可能需要几天甚至几周。在这段时间内,新旧两个版本的流处理任务同时运行,需要双倍的计算资源。如果业务不能容忍这个时间窗口和资源开销,Kappa 架构就不合适。

第二,计算类型。 流处理引擎擅长的是事件驱动的增量计算——计数、求和、窗口聚合。但某些分析场景需要全量扫描——比如计算 Top-K、做复杂的 JOIN、训练机器学习模型。这些计算在流处理引擎上要么效率极低,要么根本无法实现。

第三,Kafka 的存储成本。 虽然 Kafka 支持分层存储,但长期保留所有原始事件的成本不低。对于每天产生 TB 级数据的系统,保留一年的历史数据意味着 PB 级的存储开销。

下表对比了 Lambda 和 Kappa 架构在关键维度上的差异:

维度 Lambda 架构 Kappa 架构
代码维护 两套代码(批+流) 一套代码(流)
数据一致性 难保证(两套逻辑) 天然一致(同一逻辑)
历史重算 批处理任务(快) 事件重放(慢)
适用计算类型 批+流都擅长 增量计算为主
存储成本 中(批处理存储) 高(Kafka 长期保留)
运维复杂度 高(五个组件) 中(三个组件)
延迟 秒级(速度层) 秒级
成熟度 高(广泛应用) 中(场景受限)

4.4 Lambda 与 Kappa 的共同瓶颈

不管是 Lambda 还是 Kappa,它们都有一个共同的架构瓶颈:存储层缺乏事务和管理能力

Lambda 架构的批处理视图通常存在 HDFS/S3 上的 Parquet 文件中,没有 ACID 事务。速度层的数据存在 HBase 或 Redis 中,有事务但容量有限。两个存储系统之间没有统一的 schema 管理和元数据管理。

Kappa 架构的存储分两部分:Kafka 存原始事件,下游数据库存计算结果。Kafka 不支持随机读取和 SQL 查询,下游数据库不存原始数据。两者之间存在信息断层。

Lakehouse 架构的核心贡献,正是在存储层加上了事务、schema 管理和统一查询的能力。


五、Lakehouse 架构——Delta Lake 与 Apache Iceberg 的设计原理

5.1 Lakehouse 的设计动机

Lakehouse 架构由 Databricks 在 2020 年系统提出(论文”Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Data Lakes”)。它的设计动机很直接:在数据湖的低成本存储之上,补上数据仓库的事务、schema 管理和查询优化能力

Lakehouse 不是数据湖和数据仓库的简单叠加。它的关键创新是在对象存储(S3/ADLS/GCS)之上增加了一个事务元数据层(Transactional Metadata Layer),这个元数据层提供了:

graph TB
    subgraph "Lakehouse 架构"
        A[BI 工具 / SQL 客户端] --> B[查询引擎<br/>Spark / Trino / Flink]
        B --> C[事务元数据层<br/>Delta Lake / Iceberg / Hudi]
        C --> D[对象存储<br/>S3 / ADLS / GCS]
    end

    subgraph "数据写入"
        E[批处理 ETL] --> C
        F[流处理<br/>Flink / Spark Streaming] --> C
        G[CDC 管道<br/>Debezium] --> C
    end

    subgraph "数据消费"
        A
        H[数据科学<br/>Pandas / Notebook] --> B
        I[机器学习<br/>训练管道] --> B
    end

5.2 Delta Lake 的核心机制

Delta Lake 是 Databricks 开源的存储层,它的核心设计围绕一个事务日志(Transaction Log)展开。

事务日志(Delta Log)

Delta Lake 在数据目录下维护一个 _delta_log/ 目录,里面的 JSON 文件记录了对数据的每一次操作——添加文件、删除文件、更新元数据、修改 schema:

s3://data-lake/orders/
  _delta_log/
    00000000000000000000.json   # 第一次写入:添加了 3 个 Parquet 文件
    00000000000000000001.json   # 第二次写入:添加了 2 个文件,删除了 1 个
    00000000000000000002.json   # 第三次写入:schema 演进,新增列
    00000000000000000010.checkpoint.parquet  # 每 10 个版本的检查点
  part-00000-xxxx.parquet
  part-00001-xxxx.parquet
  ...

每个 JSON 日志文件包含一系列”动作”(Action):

{
  "add": {
    "path": "part-00000-abc123.parquet",
    "size": 52428800,
    "partitionValues": {"order_date": "2026-04-13"},
    "modificationTime": 1744531200000,
    "dataChange": true,
    "stats": "{\"numRecords\":125000,\"minValues\":{\"order_id\":1},\"maxValues\":{\"order_id\":125000}}"
  }
}
{
  "remove": {
    "path": "part-00000-old456.parquet",
    "deletionTimestamp": 1744531200000,
    "dataChange": true
  }
}

乐观并发控制(Optimistic Concurrency Control)

Delta Lake 使用乐观并发控制来处理多个写入者的冲突。每个写入者在提交时,检查自己读取的版本是否仍然是最新版本。如果不是,根据冲突检测规则决定是否需要重试:

# Delta Lake 的 ACID 事务示例
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions",
            "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 原子性写入:要么全部成功,要么全部回滚
new_orders = spark.read.json("s3://staging/orders/2026-04-13/")
new_orders.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("order_date") \
    .save("s3://data-lake/orders/")

# MERGE 操作:upsert(更新+插入)
delta_table = DeltaTable.forPath(spark, "s3://data-lake/orders/")

delta_table.alias("target").merge(
    new_orders.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdate(set={
    "order_status": "source.order_status",
    "updated_at": "source.updated_at"
}).whenNotMatchedInsertAll().execute()

# 时间旅行:查询历史版本的数据
historical_orders = spark.read \
    .format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://data-lake/orders/")

# 或者按时间戳查询
yesterday_orders = spark.read \
    .format("delta") \
    .option("timestampAsOf", "2026-04-12T00:00:00") \
    .load("s3://data-lake/orders/")

Schema 约束与演进

Delta Lake 默认开启 schema 约束——如果写入的数据 schema 与表的 schema 不匹配,写入会被拒绝:

# Schema 约束:阻止不匹配的写入
# 假设表的 schema 是 (order_id: long, amount: double, status: string)
bad_data = spark.createDataFrame([
    (1, "not_a_number", "completed")  # amount 应该是 double,这里是 string
], ["order_id", "amount", "status"])

# 这行会抛出 AnalysisException:
# 数据 schema 和表 schema 不匹配
bad_data.write.format("delta").mode("append") \
    .save("s3://data-lake/orders/")

# Schema 演进:显式允许新增列
new_data_with_extra_col = spark.createDataFrame([
    (1001, 99.99, "completed", "CN")
], ["order_id", "amount", "status", "country"])

new_data_with_extra_col.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("s3://data-lake/orders/")

5.3 Apache Iceberg 的核心机制

Apache Iceberg 是 Netflix 开源的表格式(Table Format),与 Delta Lake 解决同一类问题,但设计思路有所不同。

三层元数据结构

Iceberg 使用三层元数据结构来跟踪数据:

  1. Catalog:存储表的当前元数据指针(指向最新的 metadata file)。
  2. Metadata File(元数据文件):JSON 格式,记录表的 schema、分区规范、快照列表。
  3. Manifest List / Manifest File:记录每个快照包含哪些数据文件及其统计信息。
Iceberg 元数据结构:

Catalog(如 Hive Metastore / Nessie / REST Catalog)
  └── metadata pointer
        └── metadata/v3.metadata.json
              ├── schema: {order_id: long, amount: double, ...}
              ├── partition-spec: [date(order_time)]
              └── snapshots:
                    ├── snap-001 → manifest-list-001.avro
                    │     ├── manifest-abc.avro → [file1.parquet, file2.parquet]
                    │     └── manifest-def.avro → [file3.parquet]
                    └── snap-002 → manifest-list-002.avro
                          ├── manifest-abc.avro → [file1.parquet, file2.parquet]
                          └── manifest-ghi.avro → [file4.parquet]  # file3 被替换

分区演进(Partition Evolution)

Iceberg 最突出的特性之一是分区演进——可以在不重写历史数据的情况下修改分区策略。这对于随着数据量增长需要调整分区粒度的场景非常有价值:

-- Iceberg 分区演进示例(Spark SQL)

-- 创建表,初始按月分区
CREATE TABLE catalog.db.orders (
    order_id    BIGINT,
    order_time  TIMESTAMP,
    customer_id BIGINT,
    amount      DOUBLE,
    status      STRING
) USING iceberg
PARTITIONED BY (month(order_time));

-- 数据量增长后,改为按天分区
-- 历史数据不需要重写,新数据按天分区,旧数据仍按月分区
ALTER TABLE catalog.db.orders
    SET PARTITION SPEC (day(order_time));

-- 进一步细化:按小时分区(用于高频写入场景)
ALTER TABLE catalog.db.orders
    SET PARTITION SPEC (hour(order_time));

-- Iceberg 查询引擎会自动处理混合分区:
-- 扫描 2024 年数据时用月分区过滤
-- 扫描 2025 年数据时用天分区过滤
-- 扫描 2026 年数据时用小时分区过滤
SELECT * FROM catalog.db.orders
WHERE order_time BETWEEN '2026-04-01' AND '2026-04-13';

隐式分区(Hidden Partitioning)

传统 Hive 分区要求用户显式指定分区列和分区值,查询时也要显式使用分区列过滤。Iceberg 的分区是”隐式”的——用户只需要用原始列过滤,Iceberg 自动利用分区信息跳过无关文件:

-- Hive 风格:用户必须知道分区结构
SELECT * FROM hive_orders
WHERE year = 2026 AND month = 4 AND day = 13;  -- 必须按分区列查询

-- Iceberg 风格:用户用原始列查询,引擎自动做分区裁剪
SELECT * FROM iceberg_orders
WHERE order_time >= '2026-04-13 00:00:00'
  AND order_time <  '2026-04-14 00:00:00';
-- Iceberg 自动将时间条件映射到分区,跳过无关文件

5.4 Delta Lake 与 Iceberg 对比

特性 Delta Lake Apache Iceberg
发起方 Databricks Netflix / Apache
事务日志格式 JSON + Checkpoint Parquet JSON Metadata + Avro Manifest
分区演进 不支持(需重写数据) 原生支持
隐式分区 不支持 原生支持
Schema 演进 支持(列追加/重命名) 支持(全面,含列提升)
时间旅行 支持 支持
计算引擎绑定 Spark 优先(对其他引擎支持逐步完善) 引擎无关(Spark/Flink/Trino/Presto)
行级操作 MERGE/UPDATE/DELETE MERGE/UPDATE/DELETE
并发控制 乐观并发 乐观并发 + 分支隔离
社区生态 Databricks 主导 开放社区(Apple/Netflix/LinkedIn)

选择 Delta Lake 还是 Iceberg,关键看两个因素:如果团队深度使用 Databricks 平台,Delta Lake 的集成体验更好;如果需要多引擎支持(Spark + Flink + Trino)和分区演进能力,Iceberg 的开放设计更有优势。


六、流批一体的工程挑战

6.1 流批一体的定义

流批一体(Stream-Batch Unification)是 Lakehouse 架构的核心目标之一:用同一套代码、同一个计算引擎、读写同一个存储系统,同时处理实时流数据和历史批数据。它不只是”流和批用同一个引擎”,而是要求端到端的统一——从数据摄入到处理逻辑到输出目标,全链路使用同一套抽象。

Apache Flink 是目前实现流批一体最成熟的引擎。Flink 的设计哲学是”流是批的超集”——批处理是有限数据流的特例。这意味着用 Flink 写的流处理逻辑,可以不修改代码直接用于批处理:

-- Flink SQL:同一段 SQL 可以跑在流模式和批模式下

-- 定义 Kafka 数据源(流模式)
CREATE TABLE order_events (
    order_id    BIGINT,
    customer_id BIGINT,
    amount      DOUBLE,
    event_time  TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- 定义 Iceberg 输出表
CREATE TABLE order_summary (
    window_start TIMESTAMP(3),
    region       STRING,
    order_count  BIGINT,
    total_amount DOUBLE
) WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'lakehouse',
    'catalog-type' = 'hive',
    'warehouse' = 's3://data-lake/warehouse'
);

-- 同一段聚合逻辑:
-- 流模式下,每分钟输出一次增量结果
-- 批模式下,一次性处理全部历史数据
INSERT INTO order_summary
SELECT
    TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
    region,
    COUNT(order_id) AS order_count,
    SUM(amount) AS total_amount
FROM order_events
GROUP BY
    TUMBLE(event_time, INTERVAL '1' MINUTE),
    region;

6.2 工程挑战一:语义一致性

流模式和批模式的语义差异是流批一体最大的工程挑战。即使使用同一段代码,以下几个方面的行为在流和批之间仍然可能不同:

窗口语义。 在流模式下,窗口由时间驱动触发(水位线推进到窗口结束时间时触发计算)。在批模式下,窗口由数据驱动(数据扫描到窗口边界时触发)。如果数据存在乱序或延迟,两种模式的窗口归属可能不同。

状态管理。 流模式需要维护状态(窗口中间结果、计数器等),状态大小随时间增长,需要 TTL 和清理策略。批模式不需要持久状态——所有数据一次性可用,引擎可以用排序+聚合来替代状态维护。

错误处理。 流模式下一条错误数据可能导致流处理任务持续失败和重启。批模式下可以先跳过错误数据,处理完所有正常数据后再统一处理错误记录。

# 流批一体的语义对齐:显式处理迟到数据
from pyflink.table import EnvironmentSettings, TableEnvironment

# 批模式设置
batch_settings = EnvironmentSettings.new_instance() \
    .in_batch_mode() \
    .build()
batch_env = TableEnvironment.create(batch_settings)

# 流模式设置
stream_settings = EnvironmentSettings.new_instance() \
    .in_streaming_mode() \
    .build()
stream_env = TableEnvironment.create(stream_settings)

# 关键:在两种模式下使用完全相同的 SQL
# 但需要额外配置来保证语义一致
UNIFIED_SQL = """
    SELECT
        window_start,
        region,
        COUNT(*) AS order_count,
        SUM(amount) AS total_amount
    FROM TABLE(
        TUMBLE(TABLE order_events, DESCRIPTOR(event_time), INTERVAL '1' HOUR)
    )
    GROUP BY window_start, region
"""

# 批模式执行
batch_env.execute_sql(UNIFIED_SQL)

# 流模式执行(需要额外配置水位线和允许延迟)
stream_env.get_config().set(
    "table.exec.emit.early-fire.enabled", "true"
)
stream_env.get_config().set(
    "table.exec.emit.early-fire.delay", "60s"
)
stream_env.execute_sql(UNIFIED_SQL)

6.3 工程挑战二:Exactly-Once 保证

流批一体要求端到端的精确一次(Exactly-Once)语义。这意味着从数据源读取、处理、到写入目标存储,每条数据恰好被处理一次——不多不少。

在批处理中,Exactly-Once 相对简单——任务失败就重跑,输出采用”覆盖写”即可。在流处理中,Exactly-Once 需要分布式快照(Checkpoint)和两阶段提交(Two-Phase Commit)的配合:

// Flink + Iceberg 的 Exactly-Once 写入配置
StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,间隔 60 秒
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(
    CheckpointingMode.EXACTLY_ONCE
);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(300000);

// Iceberg Sink 配合 Checkpoint 实现 Exactly-Once
// 每次 Checkpoint 完成时,Iceberg Sink 将暂存的数据文件
// 原子性地提交到 Iceberg 表
FlinkSink.forRowData(dataStream)
    .table(icebergTable)
    .tableLoader(tableLoader)
    .writeParallelism(4)
    .upsert(true)  // 开启 upsert 模式
    .equalityFieldColumns(Arrays.asList("order_id"))
    .append();

env.execute("ExactlyOnce-Iceberg-Sink");

6.4 工程挑战三:小文件问题

流处理以高频率持续写入数据,每次 Checkpoint 都会生成新的数据文件。如果 Checkpoint 间隔是 1 分钟,每天就会生成 1440 个小文件——一年下来超过 50 万个。大量小文件会严重降低查询性能(文件元数据开销远大于数据扫描开销)并增加存储成本(对象存储按请求次数计费)。

解决方案是定期做文件合并(Compaction):

-- Delta Lake 的 OPTIMIZE 命令:合并小文件
OPTIMIZE delta.`s3://data-lake/orders/`
WHERE order_date >= '2026-04-01';

-- Z-ORDER 优化:按查询模式重新组织数据布局
OPTIMIZE delta.`s3://data-lake/orders/`
ZORDER BY (customer_id, order_date);

-- Iceberg 的 Compaction(通过 Spark 调用)
-- 重写小文件,合并为更大的文件
CALL catalog.system.rewrite_data_files(
    table => 'db.orders',
    strategy => 'sort',
    sort_order => 'order_date ASC, customer_id ASC',
    options => map(
        'target-file-size-bytes', '134217728',  -- 128MB
        'min-file-size-bytes', '67108864',       -- 64MB
        'max-file-size-bytes', '201326592'       -- 192MB
    )
);

-- 清理过期快照,释放被删除文件的存储空间
CALL catalog.system.expire_snapshots(
    table => 'db.orders',
    older_than => TIMESTAMP '2026-04-06 00:00:00',
    retain_last => 10
);

6.5 三种架构的全面对比

下面这张表从十个维度对比 Lambda、Kappa 和 Lakehouse 三种架构:

维度 Lambda Kappa Lakehouse
代码维护 两套(批+流) 一套(流) 一套(流批一体)
存储层 HDFS + HBase/Redis Kafka + DB 对象存储 + 表格式
事务支持 无(HDFS)/ 有(HBase) 有(Kafka 事务) 有(Delta/Iceberg)
Schema 管理 强(约束+演进)
时间旅行 不支持 通过 Kafka offset 原生支持
查询引擎 Hive/Presto(批)+ 自建(实时) 受限于流引擎 多引擎(Spark/Trino/Flink)
数据新鲜度 秒级(速度层) 秒级 分钟级(微批)到秒级
历史重算 批处理(快) 事件重放(慢) 批处理(快)
非结构化数据 支持(数据湖部分) 不擅长 支持
运维复杂度 中低

七、数据质量保证架构

7.1 数据质量的三个层次

Lakehouse 解决了存储层的事务和 schema 问题,但数据质量是一个更广泛的问题。数据质量保证需要在三个层次上建设:

  1. 技术层:数据是否符合 schema?字段类型是否正确?是否有 NULL 值?
  2. 业务层:数据是否符合业务规则?订单金额是否为正数?用户年龄是否在合理范围内?
  3. 时效层:数据是否按时到达?数据新鲜度是否满足 SLA?

7.2 数据契约(Data Contract)

数据契约(Data Contract)是近年来兴起的数据治理理念:上游数据生产者和下游数据消费者之间签订一份”合同”,明确约定数据的 schema、质量标准、更新频率和 SLA。当上游违反契约时,系统自动告警或阻断。

# 数据契约定义示例(data-contract.yaml)
apiVersion: datacontract/v1.0
kind: DataContract
metadata:
  name: order-events-contract
  owner: order-service-team
  consumers:
    - analytics-team
    - recommendation-team

schema:
  type: object
  properties:
    order_id:
      type: integer
      description: "订单唯一标识"
      constraints:
        - not_null
        - unique
    customer_id:
      type: integer
      description: "客户 ID"
      constraints:
        - not_null
        - "references: customer_master.customer_id"
    amount:
      type: number
      description: "订单金额(单位:元)"
      constraints:
        - not_null
        - "range: [0.01, 10000000]"
    order_time:
      type: timestamp
      description: "下单时间"
      constraints:
        - not_null
        - "freshness: <= 5 minutes"
    status:
      type: string
      description: "订单状态"
      constraints:
        - not_null
        - "enum: [created, paid, shipped, completed, cancelled]"

quality:
  completeness:
    threshold: 99.5%      # 非 NULL 字段的完整率不低于 99.5%
  uniqueness:
    columns: [order_id]
    threshold: 100%        # 主键必须完全唯一
  freshness:
    max_delay: 5 minutes   # 数据延迟不超过 5 分钟
  volume:
    min_records_per_hour: 1000   # 每小时至少 1000 条记录
    max_records_per_hour: 500000 # 每小时不超过 50 万条

sla:
  availability: 99.9%
  notification:
    channels: [slack, pagerduty]
    on_violation: block_pipeline  # 违反契约时阻断管道

7.3 Schema 校验的自动化实现

在数据管道中嵌入自动化 schema 校验,可以在数据进入 Lakehouse 之前拦截不合规的数据:

# 基于 Great Expectations 的数据质量校验
import great_expectations as gx

# 创建数据上下文
context = gx.get_context()

# 定义数据源
datasource = context.data_sources.add_spark("lakehouse_source")
data_asset = datasource.add_dataframe_asset("order_events")

# 定义期望套件(Expectation Suite)
suite = context.suites.add(
    gx.ExpectationSuite(name="order_events_quality")
)

# 添加数据质量规则
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount",
        min_value=0.01,
        max_value=10000000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="status",
        value_set=["created", "paid", "shipped", "completed", "cancelled"]
    )
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,
        max_value=500000
    )
)

# 运行校验
batch = data_asset.add_batch_definition_whole_dataframe(
    "daily_batch"
).get_batch(batch_parameters={"dataframe": order_df})

validation_result = batch.validate(suite)

# 根据校验结果决定是否继续管道
if not validation_result.success:
    failed_expectations = [
        r for r in validation_result.results if not r.success
    ]
    raise DataQualityException(
        f"数据质量校验失败:{len(failed_expectations)} 项不合格",
        details=failed_expectations
    )

7.4 数据血缘追踪(Data Lineage)

数据血缘(Data Lineage)回答一个关键问题:这条数据是从哪里来的,经过了哪些变换,最终到了哪里? 当数据出现异常时,血缘信息可以帮助快速定位问题源头。

# 基于 OpenLineage 的血缘追踪集成
# OpenLineage 是一个开放的血缘追踪标准
# 支持 Spark、Flink、Airflow 等引擎

# Spark 配置 OpenLineage
spark = SparkSession.builder \
    .appName("OrderETL") \
    .config("spark.extraListeners",
            "io.openlineage.spark.agent.OpenLineageSparkListener") \
    .config("spark.openlineage.transport.type", "http") \
    .config("spark.openlineage.transport.url",
            "http://lineage-server:5000") \
    .config("spark.openlineage.namespace", "production") \
    .getOrCreate()

# 正常编写 ETL 逻辑
# OpenLineage 自动捕获输入/输出/转换信息
raw_orders = spark.read.format("delta") \
    .load("s3://data-lake/raw/orders/")

cleaned_orders = raw_orders \
    .filter(F.col("amount") > 0) \
    .filter(F.col("status").isNotNull()) \
    .withColumn("amount_usd",
                F.col("amount") * F.col("exchange_rate"))

cleaned_orders.write.format("delta") \
    .mode("overwrite") \
    .save("s3://data-lake/curated/orders/")

# OpenLineage 自动记录:
# 输入:s3://data-lake/raw/orders/
# 输出:s3://data-lake/curated/orders/
# 转换:filter(amount > 0), filter(status IS NOT NULL),
#        withColumn(amount_usd = amount * exchange_rate)

7.5 数据质量监控的分层架构

完整的数据质量保证体系通常分为四层:

第四层:业务指标监控
    ├── 指标异常检测(同比/环比偏差告警)
    ├── 跨数据源一致性校验
    └── SLA 达成率追踪

第三层:管道级校验
    ├── 输入数据质量门禁(Great Expectations)
    ├── 输出数据质量校验
    └── 管道运行时长/数据量异常检测

第二层:表级保护
    ├── Schema 约束(Delta Lake / Iceberg 内置)
    ├── CHECK 约束(Delta Lake 支持)
    └── 分区完整性校验

第一层:基础设施层
    ├── 数据血缘(OpenLineage / Apache Atlas)
    ├── 数据目录(DataHub / Amundsen)
    └── 元数据管理(Hive Metastore / Iceberg Catalog)

八、工程案例:某电商平台从 Lambda 到 Lakehouse 的迁移

8.1 背景

某中型跨境电商平台(日均订单量约 50 万,日均事件量约 2 亿条)运营着一套 Lambda 架构的分析系统,已经运行了三年。架构组成:

8.2 面临的核心问题

这套系统面临三个持续恶化的问题:

问题一:数据口径不一致。 批处理管道用 Spark SQL 计算”有效订单金额”,逻辑是”order_status = ‘completed’ AND refund_amount = 0”。速度层用 Flink Java 代码做同样的过滤,但 Flink 中 refund_amount 字段在事件流中是异步到达的——订单完成时 refund_amount 还没有最终值,导致实时管道多计了一部分退款订单。每天早上运营看板的实时数据和批处理数据的差异在 3%-8% 之间。

问题二:运维成本高。 三套系统(Spark、Flink、ClickHouse)各有自己的集群管理、监控告警、故障处理流程。团队 8 个人,其中 3 个全职做运维。任何一个组件的版本升级都可能影响其他组件。

问题三:数据湖已成沼泽。 三年来各团队往 S3 倾倒了大量原始数据,没有统一的 schema 管理和元数据管理。数据科学团队想做用户行为分析,花了两周时间才搞清楚事件数据的字段含义和数据质量。

8.3 迁移方案

团队选择迁移到 Lakehouse 架构,核心技术选型:

迁移分三个阶段:

阶段一(4 周):存储层升级。 将 Hive 表迁移到 Iceberg 表格式,保持计算层不变。Spark 批处理任务改为读写 Iceberg 表,验证数据一致性。

# 阶段一:Hive 表迁移到 Iceberg
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MigrateToIceberg") \
    .config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.lakehouse.type", "hive") \
    .config("spark.sql.catalog.lakehouse.uri", "thrift://metastore:9083") \
    .getOrCreate()

# 从 Hive 表读取数据
hive_orders = spark.sql("SELECT * FROM hive_db.orders")

# 创建 Iceberg 表
spark.sql("""
    CREATE TABLE lakehouse.analytics.orders (
        order_id        BIGINT,
        customer_id     BIGINT,
        order_time      TIMESTAMP,
        amount          DOUBLE,
        refund_amount   DOUBLE,
        status          STRING,
        region          STRING
    ) USING iceberg
    PARTITIONED BY (days(order_time))
    TBLPROPERTIES (
        'write.format.default' = 'parquet',
        'write.parquet.compression-codec' = 'zstd',
        'write.target-file-size-bytes' = '134217728'
    )
""")

# 数据迁移
hive_orders.writeTo("lakehouse.analytics.orders").append()

# 数据一致性校验
iceberg_count = spark.sql(
    "SELECT COUNT(*) FROM lakehouse.analytics.orders"
).collect()[0][0]
hive_count = spark.sql(
    "SELECT COUNT(*) FROM hive_db.orders"
).collect()[0][0]

assert iceberg_count == hive_count, \
    f"数据量不一致:Iceberg={iceberg_count}, Hive={hive_count}"

阶段二(6 周):计算层统一。 将 Spark 批处理任务和 Flink 实时任务统一为 Flink SQL,消除双路径的代码差异。

-- 阶段二:统一的 Flink SQL(流批共用)
-- 这段 SQL 既可以在流模式下运行(实时消费 Kafka)
-- 也可以在批模式下运行(全量扫描 Iceberg 表)

-- 流模式数据源
CREATE TABLE kafka_orders (
    order_id        BIGINT,
    customer_id     BIGINT,
    order_time      TIMESTAMP(3),
    amount          DOUBLE,
    refund_amount   DOUBLE,
    status          STRING,
    region          STRING,
    WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 批模式数据源(同一张 Iceberg 表)
CREATE TABLE iceberg_orders (
    order_id        BIGINT,
    customer_id     BIGINT,
    order_time      TIMESTAMP(3),
    amount          DOUBLE,
    refund_amount   DOUBLE,
    status          STRING,
    region          STRING
) WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'lakehouse',
    'catalog-type' = 'hive',
    'warehouse' = 's3://data-lake/warehouse'
);

-- 统一的计算逻辑(核心:用同一段 SQL)
-- 关键修改:refund_amount 使用 COALESCE 处理异步到达
INSERT INTO iceberg_order_summary
SELECT
    TUMBLE_START(order_time, INTERVAL '1' HOUR) AS window_start,
    region,
    COUNT(*) AS order_count,
    SUM(CASE
        WHEN status = 'completed'
             AND COALESCE(refund_amount, 0) = 0
        THEN amount
        ELSE 0
    END) AS effective_revenue,
    SUM(amount) AS total_revenue
FROM kafka_orders  -- 流模式用 kafka_orders,批模式换成 iceberg_orders
GROUP BY
    TUMBLE(order_time, INTERVAL '1' HOUR),
    region;

阶段三(3 周):数据质量体系建设。 部署 Great Expectations 做管道级校验,OpenLineage 做血缘追踪,建立数据契约。

8.4 迁移效果

迁移完成后,关键指标的变化:

指标 迁移前(Lambda) 迁移后(Lakehouse)
数据口径差异率 3%-8% < 0.1%(流批同源同逻辑)
数据管道代码量 约 12000 行(Spark + Flink Java) 约 3500 行(Flink SQL)
运维人力 3 人全职 1 人兼职
数据新鲜度 T+1(批)+ 秒级(实时) 分钟级(统一)
数据质量问题发现时间 平均 4 小时(人工发现) 平均 5 分钟(自动告警)
月度基础设施成本 约 $18,000 约 $12,000

需要注意的是,迁移过程本身的代价不低:三个阶段合计 13 周,团队投入了 5 个工程师。迁移期间两套系统并行运行,基础设施成本一度达到 $28,000/月。这个投入对于日均 50 万订单的中型平台是值得的,但对于更小规模的团队需要重新评估 ROI。


九、架构选型决策框架

9.1 决策树

面对 Lambda、Kappa 和 Lakehouse 三种架构,选型决策可以按以下路径进行:

数据量级是否超过 TB 级?
├── 否 → 考虑简单的批处理 + 缓存方案,不需要复杂架构
└── 是 → 是否需要秒级数据新鲜度?
    ├── 否 → 是否需要多引擎支持和复杂分析?
    │   ├── 否 → 云数据仓库(Snowflake/BigQuery)可能就够了
    │   └── 是 → Lakehouse(Delta Lake 或 Iceberg)
    └── 是 → 是否能接受流批两套代码的维护成本?
        ├── 是 → Lambda 架构(成熟度高,生态完善)
        └── 否 → 数据类型是否主要是事件/日志类?
            ├── 是 → Kappa 架构(流处理 + Kafka 长期存储)
            └── 否 → Lakehouse + Flink(流批一体)

9.2 关键决策因素

在最终拍板之前,需要回答以下五个问题:

  1. 团队能力:团队是否有 Flink/Spark 的运维经验?如果没有,Lakehouse 的学习曲线需要计入成本。
  2. 数据类型:如果大量数据是非结构化的(图片、视频、日志),数据湖/Lakehouse 是必须的。如果全是结构化数据,云数据仓库可能更简单。
  3. 查询模式:如果查询模式固定且可预测(报表、仪表盘),预聚合 + 数据仓库的方案性价比最高。如果需要灵活的探索式分析,Lakehouse 的开放格式更有优势。
  4. 新鲜度要求:T+1 可接受就用批处理,分钟级用微批(Spark Structured Streaming),秒级用流处理(Flink)。不同新鲜度的工程复杂度和成本差异巨大。
  5. 合规要求:如果有 GDPR/CCPA 的”被遗忘权”(Right to Erasure)要求,需要行级删除能力——传统数据湖不支持,Delta Lake 和 Iceberg 支持。

9.3 避免过度架构

最后一个建议:不要为了”先进”而选择复杂架构。很多团队在日均数据量只有 GB 级的情况下就上 Lakehouse,在只需要 T+1 报表的场景下就搞流批一体。这种过度架构(Over-Engineering)带来的运维成本远超它解决的问题。

一个务实的原则是:从最简单的方案开始,当具体瓶颈出现时再演进

-- 最简方案示例:PostgreSQL + dbt
-- 对于 GB 级数据,这可能就是全部所需

-- dbt 模型文件:models/order_summary.sql
{{ config(materialized='table', schema='analytics') }}

WITH completed_orders AS (
    SELECT
        DATE_TRUNC('day', order_time) AS order_date,
        region,
        COUNT(*) AS order_count,
        SUM(amount) AS total_revenue,
        SUM(CASE WHEN refund_amount = 0 THEN amount ELSE 0 END)
            AS effective_revenue,
        COUNT(DISTINCT customer_id) AS unique_customers
    FROM {{ source('raw', 'orders') }}
    WHERE status = 'completed'
    GROUP BY 1, 2
)

SELECT
    order_date,
    region,
    order_count,
    total_revenue,
    effective_revenue,
    unique_customers,
    effective_revenue / NULLIF(order_count, 0) AS avg_effective_value
FROM completed_orders

导航

上一篇: 多模数据库选型

下一篇: 流处理架构


参考资料

  1. Armbrust, M., et al. “Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Data Lakes.” CIDR, 2021.
  2. Marz, N., Warren, J. “Big Data: Principles and Best Practices of Scalable Real-Time Data Systems.” Manning, 2015.
  3. Kreps, J. “Questioning the Lambda Architecture.” O’Reilly Radar, 2014.
  4. Apache Iceberg Documentation. “Table Spec.” https://iceberg.apache.org/spec/
  5. Delta Lake Documentation. “Delta Transaction Log Protocol.” https://github.com/delta-io/delta/blob/master/PROTOCOL.md
  6. Inmon, W. H. “Building the Data Warehouse.” Wiley, 2005.
  7. Dehghani, Z. “Data Mesh: Delivering Data-Driven Value at Scale.” O’Reilly, 2022.
  8. Apache Flink Documentation. “Streaming Concepts.” https://nightlies.apache.org/flink/flink-docs-stable/
  9. Andrew Jones. “Data Contracts.” https://datacontract.com/
  10. OpenLineage Project. “OpenLineage Spec.” https://openlineage.io/

同主题继续阅读

把当前热点继续串成多页阅读,而不是停在单篇消费。

2026-04-13 · architecture

【系统架构设计百科】Serverless 架构:冷启动、成本模型与适用场景

2023 年,Datadog 发布的年度 Serverless 报告显示,超过 70% 的 AWS 用户已在生产环境中使用 Lambda,平均每个组织部署了超过 1000 个 Lambda 函数。然而,同一份报告也指出,冷启动(Cold Start)仍然是开发者最关注的性能问题——在 Java 运行时中,P99 冷启动…

2026-04-13 · architecture

【系统架构设计百科】架构质量属性:不只是"高可用高性能"

需求评审时写下的'高可用、高性能、高并发',到了架构设计阶段几乎无法落地——因为它们不是可执行的需求。本文从 SEI/CMU 的质量属性理论出发,用 stimulus-response 场景模型把模糊需求变成可量化、可验证的架构约束,并拆解属性之间的冲突与联动关系。

2026-04-13 · architecture

【系统架构设计百科】告警策略:如何避免"狼来了"

大多数团队的告警系统都在制造噪声而不是传递信号。阈值告警看似直观,实则产生大量误报和漏报,值班工程师在凌晨三点被叫醒,却发现只是一次无害的毛刺。本文从告警疲劳的工业数据出发,拆解基于 SLO 的多窗口燃烧率告警算法,深入 Alertmanager 的路由、抑制与分组机制,结合 PagerDuty 的告警疲劳研究和真实工程案例,给出一套可落地的告警策略设计方法。


By .