目录

  1. Celeborn 简介与原理
  2. 核心架构深度解析
  3. 安装与集群部署
  4. Spark 集成使用
  5. Flink 集成使用
  6. Worker 存储与内存调优
  7. 监控与运维
  8. 常见问题排查
  9. 最佳实践
  10. 版本兼容性矩阵

1. Celeborn 简介与原理

1.1 什么是 Celeborn

Apache Celeborn(曾用名 Remote Shuffle Service,RSS)是由阿里云开源并捐赠给 Apache 软件基金会的分布式中间数据服务。它将大数据计算引擎(Spark、Flink、MapReduce 等)在 Shuffle 阶段产生的中间数据,从计算节点抽离出来,交由独立的存储服务集群统一管理,从根本上解决传统 Shuffle 的性能与稳定性瓶颈。

1.2 传统 Shuffle 的问题

要理解 Celeborn 解决了什么,先看传统 Shuffle 的工作方式:

传统 Spark Shuffle 流程:

Map Task                          Reduce Task
┌──────────┐                      ┌──────────┐
│ Executor │                      │ Executor │
│          │                      │          │
│  Map()   │──写本地磁盘──►  /tmp  │  Reduce()│
│          │                      │          │
│  本地磁盘 │◄──────────────────── │   Fetch  │
└──────────┘     网络拉取          └──────────┘

这种方式存在以下核心问题:

资源耦合严重:计算(CPU)、内存(JVM Heap)、磁盘 IO 全部集中在同一个 Executor 进程中。Shuffle 写入的随机 IO 会抢占 CPU 和内存,直接影响计算性能。

容错代价高昂:Executor 宕机后,其本地 Shuffle 数据随之消失。下游所有依赖该 Executor 数据的 Reduce Task 必须重新触发上游整个 Stage 重算,代价极大。

大量小文件问题:传统 Sort Shuffle 每个 Map Task 产生若干文件,N 个 Map × M 个 Reduce = N×M 个文件。百万级并发时,小文件数量使 OS 文件系统不堪重负。

数据倾斜放大:某个 Partition 数据量远大于其他 Partition 时,对应 Executor 成为热点,磁盘和网络都被打满,而其他节点却空闲,整体资源利用率极低。

Shuffle Fetch 风暴:Reduce 阶段大量 Task 同时向少数几个 Executor 拉取数据,形成网络和磁盘 IO 风暴。

1.3 Celeborn 的解决思路

Celeborn 的核心思想是存算分离

Celeborn Shuffle 流程:

Map Task                 Celeborn Cluster           Reduce Task
┌──────────┐             ┌─────────────┐            ┌──────────┐
│ Executor │             │   Worker1   │            │ Executor │
│          │──Push()────►│  /data/...  │◄──Fetch────│          │
│  Map()   │             │─────────────│            │ Reduce() │
│          │             │   Worker2   │            │          │
└──────────┘             │  /data/...  │            └──────────┘
                         └─────────────┘
                               ▲
                         ┌─────┴─────┐
                         │   Master  │
                         │ 元数据管理 │
                         └───────────┘

Map Task 将数据主动推送(Push) 到 Celeborn Worker,而非写本地磁盘。Reduce Task 从 Worker 拉取(Fetch) 数据。计算节点完全无状态,宕机重启后不影响已写出的 Shuffle 数据。

1.4 Celeborn 的核心创新

Push-based Shuffle:与传统 Pull-based 相反,Map 端主动推送数据。Worker 将来自不同 Map Task 的数据合并写入同一个大文件(per-partition),大幅减少文件数量,将随机写转变为顺序写。

分级存储:Worker 支持内存 → SSD → HDD 多级存储,热数据驻留内存和 SSD,冷数据自动降级到 HDD,充分利用存储层次结构。

数据副本:支持将同一份 Shuffle 数据写入两个 Worker,任意一个 Worker 宕机不影响数据完整性,无需重算。

动态分区:配合 Spark AQE,在运行时动态调整 Reduce 分区数量,避免过多空分区浪费资源。


2. 核心架构深度解析

2.1 组件总览

┌────────────────────────────────────────────────────────────┐
│                    Celeborn Cluster                        │
│                                                            │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                  Master (HA)                         │  │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐            │  │
│  │  │ Master-1 │  │ Master-2 │  │ Master-3 │  (Raft)    │  │
│  │  │ (Leader) │  │(Follower)│  │(Follower)│            │  │
│  │  └──────────┘  └──────────┘  └──────────┘            │  │
│  └──────────────────────────────────────────────────────┘  │
│                           ▲ 注册/心跳                       │
│  ┌────────────────────────┼───────────────────────────┐    │
│  │  Workers               │                           │    │
│  │  ┌──────────┐   ┌──────────┐   ┌──────────┐        │    │
│  │  │ Worker-1 │   │ Worker-2 │   │ Worker-3 │  ...   │    │
│  │  │ SSD/HDD  │   │ SSD/HDD  │   │ SSD/HDD  │        │    │
│  │  └──────────┘   └──────────┘   └──────────┘        │    │
│  └────────────────────────────────────────────────────┘    │
└────────────────────────────────────────────────────────────┘
           ▲ Push/Fetch                   ▲ Push/Fetch
┌──────────┴──────────┐        ┌──────────┴──────────┐
│   Spark Application │        │   Flink Application │
│  ┌────────────────┐ │        │  ┌────────────────┐ │
│  │ Executor (Map) │ │        │  │TaskManager(Map)│ │
│  │ Celeborn Client│ │        │  │ Celeborn Client│ │
│  └────────────────┘ │        │  └────────────────┘ │
└─────────────────────┘        └─────────────────────┘

2.2 Master 详解

Master 是 Celeborn 集群的大脑,承担以下职责:

Worker 生命周期管理:Worker 启动后向 Master 注册,汇报自身的磁盘信息、内存情况、端口等元数据。Master 维护全局 Worker 列表,通过心跳(默认 15 秒)检测 Worker 存活状态。

