java 版flink模拟生成订单数据写入kafka小程序【纯干货】
package com.example.kafka;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* flink 模拟生成订单数据,发生至kafka中,并验证
*/
public class SalesDataWriteKafka {
public static void main(String[] args) throws Exception {
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建自定义Source生成订单数据
DataStreamSource<Tuple4<String, String, String, Long>> orderStream = env.addSource(new MySource());
SingleOutputStreamOperator<String> outstream = orderStream.map(new MapFunction<Tuple4<String, String, String, Long>, String>() {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public String map(Tuple4<String, String, String, Long> in) throws Exception {
Map map = new HashMap<>();
map.put("user_id", in.f1);
map.put("amount", in.f2);
map.put("order_time", in.f3);
String result = objectMapper.writeValueAsString(map);
System.out.println(result);
return result;
}
});
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.253.100:9092,192.168.253.101:9092,192.168.253.102:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("orders_topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
outstream.sinkTo(sink);
// 执行程序
env.execute("Sales Data Sliding Window Processing");
}
// 自定义Source生成订单数据
public static class MySource implements SourceFunction<Tuple4<String, String, String, Long>> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple4<String, String, String, Long>> ctx) throws Exception {
int n = 0;
while (isRunning) {
n++;
// 生成订单ID
String orderId = UUID.randomUUID().toString();
Random random = new Random();
// 订单下单时间戳(当前时间)
long orderTime = System.currentTimeMillis() - random.nextInt(4) * 10000;
// 生成一个随机整数,范围从0(包含)到100(不包含)
//int randomInt1 = random.nextInt(3);
int randomInt1 = 0;
String name[] = {"100001", "100002"};
String orderMoney[] = {"5"};
// 获取第二个时间点
long sendtime = System.currentTimeMillis();
// 计算时间差(以毫秒为单位)
long diffe = orderTime - sendtime;
// 将时间差转换为秒
double diffetime = (double) diffe / 1000.0;
// 发送订单数据
ctx.collect(Tuple4.of(orderId, name[1], orderMoney[0], orderTime));
// 每秒生成一个订单
TimeUnit.SECONDS.sleep(1);
// if (n > 10) {
// isRunning = false;
// }
}
}
@Override
public void cancel() {
isRunning = false;
}
}
}
测试验证:
[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server master:9092 --topic orders_topic --from-beginning
{"amount":"5","user_id":"10001","order_time":1775629814608}
{"amount":"5","user_id":"10001","order_time":1775629818613}
{"amount":"5","user_id":"10001","order_time":1775629991298}
{"amount":"5","user_id":"10001","order_time":1775629993561}
{"amount":"5","user_id":"10001","order_time":1775629994570}
{"amount":"5","user_id":"10001","order_time":1775630005574}
{"amount":"5","user_id":"10001","order_time":1775630007598}
{"amount":"5","user_id":"10001","order_time":1775629988603}
{"amount":"5","user_id":"10001","order_time":1775629999638}
{"amount":"5","user_id":"10001","order_time":1775630021658}
{"amount":"5","user_id":"10001","order_time":1775630240818}
{"amount":"5","user_id":"10001","order_time":1775630264951}
{"amount":"5","user_id":"10001","order_time":1775630290959}
{"amount":"5","user_id":"10001","order_time":1775630294005}
{"amount":"5","user_id":"10001","order_time":1775630295007}
{"amount":"5","user_id":"10001","order_time":1775630288011}
{"amount":"5","user_id":"10001","order_time":1775630299012}
{"amount":"5","user_id":"100002","order_time":1775630947658}
{"amount":"5","user_id":"100002","order_time":1775630949659}
{"amount":"5","user_id":"100002","order_time":1775630971661}
{"amount":"5","user_id":"100002","order_time":1775630953662}
{"amount":"5","user_id":"100002","order_time":1775630955665}
{"amount":"5","user_id":"100002","order_time":1775630967666}
{"amount":"5","user_id":"100002","order_time":1775630959668}
{"amount":"5","user_id":"100002","order_time":1775630981671}
{"amount":"5","user_id":"10001","order_time":1775629822601}
{"amount":"5","user_id":"10001","order_time":1775629826610}
{"amount":"5","user_id":"10001","order_time":1775629820615}
{"amount":"5","user_id":"10001","order_time":1775630002559}
{"amount":"5","user_id":"10001","order_time":1775630016590}
{"amount":"5","user_id":"10001","order_time":1775630020644}
{"amount":"5","user_id":"10001","order_time":1775630228803}
{"amount":"5","user_id":"10001","order_time":1775630229806}
{"amount":"5","user_id":"10001","order_time":1775630232921}
{"amount":"5","user_id":"10001","order_time":1775630243937}
{"amount":"5","user_id":"10001","order_time":1775630266961}
{"amount":"5","user_id":"10001","order_time":1775630267965}
{"amount":"5","user_id":"100002","order_time":1775630437215}
{"amount":"5","user_id":"100002","order_time":1775630948659}
{"amount":"5","user_id":"100002","order_time":1775630972661}
{"amount":"5","user_id":"100002","order_time":1775630986665}
{"amount":"5","user_id":"100002","order_time":1775630980669}
{"amount":"5","user_id":"10001","order_time":1775629831550}
{"amount":"5","user_id":"10001","order_time":1775629813607}
{"amount":"5","user_id":"10001","order_time":1775629835609}
{"amount":"5","user_id":"10001","order_time":1775629817612}
{"amount":"5","user_id":"10001","order_time":1775629809614}
{"amount":"5","user_id":"10001","order_time":1775629831617}
{"amount":"5","user_id":"10001","order_time":1775630257731}
{"amount":"5","user_id":"10001","order_time":1775630261834}
{"amount":"5","user_id":"10001","order_time":1775630265958}
{"amount":"5","user_id":"10001","order_time":1775630302002}
{"amount":"5","user_id":"10001","order_time":1775630283004}
{"amount":"5","user_id":"10001","order_time":1775630286008}
{"amount":"5","user_id":"10001","order_time":1775630287009}
{"amount":"5","user_id":"10001","order_time":1775630310014}
{"amount":"5","user_id":"10001","order_time":1775630291015}
{"amount":"5","user_id":"100002","order_time":1775630966650}
{"amount":"5","user_id":"100002","order_time":1775630980661}
{"amount":"5","user_id":"100002","order_time":1775630984663}
{"amount":"5","user_id":"100002","order_time":1775630978667}
{"amount":"5","user_id":"100002","order_time":1775630962671}
{"amount":"5","user_id":"100002","order_time":1775631206960}
{"amount":"5","user_id":"100002","order_time":1775631225950}
{"amount":"5","user_id":"100002","order_time":1775631217962}
{"amount":"5","user_id":"100002","order_time":1775631198962}
{"amount":"5","user_id":"100002","order_time":1775631229980}
更多推荐

所有评论(0)