java版 flink aggregate函数应用小程序【纯干货】
·
package com.example.window;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
public class LateDataWindowExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 模拟输入数据: 时间戳,数值
DataStream<String> text = env.fromElements(
"1000,a", // 第1秒
"2000,b", // 第2秒
"3000,c", // 第3秒
"5000,e", // 第5秒 (跳过第4秒)
"4000,d", // 迟到数据,第4秒的数据在第5秒之后到达
"6000,f", // 第6秒
"14000,n", // 第14秒
"15000,o", // 第15秒
"16000,p", // 第16秒
"17000,q", // 第17秒
"18000,r", // 第18秒
"19000,s", // 第19秒
"20000,t", // 第20秒
"21000,u", // 第21秒
"7000,g", // 第7秒
"8000,h", // 第8秒
"9000,i", // 第9秒
"10000,j", // 第10秒
"11000,k", // 第11秒
"12000,l", // 第12秒
"13000,m", // 第13秒
"22000,v", // 第22秒
"23000,w", // 第23秒
"24000,x", // 第24秒
"25000,y", // 第25秒
"26000,z", // 第26秒
"27000,aa", // 第27秒
"28000,bb", // 第28秒
"29000,cc", // 第29秒
"30000,dd" // 第30秒
);
// 解析字符串为Tuple2<Long, String>
DataStream<Tuple2<Long, String>> parsedStream = text.map(new MapFunction<String, Tuple2<Long, String>>() {
@Override
public Tuple2<Long, String> map(String value) {
String[] parts = value.split(",");
return new Tuple2<Long, String>(Long.parseLong(parts[0]), parts[1]);
}
});
// 定义水印策略,允许最大5秒的乱序
WatermarkStrategy<Tuple2<Long, String>> watermarkStrategy = WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Time.seconds(5).toDuration())
.withTimestampAssigner((event, timestamp) -> event.f0);
// 应用水印策略
DataStream<Tuple2<Long, String>> timedStream = parsedStream.assignTimestampsAndWatermarks(watermarkStrategy);
// 定义侧输出标签,用于收集迟到数据
final OutputTag<Tuple2<Long, String>> lateOutputTag = new OutputTag<Tuple2<Long, String>>("late-data"){};
// 定义聚合函数
AggregateFunction<Tuple2<Long, String>, Integer, Integer> aggregateFunction =
new AggregateFunction<Tuple2<Long, String>, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2<Long, String> value, Integer accumulator) {
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
};
// 开5秒的滚动窗口,允许2秒的迟到数据
SingleOutputStreamOperator<Integer> windowedStream = timedStream
.keyBy(value -> "global") // 全局key
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(1)) // 允许2秒的迟到数据
.sideOutputLateData(lateOutputTag) // 将迟到数据发送到侧输出
.aggregate(aggregateFunction); // 计算每个窗口的元素数量
// 获取迟到数据流
DataStream<Tuple2<Long, String>> lateDataStream = windowedStream.getSideOutput(lateOutputTag);
// 打印主输出结果(窗口计算结果)
System.out.println("=== 窗口计算结果 ===");
windowedStream.print("Window Result");
// 打印迟到数据
System.out.println("=== 迟到数据 ===");
lateDataStream.print("Late Data");
// 执行程序
env.execute("Flink Window Late Data Processing Example");
}
}
运行结果:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
=== 窗口计算结果 ===
=== 迟到数据 ===
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
Window Result> 4
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 5
Window Result> 1
Process finished with exit code 0
更多推荐

所有评论(0)