资源调度:当计算引擎的 Shuffle Client 需要为一个 Shuffle 分配存储位置时,向 Master 发起 RequestSlots 请求。Master 根据各 Worker 的负载情况(磁盘使用率、内存压力、网络带宽)进行加权轮询分配,返回每个 Partition 对应的 Worker 地址。

应用生命周期:Master 跟踪每个 Application 的 Shuffle 数据,Application 结束后负责通知相关 Worker 清理数据,防止数据泄漏。

HA 机制:Master 基于 Apache Ratis(Raft 协议实现)实现高可用。至少需要 3 个 Master 节点,Leader 负责处理所有请求,Follower 同步状态。Leader 宕机后,Follower 自动选举新 Leader,整个选主过程通常在 5-10 秒内完成。

2.3 Worker 详解

Worker 是实际存储和传输 Shuffle 数据的节点,对外暴露 4 个端口:

端口 默认值 用途
rpc.port 9096 接收控制命令(RPC)
push.port 9092 接收 Map 端推送的数据
fetch.port 9091 响应 Reduce 端的数据拉取
replicate.port 9093 Worker 间数据副本同步

数据写入流程(Push Path)

Map Task Client
    │
    │ 1. 数据积累到缓冲区(push.buffer.max.size)
    ▼
PushDataHandler(push.port)
    │
    │ 2. 写入 PartitionFileWriter
    │    同时异步向 Replica Worker 发送副本
    ▼
FileWriter(内存缓冲)
    │
    │ 3. 缓冲满或达到 flush 阈值,刷入磁盘
    ▼
磁盘文件(.shuffle)

数据读取流程(Fetch Path)

Reduce Task Client
    │
    │ 1. 发起 ChunkFetch 请求(携带 offset + length)
    ▼
FetchHandler(fetch.port)
    │
    │ 2. 从磁盘文件读取对应 Chunk
    │    优先从 OS Page Cache 读取(零拷贝)
    ▼
Reduce Task Client

存储引擎:Celeborn Worker 为每个 (ShuffleId, PartitionId) 维护一个 PartitionFile。所有写入该分区的 Map 数据都追加到同一个文件中,实现顺序写,极大提升磁盘 IO 效率。

2.4 Client(Shuffle Plugin)详解

Celeborn Client 作为 Shuffle Plugin 集成在计算引擎中,替换引擎原生的 Shuffle Manager。其核心工作流程如下:

Shuffle Write(Map 阶段)

  1. 调用 LifecycleManager.requestPartitionLocation() 向 Master 请求 Partition 的存储位置
  2. 将 Map 输出的各 Partition 数据推送到对应 Worker
  3. 内部维护推送队列、重试逻辑、背压控制
  4. 所有 Map Task 完成后,通知 Master 该 Shuffle 的 Map 阶段已完成

Shuffle Read(Reduce 阶段)

  1. 向 Master 查询目标 Partition 所在 Worker
  2. 从 Worker 拉取数据(支持并发多 Chunk 请求)
  3. 处理副本 Failover(主 Worker 不可达时自动切换到副本 Worker)

2.5 数据流全链路

完整 Shuffle 数据流:

① Map Task 计算输出
        ↓
② Celeborn Client 按 PartitionId 分组,写入本地缓冲区
        ↓
③ 缓冲区达到阈值,Push 到 Worker-A(Primary)
        ↓                ↓(同步/异步)
④ Worker-A 写磁盘    Worker-B 写磁盘(Replica)
        ↓
⑤ Map Stage 完成,Client 上报 MapFinish
        ↓
⑥ Reduce Task 启动,向 Master 查询 Partition 位置
        ↓
⑦ Master 返回 Worker-A(或 Worker-B)地址
        ↓
⑧ Reduce Task 从 Worker Fetch 数据,进行 Reduce 计算
        ↓
⑨ Job 完成,Master 通知 Worker 清理该 Application 数据

3. 安装与集群部署

3.1 环境要求

组件 最低要求 推荐配置
JDK JDK 8 JDK 11
操作系统 Linux CentOS 7+ / Ubuntu 20.04+
Master 内存 4GB 16GB+
Worker 内存 16GB 64GB+(堆外内存为主)
Worker 磁盘 HDD NVMe SSD,4块以上
网络 10GbE 25GbE / 100GbE

3.2 下载与目录结构

# 下载预编译包(以 0.5.1 为例)
wget https://downloads.apache.org/celeborn/celeborn-0.5.1/apache-celeborn-0.5.1-bin.tgz
tar -xzf apache-celeborn-0.5.1-bin.tgz
cd apache-celeborn-0.5.1-bin

目录说明:

apache-celeborn-0.5.1-bin/
├── bin/
│   ├── celeborn-master.sh       # Master 启停脚本
│   ├── celeborn-worker.sh       # Worker 启停脚本
│   └── start-all.sh             # 批量启动所有节点
├── conf/
│   ├── celeborn-defaults.conf   # 核心配置文件
│   ├── celeborn-env.sh          # JVM 参数配置
│   ├── masters                  # Master 节点列表
│   └── workers                  # Worker 节点列表
├── jars/                        # 公共依赖 JAR
├── master-jars/                 # Master 专用 JAR
├── worker-jars/                 # Worker 专用 JAR
├── spark/                       # Spark 各版本 Plugin JAR
└── flink/                       # Flink 各版本 Plugin JAR

3.3 单机模式配置(快速体验)

适用于本地开发测试,Master 和 Worker 运行在同一台机器:

# conf/celeborn-defaults.conf

# Master 配置
celeborn.master.host                    localhost
celeborn.master.port                    9097

# Worker 存储目录
celeborn.worker.storage.dirs            /tmp/celeborn-data

# 关闭副本(单机无需副本)
celeborn.client.push.replicate.enabled  false
# 启动 Master
./bin/celeborn-master.sh start

# 启动 Worker
./bin/celeborn-worker.sh start

