Flink 中 MapFunction、RichMapFunction、ProcessFunction、KeyedProcessFunction 的区别
·
文章目录
在使用 Apache Flink 进行流处理开发时,我们经常会遇到各种 Function:
MapFunctionRichMapFunctionProcessFunctionKeyedProcessFunction
初学者常常会有这些疑问:
- 它们到底有什么区别?
- 什么场景该用哪个?
- 为什么很多复杂业务都在用
KeyedProcessFunction?
一、四种 Function 一句话理解
| Function | 一句话理解 |
|---|---|
| MapFunction | 最简单的逐条映射 |
| RichMapFunction | 带生命周期的 Map |
| ProcessFunction | 底层流控制器(无 key) |
| KeyedProcessFunction | 有 key + 状态 + 定时器的终极形态 |
二、核心能力对比(重点)
| 能力 | MapFunction | RichMapFunction | ProcessFunction | KeyedProcessFunction |
|---|---|---|---|---|
| 逐条处理 | ✅ | ✅ | ✅ | ✅ |
| open / close 生命周期 | ❌ | ✅ | ✅ | ✅ |
| RuntimeContext | ❌ | ✅ | ✅ | ✅ |
| Keyed State | ❌ | ❌ | ❌ | ✅ |
| Timer 定时器 | ❌ | ❌ | ✅ | ✅ |
| Side Output | ❌ | ❌ | ✅ | ✅ |
| 需要 keyBy | ❌ | ❌ | ❌ | ✅ |
| 控制能力 | ⭐ | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
总结一句话:能力越强,复杂度越高。
三、MapFunction —— 最轻量的算子
1. 定义
public interface MapFunction<T, R> {
R map(T value) throws Exception;
}
2. 特点
- 无状态
- 无生命周期
- 无时间、无定时器
- 性能最好、最简单
3. 使用场景
- 字段转换
- 类型映射
- 简单清洗
4. 示例
stream.map((MapFunction<String, String>) v -> v.toUpperCase());
📌 能用 MapFunction,就不要上复杂 Function。
四、RichMapFunction —— 带生命周期的 Map
1. 相比 MapFunction 多了什么?
open()/close()生命周期RuntimeContext- 适合初始化资源(DB / Redis / HTTP 客户端)
2. 示例
stream.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) {
System.out.println("初始化资源");
}
@Override
public String map(String value) {
return "Rich: " + value;
}
@Override
public void close() {
System.out.println("释放资源");
}
});
3. 注意点
- ❌ 仍然不能用状态
- ❌ 不能注册定时器
📌 适合“需要外部资源,但逻辑简单”的场景。
五、ProcessFunction —— 底层流控制器
1. 核心能力
- 访问时间(Processing / Event Time)
- 注册定时器
- Side Output(侧输出)
- 更灵活的流控制
2. 但它的限制是?
- ❌ 不能使用 Keyed State
- ❌ 不支持 per-key 逻辑
3. 示例(侧输出)
OutputTag<String> badTag = new OutputTag<>("bad"){};
stream.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
if (value.startsWith("bad")) {
ctx.output(badTag, value);
} else {
out.collect(value);
}
}
});
4. 使用场景
- 数据分流
- 非 key 的定时任务
- 全局规则控制
六、KeyedProcessFunction —— Flink 的“终极武器”
1. 定义
public abstract class KeyedProcessFunction<K, I, O>
2. 核心能力
- 必须
keyBy - 支持 Keyed State
- 支持 Timer
- 每个 key 独立执行逻辑
📌 这是 Flink 里实现复杂业务的核心算子。
3. 示例:超时检测(Processing Time)
需求:某个 key 3 秒内没有新数据,输出告警。
public class TimeoutFunction
extends KeyedProcessFunction<String, String, String> {
private transient ValueState<Long> timerState;
@Override
public void open(Configuration parameters) {
timerState = getRuntimeContext().getState(
new ValueStateDescriptor<>("timer", Long.class)
);
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
long now = ctx.timerService().currentProcessingTime();
long fireAt = now + 3000;
Long old = timerState.value();
if (old != null) {
ctx.timerService().deleteProcessingTimeTimer(old);
}
ctx.timerService().registerProcessingTimeTimer(fireAt);
timerState.update(fireAt);
out.collect("正常数据:" + value);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("⚠️ key=" + ctx.getCurrentKey() + " 超时未上报");
timerState.clear();
}
}
4. 使用场景
- 会话超时
- 状态机
- 实时风控
- 去重
- 轨迹断流检测
七、如何选择合适的 Function?
决策流程(实战经验)
是否只是字段转换?
└─ 是 → MapFunction
└─ 否
是否需要外部资源?
└─ 是 → RichMapFunction
└─ 否
是否需要 keyBy?
└─ 否 → ProcessFunction
└─ 是 → KeyedProcessFunction
八、总结
- MapFunction 是最轻量的算子
- RichMapFunction 是带生命周期的 Map
- ProcessFunction 提供底层流控制,但无 key 状态
- KeyedProcessFunction 是 Flink 实现复杂实时业务的核心
Map → RichMap → Process → KeyedProcess
本质上是 能力逐级增强的过程。
在真实生产环境中:
- 80% 场景用
Map / RichMap - 一旦涉及 状态 + 时间 + 规则
👉 直接使用KeyedProcessFunction
更多推荐
所有评论(0)