java版Flink关联查询小程序【纯干货】
·
Apache Flink 是一个开源流处理框架,用于在无界和有界数据流上进行状态计算。在 Flink 中实现关联查询(Join Operation),尤其是在处理实时数据流时,是非常重要的功能。关联查询允许你将两个或多个数据流根据某些条件合并起来,这对于诸如事件关联、实时分析等场景非常有用。
代码测试例子
package com.example.connect;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* flink多流连接小程序
*/
public class CustomConnectMapExample {
public static void main(String[] args) throws Exception {
// 设置流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个模拟的数据流
DataStreamSource<Tuple3<String, String, String>> ds1= env.fromElements(Tuple3.of("1001","张三","男"),Tuple3.of("1002","李四","女"),Tuple3.of("1003","王五","女"));
// 创建一个模拟的数据流
DataStreamSource<Tuple3<String, String, String>> ds2= env.fromElements(Tuple3.of("1001","语文","100"),Tuple3.of("1001","数学","99"),Tuple3.of("1001","体育","99"),Tuple3.of("1002","语文","100"),Tuple3.of("1002","数学","99"),Tuple3.of("1003","语文","88"),Tuple3.of("1003","数学","80"));
ConnectedStreams<Tuple3<String, String, String>, Tuple3<String, String, String>> conn= ds1.connect(ds2).keyBy(0,0);
SingleOutputStreamOperator connr= conn.process(new CoProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple3<String, String, String>>() {
Map<String , List<Tuple3<String, String, String>>> map1=new HashMap();
Map<String , List<Tuple3<String, String, String>>> map2=new HashMap();
@Override
public void processElement1(Tuple3<String, String, String> value, CoProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple3<String, String, String>>.Context context, Collector<Tuple3<String, String, String>> collector) throws Exception {
String id=value.f0;
if (!map1.containsKey(id)) {
List list=new ArrayList();
list.add(value);
map1.put(id,list);
} else {
map1.get(id).add(value);
}
if (map2.containsKey(id)){
List <Tuple3<String, String, String>> list2=map2.get(id);
for (Tuple3<String, String, String> value2 :list2){
Tuple3<String, String, String> info=new Tuple3<>(value.f0,value2.f1,value2.f2);
collector.collect(info);
}
}
}
@Override
public void processElement2(Tuple3<String, String, String> value, CoProcessFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, Tuple3<String, String, String>>.Context context, Collector<Tuple3<String, String, String>> collector) throws Exception {
String id=value.f0;
if (!map2.containsKey(id)) {
List list=new ArrayList();
list.add(value);
map2.put(id,list);
} else {
map2.get(id).add(value);
}
if (map1.containsKey(id)){
List <Tuple3<String, String, String>> list1=map1.get(id);
for (Tuple3<String, String, String> value2 :list1){
Tuple3<String, String, String> info=new Tuple3<>(value2.f0,value.f1,value.f2);
collector.collect(info);
}
}
}
});
connr.print();
env.execute();
}
}
测试结果:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
2> (1001,语文,100)
3> (1003,语文,88)
2> (1001,数学,99)
3> (1003,数学,80)
2> (1001,体育,99)
2> (1002,语文,100)
2> (1002,数学,99)
更多推荐

所有评论(0)