# 检查日志
tail -f logs/celeborn-master.log
tail -f logs/celeborn-worker.log

3.4 生产集群配置

规划集群节点(示例)

clb-master1, clb-master2, clb-master3  →  Master 节点(HA)
clb-worker1 ~ clb-worker10             →  Worker 节点

conf/masters 文件(仅列出 Master 主机名):

clb-master1
clb-master2
clb-master3

conf/workers 文件(列出所有 Worker 主机名):

clb-worker1
clb-worker2
...
clb-worker10

conf/celeborn-defaults.conf(所有节点统一配置)

# ─────────────────────────────────────
# Master 配置
# ─────────────────────────────────────

# Master HA 端点列表(rpcPort:ratisPort)
celeborn.master.endpoints               clb-master1:9097,clb-master2:9097,clb-master3:9097

# 开启 HA
celeborn.master.ha.enabled              true
# 当前节点 ID(每台 Master 节点不同,分别为 master1/master2/master3)
celeborn.master.ha.node.id              master1
# HA 节点定义(格式:host:rpcPort:ratisPort)
celeborn.master.ha.nodes.master1        clb-master1:9097:9872
celeborn.master.ha.nodes.master2        clb-master2:9097:9872
celeborn.master.ha.nodes.master3        clb-master3:9097:9872
# Ratis 数据存储目录(建议独立 SSD)
celeborn.master.ha.storage.dir          /data/celeborn-ratis

# ─────────────────────────────────────
# Worker 配置
# ─────────────────────────────────────

# Worker 端口(所有 Worker 相同)
celeborn.worker.rpc.port                9096
celeborn.worker.fetch.port              9091
celeborn.worker.push.port               9092
celeborn.worker.replicate.port          9093

# 多磁盘存储配置(disktype 和 capacity 可选)
celeborn.worker.storage.dirs            /data1/celeborn:disktype=SSD:capacity=800G,\
                                        /data2/celeborn:disktype=SSD:capacity=800G,\
                                        /data3/celeborn:disktype=HDD:capacity=4T,\
                                        /data4/celeborn:disktype=HDD:capacity=4T

# 磁盘保留空间(低于此值停止写入该磁盘)
celeborn.worker.disk.reserve.size       10G

# ─────────────────────────────────────
# 内存与背压控制
# ─────────────────────────────────────

# 堆外内存使用率达到此值时,暂停接收新的 Push 请求
celeborn.worker.directMemoryRatioToPauseReceive  0.85
# 堆外内存使用率降至此值时,恢复接收 Push 请求
celeborn.worker.directMemoryRatioToResume        0.80

# ─────────────────────────────────────
# 性能调优
# ─────────────────────────────────────

# 数据压缩算法(lz4 推荐,也支持 zstd/none)
celeborn.client.shuffle.compression.codec        lz4

# SSD 磁盘 Flush 线程数(每块 SSD 独立线程池)
celeborn.worker.flusher.ssd.threads              8
# HDD 磁盘 Flush 线程数
celeborn.worker.flusher.hdd.threads              2

conf/celeborn-env.sh(Worker 节点 JVM 参数)

# Worker JVM 堆内存(不宜过大,Celeborn 主要用堆外内存)
export CELEBORN_WORKER_JAVA_OPTS="-Xmx8g -Xms8g \
  -XX:MaxDirectMemorySize=32g \
  -XX:+UseG1GC \
  -XX:G1HeapRegionSize=8m \
  -XX:InitiatingHeapOccupancyPercent=35 \
  -XX:G1MixedGCLiveThresholdPercent=85 \
  -XX:+PrintGCDetails \
  -XX:+PrintGCDateStamps \
  -Xloggc:/var/log/celeborn/gc-worker.log"

# Master JVM
export CELEBORN_MASTER_JAVA_OPTS="-Xmx8g -Xms8g \
  -XX:+UseG1GC \
  -XX:+PrintGCDetails"

3.5 批量启动集群

配置好 SSH 免密登录后,可使用批量脚本:

# 批量启动所有 Master 和 Worker
./bin/start-all.sh

# 或分别启动
./bin/start-masters.sh
./bin/start-workers.sh

# 检查 Master Web UI(默认 9098 端口)
curl http://clb-master1:9098/api/v1/workers

3.6 验证集群状态

访问 Master Web UI:http://clb-master1:9098

应当看到:

  • 已注册的 Worker 列表及其存储容量
  • 集群总可用磁盘空间
  • HA 模式下的 Leader 标识

4. Spark 集成使用

4.1 集成原理

Spark 通过可插拔的 ShuffleManager 接口支持自定义 Shuffle 实现。Celeborn 提供了 SparkShuffleManager,完整替换 Spark 默认的 SortShuffleManager

原生 Spark Shuffle 写入:
Executor → SortShuffleManager → 本地磁盘文件

Celeborn Shuffle 写入:
Executor → SparkShuffleManager → Celeborn Client → Worker

在 Spark 中,Celeborn 的核心组件有:

  • ShuffleManager:入口点,拦截所有 Shuffle 操作
  • LifecycleManager:运行在 Driver 端,管理 Shuffle 生命周期,与 Celeborn Master 通信
  • ShuffleWriter:运行在 Executor 端,负责将 Map 输出 Push 到 Worker
  • ShuffleReader:运行在 Executor 端,负责从 Worker Fetch 数据

4.2 添加 Celeborn Shuffle Plugin

首先确定所用 Spark 版本,选择对应的 Plugin JAR:

spark/
├── celeborn-client-spark-2-shaded_2.11-0.5.1.jar   # Spark 2.x + Scala 2.11
├── celeborn-client-spark-2-shaded_2.12-0.5.1.jar   # Spark 2.x + Scala 2.12
├── celeborn-client-spark-3-shaded_2.12-0.5.1.jar   # Spark 3.x + Scala 2.12
└── celeborn-client-spark-3-shaded_2.13-0.5.1.jar   # Spark 3.x + Scala 2.13

