Apache Celeborn 0.5.x 学习文档
Apache Celeborn(原RSS)是阿里云开源的分布式中间数据服务,采用存算分离架构解决传统Shuffle的性能瓶颈。它将Spark/Flink等计算引擎的Shuffle数据从计算节点抽离,交由独立集群管理,通过Push-based Shuffle实现数据主动推送、合并写入大文件,大幅减少小文件问题。核心创新包括多级存储(内存→SSD→HDD)、数据副本容错、动态分区等。架构包含Maste
目录
1. Celeborn 简介与原理
1.1 什么是 Celeborn
Apache Celeborn(曾用名 Remote Shuffle Service,RSS)是由阿里云开源并捐赠给 Apache 软件基金会的分布式中间数据服务。它将大数据计算引擎(Spark、Flink、MapReduce 等)在 Shuffle 阶段产生的中间数据,从计算节点抽离出来,交由独立的存储服务集群统一管理,从根本上解决传统 Shuffle 的性能与稳定性瓶颈。
1.2 传统 Shuffle 的问题
要理解 Celeborn 解决了什么,先看传统 Shuffle 的工作方式:
传统 Spark Shuffle 流程:
Map Task Reduce Task
┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │
│ │ │ │
│ Map() │──写本地磁盘──► /tmp │ Reduce()│
│ │ │ │
│ 本地磁盘 │◄──────────────────── │ Fetch │
└──────────┘ 网络拉取 └──────────┘
这种方式存在以下核心问题:
资源耦合严重:计算(CPU)、内存(JVM Heap)、磁盘 IO 全部集中在同一个 Executor 进程中。Shuffle 写入的随机 IO 会抢占 CPU 和内存,直接影响计算性能。
容错代价高昂:Executor 宕机后,其本地 Shuffle 数据随之消失。下游所有依赖该 Executor 数据的 Reduce Task 必须重新触发上游整个 Stage 重算,代价极大。
大量小文件问题:传统 Sort Shuffle 每个 Map Task 产生若干文件,N 个 Map × M 个 Reduce = N×M 个文件。百万级并发时,小文件数量使 OS 文件系统不堪重负。
数据倾斜放大:某个 Partition 数据量远大于其他 Partition 时,对应 Executor 成为热点,磁盘和网络都被打满,而其他节点却空闲,整体资源利用率极低。
Shuffle Fetch 风暴:Reduce 阶段大量 Task 同时向少数几个 Executor 拉取数据,形成网络和磁盘 IO 风暴。
1.3 Celeborn 的解决思路
Celeborn 的核心思想是存算分离:
Celeborn Shuffle 流程:
Map Task Celeborn Cluster Reduce Task
┌──────────┐ ┌─────────────┐ ┌──────────┐
│ Executor │ │ Worker1 │ │ Executor │
│ │──Push()────►│ /data/... │◄──Fetch────│ │
│ Map() │ │─────────────│ │ Reduce() │
│ │ │ Worker2 │ │ │
└──────────┘ │ /data/... │ └──────────┘
└─────────────┘
▲
┌─────┴─────┐
│ Master │
│ 元数据管理 │
└───────────┘
Map Task 将数据主动推送(Push) 到 Celeborn Worker,而非写本地磁盘。Reduce Task 从 Worker 拉取(Fetch) 数据。计算节点完全无状态,宕机重启后不影响已写出的 Shuffle 数据。
1.4 Celeborn 的核心创新
Push-based Shuffle:与传统 Pull-based 相反,Map 端主动推送数据。Worker 将来自不同 Map Task 的数据合并写入同一个大文件(per-partition),大幅减少文件数量,将随机写转变为顺序写。
分级存储:Worker 支持内存 → SSD → HDD 多级存储,热数据驻留内存和 SSD,冷数据自动降级到 HDD,充分利用存储层次结构。
数据副本:支持将同一份 Shuffle 数据写入两个 Worker,任意一个 Worker 宕机不影响数据完整性,无需重算。
动态分区:配合 Spark AQE,在运行时动态调整 Reduce 分区数量,避免过多空分区浪费资源。
2. 核心架构深度解析
2.1 组件总览
┌────────────────────────────────────────────────────────────┐
│ Celeborn Cluster │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Master (HA) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Master-1 │ │ Master-2 │ │ Master-3 │ (Raft) │ │
│ │ │ (Leader) │ │(Follower)│ │(Follower)│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ ▲ 注册/心跳 │
│ ┌────────────────────────┼───────────────────────────┐ │
│ │ Workers │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker-1 │ │ Worker-2 │ │ Worker-3 │ ... │ │
│ │ │ SSD/HDD │ │ SSD/HDD │ │ SSD/HDD │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘
▲ Push/Fetch ▲ Push/Fetch
┌──────────┴──────────┐ ┌──────────┴──────────┐
│ Spark Application │ │ Flink Application │
│ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ Executor (Map) │ │ │ │TaskManager(Map)│ │
│ │ Celeborn Client│ │ │ │ Celeborn Client│ │
│ └────────────────┘ │ │ └────────────────┘ │
└─────────────────────┘ └─────────────────────┘
2.2 Master 详解
Master 是 Celeborn 集群的大脑,承担以下职责:
Worker 生命周期管理:Worker 启动后向 Master 注册,汇报自身的磁盘信息、内存情况、端口等元数据。Master 维护全局 Worker 列表,通过心跳(默认 15 秒)检测 Worker 存活状态。
资源调度:当计算引擎的 Shuffle Client 需要为一个 Shuffle 分配存储位置时,向 Master 发起 RequestSlots 请求。Master 根据各 Worker 的负载情况(磁盘使用率、内存压力、网络带宽)进行加权轮询分配,返回每个 Partition 对应的 Worker 地址。
应用生命周期:Master 跟踪每个 Application 的 Shuffle 数据,Application 结束后负责通知相关 Worker 清理数据,防止数据泄漏。
HA 机制:Master 基于 Apache Ratis(Raft 协议实现)实现高可用。至少需要 3 个 Master 节点,Leader 负责处理所有请求,Follower 同步状态。Leader 宕机后,Follower 自动选举新 Leader,整个选主过程通常在 5-10 秒内完成。
2.3 Worker 详解
Worker 是实际存储和传输 Shuffle 数据的节点,对外暴露 4 个端口:
| 端口 | 默认值 | 用途 |
|---|---|---|
| rpc.port | 9096 | 接收控制命令(RPC) |
| push.port | 9092 | 接收 Map 端推送的数据 |
| fetch.port | 9091 | 响应 Reduce 端的数据拉取 |
| replicate.port | 9093 | Worker 间数据副本同步 |
数据写入流程(Push Path):
Map Task Client
│
│ 1. 数据积累到缓冲区(push.buffer.max.size)
▼
PushDataHandler(push.port)
│
│ 2. 写入 PartitionFileWriter
│ 同时异步向 Replica Worker 发送副本
▼
FileWriter(内存缓冲)
│
│ 3. 缓冲满或达到 flush 阈值,刷入磁盘
▼
磁盘文件(.shuffle)
数据读取流程(Fetch Path):
Reduce Task Client
│
│ 1. 发起 ChunkFetch 请求(携带 offset + length)
▼
FetchHandler(fetch.port)
│
│ 2. 从磁盘文件读取对应 Chunk
│ 优先从 OS Page Cache 读取(零拷贝)
▼
Reduce Task Client
存储引擎:Celeborn Worker 为每个 (ShuffleId, PartitionId) 维护一个 PartitionFile。所有写入该分区的 Map 数据都追加到同一个文件中,实现顺序写,极大提升磁盘 IO 效率。
2.4 Client(Shuffle Plugin)详解
Celeborn Client 作为 Shuffle Plugin 集成在计算引擎中,替换引擎原生的 Shuffle Manager。其核心工作流程如下:
Shuffle Write(Map 阶段):
- 调用
LifecycleManager.requestPartitionLocation()向 Master 请求 Partition 的存储位置 - 将 Map 输出的各 Partition 数据推送到对应 Worker
- 内部维护推送队列、重试逻辑、背压控制
- 所有 Map Task 完成后,通知 Master 该 Shuffle 的 Map 阶段已完成
Shuffle Read(Reduce 阶段):
- 向 Master 查询目标 Partition 所在 Worker
- 从 Worker 拉取数据(支持并发多 Chunk 请求)
- 处理副本 Failover(主 Worker 不可达时自动切换到副本 Worker)
2.5 数据流全链路
完整 Shuffle 数据流:
① Map Task 计算输出
↓
② Celeborn Client 按 PartitionId 分组,写入本地缓冲区
↓
③ 缓冲区达到阈值,Push 到 Worker-A(Primary)
↓ ↓(同步/异步)
④ Worker-A 写磁盘 Worker-B 写磁盘(Replica)
↓
⑤ Map Stage 完成,Client 上报 MapFinish
↓
⑥ Reduce Task 启动,向 Master 查询 Partition 位置
↓
⑦ Master 返回 Worker-A(或 Worker-B)地址
↓
⑧ Reduce Task 从 Worker Fetch 数据,进行 Reduce 计算
↓
⑨ Job 完成,Master 通知 Worker 清理该 Application 数据
3. 安装与集群部署
3.1 环境要求
| 组件 | 最低要求 | 推荐配置 |
|---|---|---|
| JDK | JDK 8 | JDK 11 |
| 操作系统 | Linux | CentOS 7+ / Ubuntu 20.04+ |
| Master 内存 | 4GB | 16GB+ |
| Worker 内存 | 16GB | 64GB+(堆外内存为主) |
| Worker 磁盘 | HDD | NVMe SSD,4块以上 |
| 网络 | 10GbE | 25GbE / 100GbE |
3.2 下载与目录结构
# 下载预编译包(以 0.5.1 为例)
wget https://downloads.apache.org/celeborn/celeborn-0.5.1/apache-celeborn-0.5.1-bin.tgz
tar -xzf apache-celeborn-0.5.1-bin.tgz
cd apache-celeborn-0.5.1-bin
目录说明:
apache-celeborn-0.5.1-bin/
├── bin/
│ ├── celeborn-master.sh # Master 启停脚本
│ ├── celeborn-worker.sh # Worker 启停脚本
│ └── start-all.sh # 批量启动所有节点
├── conf/
│ ├── celeborn-defaults.conf # 核心配置文件
│ ├── celeborn-env.sh # JVM 参数配置
│ ├── masters # Master 节点列表
│ └── workers # Worker 节点列表
├── jars/ # 公共依赖 JAR
├── master-jars/ # Master 专用 JAR
├── worker-jars/ # Worker 专用 JAR
├── spark/ # Spark 各版本 Plugin JAR
└── flink/ # Flink 各版本 Plugin JAR
3.3 单机模式配置(快速体验)
适用于本地开发测试,Master 和 Worker 运行在同一台机器:
# conf/celeborn-defaults.conf
# Master 配置
celeborn.master.host localhost
celeborn.master.port 9097
# Worker 存储目录
celeborn.worker.storage.dirs /tmp/celeborn-data
# 关闭副本(单机无需副本)
celeborn.client.push.replicate.enabled false
# 启动 Master
./bin/celeborn-master.sh start
# 启动 Worker
./bin/celeborn-worker.sh start
# 检查日志
tail -f logs/celeborn-master.log
tail -f logs/celeborn-worker.log
3.4 生产集群配置
规划集群节点(示例):
clb-master1, clb-master2, clb-master3 → Master 节点(HA)
clb-worker1 ~ clb-worker10 → Worker 节点
conf/masters 文件(仅列出 Master 主机名):
clb-master1
clb-master2
clb-master3
conf/workers 文件(列出所有 Worker 主机名):
clb-worker1
clb-worker2
...
clb-worker10
conf/celeborn-defaults.conf(所有节点统一配置):
# ─────────────────────────────────────
# Master 配置
# ─────────────────────────────────────
# Master HA 端点列表(rpcPort:ratisPort)
celeborn.master.endpoints clb-master1:9097,clb-master2:9097,clb-master3:9097
# 开启 HA
celeborn.master.ha.enabled true
# 当前节点 ID(每台 Master 节点不同,分别为 master1/master2/master3)
celeborn.master.ha.node.id master1
# HA 节点定义(格式:host:rpcPort:ratisPort)
celeborn.master.ha.nodes.master1 clb-master1:9097:9872
celeborn.master.ha.nodes.master2 clb-master2:9097:9872
celeborn.master.ha.nodes.master3 clb-master3:9097:9872
# Ratis 数据存储目录(建议独立 SSD)
celeborn.master.ha.storage.dir /data/celeborn-ratis
# ─────────────────────────────────────
# Worker 配置
# ─────────────────────────────────────
# Worker 端口(所有 Worker 相同)
celeborn.worker.rpc.port 9096
celeborn.worker.fetch.port 9091
celeborn.worker.push.port 9092
celeborn.worker.replicate.port 9093
# 多磁盘存储配置(disktype 和 capacity 可选)
celeborn.worker.storage.dirs /data1/celeborn:disktype=SSD:capacity=800G,\
/data2/celeborn:disktype=SSD:capacity=800G,\
/data3/celeborn:disktype=HDD:capacity=4T,\
/data4/celeborn:disktype=HDD:capacity=4T
# 磁盘保留空间(低于此值停止写入该磁盘)
celeborn.worker.disk.reserve.size 10G
# ─────────────────────────────────────
# 内存与背压控制
# ─────────────────────────────────────
# 堆外内存使用率达到此值时,暂停接收新的 Push 请求
celeborn.worker.directMemoryRatioToPauseReceive 0.85
# 堆外内存使用率降至此值时,恢复接收 Push 请求
celeborn.worker.directMemoryRatioToResume 0.80
# ─────────────────────────────────────
# 性能调优
# ─────────────────────────────────────
# 数据压缩算法(lz4 推荐,也支持 zstd/none)
celeborn.client.shuffle.compression.codec lz4
# SSD 磁盘 Flush 线程数(每块 SSD 独立线程池)
celeborn.worker.flusher.ssd.threads 8
# HDD 磁盘 Flush 线程数
celeborn.worker.flusher.hdd.threads 2
conf/celeborn-env.sh(Worker 节点 JVM 参数):
# Worker JVM 堆内存(不宜过大,Celeborn 主要用堆外内存)
export CELEBORN_WORKER_JAVA_OPTS="-Xmx8g -Xms8g \
-XX:MaxDirectMemorySize=32g \
-XX:+UseG1GC \
-XX:G1HeapRegionSize=8m \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:G1MixedGCLiveThresholdPercent=85 \
-XX:+PrintGCDetails \
-XX:+PrintGCDateStamps \
-Xloggc:/var/log/celeborn/gc-worker.log"
# Master JVM
export CELEBORN_MASTER_JAVA_OPTS="-Xmx8g -Xms8g \
-XX:+UseG1GC \
-XX:+PrintGCDetails"
3.5 批量启动集群
配置好 SSH 免密登录后,可使用批量脚本:
# 批量启动所有 Master 和 Worker
./bin/start-all.sh
# 或分别启动
./bin/start-masters.sh
./bin/start-workers.sh
# 检查 Master Web UI(默认 9098 端口)
curl http://clb-master1:9098/api/v1/workers
3.6 验证集群状态
访问 Master Web UI:http://clb-master1:9098
应当看到:
- 已注册的 Worker 列表及其存储容量
- 集群总可用磁盘空间
- HA 模式下的 Leader 标识
4. Spark 集成使用
4.1 集成原理
Spark 通过可插拔的 ShuffleManager 接口支持自定义 Shuffle 实现。Celeborn 提供了 SparkShuffleManager,完整替换 Spark 默认的 SortShuffleManager。
原生 Spark Shuffle 写入:
Executor → SortShuffleManager → 本地磁盘文件
Celeborn Shuffle 写入:
Executor → SparkShuffleManager → Celeborn Client → Worker
在 Spark 中,Celeborn 的核心组件有:
- ShuffleManager:入口点,拦截所有 Shuffle 操作
- LifecycleManager:运行在 Driver 端,管理 Shuffle 生命周期,与 Celeborn Master 通信
- ShuffleWriter:运行在 Executor 端,负责将 Map 输出 Push 到 Worker
- ShuffleReader:运行在 Executor 端,负责从 Worker Fetch 数据
4.2 添加 Celeborn Shuffle Plugin
首先确定所用 Spark 版本,选择对应的 Plugin JAR:
spark/
├── celeborn-client-spark-2-shaded_2.11-0.5.1.jar # Spark 2.x + Scala 2.11
├── celeborn-client-spark-2-shaded_2.12-0.5.1.jar # Spark 2.x + Scala 2.12
├── celeborn-client-spark-3-shaded_2.12-0.5.1.jar # Spark 3.x + Scala 2.12
└── celeborn-client-spark-3-shaded_2.13-0.5.1.jar # Spark 3.x + Scala 2.13
方式一:集群统一部署(推荐生产使用)
# 将 JAR 分发到所有 Spark 节点的 jars 目录
scp celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
$SPARK_HOME/jars/
# 或者通过 ansible 批量分发
ansible all -m copy -a \
"src=celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
dest=$SPARK_HOME/jars/"
方式二:按需加载(适合测试)
spark-submit \
--jars hdfs:///celeborn/jars/celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
...
4.3 Spark 配置参数详解
必须配置项
# 替换 Spark 默认 Shuffle Manager(Spark 3.x)
spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
# Celeborn Master 地址(与集群配置保持一致)
spark.celeborn.master.endpoints clb-master1:9097,clb-master2:9097,clb-master3:9097
Push 相关配置
# 单次 Push 请求的最大数据量(越大吞吐越高,但内存占用越多)
spark.celeborn.client.push.buffer.max.size 512k
# Push 请求队列深度(增大可提升并发推送能力)
spark.celeborn.client.push.queue.capacity 512
# 同时在途的最大 Push 请求数(控制推送并发度)
spark.celeborn.client.push.maxReqsInFlight 32
# 是否启用数据副本(生产环境强烈推荐)
spark.celeborn.client.push.replicate.enabled true
# 副本写入超时时间
spark.celeborn.client.push.replicateTimeout 120s
# Push 失败后重试次数
spark.celeborn.client.push.maxReviveTimes 5
Fetch 相关配置
# 同时在途的最大 Fetch 请求数
spark.celeborn.client.fetch.maxReqsInFlight 32
# 单个 Chunk 的大小(影响 Fetch 效率)
spark.celeborn.client.fetch.chunkSize 8m
# Fetch 超时时间
spark.celeborn.client.fetch.timeout 240s
性能相关配置
# 数据 IO 线程数(影响并发读写能力)
spark.celeborn.data.io.threads 8
# 是否启用分批处理 Partition 变更(配合 AQE)
spark.celeborn.shuffle.batchHandleChangePartition.enabled true
spark.celeborn.shuffle.batchHandleChangePartition.interval 100ms
# 压缩算法(lz4 性能最优,zstd 压缩率更高)
spark.celeborn.client.shuffle.compression.codec lz4
全参数汇总表
| 参数 | 默认值 | 说明 |
|---|---|---|
spark.shuffle.manager |
SortShuffleManager | 设为 SparkShuffleManager 启用 Celeborn |
spark.celeborn.master.endpoints |
无 | Master 地址列表 |
spark.celeborn.client.push.buffer.max.size |
64k | 最大 Push 缓冲区 |
spark.celeborn.client.push.maxReqsInFlight |
32 | 最大并发 Push 请求 |
spark.celeborn.client.push.replicate.enabled |
false | 开启双副本 |
spark.celeborn.client.fetch.maxReqsInFlight |
32 | 最大并发 Fetch 请求 |
spark.celeborn.client.fetch.chunkSize |
8m | 每次 Fetch 的 Chunk 大小 |
spark.celeborn.data.io.threads |
8 | 数据 IO 线程数 |
spark.celeborn.client.shuffle.compression.codec |
lz4 | 压缩算法 |
4.4 完整 spark-submit 示例
YARN Cluster 模式:
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 50 \
--executor-memory 16g \
--executor-cores 4 \
--driver-memory 8g \
\
# Celeborn 核心配置
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=clb-master1:9097,clb-master2:9097,clb-master3:9097 \
\
# Push 优化配置
--conf spark.celeborn.client.push.replicate.enabled=true \
--conf spark.celeborn.client.push.buffer.max.size=512k \
--conf spark.celeborn.client.push.maxReqsInFlight=64 \
\
# Fetch 优化配置
--conf spark.celeborn.client.fetch.maxReqsInFlight=64 \
--conf spark.celeborn.client.fetch.chunkSize=16m \
\
# 压缩
--conf spark.celeborn.client.shuffle.compression.codec=lz4 \
\
# 推荐同时开启 AQE
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.skewJoin.enabled=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
\
--class com.example.YourSparkJob \
your-application.jar
Kubernetes 模式:
spark-submit \
--master k8s://https://your-k8s-api:6443 \
--deploy-mode cluster \
--conf spark.kubernetes.container.image=your-spark-image:latest \
--conf spark.executor.instances=20 \
--conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
--conf spark.celeborn.master.endpoints=clb-master1:9097,clb-master2:9097 \
--class com.example.YourSparkJob \
local:///opt/spark/jars/your-application.jar
4.5 Spark AQE 深度集成
Celeborn 与 Spark Adaptive Query Execution(AQE)深度集成,是使用 Celeborn 时最重要的优化组合。
AQE + Celeborn 的工作原理:
AQE 允许 Spark 在 Shuffle 完成后,根据实际数据分布动态调整 Reduce 阶段的分区数量。Celeborn 专门为此实现了 ShufflePartitionLocator 接口,支持:
- 动态合并空分区或小分区(Coalescing):原本 1000 个 Partition,AQE 发现 800 个数据量很小,合并为 200 个,减少 Reduce Task 数量
- 动态处理数据倾斜(Skew Join):将一个超大 Partition 拆分为多个小 Partition 并行处理
# 推荐 AQE 配置(与 Celeborn 配合使用)
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.coalescePartitions.minPartitionSize 1m
spark.sql.adaptive.coalescePartitions.initialPartitionNum 2000
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256m
# Celeborn 配合 AQE 的批处理参数
spark.celeborn.shuffle.batchHandleChangePartition.enabled true
spark.celeborn.shuffle.batchHandleChangePartition.interval 100ms
4.6 Spark Structured Streaming
Celeborn 支持 Spark Structured Streaming 的微批(Micro-batch)模式。每个 micro-batch 触发独立的 Shuffle,Celeborn 会自动管理其生命周期:
# PySpark Structured Streaming 示例
spark = SparkSession.builder \
.config("spark.shuffle.manager",
"org.apache.spark.shuffle.celeborn.SparkShuffleManager") \
.config("spark.celeborn.master.endpoints", "clb-master1:9097") \
.getOrCreate()
# 流式聚合作业,每批次触发 Shuffle
query = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.load() \
.groupBy("category") \
.agg({"amount": "sum"}) \
.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="30 seconds") \
.start()
流式作业建议适当减小 Push 缓冲区以降低端到端延迟:
spark.celeborn.client.push.buffer.max.size 128k
spark.celeborn.client.push.queue.capacity 256
5. Flink 集成使用
5.1 集成原理
Flink 的批处理作业(BATCH 模式)在算子之间传递数据时需要 Shuffle。Flink 通过 ShuffleServiceFactory 接口支持自定义 Shuffle 服务。Celeborn 实现了 RemoteShuffleServiceFactory,接管 Flink 的所有批处理 Shuffle 操作。
重要:Celeborn 主要面向 Flink BATCH 模式作业。STREAMING 模式下算子间数据通过流水线(Pipeline)传递,不走 Shuffle,因此不适用 Celeborn。
Flink Shuffle 与 Spark Shuffle 的差异:
| 对比项 | Spark | Flink |
|---|---|---|
| Shuffle 触发时机 | Stage 边界 | 算子边界(BATCH 模式) |
| 数据格式 | Serialized Row | Flink StreamRecord |
| 缓冲机制 | JVM Heap Buffer | Network Buffer Pool |
| 背压机制 | Push 队列控制 | Network Buffer 信用(Credit) |
Celeborn 为 Flink 适配了这些差异,提供完全兼容的 ResultPartition 和 InputGate 实现。
5.2 添加 Flink Plugin JAR
选择与 Flink 版本对应的 Plugin JAR:
flink/
├── celeborn-client-flink-1.14-shaded_2.11-0.5.1.jar
├── celeborn-client-flink-1.15-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.16-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.18-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.19-shaded_2.12-0.5.1.jar
└── celeborn-client-flink-1.20-shaded_2.12-0.5.1.jar
推荐方式:放入 plugins 目录(避免类路径冲突)
# 以 Flink 1.17 为例
mkdir -p $FLINK_HOME/plugins/celeborn
cp celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar \
$FLINK_HOME/plugins/celeborn/
备选方式:放入 lib 目录
cp celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar \
$FLINK_HOME/lib/
使用
lib目录时,若 Celeborn 与 Flink 内部依赖版本冲突,可能导致 ClassLoader 问题,优先使用plugins目录。
5.3 Flink 配置参数详解
flink-conf.yaml 配置(Flink 1.18 及以下)或 config.yaml(Flink 1.19+):
必须配置项
# 替换 Flink 默认 Shuffle Service
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
# Celeborn Master 地址
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097,clb-master3:9097
缓冲区配置
# ResultPartition 发送缓冲池大小(每个 Task 持有的发送 Buffer 数量)
# 增大可提升写入吞吐,但会增加 TaskManager 内存消耗
celeborn.client.flink.resultPartition.sendBufferPoolSize: 64
# InputGate 是否支持浮动缓冲区(推荐开启,提升读取吞吐)
celeborn.client.flink.inputGate.supportFloatingBuffer: true
# 最大浮动缓冲区数量
celeborn.client.flink.inputGate.maxBuffers: 1000
网络内存配置
# Flink 网络内存占 TM 内存的比例(默认 0.1,建议适当增大)
taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 2gb
# 每个 Network Buffer 的大小
taskmanager.memory.segment-size: 32kb
副本与容错配置
# 开启双副本(生产环境推荐)
celeborn.client.push.replicate.enabled: true
# Push 失败重试
celeborn.client.push.maxReviveTimes: 5
完整生产配置示例
# ─── 启用 Celeborn ───
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097,clb-master3:9097
# ─── 执行模式 ───
execution.runtime-mode: BATCH
# ─── 网络内存 ───
taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 4gb
taskmanager.memory.segment-size: 32kb
# ─── Celeborn 缓冲区 ───
celeborn.client.flink.resultPartition.sendBufferPoolSize: 128
celeborn.client.flink.inputGate.supportFloatingBuffer: true
celeborn.client.flink.inputGate.maxBuffers: 2000
# ─── 容错 ───
celeborn.client.push.replicate.enabled: true
celeborn.client.push.maxReviveTimes: 5
# ─── 压缩 ───
celeborn.client.shuffle.compression.codec: lz4
5.4 Flink on YARN 作业提交示例
方式一:提交时动态覆盖配置
flink run-application \
-t yarn-application \
-p 40 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dtaskmanager.memory.process.size=16g \
-Djobmanager.memory.process.size=4g \
\
# 启用 Celeborn
-Dshuffle-service-factory.class=org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory \
-Dceleborn.master.endpoints=clb-master1:9097,clb-master2:9097,clb-master3:9097 \
\
# 运行模式
-Dexecution.runtime-mode=BATCH \
\
# 网络内存
-Dtaskmanager.network.memory.fraction=0.15 \
-Dtaskmanager.network.memory.max=2g \
\
# Celeborn 参数
-Dceleborn.client.push.replicate.enabled=true \
-Dceleborn.client.flink.resultPartition.sendBufferPoolSize=128 \
-Dceleborn.client.flink.inputGate.supportFloatingBuffer=true \
\
-c com.example.YourFlinkBatchJob \
your-flink-job.jar
方式二:Per-Job Session 模式(Flink on YARN Session)
先启动包含 Celeborn 配置的 Session:
flink run \
-Dshuffle-service-factory.class=org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory \
-Dceleborn.master.endpoints=clb-master1:9097,clb-master2:9097 \
-c com.example.YourFlinkBatchJob \
your-flink-job.jar
方式三:Flink on Kubernetes
# flink-application.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-celeborn-job
spec:
image: your-flink-image:latest
flinkVersion: v1_17
flinkConfiguration:
shuffle-service-factory.class: "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory"
celeborn.master.endpoints: "clb-master1:9097,clb-master2:9097"
execution.runtime-mode: "BATCH"
taskmanager.network.memory.fraction: "0.15"
celeborn.client.push.replicate.enabled: "true"
taskManager:
resource:
memory: "16g"
cpu: 4
job:
jarURI: local:///opt/flink/usrlib/your-flink-job.jar
entryClass: com.example.YourFlinkBatchJob
parallelism: 40
5.5 Flink SQL 集成
在 Flink SQL CLI 或 Table API 中使用 Celeborn:
Flink SQL CLI 方式:
-- 设置运行模式为 BATCH
SET 'execution.runtime-mode' = 'BATCH';
-- 启用 Celeborn
SET 'shuffle-service-factory.class' = 'org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory';
SET 'celeborn.master.endpoints' = 'clb-master1:9097,clb-master2:9097';
SET 'celeborn.client.push.replicate.enabled' = 'true';
-- 执行大规模聚合 SQL(将使用 Celeborn 作为 Shuffle)
INSERT INTO dws_user_order_summary
SELECT
user_id,
DATE_FORMAT(order_time, 'yyyy-MM-dd') AS order_date,
COUNT(1) AS order_cnt,
SUM(amount) AS total_amount
FROM ods_orders
GROUP BY user_id, DATE_FORMAT(order_time, 'yyyy-MM-dd');
Flink Table API(Java):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Configuration config = new Configuration();
config.setString("shuffle-service-factory.class",
"org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory");
config.setString("celeborn.master.endpoints",
"clb-master1:9097,clb-master2:9097");
config.setBoolean("celeborn.client.push.replicate.enabled", true);
env.configure(config);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(
"INSERT INTO target SELECT user_id, SUM(amount) FROM source GROUP BY user_id"
);
5.6 Flink Hybrid Shuffle(Flink 1.20+)
Flink 1.20 引入了 Tiered Storage 架构,Celeborn 基于此实现了 Hybrid Shuffle:当 TaskManager 本地磁盘空间不足时,自动将 Shuffle 数据溢出到 Celeborn 远程存储。
# 启用 Hybrid Shuffle
shuffle-service-factory.class: >
org.apache.celeborn.plugin.flink.tiered.CelebornTieredShuffleServiceFactory
# 配置分层策略
celeborn.client.flink.tieredStorage.localDiskEnabled: true
celeborn.client.flink.tieredStorage.localDiskReservedSpaceBytes: 10gb
# Celeborn Master 地址
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097
6. Worker 存储与内存调优
6.1 存储配置精讲
多磁盘配置,支持为每块磁盘指定类型和容量限制:
celeborn.worker.storage.dirs \
/nvme0/celeborn:disktype=SSD:capacity=1T:flushthread=4,\
/nvme1/celeborn:disktype=SSD:capacity=1T:flushthread=4,\
/hdd0/celeborn:disktype=HDD:capacity=4T:flushthread=1,\
/hdd1/celeborn:disktype=HDD:capacity=4T:flushthread=1
各参数说明:
disktype=SSD/HDD:磁盘类型,影响 Flush 线程数默认值和写入优先级(SSD 优先写入)capacity=1T:该目录最大使用量,超过后停止写入flushthread=4:该目录独立 Flush 线程数(可覆盖全局配置)
存储分层写入策略:
# 优先写 SSD,SSD 满后溢出到 HDD
celeborn.worker.storage.storagePolicy SSD_FIRST
# 磁盘保留空间(低于此值停止写入)
celeborn.worker.disk.reserve.size 5G
# 定期检查磁盘健康状态的间隔
celeborn.worker.storage.storagePolicyCheckInterval 60s
6.2 内存管理详解
Worker 进程的内存分为两部分:JVM 堆内存(Heap)和堆外内存(Direct Memory)。Celeborn 大量使用堆外内存进行数据读写,减少 GC 压力。
Worker 内存分布:
┌─────────────────────────────────────────┐
│ JVM 进程总内存(例如 40GB) │
│ │
│ ┌────────────────┐ ┌───────────────┐ │
│ │ Heap(4-8GB) │ │ Direct(32GB)│ │
│ │ │ │ │ │
│ │ - 对象元数据 │ │ - 数据接收缓冲 │ │
│ │ - 索引结构 │ │ - 数据发送缓冲 │ │
│ │ - RPC 处理 │ │ - 文件读写缓冲 │ │
│ └────────────────┘ └───────────────┘ │
└─────────────────────────────────────────┘
关键内存参数:
# ─── JVM 参数(celeborn-env.sh)───
# 堆内存(保持相对小,4-8g 够用)
-Xmx8g -Xms8g
# 堆外内存上限(需要比实际使用量大一些)
-XX:MaxDirectMemorySize=32g
# ─── 背压控制 ───
# 堆外内存使用率超过此值时,Worker 拒绝新的 Push 请求
celeborn.worker.directMemoryRatioToPauseReceive 0.85
# 使用率下降到此值时,恢复接收
celeborn.worker.directMemoryRatioToResume 0.80
# 堆外内存中用于 Shuffle 数据存储的比例
celeborn.worker.directMemoryRatioForShuffleStorage 0.80
内存压力排查:
# 查看 Worker 堆外内存使用情况
curl http://clb-worker1:9095/api/v1/worker/info | python3 -m json.tool | grep -i memory
6.3 Flush 调优
# 单次 Flush 的数据缓冲大小(越大顺序写效率越高)
celeborn.worker.flusher.buffer.size 256k
# SSD 专用 Flush 线程数(SSD IOPS 高,可适当增大)
celeborn.worker.flusher.ssd.threads 8
# HDD 专用 Flush 线程数(HDD 顺序写,线程不宜过多)
celeborn.worker.flusher.hdd.threads 2
# Flush 平均耗时滑动窗口(用于动态调整)
celeborn.worker.flusher.avgFlushTime.slidingWindow 20
7. 监控与运维
7.1 Master Web UI
访问 http://<master-host>:9098(默认端口),提供:
- Overview:集群 Worker 总数、存储总量/已用量、活跃 Application 数
- Workers:每个 Worker 的状态(Active/Lost/Decommissioned)、磁盘用量、内存压力
- Applications:当前活跃和历史 Application 的 Shuffle 数据量统计
- HA Info(HA 模式):当前 Leader、Raft 日志 Index、各节点同步状态
7.2 Prometheus 监控集成
# conf/celeborn-defaults.conf 中启用 Metrics
celeborn.metrics.enabled true
# Master 和 Worker 分别暴露 Prometheus Endpoint
celeborn.master.metrics.prometheus.port 9098
celeborn.worker.metrics.prometheus.port 9095
# Metrics 采集间隔
celeborn.metrics.timer.slidingWindowSize 4096
Prometheus 抓取配置(prometheus.yml):
scrape_configs:
- job_name: 'celeborn-master'
scrape_interval: 15s
static_configs:
- targets:
- 'clb-master1:9098'
- 'clb-master2:9098'
- 'clb-master3:9098'
metrics_path: /metrics/prometheus
- job_name: 'celeborn-worker'
scrape_interval: 15s
static_configs:
- targets:
- 'clb-worker1:9095'
- 'clb-worker2:9095'
# ... 其他 Worker
metrics_path: /metrics/prometheus
关键 Metrics 指标:
| Metric 名称 | 含义 | 告警建议 |
|---|---|---|
DirectMemoryUsage |
Worker 堆外内存使用量 | 超过 80% 告警 |
DiskUsage |
磁盘使用率 | 超过 85% 告警 |
PushDataRate |
数据推送速率(bytes/s) | 用于容量规划 |
FetchDataRate |
数据拉取速率(bytes/s) | 用于容量规划 |
PushDataFailCount |
Push 失败次数 | 非零即告警 |
MasterSlotsAllocated |
已分配 Slots 数 | 评估集群负载 |
7.3 Grafana Dashboard
社区提供了开箱即用的 Grafana Dashboard(Dashboard ID 可从 GitHub celeborn-dashboard 项目获取),导入后自动展示:
- 集群整体吞吐(Push/Fetch 带宽趋势)
- 各 Worker 内存/磁盘热力图
- 请求延迟分布(P50/P95/P99)
- 错误率趋势
7.4 常用运维操作
# 查看 Worker 存储状态
curl http://clb-worker1:9095/api/v1/worker/storageInfo | python3 -m json.tool
# 手动触发 Worker 下线(Decommission,等待数据迁移完成)
curl -X POST http://clb-master1:9098/api/v1/workers/decommission \
-H "Content-Type: application/json" \
-d '{"hosts": ["clb-worker1"]}'
# 查看某 Application 的 Shuffle 数据信息
curl http://clb-master1:9098/api/v1/applications/{appId}
# 手动清理某 Application 的 Shuffle 数据(正常情况 Application 退出会自动清理)
curl -X DELETE http://clb-master1:9098/api/v1/applications/{appId}
# 动态调整日志级别(无需重启)
curl "http://clb-master1:9098/logLevel?level=DEBUG"
curl "http://clb-worker1:9095/logLevel?level=DEBUG"
8. 常见问题排查
8.1 Push 失败 / PushDataFailedException
现象:Spark Executor 或 Flink TaskManager 日志出现:
org.apache.celeborn.common.exception.CelebornIOException: PushDataFailNotRetry
排查步骤:
- 检查 Worker 磁盘使用率是否超过阈值:
curl http://clb-worker1:9095/api/v1/worker/storageInfo
# 关注 diskInfos 中各磁盘的 usedSlots 和 status
- 检查 Worker 堆外内存是否触发背压:
curl http://clb-worker1:9095/api/v1/worker/info | grep -i "directmemory"
# 若 directMemoryUsage > directMemoryRatioToPauseReceive,Worker 会拒绝 Push
- 检查 Worker 日志:
grep -i "PauseReceive\|OutOfMemory\|GC" $CELEBORN_HOME/logs/celeborn-worker.log
解决方案:
# 增大 MaxDirectMemorySize
-XX:MaxDirectMemorySize=48g # 在 celeborn-env.sh 中调整
# 调整背压阈值
celeborn.worker.directMemoryRatioToPauseReceive 0.90
# 增加 Worker 节点或减少并发 Shuffle 数量
8.2 Fetch 慢 / FetchChunkTimeoutException
现象:Reduce 阶段耗时异常,日志出现 FetchChunkTimeoutException。
排查步骤:
- 检查 Worker 磁盘 IO 是否饱和(通过
iostat -x 1查看%util) - 检查 Worker 到 Executor 的网络带宽是否打满(通过
iftop或监控) - 检查 Fetch 请求队列是否积压:
curl http://clb-worker1:9095/api/v1/worker/fetchQueueSize
解决方案:
# 增大 Fetch 超时时间
spark.celeborn.client.fetch.timeout 360s
celeborn.client.fetch.timeout 360s
# 增大 IO 线程数
celeborn.data.io.threads 16
# 减小 Chunk 大小降低单次请求延迟
spark.celeborn.client.fetch.chunkSize 4m
8.3 Master HA 选主失败
现象:Master 日志持续出现选举超时,Worker 无法注册到 Master。
排查步骤:
- 检查 Ratis 端口(9872)是否互通:
nc -zv clb-master2 9872
- 检查时间同步(Raft 对时钟偏差敏感):
ntpstat # 或 chronyc tracking
- 查看 Ratis 日志:
tail -f $CELEBORN_HOME/logs/celeborn-master.log | grep -i "raft\|election\|leader"
- 检查 Ratis 数据目录磁盘是否满:
df -h /data/celeborn-ratis
8.4 Flink 作业 OOM(Network Buffer 不足)
现象:Flink TaskManager 出现 Insufficient number of network buffers。
解决方案:
# 增大网络内存
taskmanager.network.memory.fraction: 0.2
taskmanager.network.memory.max: 4gb
# 减小 Buffer 大小(可用的 Buffer 数量 = 总网络内存 / segment-size)
taskmanager.memory.segment-size: 32kb
# 或减小 resultPartition.sendBufferPoolSize 降低每 Task 的 Buffer 持有量
celeborn.client.flink.resultPartition.sendBufferPoolSize: 32
8.5 数据倾斜导致 Worker 热点
现象:某个 Worker 的磁盘使用率远高于其他 Worker,对应 Partition 的 Fetch 成为瓶颈。
解决方案:
对于 Spark,开启 AQE Skew Join 处理:
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.skewJoin.skewedPartitionFactor 3
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 128m
对于 Flink,在 SQL 中对倾斜 Key 进行两阶段聚合:
-- 第一阶段:局部聚合,分散热点
SELECT user_id, CONCAT(user_id, '_', CAST(RAND() * 10 AS INT)) AS salt_key,
SUM(amount) AS partial_sum
FROM orders
GROUP BY user_id, CONCAT(user_id, '_', CAST(RAND() * 10 AS INT));
-- 第二阶段:全局聚合
SELECT user_id, SUM(partial_sum) FROM ...
GROUP BY user_id;
9. 最佳实践
9.1 硬件规划建议
Worker 节点配置建议(单节点):
| 硬件 | 建议规格 | 说明 |
|---|---|---|
| CPU | 32 核 | 主要消耗在压缩/解压和网络 IO |
| 内存 | 64GB(8G 堆 + 48G 堆外) | 堆外内存越大,性能越稳定 |
| 磁盘 | 4 × NVMe SSD(800GB) | 多盘提升并行 IO |
| 网卡 | 25GbE | 高吞吐 Shuffle 场景必须 |
集群容量估算:
单次峰值 Shuffle 数据量估算:
- 总输入数据量 × Shuffle 放大系数(通常 1.5~3x)
- 考虑压缩后缩小(LZ4 约 50%~70% 压缩率)
集群总容量:
- 峰值并发作业数 × 单次峰值数据量 × 安全系数(1.5)
- 开启双副本 × 2
9.2 Spark 调优 Checklist
- 开启 Celeborn 副本:
spark.celeborn.client.push.replicate.enabled=true - 开启 AQE:
spark.sql.adaptive.enabled=true - 设置合理初始分区数:
spark.sql.shuffle.partitions=2000(大任务可设 4000+) - Push 缓冲区与集群带宽匹配:
spark.celeborn.client.push.buffer.max.size=512k - 生产环境使用 LZ4 压缩:
spark.celeborn.client.shuffle.compression.codec=lz4 - Executor 数量与 Worker 数量比例不超过 10:1
9.3 Flink 调优 Checklist
- 确认运行模式为 BATCH:
execution.runtime-mode=BATCH - 增大网络内存:
taskmanager.network.memory.fraction=0.15 - 开启浮动缓冲区:
celeborn.client.flink.inputGate.supportFloatingBuffer=true - 生产环境开启副本:
celeborn.client.push.replicate.enabled=true - 并行度与 Celeborn Worker 数量匹配(建议 Parallelism / Worker 数 ≤ 50)
9.4 安全配置
生产环境建议开启 SSL 加密和认证:
# 启用 SSL
celeborn.ssl.enabled true
celeborn.ssl.keyStore /etc/celeborn/keystore.jks
celeborn.ssl.keyStorePassword your-keystore-password
celeborn.ssl.trustStore /etc/celeborn/truststore.jks
celeborn.ssl.trustStorePassword your-truststore-password
# 启用 SASL 认证
celeborn.auth.enabled true
celeborn.auth.secret your-shared-secret-key
9.5 与 Apache Spark History Server 集成
Celeborn 作业的 Shuffle 指标会记录在 Spark Event Log 中,History Server 可直接展示,无需额外配置。但建议开启 Celeborn 侧的 Metrics 与 Grafana 集成,获得更细粒度的 Push/Fetch 性能视图。
10. 版本兼容性矩阵
| Celeborn 版本 | Spark 支持版本 | Flink 支持版本 | JDK 要求 |
|---|---|---|---|
| 0.5.x(最新) | 2.4, 3.0–3.5 | 1.14–1.20 | JDK 8 / 11 |
| 0.4.x | 2.4, 3.0–3.4 | 1.14–1.18 | JDK 8 / 11 |
| 0.3.x | 2.4, 3.0–3.3 | 1.14–1.17 | JDK 8 |
| 0.2.x | 2.4, 3.0–3.2 | 1.14–1.15 | JDK 8 |
升级建议:生产环境建议使用 0.5.x 最新 Patch 版本,社区活跃,BUG 修复最及时。
参考资料
更多推荐
所有评论(0)