Flink 的状态后端(State Backend)决定了 Checkpoint 如何存储 以及 状态是如何在 TaskManager 的内存和外部存储之间分布的

截止 Flink 1.13 版本之后,官方主要推荐并维护两种状态后端,以及一种历史遗留类型。以下是详细分类及其区别:

1. 三大类型概览

  1. HashMapStateBackend (旧称 MemoryStateBackend + FsStateBackend)
  2. EmbeddedRocksDBStateBackend (旧称 RocksDBStateBackend)
  3. (已废弃)JobManagerStateBackend (对应旧版 MemoryStateBackend 的某些情况,现已合并或不再推荐单独使用)

2. 核心区别对比

可以从以下几个维度对比这两种主要的状态后端:

比较维度 HashMapStateBackend EmbeddedRocksDBStateBackend
存储位置 JVM 堆内存 (TaskManager 的 Heap) 磁盘 (本地文件系统,如 SSD) + RocksDB 内存缓存 (Off-Heap)
状态大小上限 小/中 (受限于 TaskManager 内存大小,太大容易导致 GC 压力或 OOM) 非常大 (远超内存,仅受限于磁盘空间,RocksDB 会将热数据放在内存,冷数据刷盘)
读写速度 极快 (基于内存的 HashMap / 直接对象访问,无序列化/反序列化开销) 较慢 (需要序列化/反序列化,涉及磁盘 I/O)
Checkpoint 机制 同步或异步 (数据已存在于 Heap,Checkpoint 时需复制到文件系统) 始终异步 (基于 RocksDB 的 Snapshot 机制,利用硬链接或文件复制)
增量 Checkpoint 不支持 (只能做全量 Checkpoint) 支持 (只上传变化的 SST 文件,节省存储和网络)
TTL (状态存活时间) 支持 支持
适用场景 状态小、低延迟、高吞吐的作业 状态超大(例如 TB 级)、需要增量 Checkpoint、不想受 GC 影响的大状态作业

3. 深入理解两者区别

A. HashMapStateBackend
  • 工作原理:将状态作为对象存储在 TaskManager 的 JVM 堆上。Keyed State 本质上是堆中的一个巨大的 HashMap。
  • 优点
    • 性能极高:读写操作直接在堆内存中进行,没有序列化开销,延迟最低。
    • 适合小状态:如果状态比较小(比如几GB以内),且集群内存充足,这是最佳选择。
  • 缺点
    • GC 风险:大状态会导致 JVM GC 压力巨大,可能导致作业卡顿甚至失败(Full GC)。
    • 内存限制:状态大小受限于 TaskManager 的堆内存大小。
    • Checkpoint 慢:在做 Checkpoint 时,需要将堆上的数据序列化后写入文件系统(如 HDFS)。如果是大状态,这个过程会非常耗时且占用网络/CPU。
B. EmbeddedRocksDBStateBackend
  • 工作原理:Flink 在本地 TaskManager 目录下启动一个嵌入式 RocksDB 实例。状态数据首先写入 RocksDB 的内存缓存(MemTable),然后刷入磁盘(SST 文件)。读写操作都经过 RocksDB 的 LSM 树结构。
  • 优点
    • 支持超大状态:状态可以无限大(受限于磁盘容量),非常适合需要存储海量历史数据的场景(如电商的超长周期 CEP累积窗口计算)。
    • 稳定性高:状态存储在堆外内存和磁盘上,减轻了 JVM GC 的压力,GC 暂停时间短且可控。
    • 增量 Checkpoint:这是 RocksDB 的杀手级特性。每次 Checkpoint 只上传自上次 Checkpoint 以来发生变化的数据文件,极大地减少了 Checkpoint 的耗时和存储成本。
  • 缺点
    • 性能开销:每个状态的读写都需要进行序列化/反序列化(例如将 Object 转为 byte[] 存盘,读出来再转回 Object)。
    • CPU 开销高:序列化和 LSM 树的 Compaction 过程会消耗较多的 CPU 资源。
    • 依赖磁盘:作业性能很大程度上依赖于本地磁盘的 I/O 速度(强烈建议使用 SSD)。

4. 如何选择?

  • 选 HashMapStateBackend 的情况

    • 你的状态比较小,TaskManager 内存足够容纳。
    • 你对延迟极其敏感,追求极致的吞吐量。
    • 你不想处理 RocksDB 调优的复杂性。
  • 选 EmbeddedRocksDBStateBackend 的情况

    • 你的状态很大(几百GB甚至TB级)。
    • 你的算子需要运行很多天/月/年,状态会持续累积。
    • 你需要增量 Checkpoint 来加快备份速度。
    • 你的作业经常因为 GC 问题而卡顿或失败。

5. 配置方式(代码示例)

在 Flink 1.18 中,状态后端的配置方式与之前版本保持一致的思路,但在 API 上更清晰地分离了 运行时状态后端Checkpoint 存储。Flink 1.18 主要提供 HashMapStateBackendEmbeddedRocksDBStateBackend 两种内置状态后端。

下面为你提供 DataStream API (SDK)Flink SQL 两种使用方式的配置示例(Flink 1.18+)。


5.1 DataStream API (SDK) 方式

在 DataStream API 中,你需要做两件事:

  1. 通过 env.setStateBackend(...) 设置运行时状态后端(内存或 RocksDB)。
  2. 通过 env.getCheckpointConfig().setCheckpointStorage(...) 设置 Checkpoint 的持久化存储(如 HDFS)。