方式一:集群统一部署(推荐生产使用)

# 将 JAR 分发到所有 Spark 节点的 jars 目录
scp celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
    $SPARK_HOME/jars/

# 或者通过 ansible 批量分发
ansible all -m copy -a \
  "src=celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
   dest=$SPARK_HOME/jars/"

方式二:按需加载(适合测试)

spark-submit \
  --jars hdfs:///celeborn/jars/celeborn-client-spark-3-shaded_2.12-0.5.1.jar \
  ...

4.3 Spark 配置参数详解

必须配置项
# 替换 Spark 默认 Shuffle Manager(Spark 3.x)
spark.shuffle.manager                   org.apache.spark.shuffle.celeborn.SparkShuffleManager

# Celeborn Master 地址(与集群配置保持一致)
spark.celeborn.master.endpoints         clb-master1:9097,clb-master2:9097,clb-master3:9097
Push 相关配置
# 单次 Push 请求的最大数据量(越大吞吐越高,但内存占用越多)
spark.celeborn.client.push.buffer.max.size              512k

# Push 请求队列深度(增大可提升并发推送能力)
spark.celeborn.client.push.queue.capacity               512

# 同时在途的最大 Push 请求数(控制推送并发度)
spark.celeborn.client.push.maxReqsInFlight              32

# 是否启用数据副本(生产环境强烈推荐)
spark.celeborn.client.push.replicate.enabled            true

# 副本写入超时时间
spark.celeborn.client.push.replicateTimeout             120s

# Push 失败后重试次数
spark.celeborn.client.push.maxReviveTimes               5
Fetch 相关配置
# 同时在途的最大 Fetch 请求数
spark.celeborn.client.fetch.maxReqsInFlight             32

# 单个 Chunk 的大小(影响 Fetch 效率)
spark.celeborn.client.fetch.chunkSize                   8m

# Fetch 超时时间
spark.celeborn.client.fetch.timeout                     240s
性能相关配置
# 数据 IO 线程数(影响并发读写能力)
spark.celeborn.data.io.threads                          8

# 是否启用分批处理 Partition 变更(配合 AQE)
spark.celeborn.shuffle.batchHandleChangePartition.enabled  true
spark.celeborn.shuffle.batchHandleChangePartition.interval  100ms

# 压缩算法(lz4 性能最优,zstd 压缩率更高)
spark.celeborn.client.shuffle.compression.codec         lz4
全参数汇总表
参数 默认值 说明
spark.shuffle.manager SortShuffleManager 设为 SparkShuffleManager 启用 Celeborn
spark.celeborn.master.endpoints Master 地址列表
spark.celeborn.client.push.buffer.max.size 64k 最大 Push 缓冲区
spark.celeborn.client.push.maxReqsInFlight 32 最大并发 Push 请求
spark.celeborn.client.push.replicate.enabled false 开启双副本
spark.celeborn.client.fetch.maxReqsInFlight 32 最大并发 Fetch 请求
spark.celeborn.client.fetch.chunkSize 8m 每次 Fetch 的 Chunk 大小
spark.celeborn.data.io.threads 8 数据 IO 线程数
spark.celeborn.client.shuffle.compression.codec lz4 压缩算法

4.4 完整 spark-submit 示例

YARN Cluster 模式

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --num-executors 50 \
  --executor-memory 16g \
  --executor-cores 4 \
  --driver-memory 8g \
  \
  # Celeborn 核心配置
  --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
  --conf spark.celeborn.master.endpoints=clb-master1:9097,clb-master2:9097,clb-master3:9097 \
  \
  # Push 优化配置
  --conf spark.celeborn.client.push.replicate.enabled=true \
  --conf spark.celeborn.client.push.buffer.max.size=512k \
  --conf spark.celeborn.client.push.maxReqsInFlight=64 \
  \
  # Fetch 优化配置
  --conf spark.celeborn.client.fetch.maxReqsInFlight=64 \
  --conf spark.celeborn.client.fetch.chunkSize=16m \
  \
  # 压缩
  --conf spark.celeborn.client.shuffle.compression.codec=lz4 \
  \
  # 推荐同时开启 AQE
  --conf spark.sql.adaptive.enabled=true \
  --conf spark.sql.adaptive.skewJoin.enabled=true \
  --conf spark.sql.adaptive.coalescePartitions.enabled=true \
  \
  --class com.example.YourSparkJob \
  your-application.jar

Kubernetes 模式

spark-submit \
  --master k8s://https://your-k8s-api:6443 \
  --deploy-mode cluster \
  --conf spark.kubernetes.container.image=your-spark-image:latest \
  --conf spark.executor.instances=20 \
  --conf spark.shuffle.manager=org.apache.spark.shuffle.celeborn.SparkShuffleManager \
  --conf spark.celeborn.master.endpoints=clb-master1:9097,clb-master2:9097 \
  --class com.example.YourSparkJob \
  local:///opt/spark/jars/your-application.jar

4.5 Spark AQE 深度集成

Celeborn 与 Spark Adaptive Query Execution(AQE)深度集成,是使用 Celeborn 时最重要的优化组合。

AQE + Celeborn 的工作原理

AQE 允许 Spark 在 Shuffle 完成后,根据实际数据分布动态调整 Reduce 阶段的分区数量。Celeborn 专门为此实现了 ShufflePartitionLocator 接口,支持:

  • 动态合并空分区或小分区(Coalescing):原本 1000 个 Partition,AQE 发现 800 个数据量很小,合并为 200 个,减少 Reduce Task 数量
  • 动态处理数据倾斜(Skew Join):将一个超大 Partition 拆分为多个小 Partition 并行处理
# 推荐 AQE 配置(与 Celeborn 配合使用)
spark.sql.adaptive.enabled                              true
spark.sql.adaptive.coalescePartitions.enabled           true
spark.sql.adaptive.coalescePartitions.minPartitionSize  1m
spark.sql.adaptive.coalescePartitions.initialPartitionNum  2000
spark.sql.adaptive.skewJoin.enabled                     true
spark.sql.adaptive.skewJoin.skewedPartitionFactor        5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes  256m

