在使用 Apache Flink 进行流处理开发时,我们经常会遇到各种 Function

  • MapFunction
  • RichMapFunction
  • ProcessFunction
  • KeyedProcessFunction

初学者常常会有这些疑问:

  • 它们到底有什么区别?
  • 什么场景该用哪个?
  • 为什么很多复杂业务都在用 KeyedProcessFunction

一、四种 Function 一句话理解

Function 一句话理解
MapFunction 最简单的逐条映射
RichMapFunction 带生命周期的 Map
ProcessFunction 底层流控制器(无 key)
KeyedProcessFunction 有 key + 状态 + 定时器的终极形态

二、核心能力对比(重点)

能力 MapFunction RichMapFunction ProcessFunction KeyedProcessFunction
逐条处理
open / close 生命周期
RuntimeContext
Keyed State
Timer 定时器
Side Output
需要 keyBy
控制能力 ⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

总结一句话:能力越强,复杂度越高。


三、MapFunction —— 最轻量的算子

1. 定义

public interface MapFunction<T, R> {
    R map(T value) throws Exception;
}

2. 特点

  • 无状态
  • 无生命周期
  • 无时间、无定时器
  • 性能最好、最简单

3. 使用场景

  • 字段转换
  • 类型映射
  • 简单清洗

4. 示例

stream.map((MapFunction<String, String>) v -> v.toUpperCase());

📌 能用 MapFunction,就不要上复杂 Function。


四、RichMapFunction —— 带生命周期的 Map

1. 相比 MapFunction 多了什么?

  • open() / close() 生命周期
  • RuntimeContext
  • 适合初始化资源(DB / Redis / HTTP 客户端)

2. 示例

stream.map(new RichMapFunction<String, String>() {

    @Override
    public void open(Configuration parameters) {
        System.out.println("初始化资源");
    }

    @Override
    public String map(String value) {
        return "Rich: " + value;
    }

    @Override
    public void close() {
        System.out.println("释放资源");
    }
});

3. 注意点

  • ❌ 仍然不能用状态
  • ❌ 不能注册定时器

📌 适合“需要外部资源,但逻辑简单”的场景。


五、ProcessFunction —— 底层流控制器

1. 核心能力

  • 访问时间(Processing / Event Time)
  • 注册定时器
  • Side Output(侧输出)
  • 更灵活的流控制

2. 但它的限制是?

  • ❌ 不能使用 Keyed State
  • ❌ 不支持 per-key 逻辑

3. 示例(侧输出)

OutputTag<String> badTag = new OutputTag<>("bad"){};

stream.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        if (value.startsWith("bad")) {
            ctx.output(badTag, value);
        } else {
            out.collect(value);
        }
    }
});

4. 使用场景

  • 数据分流
  • 非 key 的定时任务
  • 全局规则控制

六、KeyedProcessFunction —— Flink 的“终极武器”

1. 定义

public abstract class KeyedProcessFunction<K, I, O>

2. 核心能力

  • 必须 keyBy
  • 支持 Keyed State
  • 支持 Timer
  • 每个 key 独立执行逻辑

📌 这是 Flink 里实现复杂业务的核心算子。


3. 示例:超时检测(Processing Time)

需求:某个 key 3 秒内没有新数据,输出告警。

public class TimeoutFunction
        extends KeyedProcessFunction<String, String, String> {

    private transient ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) {
        timerState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("timer", Long.class)
        );
    }

    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        long now = ctx.timerService().currentProcessingTime();
        long fireAt = now + 3000;

        Long old = timerState.value();
        if (old != null) {
            ctx.timerService().deleteProcessingTimeTimer(old);
        }

        ctx.timerService().registerProcessingTimeTimer(fireAt);
        timerState.update(fireAt);

        out.collect("正常数据:" + value);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
        out.collect("⚠️ key=" + ctx.getCurrentKey() + " 超时未上报");
        timerState.clear();
    }
}

4. 使用场景

  • 会话超时
  • 状态机
  • 实时风控
  • 去重
  • 轨迹断流检测

七、如何选择合适的 Function?

决策流程(实战经验)

是否只是字段转换?
 └─ 是 → MapFunction
 └─ 否
     是否需要外部资源?
      └─ 是 → RichMapFunction
      └─ 否
          是否需要 keyBy?
           └─ 否 → ProcessFunction
           └─ 是 → KeyedProcessFunction

八、总结

  • MapFunction 是最轻量的算子
  • RichMapFunction 是带生命周期的 Map
  • ProcessFunction 提供底层流控制,但无 key 状态
  • KeyedProcessFunction 是 Flink 实现复杂实时业务的核心

Map → RichMap → Process → KeyedProcess
本质上是 能力逐级增强的过程

在真实生产环境中:

  • 80% 场景用 Map / RichMap
  • 一旦涉及 状态 + 时间 + 规则
    👉 直接使用 KeyedProcessFunction
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