A. 配置 HashMapStateBackend (堆内存)

适用于状态较小、追求极致性能的场景。

Maven依赖 (如果使用RocksDB则需要,HashMap不需要额外依赖)
HashMapStateBackend 是内置默认的,无需额外引入依赖。

Java 代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;

public class FlinkStateBackendExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 1. 设置运行时状态后端为 HashMap (堆内存)
        env.setStateBackend(new HashMapStateBackend());
        
        // 2. 设置 Checkpoint 持久化存储 (例如 HDFS)
        //    - 如果未指定,默认使用 JobManagerCheckpointStorage (仅适合本地测试)
        env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/flink/checkpoints");
        
        // 可选:开启 Checkpoint 相关配置
        env.enableCheckpointing(5000); // 每5秒一次 Checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // ... 你的业务逻辑
        
        env.execute("Flink 1.18 HashMap StateBackend Job");
    }
}
B. 配置 EmbeddedRocksDBStateBackend (磁盘)

适用于状态超大(TB级)、需要增量 Checkpoint 的场景。

Maven依赖
RocksDB 虽然包含在 Flink 发行版中,但在 IDE 中开发或进行编程式配置时需要显式添加依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.18.1</version>
    <scope>provided</scope> <!-- 集群环境中已提供,所以用 provided -->
</dependency>

Java 代码示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.CheckpointingMode;

public class FlinkRocksDBExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 1. 设置运行时状态后端为 RocksDB
        //    构造函数传入 true 表示开启增量 Checkpoint (推荐大状态使用)
        EmbeddedRocksDBStateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(rocksDBBackend);
        
        // 2. 设置 Checkpoint 持久化存储 (必须配置,RocksDB 依赖外部存储做快照)
        env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/flink/checkpoints");
        
        // 可选:开启 Checkpoint
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 可选:针对 RocksDB 的调优参数可以通过 Flink 配置或原生 RocksDB 选项设置
        // 例如,通过配置文件 flink-conf.yaml 设置 state.backend.rocksdb.memory.managed: true
        
        // ... 你的业务逻辑
        
        env.execute("Flink 1.18 RocksDB StateBackend Job");
    }
}

5.2 Flink SQL 方式

在 Flink SQL 中,状态后端无法在 SQL 语句中直接通过 WITH 参数动态切换,而是依赖于 集群的配置文件 (flink-conf.yaml)SQL Client 的启动配置

A. 通过配置文件全局设置 (推荐生产环境)

编辑 Flink 集群的 conf/flink-conf.yaml 文件:

# 1. 设置运行时状态后端类型
#    - hashmap: HashMapStateBackend
#    - rocksdb: EmbeddedRocksDBStateBackend
state.backend.type: rocksdb

# 2. 设置 Checkpoint 存储目录 (所有 Job 的默认目录)
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints

# 3. (可选) RocksDB 开启增量 Checkpoint
state.backend.incremental: true

# 4. (可选) 其他 RocksDB 配置,如托管内存
state.backend.rocksdb.memory.managed: true

配置完成后,提交的任何 SQL 作业都会默认使用此状态后端。

B. 在 SQL Client 或会话中动态设置

sql-client 中,可以通过 SET 命令动态调整,但只能调整 Checkpoint 存储等参数,状态后端类型一旦集群启动通常固定。不过,你可以通过 Table API 的环境配置来覆盖。

SQL Client 示例

-- 启动 sql-client 后,设置当前会话的 Checkpoint 目录和模式
SET execution.checkpointing.interval = 5s;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET state.checkpoints.dir = hdfs://namenode:8020/flink/checkpoints;

-- 对于 RocksDB 的增量模式,如果集群配置中未开启,可以在会话中开启
SET state.backend.incremental = true;

-- 然后执行你的 SQL
CREATE TABLE ...;
INSERT INTO ...;

说明:Flink 1.18 在 SQL 层面新增了算子级别的状态保留时间(TTL),可以在编译计划(Compiled Plan)中为不同流设置不同的 TTL,这对优化 regular join 的大状态非常有帮助。


5.3 两种状态后端的区别总结
比较维度 HashMapStateBackend EmbeddedRocksDBStateBackend
存储位置 TaskManager JVM 堆内存 本地磁盘 (RocksDB) + 堆外内存缓存
状态大小上限 受限于 TaskManager 总堆内存,中等规模 非常大,仅受本地磁盘空间限制
读写性能 极快(无序列化开销) 相对较慢(需序列化/反序列化及磁盘 I/O)
Checkpoint 类型 全量 Checkpoint 支持增量 Checkpoint(节省存储与网络)
适用场景 状态小、低延迟、高吞吐的作业 状态超大(如天级窗口、长周期 Join)、需增量快照的作业
配置方式 (DataStream) new HashMapStateBackend() new EmbeddedRocksDBStateBackend(true)
SQL 配置 (conf/yaml) state.backend.type: hashmap state.backend.type: rocksdb
  • SQL 作业特别提示
    对于需要处理大量历史数据或进行 Regular Join 的 SQL 作业,建议使用 RocksDB 并配合算子级别 TTL 来控制状态大小,避免内存溢出。

总结:
简而言之,HashMap内存党(快但有限),RocksDB磁盘党(大但稍慢)。如果项目初期状态不大,可以从 HashMap 开始;一旦涉及到大状态、复杂事件处理或长窗口,建议切换到 RocksDB。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