假设你在 Hadoop MapReduce 上跑一个 PageRank 迭代:每轮迭代要从 HDFS 读全量数据,做一次 map-shuffle-reduce,再把结果写回 HDFS。20 轮迭代意味着 20 次全量磁盘读写。一个 100 GB 的图数据集,每轮迭代光磁盘 I/O 就要几分钟,整个作业跑下来几个小时。问题的核心不在于 MapReduce 的计算逻辑有多慢,而在于它的编程模型强制每个阶段的中间结果都要落盘——这对迭代式算法和交互式查询来说是致命的。
2010 年,Matei Zaharia 在 Berkeley AMPLab 提出了弹性分布式数据集(Resilient Distributed Dataset,RDD)的抽象。RDD 的核心想法很直接:把中间结果留在内存里,用血统(Lineage)图来保证容错,而不是靠复制或写磁盘。这个简单的改变让 Spark 在迭代式机器学习任务上比 MapReduce 快了几十倍。
本文拆解 Spark 内核的关键组件:RDD 抽象与依赖模型、DAG 调度器的 Stage 划分、Sort-based Shuffle 机制、Tungsten 堆外内存与代码生成、Catalyst 优化器的逻辑-物理计划转换,以及从 RDD 到 DataFrame/Dataset 的 API 演进。同时会讨论 Spark 架构中 Driver 单点和 JVM 垃圾回收(Garbage Collection,GC)带来的实际问题。
一、RDD 核心抽象
五个属性
RDD 在 Spark
源码中由五个方法定义(spark-core
模块,RDD.scala):
getPartitions:返回分区列表。RDD 的并行度由分区数决定,每个分区对应一个 Task。getDependencies:返回对父 RDD 的依赖列表。依赖分窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)两种。compute(partition):给定一个分区,计算该分区的迭代器。这是 RDD 的核心计算逻辑。partitioner:可选的分区器(Partitioner),只有键值对 RDD 才需要。HashPartitioner和RangePartitioner是两个内置实现。preferredLocations(partition):返回分区的首选计算位置,用于数据本地性(Data Locality)调度。
这五个属性构成了 RDD 的完整契约。任何继承
RDD 的子类只需实现这五个方法,就可以接入 Spark
的调度和容错机制。
不可变性与惰性求值
RDD 一旦创建就不可修改。每次
map、filter、flatMap
操作都会生成一个新的 RDD,原始 RDD
不变。这种不可变性带来两个好处:第一,不需要加锁,多个 Task
可以安全地并行读同一份数据;第二,简化了容错——任何分区丢失都可以通过重新计算来恢复,不需要回滚。
RDD 的转换(Transformation)是惰性的。调用
map 或 filter 不会触发计算,只会在
RDD
的有向无环图(DAG)上添加一个节点。只有当用户调用行动(Action)操作——collect、count、saveAsTextFile
等——时,Spark 才会提交一个作业(Job),触发实际计算。
// 以下三行不会触发任何计算,只是构建 DAG
val lines = sc.textFile("hdfs:///logs/access.log") // HadoopRDD
val errors = lines.filter(_.contains("ERROR")) // FilteredRDD
val messages = errors.map(_.split("\t")(1)) // MappedRDD
// 调用 action 才触发执行
val result = messages.collect() // 此时 DAGScheduler 开始工作窄依赖与宽依赖
依赖类型是 Spark 调度和容错的核心分界线。
窄依赖(NarrowDependency):父 RDD
的每个分区最多被子 RDD 的一个分区使用。典型操作包括
map、filter、flatMap、union。窄依赖允许管道化执行(Pipelining)——多个窄依赖算子可以在同一个
Task
中连续执行,中间结果不需要物化到内存或磁盘。分区丢失时只需要重算对应的父分区,影响范围小。
宽依赖(ShuffleDependency):父 RDD
的每个分区会被子 RDD 的多个分区使用。典型操作包括
reduceByKey、groupByKey、join(无共同分区器时)、repartition。宽依赖意味着需要
Shuffle:上游所有分区的数据按 key
重新分配到下游分区。Shuffle 是 Spark
作业中最昂贵的操作——涉及磁盘写入、网络传输和排序。
// 窄依赖示例:map 和 filter 可以 pipeline
val rdd1 = sc.parallelize(1 to 1000000, 200)
val rdd2 = rdd1.map(_ * 2) // NarrowDependency: OneToOneDependency
val rdd3 = rdd2.filter(_ > 100) // NarrowDependency: OneToOneDependency
// 宽依赖示例:reduceByKey 需要 shuffle
val pairs = rdd3.map(x => (x % 10, x))
val reduced = pairs.reduceByKey(_ + _) // ShuffleDependency依赖类型直接决定了 Stage 的划分方式,这是下一节的主题。
Lineage 与容错
RDD 不依赖数据复制来保证容错。每个 RDD 记录了自己从哪些父 RDD 经过什么转换得来——这就是血统(Lineage)图。当某个分区丢失时(比如 Executor 宕机),Spark 可以根据 Lineage 从最近的物化点(检查点或 Shuffle 输出)开始重新计算丢失的分区。
这种设计在大多数情况下比复制更高效。对于窄依赖,重算只涉及单个分区链;对于宽依赖,Shuffle
输出会写入磁盘,所以不需要重算上游 Stage 的全部数据。但
Lineage 链过长时重算代价会很高。checkpoint()
可以截断 Lineage,把 RDD 持久化到可靠存储(如
HDFS),代价是一次额外的写入。
二、DAG 调度器
从 Action 到 Job
用户调用 collect() 或 save()
时,SparkContext 调用
DAGScheduler.submitJob()。DAGScheduler 从触发
Action 的最终 RDD 开始,沿着依赖关系向前回溯,构建出完整的
RDD DAG。
Stage 划分算法
DAGScheduler 把 DAG 切分为 Stage 的规则很简单:在每个宽依赖(ShuffleDependency)处切一刀。
具体流程(对应
DAGScheduler.getOrCreateShuffleMapStage() 和
getMissingAncestorShuffleDependencies()):
- 从最终 RDD 开始,递归遍历依赖。
- 遇到窄依赖,继续向父 RDD 回溯,纳入当前 Stage。
- 遇到宽依赖,创建一个新的
ShuffleMapStage作为当前 Stage 的父 Stage。 - 最终 RDD 所在的 Stage 是
ResultStage,负责产出最终结果。
Stage 之间的依赖关系构成一棵树(或 DAG):只有当一个 Stage 的所有父 Stage 都完成后,这个 Stage 才能开始执行。Stage 内部的所有算子在同一个 Task 中管道化执行,中间数据以迭代器形式在内存中流过,不需要物化。
一个具体的多阶段作业追踪
以一个典型的词频统计加排序为例,完整的算子链如下:
sc.textFile("hdfs://logs/*.txt") // 读取文本
.filter(_.nonEmpty) // 过滤空行
.flatMap(_.split("\\s+")) // 切分单词
.map(word => (word, 1)) // 映射为 (word, 1)
.reduceByKey(_ + _) // 按 key 聚合计数
.sortByKey() // 按 key 排序
.collect() // 收集结果到 DriverSpark 的 DAGScheduler 从最终 RDD
向前回溯,每遇到一个宽依赖(Shuffle 依赖)就切分出一个新的
Stage。上面这条链路会被切分为三个 Stage:
- Stage
0(ShuffleMapStage):
textFile→filter→flatMap→map。这四个算子之间全部是窄依赖,它们被管道化到同一个 Task 中执行,数据以迭代器形式在内存中流过,不产生中间物化。如果输入有 200 个分区,Stage 0 就会启动 200 个并行的ShuffleMapTask,每个 Task 处理一个分区,最终按reduceByKey的分区器将结果写入本地 Shuffle 文件。 - Shuffle
边界(reduceByKey):
reduceByKey需要把所有分区中相同 key 的数据汇聚到一起,这是一个宽依赖,产生第一次 Shuffle。 - Stage
1(ShuffleMapStage):
reduceByKey的 Shuffle Read 端。每个 Task 从上游所有 Mapper 拉取自己负责的分区数据,在本地做聚合。聚合完成后,结果再次按sortByKey的 Range 分区器写入 Shuffle 文件。 - Shuffle
边界(sortByKey):
sortByKey使用RangePartitioner进行重分区,产生第二次 Shuffle。 - Stage
2(ResultStage):
sortByKey的 Shuffle Read 端加上collect。每个 Task 拉取自己负责的 range 数据并排序,collect把结果发送回 Driver。这是最后一个 Stage,包含的是ResultTask。
关键观察:Stage 内部的并行度由分区数决定——200 个分区意味着 200 个 Task 可以同时在不同 Executor 上执行。Stage 之间则是串行的,Stage 1 必须等 Stage 0 的所有 Task 完成 Shuffle Write 后才能开始。
flowchart LR
subgraph Stage0["Stage 0(ShuffleMapStage)"]
direction TB
T0_1["Task 0: textFile → filter → flatMap → map"]
T0_2["Task 1: textFile → filter → flatMap → map"]
T0_N["Task N: ..."]
end
subgraph Shuffle1["Shuffle Write/Read"]
SW1["按 key hash 写入本地磁盘"]
end
subgraph Stage1["Stage 1(ShuffleMapStage)"]
direction TB
T1_1["Task 0: reduceByKey"]
T1_2["Task 1: reduceByKey"]
end
subgraph Shuffle2["Shuffle Write/Read"]
SW2["按 key range 写入本地磁盘"]
end
subgraph Stage2["Stage 2(ResultStage)"]
direction TB
T2_1["Task 0: sortByKey → collect"]
end
Stage0 --> Shuffle1 --> Stage1 --> Shuffle2 --> Stage2
上图展示了三个 Stage 的执行时序与它们之间的 Shuffle 边界。每个 Stage 内部的多个 Task 并行执行相同的算子管道,Stage 之间通过 Shuffle Write/Read 交换数据。Shuffle 环节既是并行度的分界点,也是容错恢复的检查点——如果下游 Stage 失败,只需要重跑下游 Stage 而不需要重算整个作业(前提是上游 Shuffle 数据仍然可用)。
Task 类型
Spark 有两种 Task:
- ShuffleMapTask:属于
ShuffleMapStage,负责读取输入分区、执行管道化的转换,然后把结果按 key 写入 Shuffle 文件(供下游 Stage 读取)。 - ResultTask:属于
ResultStage,负责计算最终结果并返回给 Driver。
每个 Stage 的 Task 数量等于该 Stage 最后一个 RDD
的分区数。Task 由 TaskScheduler 分配到 Executor
上执行,调度时会考虑数据本地性(PROCESS_LOCAL >
NODE_LOCAL > RACK_LOCAL > ANY)。
Stage 重试
如果某个 Stage 的部分 Task 失败(比如 Executor
丢失),DAGScheduler 会重新提交该 Stage 的失败
Task。如果 Shuffle 输出丢失(比如存储 Shuffle
数据的节点挂了),上游 Stage 需要重跑。这就是为什么 Shuffle
数据写入磁盘而不是只放内存——它是 Stage 之间的容错边界。
三、Sort-based Shuffle 机制
Shuffle 是 Spark 作业中最主要的性能瓶颈。每一次 Shuffle 都意味着三重开销的叠加:第一,大量的内存缓冲区用于排序和聚合,给 JVM 垃圾回收带来巨大压力,尤其在 Full GC 发生时会导致 Task 长时间停顿;第二,当数据量超过可用内存时,中间结果必须溢写到磁盘,引入额外的磁盘 I/O;第三,Shuffle 本质上是 all-to-all 的数据交换模式——每个 Mapper 的输出可能被每个 Reducer 读取,网络带宽和连接数都会成为瓶颈。理解这三个维度的开销,是理解后续所有 Shuffle 优化手段的前提。
从 Hash Shuffle 到 Sort Shuffle
Spark 0.x 和 1.0 使用 Hash-based Shuffle:每个 Map Task 为每个 Reduce 分区创建一个独立文件。如果有 M 个 Map Task 和 R 个 Reduce 分区,就会产生 M x R 个文件。这在大规模作业中带来严重的文件句柄消耗和随机 I/O 问题——当 R 达到上万甚至十万级别时,文件系统扛不住。
Spark 1.1 引入了 Sort-based
Shuffle(SortShuffleManager),到 Spark 2.0
移除了 Hash Shuffle,Sort Shuffle 成为唯一实现。
Shuffle Write 过程
一个 ShuffleMapTask 执行 Shuffle Write
的核心流程:
- 缓冲与排序:Task 把输出的 (key, value)
对写入一个内存缓冲区(
ShuffleExternalSorter)。缓冲区中的数据按(partitionId, key)排序——先按目标分区编号排,同一分区内再按 key 排。 - Spill:当缓冲区超过阈值(由
spark.shuffle.spill.initialMemoryThreshold和TaskMemoryManager控制),将排序好的数据溢写(Spill)到磁盘临时文件。 - Merge:Task 完成时,把所有 Spill 文件和内存中剩余数据做归并排序(Merge Sort),写入一个最终的数据文件和一个索引文件。索引文件记录每个分区在数据文件中的偏移量。
每个 ShuffleMapTask 最终只产生两个文件(一个数据文件 + 一个索引文件),把文件数量从 M x R 降到了 2M。
// SortShuffleWriter 的核心逻辑(简化版,基于 Spark 3.x 源码)
// 路径:core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
class SortShuffleWriter[K, V, C] extends ShuffleWriter[K, V] {
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 1. 如果需要 map 端聚合(如 reduceByKey),使用 ExternalSorter 做聚合+排序
// 2. 如果不需要聚合(如 groupByKey),直接按 partitionId 排序
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering)
} else {
new ExternalSorter[K, V, C](
context, aggregator = None, Some(dep.partitioner), ordering = None)
}
sorter.insertAll(records)
// 写入最终输出文件
val mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(dep.shuffleId, mapId, dep.partitioner.numPartitions)
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
mapOutputWriter.commitAllPartitions()
}
}Shuffle Read 过程
下游 Stage 的 Task 执行 Shuffle Read 时:
- 通过
MapOutputTracker向 Driver 查询上游 Shuffle 输出的位置信息(哪些 Executor 上有哪些分区的数据)。 - 通过
BlockTransferService(底层基于 Netty)从各个 Executor 拉取自己需要的分区数据。 - 如果需要聚合或排序,再做一次
ExternalSorter处理。
Shuffle Read 的性能瓶颈通常在网络传输和磁盘读取。Spark
3.x 引入了 Push-based
Shuffle(spark.shuffle.push.enabled),让 Map
端主动把 Shuffle 数据推送到指定的 Shuffle 服务节点,减少
Reduce 端拉取时的随机 I/O。
sequenceDiagram
participant Mapper as Mapper(ShuffleMapTask)
participant Disk as 本地磁盘
participant Driver as Driver(MapOutputTracker)
participant Reducer as Reducer(下游 Task)
Mapper->>Disk: 将排序后的数据写入数据文件 + 索引文件
Mapper->>Driver: 汇报 Shuffle 输出位置(mapId, 文件路径)
Reducer->>Driver: 查询上游 Shuffle 输出位置
Driver-->>Reducer: 返回各 Mapper 的地址和分区偏移
loop 对每个 Mapper
Reducer->>Mapper: 通过 BlockTransferService(Netty)请求分区数据
Mapper->>Disk: 根据索引文件定位分区偏移,读取数据
Mapper-->>Reducer: 返回分区数据(网络传输)
end
Reducer->>Reducer: 对接收到的多个分区数据做归并排序和聚合
上图展示了 Shuffle Write 和 Shuffle Read
的完整交互流程。Mapper
在完成排序后将数据和索引写入本地磁盘,并向 Driver 的
MapOutputTracker 注册输出位置。Reducer 先从
Driver 获取所有上游 Mapper 的位置信息,然后通过 Netty
并行拉取各个 Mapper
上属于自己分区的数据,最后在本地做归并排序完成聚合。这个流程中,网络传输量和
Mapper 端的磁盘随机读取是主要的性能瓶颈。
Shuffle 调优参数
几个关键参数:
| 参数 | 默认值 | 作用 |
|---|---|---|
spark.shuffle.file.buffer |
32KB | Shuffle Write 的文件缓冲区大小 |
spark.reducer.maxSizeInFlight |
48MB | Shuffle Read 端同时拉取的最大数据量 |
spark.shuffle.sort.bypassMergeThreshold |
200 | 分区数低于此值时跳过排序,直接按分区写文件 |
spark.shuffle.compress |
true | 是否压缩 Shuffle 数据 |
spark.shuffle.spill.compress |
true | 是否压缩 Spill 文件 |
bypassMergeThreshold 对应
BypassMergeSortShuffleWriter:当 Reduce
分区数较少且不需要 Map
端聚合时,跳过排序,直接为每个分区建一个临时文件,最后合并。这样避免了排序开销,适合分区数很少的场景。
四、Tungsten 堆外内存管理
JVM 对象模型的代价
Java 对象模型对大数据处理来说开销很大。一个
java.lang.Integer 占 16 字节(对象头 12 字节 +
4 字节 int 值),而裸 int 只需要 4 字节。一个包含 1000 万个
Integer 的数组,光对象开销就超过 150
MB。更严重的是,大量小对象给 GC 带来巨大压力——Full GC
在大堆(如 30 GB 以上)情况下可能暂停数十秒,直接导致 Task
超时和推测执行(Speculative Execution)的误判。
Project Tungsten 的三个方向
Tungsten 项目(从 Spark 1.4 开始,持续到 Spark 2.x)从三个方向优化 Spark 的执行效率:
1. 手动内存管理(Off-Heap Memory)
Spark 通过 sun.misc.Unsafe 直接在 JVM
堆外分配内存,绕过 GC。TaskMemoryManager 为每个
Task 管理一块内存空间,用 Page
表来追踪分配。数据以紧凑的二进制格式存储,而不是 Java
对象:
// Tungsten 内存布局示例(概念性)
// 一行数据在堆外内存中的存储方式:
// [null bitmap (8 bytes)] [field1 (8 bytes)] [field2 (8 bytes)] [var-length data...]
//
// 对比 Java 对象:
// [object header (16 bytes)] [field1 ref (8 bytes)] [field2 ref (8 bytes)]
// [String object (40+ bytes)] [char[] object (16 + data bytes)]堆外内存的优势:GC 完全不感知这部分内存,不会扫描也不会回收;数据按行紧凑排列,CPU 缓存友好;可以用指针算术直接定位字段,省去对象解引用的开销。
2. 缓存感知计算(Cache-aware Computation)
Tungsten
的排序算法(UnsafeExternalSorter)不直接排序数据本身,而是排序一个由
(prefix, pointer)
对组成的数组。prefix 是 key 的编码前缀(通常 8
字节),pointer 指向实际数据。排序时先比较
prefix,大部分情况下不需要解引用 pointer 去读完整
key。这个数组足够紧凑,可以放进 CPU L2/L3
缓存,大幅减少缓存未命中(Cache Miss)。
3. 全阶段代码生成(Whole-Stage Code Generation)
传统的火山模型(Volcano Model)中,每个算子实现一个
next() 方法,数据一行一行通过算子链。每调用一次
next() 就有一次虚函数调用和一次 null
检查。当算子链很长、数据量很大时,虚函数调用的开销不可忽视。
Whole-Stage Code Generation 把一个 Stage 内的所有算子融合成一段紧凑的 Java 代码(通过 Janino 编译器即时编译),消除虚函数调用、去掉中间数据物化、直接操作堆外内存中的二进制数据。效果相当于手写的、针对特定查询的 C++ 级别代码。
// 概念对比:火山模型 vs. Whole-Stage CodeGen
// 火山模型(伪代码)
class FilterOperator(child: Operator, predicate: Row => Boolean) {
def next(): Row = {
var row = child.next() // 虚函数调用
while (row != null && !predicate(row)) { // 解释执行 predicate
row = child.next() // 又一次虚函数调用
}
row
}
}
// Whole-Stage CodeGen 生成的代码(伪代码)
// 所有算子融合成一个紧凑循环
while (inputIterator.hasNext) {
val row = inputIterator.next()
// 直接内联:filter 条件 + project 逻辑 + aggregate 逻辑
if (row.getInt(0) > 100) { // 直接字段访问,无虚函数
sum += row.getLong(1) // 直接累加,无中间对象
count += 1
}
}Spark UI 中可以看到 WholeStageCodegen
节点;可以通过
df.queryExecution.debug.codegen() 查看生成的
Java 代码。
五、Catalyst 优化器
四阶段优化流程
Spark SQL 的 Catalyst 优化器把一条 SQL 查询或 DataFrame 操作转换成物理执行计划,分四个阶段:
Analysis(分析):解析未绑定的属性引用(Unresolved Attribute),通过 Catalog 查找表和列的元数据,确定数据类型。例如,把
SELECT name FROM users中的name绑定到users表的name列(类型 StringType)。Logical Optimization(逻辑优化):在逻辑计划上应用一系列规则(Rule),每条规则是一个
TreeNode => TreeNode的模式匹配与重写。典型规则包括:- 谓词下推(Predicate Pushdown):把
Filter推到Join下方,减少 Join 输入量。 - 列裁剪(Column Pruning):只读取查询需要的列,对列式存储(Parquet、ORC)效果显著。
- 常量折叠(Constant Folding):编译期计算
1 + 2这类表达式。 BooleanSimplification:简化布尔条件,消除恒真/恒假分支。
- 谓词下推(Predicate Pushdown):把
Physical Planning(物理计划):把逻辑计划转成一个或多个候选物理计划,然后用基于代价的优化(Cost-Based Optimization,CBO)选择最优方案。例如,对于 Join 操作,根据两张表的大小决定用
BroadcastHashJoin(小表广播)还是SortMergeJoin(大表对等排序合并)。Code Generation(代码生成):前面讨论的 Whole-Stage Code Generation 在这个阶段执行。
// 用 DataFrame API 写一段查询,观察 Catalyst 优化
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("CatalystDemo")
.getOrCreate()
import spark.implicits._
val orders = spark.read.parquet("hdfs:///data/orders")
val items = spark.read.parquet("hdfs:///data/items")
val result = orders
.join(items, orders("item_id") === items("id"))
.filter($"price" > 100)
.select($"order_id", $"item_name", $"price")
// 查看逻辑计划和物理计划
result.explain(true)
// == Parsed Logical Plan ==
// Project [order_id, item_name, price]
// +- Filter (price > 100)
// +- Join Inner, (item_id = id)
// :- Relation[order_id, item_id, ...] parquet
// +- Relation[id, item_name, price, ...] parquet
//
// == Optimized Logical Plan ==
// Project [order_id, item_name, price]
// +- Join Inner, (item_id = id)
// :- Project [order_id, item_id] ← 列裁剪
// : +- Relation[order_id, item_id, ...] parquet
// +- Filter (price > 100) ← 谓词下推
// +- Project [id, item_name, price] ← 列裁剪
// +- Relation[id, item_name, price, ...] parquet规则执行机制
Catalyst 的规则应用不是只跑一遍。逻辑优化阶段使用
RuleExecutor,它在规则集合(Batch)上反复迭代,直到计划不再变化(达到不动点,Fixed
Point)或超过最大迭代次数。这种设计让规则之间可以有依赖关系——规则
A 的输出可能让规则 B 的条件成立。
CBO 与统计信息
CBO
依赖表和列的统计信息:行数、数据大小、列的不同值数(Number
of Distinct Values,NDV)、null
值比例、最大/最小值。这些信息通过 ANALYZE TABLE
命令收集:
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS item_id, price;CBO 根据统计信息估算每个算子的输出行数和大小,进而选择
Join 策略。例如,如果 items 表只有 10 MB,CBO
会选择 BroadcastHashJoin,把整张表广播到每个
Executor 的内存中,避免 Shuffle。
六、为什么比 MapReduce 快
Spark 在许多场景下比 MapReduce 快 10 到 100 倍。这个数字来自 Zaharia 等人 2012 年 NSDI 论文中的实验——在逻辑回归(Logistic Regression)任务上,Spark 比 Hadoop MapReduce 快约 100 倍。但这个倍数在不同负载类型下差异很大,需要拆解具体原因。
内存计算
最直接的原因。MapReduce 的每个 Job 的中间结果(Map
输出)必须写入磁盘,下一个 Job 再从磁盘读取。Spark 把 RDD
缓存在内存中(rdd.persist(StorageLevel.MEMORY_ONLY)),后续迭代直接从内存读取。对于迭代式算法(PageRank、K-Means、梯度下降),每次迭代省掉了一次全量的
HDFS 读写,加速效果最显著。
但内存计算不是万能的。如果作业只有一轮 map-reduce(比如简单的 word count),中间结果不会被复用,那 Spark 和 MapReduce 的性能差距主要体现在其他方面。
Stage 内管道化执行
MapReduce 的 Map 阶段和 Reduce 阶段之间有一个硬性的物化边界:Map 输出必须全部写入磁盘,然后 Reduce 才能开始拉取。Spark 的 Stage 内部没有这个边界——多个窄依赖算子(map → filter → map)在同一个 Task 中以迭代器形式管道化执行,数据一行一行流过算子链,不需要物化中间结果。
这意味着如果一个 filter 过滤掉了 90% 的数据,后续的 map 只需处理 10% 的数据量,而 MapReduce 必须先把 100% 的 Map 输出写入磁盘。
优化的 Shuffle
Spark 的 Sort-based Shuffle 比 MapReduce
早期的实现更高效:按分区排序后合并成一个文件,减少了随机 I/O
和文件句柄消耗。Spark 还支持 Map
端聚合(reduceByKey 在 Map 端做 combine),减少
Shuffle 数据量。MapReduce 也有 Combiner,但 Spark
的实现更灵活,可以和 Aggregator 紧密集成。
Tungsten 与代码生成
Spark 2.x 之后,Tungsten 的堆外内存管理和 Whole-Stage Code Generation 进一步拉开了差距。MapReduce 始终运行在 JVM 的 Java 对象模型上,GC 压力大、CPU 缓存利用率低。Spark SQL 通过代码生成把查询编译成紧凑的循环,性能接近手写 C++ 代码。
比较的边界条件
需要注意的边界条件:
- 单轮批处理作业(只做一次 map-reduce),Spark 的优势没有迭代式场景那么大。
- 数据量远超内存容量时,Spark 也要溢写磁盘,优势缩小。
- MapReduce 的作业隔离性更强——每个 Task 是独立的 JVM 进程,而 Spark 的 Task 在 Executor 的线程池中运行,一个 Task 的 OOM 可能影响同 Executor 上的其他 Task。
- Spark on YARN 的部署复杂度和调优门槛比 MapReduce 更高。
七、Driver 单点与 GC 压力问题
Driver 的职责
Spark 的 Driver 进程负责:
- 运行用户编写的
main()函数,构建 RDD DAG。 - 运行
DAGScheduler和TaskScheduler,负责 Stage 划分和 Task 分发。 - 维护
MapOutputTracker——记录所有 Shuffle 输出的位置信息。 - 维护
BlockManager的元数据——跟踪每个 RDD 分区缓存在哪个 Executor 上。 - 收集
ResultTask的结果,汇总后返回给用户。 - 运行
SparkUI的 HTTP 服务器。
单点问题
Driver 是单点(Single Point of Failure)。如果 Driver
进程挂了,整个作业失败。在 YARN cluster 模式下,YARN
可以自动重启
Driver(spark.yarn.maxAppAttempts),但作业需要从头开始。
Driver 也是性能瓶颈。所有 Task 的调度决策、Shuffle
位置查询、心跳处理都经过 Driver。在大规模作业中(几万个
Task),Driver 的 CPU
和网络可能成为瓶颈。MapOutputTracker
的位置信息在大 Shuffle 作业中可能占用几百 MB 内存。
GC 问题的三个层面
Executor 层面的 GC:每个 Executor 是一个长期运行的 JVM 进程,同时执行多个 Task,堆内存中混杂着 RDD 缓存数据、Shuffle 缓冲区、用户代码创建的对象。堆越大,Full GC 暂停时间越长。典型症状是 Spark UI 上看到 Task 执行时间波动很大——某些 Task 被 GC 暂停拖慢了。
应对策略:
# 减小堆内存中的 RDD 缓存比例,给计算留更多空间
spark.memory.fraction=0.6
spark.memory.storageFraction=0.5
# 使用堆外内存存储 Shuffle 数据
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
# 使用 G1GC 并设置暂停时间目标
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=200
Driver 层面的 GC:Driver 收集
collect() 结果时,大量数据涌入 Driver
JVM,可能触发长时间 GC。最典型的错误是在 Driver 上
collect() 一个大 RDD——所有数据从 Executor 拉到
Driver 的单个 JVM 中,堆内存直接爆炸。
// 反模式:在 Driver 上 collect 大量数据
val allData = hugeRDD.collect() // OOM 或长时间 GC
// 正确做法:在 Executor 端完成计算,只返回小结果
val count = hugeRDD.count()
val top10 = hugeRDD.top(10)
hugeRDD.saveAsTextFile("hdfs:///output")Serialization 层面的 GC:Spark 默认使用 Java Serialization,序列化产生大量临时对象。切换到 Kryo 序列化可以减少对象数量和序列化后的数据大小:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass]))统一内存管理
Spark 1.6 引入了统一内存管理(Unified Memory
Management,UnifiedMemoryManager),把执行内存(Execution
Memory,用于 Shuffle、排序、Join)和存储内存(Storage
Memory,用于 RDD
缓存)放在同一个池中,动态借用。之前的静态分配模型中,一方空闲而另一方不够用的情况很常见。统一管理减少了内存浪费,但也增加了调优的复杂度——缓存的
RDD 可能在执行压力大时被驱逐。
八、DataFrame/Dataset 演进
从 RDD 到 DataFrame
RDD API 灵活但有两个问题:
- 没有 Schema 信息:Spark 不知道 RDD
中的数据是什么结构,无法做列裁剪、谓词下推等优化。
rdd.filter(row => row._3 > 100)中,Spark 只看到一个 lambda 函数,不知道_3对应什么字段。 - 对象开销:RDD 中的每条记录是一个 JVM 对象(或元组),GC 压力大。
Spark 1.3 引入 DataFrame API。DataFrame 本质是
Dataset[Row]——带 Schema 的分布式表。Schema 让
Catalyst 可以做逻辑优化,数据以 Tungsten
的二进制格式存储,避免 JVM 对象开销。
// RDD API:Spark 对 lambda 内部一无所知
case class Order(orderId: Long, itemId: Long, price: Double)
val ordersRDD: RDD[Order] = sc.textFile("orders.csv").map(parseOrder)
val expensive = ordersRDD.filter(_.price > 100) // Spark 无法优化这个 filter
// DataFrame API:Spark 知道列名和类型,可以做谓词下推和列裁剪
val ordersDF = spark.read
.schema("orderId LONG, itemId LONG, price DOUBLE")
.csv("orders.csv")
val expensive = ordersDF.filter($"price" > 100) // Catalyst 可以下推到数据源Dataset API 与 Encoder
Spark 1.6 引入 Dataset API,结合了 RDD 的类型安全和
DataFrame 的优化能力。Dataset 的关键在于
Encoder:它知道如何在 JVM 对象和 Tungsten
二进制格式之间高效转换,不需要经过通用的序列化/反序列化。
// Dataset API:类型安全 + Tungsten 优化
case class Order(orderId: Long, itemId: Long, price: Double)
// Encoder 由 import spark.implicits._ 隐式提供
val ordersDS: Dataset[Order] = spark.read
.csv("orders.csv")
.as[Order] // Encoder 在编译时生成
// 类型安全的操作——编译器会检查字段名
val expensive = ordersDS.filter(_.price > 100)
// 混合使用 DataFrame 和 Dataset
val result = ordersDS
.filter(_.price > 100) // 类型安全,但 lambda 不透明
.select($"orderId", $"price") // 切换到 DataFrame 操作,Catalyst 可以优化三种 API 的对比
| 维度 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 类型安全 | 编译时 | 运行时 | 编译时 |
| Catalyst 优化 | 不支持 | 完全支持 | 部分支持(lambda 不透明) |
| Tungsten 存储 | 不支持 | 完全支持 | 支持 |
| 序列化 | Java/Kryo | Tungsten 内部格式 | Encoder |
| 适用场景 | 非结构化数据、需要细粒度控制 | SQL 查询、ETL | 类型安全的结构化数据处理 |
一个常见的性能陷阱:在 Dataset 上使用 lambda 函数(如
.filter(_.price > 100))时,Catalyst
无法看透 lambda
的语义,不能做谓词下推。如果需要优化,应该使用列表达式(如
.filter($"price" > 100)),或者直接使用
DataFrame API。
Structured Streaming 的统一
DataFrame/Dataset API 的另一个好处是统一了批处理和流处理。Structured Streaming 把流数据视为一张不断追加行的无界表(Unbounded Table),用和批处理相同的 DataFrame/Dataset API 编写查询。底层通过微批次(Micro-batch)或连续处理(Continuous Processing)引擎执行。
// 批处理
val batchResult = spark.read.parquet("hdfs:///data/orders")
.filter($"price" > 100)
.groupBy($"item_id")
.agg(sum($"price").as("total"))
// 流处理——API 几乎一样
val streamResult = spark.readStream
.format("kafka")
.option("subscribe", "orders")
.load()
.selectExpr("CAST(value AS STRING)")
.filter($"price" > 100)
.groupBy($"item_id")
.agg(sum($"price").as("total"))
.writeStream
.format("console")
.outputMode("update")
.start()这种统一性降低了学习成本,也让同一套优化(Catalyst、Tungsten)同时服务于批处理和流处理。
九、Adaptive Query Execution
Spark 3.0 引入了自适应查询执行(Adaptive Query Execution,AQE),在运行时根据 Shuffle 的中间统计信息动态调整查询计划。AQE 解决了一个根本问题:静态优化器在编译时估算的行数可能和实际值差几个数量级,导致选择了错误的 Join 策略或不合理的并行度。
三个核心功能
1. 动态合并 Shuffle 分区(Coalescing Shuffle Partitions)
用户通常设置
spark.sql.shuffle.partitions=200(默认值),但不同查询的数据量差异很大。AQE
在 Shuffle Write 完成后,根据实际的 Shuffle
数据量动态合并小分区,避免大量小 Task 的调度开销。
# 启用 AQE
spark.sql.adaptive.enabled=true
# 合并后目标分区大小
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB
# 合并触发条件:分区大小小于此值
spark.sql.adaptive.coalescePartitions.minPartitionSize=1MB
2. 动态切换 Join 策略(Switching Join Strategies)
假设优化器估算 items 表有 500 MB,选择了
SortMergeJoin。但实际经过 Filter 后只剩 8
MB。AQE 在 Shuffle 阶段拿到真实数据量后,把
SortMergeJoin 切换成
BroadcastHashJoin,省掉了 Sort 和 Merge
的开销。
3. 动态优化倾斜 Join(Skew Join Optimization)
数据倾斜是 Shuffle 最头疼的问题:某些 key 的数据量远超其他 key,导致个别 Task 执行时间极长(长尾效应)。AQE 检测到倾斜分区后,自动把它拆分成多个小分区,并将另一侧对应的分区复制多份进行并行 Join。
flowchart TD
Start["Shuffle Write 完成,收集分区统计信息"] --> Check["检查各分区的实际数据量"]
Check --> Small{"存在过小分区?"}
Check --> Skew{"存在倾斜分区?"}
Check --> JoinSize{"Join 输入足够小?"}
Small -->|是| Coalesce["合并相邻小分区\n减少 Task 数量和调度开销"]
Small -->|否| Skip1["保持原分区"]
Skew -->|是| Split["拆分倾斜分区为多个子分区\n复制另一侧对应分区并行 Join"]
Skew -->|否| Skip2["保持原分区"]
JoinSize -->|是,小于广播阈值| Switch["将 SortMergeJoin\n切换为 BroadcastHashJoin"]
JoinSize -->|否| Skip3["保持 SortMergeJoin"]
Coalesce --> Execute["执行优化后的物理计划"]
Skip1 --> Execute
Split --> Execute
Skip2 --> Execute
Switch --> Execute
Skip3 --> Execute
上图展示了 AQE 在每个 Shuffle 边界处的决策流程。当 Shuffle Write 完成后,AQE 收集到各分区的真实数据量统计,据此在三个维度上做出优化判断:合并过小分区以减少调度开销、拆分倾斜分区以消除长尾效应、以及在数据量足够小时切换为更高效的 Join 策略。这三种优化可以同时作用于同一个查询的不同阶段,且整个决策过程对用户透明,无需手动干预。
十、Spark 架构的局限
Driver 单点的持续挑战
虽然 Spark 3.x 对 Driver 做了很多优化(比如
MapOutputTracker 的分片查询),但 Driver
仍然是中心化的协调者。在超大规模作业中(数百万
Task),Driver 的内存和 CPU 可能成为瓶颈。Spark
Connect(Spark 3.4+)通过将客户端和 Driver
解耦,缓解了部分问题——用户代码在本地进程运行,通过 gRPC
与远程 Spark Server 通信。但 Driver
本身的单点性质没有改变。
Executor 资源的静态分配
Spark 的 Executor 在作业启动时申请固定数量的 CPU 和内存,运行期间不变。动态资源分配(Dynamic Resource Allocation)可以在空闲时释放 Executor,在需要时申请新的,但粒度仍然是整个 Executor,不是单个 Task。这和 Flink 的 Slot 共享机制、以及 Kubernetes 原生的 Pod 弹性扩缩相比,灵活性较低。
Shuffle 是永恒的瓶颈
无论怎么优化,Shuffle 都涉及磁盘 I/O 和网络传输,是 Spark 作业中最慢的阶段。Disaggregated Shuffle(如 Apache Celeborn、Uber 的 RSS)试图把 Shuffle 数据存储从 Executor 本地磁盘抽出来,放到独立的 Shuffle 服务集群上,但这引入了新的架构复杂度和网络开销。
与流处理系统的差距
Structured Streaming 的微批次模型本质上是小间隔的批处理,端到端延迟通常在秒级到数十秒级。连续处理模式(Continuous Processing)可以降到毫秒级,但功能受限(不支持聚合),且在 Spark 3.x 中仍是实验性功能。对于真正的低延迟流处理需求,Flink 的事件驱动模型更合适。
参考文献
Zaharia, M., Chowdhury, M., Das, T., Dave, A., Ma, J., McCauley, M., Franklin, M. J., Shenker, S., Stoica, I. (2012). “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.” NSDI’12. https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia
Zaharia, M., Xin, R. S., Wendell, P., Das, T., Armbrust, M., Dave, A., Meng, X., Rosen, J., Venkataraman, S., Franklin, M. J., Ghodsi, A., Gonzalez, J., Shenker, S., Stoica, I. (2016). “Apache Spark: A Unified Engine for Big Data Processing.” Communications of the ACM, 59(11). https://doi.org/10.1145/2934664
Armbrust, M., Xin, R. S., Lian, C., Huai, Y., Liu, D., Bradley, J. K., Meng, X., Kaftan, T., Franklin, M. J., Ghodsi, A., Zaharia, M. (2015). “Spark SQL: Relational Data Processing in Spark.” SIGMOD’15. https://doi.org/10.1145/2723372.2742797
Dean, J., Ghemawat, S. (2004). “MapReduce: Simplified Data Processing on Large Clusters.” OSDI’04. https://research.google/pubs/pub62/
Armbrust, M., Das, T., Torres, J., Yavuz, B., Zhu, S., Xin, R., Ghodsi, A., Stoica, I., Zaharia, M. (2018). “Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark.” SIGMOD’18. https://doi.org/10.1145/3183713.3190664
Apache Spark 源码(3.5.x),
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala。 https://github.com/apache/sparkNeumann, T. (2011). “Efficiently Compiling Efficient Query Plans for Modern Hardware.” VLDB’11. https://doi.org/10.14778/2002938.2002940
上一篇:MapReduce 下一篇:Flink 深度拆解