# Celeborn 配合 AQE 的批处理参数
spark.celeborn.shuffle.batchHandleChangePartition.enabled  true
spark.celeborn.shuffle.batchHandleChangePartition.interval  100ms

4.6 Spark Structured Streaming

Celeborn 支持 Spark Structured Streaming 的微批(Micro-batch)模式。每个 micro-batch 触发独立的 Shuffle,Celeborn 会自动管理其生命周期:

# PySpark Structured Streaming 示例
spark = SparkSession.builder \
    .config("spark.shuffle.manager", 
            "org.apache.spark.shuffle.celeborn.SparkShuffleManager") \
    .config("spark.celeborn.master.endpoints", "clb-master1:9097") \
    .getOrCreate()

# 流式聚合作业,每批次触发 Shuffle
query = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "events") \
    .load() \
    .groupBy("category") \
    .agg({"amount": "sum"}) \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="30 seconds") \
    .start()

流式作业建议适当减小 Push 缓冲区以降低端到端延迟:

spark.celeborn.client.push.buffer.max.size   128k
spark.celeborn.client.push.queue.capacity    256

5. Flink 集成使用

5.1 集成原理

Flink 的批处理作业(BATCH 模式)在算子之间传递数据时需要 Shuffle。Flink 通过 ShuffleServiceFactory 接口支持自定义 Shuffle 服务。Celeborn 实现了 RemoteShuffleServiceFactory,接管 Flink 的所有批处理 Shuffle 操作。

重要:Celeborn 主要面向 Flink BATCH 模式作业。STREAMING 模式下算子间数据通过流水线(Pipeline)传递,不走 Shuffle,因此不适用 Celeborn。

Flink Shuffle 与 Spark Shuffle 的差异

对比项 Spark Flink
Shuffle 触发时机 Stage 边界 算子边界(BATCH 模式)
数据格式 Serialized Row Flink StreamRecord
缓冲机制 JVM Heap Buffer Network Buffer Pool
背压机制 Push 队列控制 Network Buffer 信用(Credit)

Celeborn 为 Flink 适配了这些差异,提供完全兼容的 ResultPartitionInputGate 实现。

5.2 添加 Flink Plugin JAR

选择与 Flink 版本对应的 Plugin JAR:

flink/
├── celeborn-client-flink-1.14-shaded_2.11-0.5.1.jar
├── celeborn-client-flink-1.15-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.16-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.18-shaded_2.12-0.5.1.jar
├── celeborn-client-flink-1.19-shaded_2.12-0.5.1.jar
└── celeborn-client-flink-1.20-shaded_2.12-0.5.1.jar

推荐方式:放入 plugins 目录(避免类路径冲突)

# 以 Flink 1.17 为例
mkdir -p $FLINK_HOME/plugins/celeborn
cp celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar \
   $FLINK_HOME/plugins/celeborn/

备选方式:放入 lib 目录

cp celeborn-client-flink-1.17-shaded_2.12-0.5.1.jar \
   $FLINK_HOME/lib/

使用 lib 目录时,若 Celeborn 与 Flink 内部依赖版本冲突,可能导致 ClassLoader 问题,优先使用 plugins 目录。

5.3 Flink 配置参数详解

flink-conf.yaml 配置(Flink 1.18 及以下)或 config.yaml(Flink 1.19+):

必须配置项
# 替换 Flink 默认 Shuffle Service
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory

# Celeborn Master 地址
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097,clb-master3:9097
缓冲区配置
# ResultPartition 发送缓冲池大小(每个 Task 持有的发送 Buffer 数量)
# 增大可提升写入吞吐,但会增加 TaskManager 内存消耗
celeborn.client.flink.resultPartition.sendBufferPoolSize: 64

# InputGate 是否支持浮动缓冲区(推荐开启,提升读取吞吐)
celeborn.client.flink.inputGate.supportFloatingBuffer: true

# 最大浮动缓冲区数量
celeborn.client.flink.inputGate.maxBuffers: 1000
网络内存配置
# Flink 网络内存占 TM 内存的比例(默认 0.1,建议适当增大)
taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.min: 128mb
taskmanager.network.memory.max: 2gb

# 每个 Network Buffer 的大小
taskmanager.memory.segment-size: 32kb
副本与容错配置
# 开启双副本(生产环境推荐)
celeborn.client.push.replicate.enabled: true

# Push 失败重试
celeborn.client.push.maxReviveTimes: 5
完整生产配置示例
# ─── 启用 Celeborn ───
shuffle-service-factory.class: org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097,clb-master3:9097

# ─── 执行模式 ───
execution.runtime-mode: BATCH

# ─── 网络内存 ───
taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 4gb
taskmanager.memory.segment-size: 32kb

# ─── Celeborn 缓冲区 ───
celeborn.client.flink.resultPartition.sendBufferPoolSize: 128
celeborn.client.flink.inputGate.supportFloatingBuffer: true
celeborn.client.flink.inputGate.maxBuffers: 2000

# ─── 容错 ───
celeborn.client.push.replicate.enabled: true
celeborn.client.push.maxReviveTimes: 5

# ─── 压缩 ───
celeborn.client.shuffle.compression.codec: lz4

5.4 Flink on YARN 作业提交示例

方式一:提交时动态覆盖配置

