package com.example.window;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class LateDataWindowExample {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 模拟输入数据: 时间戳,数值
        DataStream<String> text = env.fromElements(
                "1000,a",     // 第1秒
                "2000,b",     // 第2秒
                "3000,c",     // 第3秒
                "5000,e",     // 第5秒 (跳过第4秒)
                "4000,d",     // 迟到数据,第4秒的数据在第5秒之后到达
                "6000,f",     // 第6秒
                "14000,n",    // 第14秒
                "15000,o",    // 第15秒
                "16000,p",    // 第16秒
                "17000,q",    // 第17秒
                "18000,r",    // 第18秒
                "19000,s",    // 第19秒
                "20000,t",    // 第20秒
                "21000,u",    // 第21秒
                "7000,g",     // 第7秒
                "8000,h",     // 第8秒
                "9000,i",     // 第9秒
                "10000,j",    // 第10秒
                "11000,k",    // 第11秒
                "12000,l",    // 第12秒
                "13000,m",    // 第13秒
                "22000,v",    // 第22秒
                "23000,w",    // 第23秒
                "24000,x",    // 第24秒
                "25000,y",    // 第25秒
                "26000,z",    // 第26秒
                "27000,aa",   // 第27秒
                "28000,bb",   // 第28秒
                "29000,cc",   // 第29秒
                "30000,dd"    // 第30秒
        );

        // 解析字符串为Tuple2<Long, String>
        DataStream<Tuple2<Long, String>> parsedStream = text.map(new MapFunction<String, Tuple2<Long, String>>() {
            @Override
            public Tuple2<Long, String> map(String value) {
                String[] parts = value.split(",");
                return new Tuple2<Long, String>(Long.parseLong(parts[0]), parts[1]);
            }
        });

        // 定义水印策略,允许最大5秒的乱序
        WatermarkStrategy<Tuple2<Long, String>> watermarkStrategy = WatermarkStrategy
                .<Tuple2<Long, String>>forBoundedOutOfOrderness(Time.seconds(5).toDuration())
                .withTimestampAssigner((event, timestamp) -> event.f0);

        // 应用水印策略
        DataStream<Tuple2<Long, String>> timedStream = parsedStream.assignTimestampsAndWatermarks(watermarkStrategy);

        // 定义侧输出标签,用于收集迟到数据
        final OutputTag<Tuple2<Long, String>> lateOutputTag = new OutputTag<Tuple2<Long, String>>("late-data"){};

        // 定义聚合函数
        AggregateFunction<Tuple2<Long, String>, Integer, Integer> aggregateFunction =
                new AggregateFunction<Tuple2<Long, String>, Integer, Integer>() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(Tuple2<Long, String> value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                };

        // 开5秒的滚动窗口,允许2秒的迟到数据
        SingleOutputStreamOperator<Integer> windowedStream = timedStream
                .keyBy(value -> "global") // 全局key
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(1))  // 允许2秒的迟到数据
                .sideOutputLateData(lateOutputTag)  // 将迟到数据发送到侧输出
                .aggregate(aggregateFunction);      // 计算每个窗口的元素数量

        // 获取迟到数据流
        DataStream<Tuple2<Long, String>> lateDataStream = windowedStream.getSideOutput(lateOutputTag);

        // 打印主输出结果(窗口计算结果)
        System.out.println("=== 窗口计算结果 ===");
        windowedStream.print("Window Result");

        // 打印迟到数据
        System.out.println("=== 迟到数据 ===");
        lateDataStream.print("Late Data");

        // 执行程序
        env.execute("Flink Window Late Data Processing Example");
    }
}

运行结果:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
=== 窗口计算结果 ===
=== 迟到数据 ===
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
Window Result> 4
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 1

Process finished with exit code 0

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