Flink状态后端类型与区别
Flink的状态后端决定了Checkpoint存储方式和状态在TaskManager内存与外部存储间的分布。Flink 1.13+主要支持两种状态后端:HashMapStateBackend(内存存储,适合小状态、低延迟场景)和EmbeddedRocksDBStateBackend(磁盘存储,支持大状态和增量Checkpoint)。前者基于JVM堆内存,读写快但受内存限制;后者通过RocksDB实
Flink 的状态后端(State Backend)决定了 Checkpoint 如何存储 以及 状态是如何在 TaskManager 的内存和外部存储之间分布的。
截止 Flink 1.13 版本之后,官方主要推荐并维护两种状态后端,以及一种历史遗留类型。以下是详细分类及其区别:
1. 三大类型概览
- HashMapStateBackend (旧称 MemoryStateBackend + FsStateBackend)
- EmbeddedRocksDBStateBackend (旧称 RocksDBStateBackend)
- (已废弃)JobManagerStateBackend (对应旧版 MemoryStateBackend 的某些情况,现已合并或不再推荐单独使用)
2. 核心区别对比
可以从以下几个维度对比这两种主要的状态后端:
| 比较维度 | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| 存储位置 | JVM 堆内存 (TaskManager 的 Heap) | 磁盘 (本地文件系统,如 SSD) + RocksDB 内存缓存 (Off-Heap) |
| 状态大小上限 | 小/中 (受限于 TaskManager 内存大小,太大容易导致 GC 压力或 OOM) | 非常大 (远超内存,仅受限于磁盘空间,RocksDB 会将热数据放在内存,冷数据刷盘) |
| 读写速度 | 极快 (基于内存的 HashMap / 直接对象访问,无序列化/反序列化开销) | 较慢 (需要序列化/反序列化,涉及磁盘 I/O) |
| Checkpoint 机制 | 同步或异步 (数据已存在于 Heap,Checkpoint 时需复制到文件系统) | 始终异步 (基于 RocksDB 的 Snapshot 机制,利用硬链接或文件复制) |
| 增量 Checkpoint | 不支持 (只能做全量 Checkpoint) | 支持 (只上传变化的 SST 文件,节省存储和网络) |
| TTL (状态存活时间) | 支持 | 支持 |
| 适用场景 | 状态小、低延迟、高吞吐的作业 | 状态超大(例如 TB 级)、需要增量 Checkpoint、不想受 GC 影响的大状态作业 |
3. 深入理解两者区别
A. HashMapStateBackend
- 工作原理:将状态作为对象存储在 TaskManager 的 JVM 堆上。Keyed State 本质上是堆中的一个巨大的 HashMap。
- 优点:
- 性能极高:读写操作直接在堆内存中进行,没有序列化开销,延迟最低。
- 适合小状态:如果状态比较小(比如几GB以内),且集群内存充足,这是最佳选择。
- 缺点:
- GC 风险:大状态会导致 JVM GC 压力巨大,可能导致作业卡顿甚至失败(Full GC)。
- 内存限制:状态大小受限于 TaskManager 的堆内存大小。
- Checkpoint 慢:在做 Checkpoint 时,需要将堆上的数据序列化后写入文件系统(如 HDFS)。如果是大状态,这个过程会非常耗时且占用网络/CPU。
B. EmbeddedRocksDBStateBackend
- 工作原理:Flink 在本地 TaskManager 目录下启动一个嵌入式 RocksDB 实例。状态数据首先写入 RocksDB 的内存缓存(MemTable),然后刷入磁盘(SST 文件)。读写操作都经过 RocksDB 的 LSM 树结构。
- 优点:
- 支持超大状态:状态可以无限大(受限于磁盘容量),非常适合需要存储海量历史数据的场景(如电商的超长周期 CEP、累积窗口计算)。
- 稳定性高:状态存储在堆外内存和磁盘上,减轻了 JVM GC 的压力,GC 暂停时间短且可控。
- 增量 Checkpoint:这是 RocksDB 的杀手级特性。每次 Checkpoint 只上传自上次 Checkpoint 以来发生变化的数据文件,极大地减少了 Checkpoint 的耗时和存储成本。
- 缺点:
- 性能开销:每个状态的读写都需要进行序列化/反序列化(例如将 Object 转为 byte[] 存盘,读出来再转回 Object)。
- CPU 开销高:序列化和 LSM 树的 Compaction 过程会消耗较多的 CPU 资源。
- 依赖磁盘:作业性能很大程度上依赖于本地磁盘的 I/O 速度(强烈建议使用 SSD)。
4. 如何选择?
-
选 HashMapStateBackend 的情况:
- 你的状态比较小,TaskManager 内存足够容纳。
- 你对延迟极其敏感,追求极致的吞吐量。
- 你不想处理 RocksDB 调优的复杂性。
-
选 EmbeddedRocksDBStateBackend 的情况:
- 你的状态很大(几百GB甚至TB级)。
- 你的算子需要运行很多天/月/年,状态会持续累积。
- 你需要增量 Checkpoint 来加快备份速度。
- 你的作业经常因为 GC 问题而卡顿或失败。
5. 配置方式(代码示例)
在 Flink 1.18 中,状态后端的配置方式与之前版本保持一致的思路,但在 API 上更清晰地分离了 运行时状态后端 和 Checkpoint 存储。Flink 1.18 主要提供 HashMapStateBackend 和 EmbeddedRocksDBStateBackend 两种内置状态后端。
下面为你提供 DataStream API (SDK) 和 Flink SQL 两种使用方式的配置示例(Flink 1.18+)。
5.1 DataStream API (SDK) 方式
在 DataStream API 中,你需要做两件事:
- 通过
env.setStateBackend(...)设置运行时状态后端(内存或 RocksDB)。 - 通过
env.getCheckpointConfig().setCheckpointStorage(...)设置 Checkpoint 的持久化存储(如 HDFS)。
A. 配置 HashMapStateBackend (堆内存)
适用于状态较小、追求极致性能的场景。
Maven依赖 (如果使用RocksDB则需要,HashMap不需要额外依赖):
HashMapStateBackend 是内置默认的,无需额外引入依赖。
Java 代码示例:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
public class FlinkStateBackendExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 设置运行时状态后端为 HashMap (堆内存)
env.setStateBackend(new HashMapStateBackend());
// 2. 设置 Checkpoint 持久化存储 (例如 HDFS)
// - 如果未指定,默认使用 JobManagerCheckpointStorage (仅适合本地测试)
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/flink/checkpoints");
// 可选:开启 Checkpoint 相关配置
env.enableCheckpointing(5000); // 每5秒一次 Checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// ... 你的业务逻辑
env.execute("Flink 1.18 HashMap StateBackend Job");
}
}
B. 配置 EmbeddedRocksDBStateBackend (磁盘)
适用于状态超大(TB级)、需要增量 Checkpoint 的场景。
Maven依赖:
RocksDB 虽然包含在 Flink 发行版中,但在 IDE 中开发或进行编程式配置时需要显式添加依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.18.1</version>
<scope>provided</scope> <!-- 集群环境中已提供,所以用 provided -->
</dependency>
Java 代码示例:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;
public class FlinkRocksDBExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. 设置运行时状态后端为 RocksDB
// 构造函数传入 true 表示开启增量 Checkpoint (推荐大状态使用)
EmbeddedRocksDBStateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(rocksDBBackend);
// 2. 设置 Checkpoint 持久化存储 (必须配置,RocksDB 依赖外部存储做快照)
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/flink/checkpoints");
// 可选:开启 Checkpoint
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 可选:针对 RocksDB 的调优参数可以通过 Flink 配置或原生 RocksDB 选项设置
// 例如,通过配置文件 flink-conf.yaml 设置 state.backend.rocksdb.memory.managed: true
// ... 你的业务逻辑
env.execute("Flink 1.18 RocksDB StateBackend Job");
}
}
5.2 Flink SQL 方式
在 Flink SQL 中,状态后端无法在 SQL 语句中直接通过 WITH 参数动态切换,而是依赖于 集群的配置文件 (flink-conf.yaml) 或 SQL Client 的启动配置。
A. 通过配置文件全局设置 (推荐生产环境)
编辑 Flink 集群的 conf/flink-conf.yaml 文件:
# 1. 设置运行时状态后端类型
# - hashmap: HashMapStateBackend
# - rocksdb: EmbeddedRocksDBStateBackend
state.backend.type: rocksdb
# 2. 设置 Checkpoint 存储目录 (所有 Job 的默认目录)
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
# 3. (可选) RocksDB 开启增量 Checkpoint
state.backend.incremental: true
# 4. (可选) 其他 RocksDB 配置,如托管内存
state.backend.rocksdb.memory.managed: true
配置完成后,提交的任何 SQL 作业都会默认使用此状态后端。
B. 在 SQL Client 或会话中动态设置
在 sql-client 中,可以通过 SET 命令动态调整,但只能调整 Checkpoint 存储等参数,状态后端类型一旦集群启动通常固定。不过,你可以通过 Table API 的环境配置来覆盖。
SQL Client 示例:
-- 启动 sql-client 后,设置当前会话的 Checkpoint 目录和模式
SET execution.checkpointing.interval = 5s;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET state.checkpoints.dir = hdfs://namenode:8020/flink/checkpoints;
-- 对于 RocksDB 的增量模式,如果集群配置中未开启,可以在会话中开启
SET state.backend.incremental = true;
-- 然后执行你的 SQL
CREATE TABLE ...;
INSERT INTO ...;
说明:Flink 1.18 在 SQL 层面新增了算子级别的状态保留时间(TTL),可以在编译计划(Compiled Plan)中为不同流设置不同的 TTL,这对优化 regular join 的大状态非常有帮助。
5.3 两种状态后端的区别总结
| 比较维度 | HashMapStateBackend | EmbeddedRocksDBStateBackend |
|---|---|---|
| 存储位置 | TaskManager JVM 堆内存 | 本地磁盘 (RocksDB) + 堆外内存缓存 |
| 状态大小上限 | 受限于 TaskManager 总堆内存,中等规模 | 非常大,仅受本地磁盘空间限制 |
| 读写性能 | 极快(无序列化开销) | 相对较慢(需序列化/反序列化及磁盘 I/O) |
| Checkpoint 类型 | 全量 Checkpoint | 支持增量 Checkpoint(节省存储与网络) |
| 适用场景 | 状态小、低延迟、高吞吐的作业 | 状态超大(如天级窗口、长周期 Join)、需增量快照的作业 |
| 配置方式 (DataStream) | new HashMapStateBackend() |
new EmbeddedRocksDBStateBackend(true) |
| SQL 配置 (conf/yaml) | state.backend.type: hashmap |
state.backend.type: rocksdb |
- SQL 作业特别提示:
对于需要处理大量历史数据或进行 Regular Join 的 SQL 作业,建议使用 RocksDB 并配合算子级别 TTL 来控制状态大小,避免内存溢出。
总结:
简而言之,HashMap 是 内存党(快但有限),RocksDB 是 磁盘党(大但稍慢)。如果项目初期状态不大,可以从 HashMap 开始;一旦涉及到大状态、复杂事件处理或长窗口,建议切换到 RocksDB。
更多推荐
所有评论(0)