flink run-application \
  -t yarn-application \
  -p 40 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dtaskmanager.memory.process.size=16g \
  -Djobmanager.memory.process.size=4g \
  \
  # 启用 Celeborn
  -Dshuffle-service-factory.class=org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory \
  -Dceleborn.master.endpoints=clb-master1:9097,clb-master2:9097,clb-master3:9097 \
  \
  # 运行模式
  -Dexecution.runtime-mode=BATCH \
  \
  # 网络内存
  -Dtaskmanager.network.memory.fraction=0.15 \
  -Dtaskmanager.network.memory.max=2g \
  \
  # Celeborn 参数
  -Dceleborn.client.push.replicate.enabled=true \
  -Dceleborn.client.flink.resultPartition.sendBufferPoolSize=128 \
  -Dceleborn.client.flink.inputGate.supportFloatingBuffer=true \
  \
  -c com.example.YourFlinkBatchJob \
  your-flink-job.jar

方式二:Per-Job Session 模式(Flink on YARN Session)

先启动包含 Celeborn 配置的 Session:

flink run \
  -Dshuffle-service-factory.class=org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory \
  -Dceleborn.master.endpoints=clb-master1:9097,clb-master2:9097 \
  -c com.example.YourFlinkBatchJob \
  your-flink-job.jar

方式三:Flink on Kubernetes

# flink-application.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-celeborn-job
spec:
  image: your-flink-image:latest
  flinkVersion: v1_17
  flinkConfiguration:
    shuffle-service-factory.class: "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory"
    celeborn.master.endpoints: "clb-master1:9097,clb-master2:9097"
    execution.runtime-mode: "BATCH"
    taskmanager.network.memory.fraction: "0.15"
    celeborn.client.push.replicate.enabled: "true"
  taskManager:
    resource:
      memory: "16g"
      cpu: 4
  job:
    jarURI: local:///opt/flink/usrlib/your-flink-job.jar
    entryClass: com.example.YourFlinkBatchJob
    parallelism: 40

5.5 Flink SQL 集成

在 Flink SQL CLI 或 Table API 中使用 Celeborn:

Flink SQL CLI 方式

-- 设置运行模式为 BATCH
SET 'execution.runtime-mode' = 'BATCH';

-- 启用 Celeborn
SET 'shuffle-service-factory.class' = 'org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory';
SET 'celeborn.master.endpoints' = 'clb-master1:9097,clb-master2:9097';
SET 'celeborn.client.push.replicate.enabled' = 'true';

-- 执行大规模聚合 SQL(将使用 Celeborn 作为 Shuffle)
INSERT INTO dws_user_order_summary
SELECT 
  user_id,
  DATE_FORMAT(order_time, 'yyyy-MM-dd') AS order_date,
  COUNT(1) AS order_cnt,
  SUM(amount) AS total_amount
FROM ods_orders
GROUP BY user_id, DATE_FORMAT(order_time, 'yyyy-MM-dd');

Flink Table API(Java)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

Configuration config = new Configuration();
config.setString("shuffle-service-factory.class",
    "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory");
config.setString("celeborn.master.endpoints",
    "clb-master1:9097,clb-master2:9097");
config.setBoolean("celeborn.client.push.replicate.enabled", true);
env.configure(config);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql(
    "INSERT INTO target SELECT user_id, SUM(amount) FROM source GROUP BY user_id"
);

5.6 Flink Hybrid Shuffle(Flink 1.20+)

Flink 1.20 引入了 Tiered Storage 架构,Celeborn 基于此实现了 Hybrid Shuffle:当 TaskManager 本地磁盘空间不足时,自动将 Shuffle 数据溢出到 Celeborn 远程存储。

# 启用 Hybrid Shuffle
shuffle-service-factory.class: >
  org.apache.celeborn.plugin.flink.tiered.CelebornTieredShuffleServiceFactory

# 配置分层策略
celeborn.client.flink.tieredStorage.localDiskEnabled: true
celeborn.client.flink.tieredStorage.localDiskReservedSpaceBytes: 10gb

# Celeborn Master 地址
celeborn.master.endpoints: clb-master1:9097,clb-master2:9097

6. Worker 存储与内存调优

6.1 存储配置精讲

多磁盘配置,支持为每块磁盘指定类型和容量限制:

celeborn.worker.storage.dirs  \
  /nvme0/celeborn:disktype=SSD:capacity=1T:flushthread=4,\
  /nvme1/celeborn:disktype=SSD:capacity=1T:flushthread=4,\
  /hdd0/celeborn:disktype=HDD:capacity=4T:flushthread=1,\
  /hdd1/celeborn:disktype=HDD:capacity=4T:flushthread=1

各参数说明:

  • disktype=SSD/HDD:磁盘类型,影响 Flush 线程数默认值和写入优先级(SSD 优先写入)
  • capacity=1T:该目录最大使用量,超过后停止写入
  • flushthread=4:该目录独立 Flush 线程数(可覆盖全局配置)

存储分层写入策略

# 优先写 SSD,SSD 满后溢出到 HDD
celeborn.worker.storage.storagePolicy                   SSD_FIRST

# 磁盘保留空间(低于此值停止写入)
celeborn.worker.disk.reserve.size                       5G

# 定期检查磁盘健康状态的间隔
celeborn.worker.storage.storagePolicyCheckInterval      60s

6.2 内存管理详解

Worker 进程的内存分为两部分:JVM 堆内存(Heap)和堆外内存(Direct Memory)。Celeborn 大量使用堆外内存进行数据读写,减少 GC 压力。

Worker 内存分布:
┌─────────────────────────────────────────┐
│ JVM 进程总内存(例如 40GB)              │
│                                         │
│  ┌────────────────┐  ┌───────────────┐  │
│  │  Heap(4-8GB) │  │ Direct(32GB)│  │
│  │                │  │               │  │
│  │ - 对象元数据   │  │ - 数据接收缓冲 │  │
│  │ - 索引结构     │  │ - 数据发送缓冲 │  │
│  │ - RPC 处理     │  │ - 文件读写缓冲 │  │
│  └────────────────┘  └───────────────┘  │
└─────────────────────────────────────────┘

关键内存参数

# ─── JVM 参数(celeborn-env.sh)───
# 堆内存(保持相对小,4-8g 够用)
-Xmx8g -Xms8g
# 堆外内存上限(需要比实际使用量大一些)
-XX:MaxDirectMemorySize=32g

# ─── 背压控制 ───
# 堆外内存使用率超过此值时,Worker 拒绝新的 Push 请求
celeborn.worker.directMemoryRatioToPauseReceive    0.85
# 使用率下降到此值时,恢复接收
celeborn.worker.directMemoryRatioToResume          0.80
# 堆外内存中用于 Shuffle 数据存储的比例
celeborn.worker.directMemoryRatioForShuffleStorage 0.80

内存压力排查

# 查看 Worker 堆外内存使用情况
curl http://clb-worker1:9095/api/v1/worker/info | python3 -m json.tool | grep -i memory

6.3 Flush 调优

# 单次 Flush 的数据缓冲大小(越大顺序写效率越高)
celeborn.worker.flusher.buffer.size                256k

# SSD 专用 Flush 线程数(SSD IOPS 高,可适当增大)
celeborn.worker.flusher.ssd.threads                8

# HDD 专用 Flush 线程数(HDD 顺序写,线程不宜过多)
celeborn.worker.flusher.hdd.threads                2

# Flush 平均耗时滑动窗口(用于动态调整)
celeborn.worker.flusher.avgFlushTime.slidingWindow 20

7. 监控与运维

7.1 Master Web UI

访问 http://<master-host>:9098(默认端口),提供:

  • Overview:集群 Worker 总数、存储总量/已用量、活跃 Application 数
  • Workers:每个 Worker 的状态(Active/Lost/Decommissioned)、磁盘用量、内存压力
  • Applications:当前活跃和历史 Application 的 Shuffle 数据量统计
  • HA Info(HA 模式):当前 Leader、Raft 日志 Index、各节点同步状态

7.2 Prometheus 监控集成

# conf/celeborn-defaults.conf 中启用 Metrics
celeborn.metrics.enabled                    true

# Master 和 Worker 分别暴露 Prometheus Endpoint
celeborn.master.metrics.prometheus.port     9098
celeborn.worker.metrics.prometheus.port     9095

# Metrics 采集间隔
celeborn.metrics.timer.slidingWindowSize    4096

Prometheus 抓取配置(prometheus.yml):

scrape_configs:
  - job_name: 'celeborn-master'
    scrape_interval: 15s
    static_configs:
      - targets:
          - 'clb-master1:9098'
          - 'clb-master2:9098'
          - 'clb-master3:9098'
    metrics_path: /metrics/prometheus

  - job_name: 'celeborn-worker'
    scrape_interval: 15s
    static_configs:
      - targets:
          - 'clb-worker1:9095'
          - 'clb-worker2:9095'
          # ... 其他 Worker
    metrics_path: /metrics/prometheus

关键 Metrics 指标

Metric 名称 含义 告警建议
DirectMemoryUsage Worker 堆外内存使用量 超过 80% 告警
DiskUsage 磁盘使用率 超过 85% 告警
PushDataRate 数据推送速率(bytes/s) 用于容量规划
FetchDataRate 数据拉取速率(bytes/s) 用于容量规划
PushDataFailCount Push 失败次数 非零即告警
MasterSlotsAllocated 已分配 Slots 数 评估集群负载

7.3 Grafana Dashboard

社区提供了开箱即用的 Grafana Dashboard(Dashboard ID 可从 GitHub celeborn-dashboard 项目获取),导入后自动展示:

  • 集群整体吞吐(Push/Fetch 带宽趋势)
  • 各 Worker 内存/磁盘热力图
  • 请求延迟分布(P50/P95/P99)
  • 错误率趋势

7.4 常用运维操作

# 查看 Worker 存储状态
curl http://clb-worker1:9095/api/v1/worker/storageInfo | python3 -m json.tool

# 手动触发 Worker 下线(Decommission,等待数据迁移完成)
curl -X POST http://clb-master1:9098/api/v1/workers/decommission \
  -H "Content-Type: application/json" \
  -d '{"hosts": ["clb-worker1"]}'

# 查看某 Application 的 Shuffle 数据信息
curl http://clb-master1:9098/api/v1/applications/{appId}

# 手动清理某 Application 的 Shuffle 数据(正常情况 Application 退出会自动清理)
curl -X DELETE http://clb-master1:9098/api/v1/applications/{appId}

# 动态调整日志级别(无需重启)
curl "http://clb-master1:9098/logLevel?level=DEBUG"
curl "http://clb-worker1:9095/logLevel?level=DEBUG"

8. 常见问题排查

8.1 Push 失败 / PushDataFailedException

现象:Spark Executor 或 Flink TaskManager 日志出现:

org.apache.celeborn.common.exception.CelebornIOException: PushDataFailNotRetry

排查步骤

  1. 检查 Worker 磁盘使用率是否超过阈值:
curl http://clb-worker1:9095/api/v1/worker/storageInfo
# 关注 diskInfos 中各磁盘的 usedSlots 和 status
  1. 检查 Worker 堆外内存是否触发背压:
curl http://clb-worker1:9095/api/v1/worker/info | grep -i "directmemory"
# 若 directMemoryUsage > directMemoryRatioToPauseReceive,Worker 会拒绝 Push
  1. 检查 Worker 日志:
grep -i "PauseReceive\|OutOfMemory\|GC" $CELEBORN_HOME/logs/celeborn-worker.log

解决方案

# 增大 MaxDirectMemorySize
-XX:MaxDirectMemorySize=48g  # 在 celeborn-env.sh 中调整

# 调整背压阈值
celeborn.worker.directMemoryRatioToPauseReceive  0.90

# 增加 Worker 节点或减少并发 Shuffle 数量

8.2 Fetch 慢 / FetchChunkTimeoutException

现象:Reduce 阶段耗时异常,日志出现 FetchChunkTimeoutException

排查步骤

  1. 检查 Worker 磁盘 IO 是否饱和(通过 iostat -x 1 查看 %util
  2. 检查 Worker 到 Executor 的网络带宽是否打满(通过 iftop 或监控)
  3. 检查 Fetch 请求队列是否积压:
curl http://clb-worker1:9095/api/v1/worker/fetchQueueSize

解决方案

# 增大 Fetch 超时时间
spark.celeborn.client.fetch.timeout                360s
celeborn.client.fetch.timeout                      360s

# 增大 IO 线程数
celeborn.data.io.threads                           16

# 减小 Chunk 大小降低单次请求延迟
spark.celeborn.client.fetch.chunkSize              4m

8.3 Master HA 选主失败

现象:Master 日志持续出现选举超时,Worker 无法注册到 Master。

排查步骤

  1. 检查 Ratis 端口(9872)是否互通:
nc -zv clb-master2 9872
  1. 检查时间同步(Raft 对时钟偏差敏感):
ntpstat  # 或 chronyc tracking
  1. 查看 Ratis 日志:
tail -f $CELEBORN_HOME/logs/celeborn-master.log | grep -i "raft\|election\|leader"
  1. 检查 Ratis 数据目录磁盘是否满:
df -h /data/celeborn-ratis

8.4 Flink 作业 OOM(Network Buffer 不足)

现象:Flink TaskManager 出现 Insufficient number of network buffers

解决方案

# 增大网络内存
taskmanager.network.memory.fraction: 0.2
taskmanager.network.memory.max: 4gb

# 减小 Buffer 大小(可用的 Buffer 数量 = 总网络内存 / segment-size)
taskmanager.memory.segment-size: 32kb

# 或减小 resultPartition.sendBufferPoolSize 降低每 Task 的 Buffer 持有量
celeborn.client.flink.resultPartition.sendBufferPoolSize: 32

8.5 数据倾斜导致 Worker 热点

现象:某个 Worker 的磁盘使用率远高于其他 Worker,对应 Partition 的 Fetch 成为瓶颈。

解决方案

对于 Spark,开启 AQE Skew Join 处理:

spark.sql.adaptive.skewJoin.enabled                     true
spark.sql.adaptive.skewJoin.skewedPartitionFactor       3
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes  128m

对于 Flink,在 SQL 中对倾斜 Key 进行两阶段聚合:

-- 第一阶段:局部聚合,分散热点
SELECT user_id, CONCAT(user_id, '_', CAST(RAND() * 10 AS INT)) AS salt_key,
       SUM(amount) AS partial_sum
FROM orders
GROUP BY user_id, CONCAT(user_id, '_', CAST(RAND() * 10 AS INT));

-- 第二阶段:全局聚合
SELECT user_id, SUM(partial_sum) FROM ...
GROUP BY user_id;

9. 最佳实践

9.1 硬件规划建议

Worker 节点配置建议(单节点):

硬件 建议规格 说明
CPU 32 核 主要消耗在压缩/解压和网络 IO
内存 64GB(8G 堆 + 48G 堆外) 堆外内存越大,性能越稳定
磁盘 4 × NVMe SSD(800GB) 多盘提升并行 IO
网卡 25GbE 高吞吐 Shuffle 场景必须

集群容量估算

单次峰值 Shuffle 数据量估算:
- 总输入数据量 × Shuffle 放大系数(通常 1.5~3x)
- 考虑压缩后缩小(LZ4 约 50%~70% 压缩率)

集群总容量:
- 峰值并发作业数 × 单次峰值数据量 × 安全系数(1.5)
- 开启双副本 × 2

9.2 Spark 调优 Checklist

  • 开启 Celeborn 副本:spark.celeborn.client.push.replicate.enabled=true
  • 开启 AQE:spark.sql.adaptive.enabled=true
  • 设置合理初始分区数:spark.sql.shuffle.partitions=2000(大任务可设 4000+)
  • Push 缓冲区与集群带宽匹配:spark.celeborn.client.push.buffer.max.size=512k
  • 生产环境使用 LZ4 压缩:spark.celeborn.client.shuffle.compression.codec=lz4
  • Executor 数量与 Worker 数量比例不超过 10:1

9.3 Flink 调优 Checklist

  • 确认运行模式为 BATCH:execution.runtime-mode=BATCH
  • 增大网络内存:taskmanager.network.memory.fraction=0.15
  • 开启浮动缓冲区:celeborn.client.flink.inputGate.supportFloatingBuffer=true
  • 生产环境开启副本:celeborn.client.push.replicate.enabled=true
  • 并行度与 Celeborn Worker 数量匹配(建议 Parallelism / Worker 数 ≤ 50)

9.4 安全配置

生产环境建议开启 SSL 加密和认证:

# 启用 SSL
celeborn.ssl.enabled                       true
celeborn.ssl.keyStore                      /etc/celeborn/keystore.jks
celeborn.ssl.keyStorePassword              your-keystore-password
celeborn.ssl.trustStore                    /etc/celeborn/truststore.jks
celeborn.ssl.trustStorePassword            your-truststore-password

# 启用 SASL 认证
celeborn.auth.enabled                      true
celeborn.auth.secret                       your-shared-secret-key

9.5 与 Apache Spark History Server 集成

Celeborn 作业的 Shuffle 指标会记录在 Spark Event Log 中,History Server 可直接展示,无需额外配置。但建议开启 Celeborn 侧的 Metrics 与 Grafana 集成,获得更细粒度的 Push/Fetch 性能视图。


10. 版本兼容性矩阵

Celeborn 版本 Spark 支持版本 Flink 支持版本 JDK 要求
0.5.x(最新) 2.4, 3.0–3.5 1.14–1.20 JDK 8 / 11
0.4.x 2.4, 3.0–3.4 1.14–1.18 JDK 8 / 11
0.3.x 2.4, 3.0–3.3 1.14–1.17 JDK 8
0.2.x 2.4, 3.0–3.2 1.14–1.15 JDK 8

升级建议:生产环境建议使用 0.5.x 最新 Patch 版本,社区活跃,BUG 修复最及时。


参考资料

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