package com.example.kafka;

import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * flink 模拟生成订单数据,发生至kafka中,并验证
 */
public class SalesDataWriteKafka {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建自定义Source生成订单数据
        DataStreamSource<Tuple4<String, String, String, Long>> orderStream = env.addSource(new MySource());

        SingleOutputStreamOperator<String> outstream = orderStream.map(new MapFunction<Tuple4<String, String, String, Long>, String>() {
            private final ObjectMapper objectMapper = new ObjectMapper();

            @Override
            public String map(Tuple4<String, String, String, Long> in) throws Exception {
                Map map = new HashMap<>();
                map.put("user_id", in.f1);
                map.put("amount", in.f2);
                map.put("order_time", in.f3);
                String result = objectMapper.writeValueAsString(map);
                System.out.println(result);
                return result;
            }
        });


        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.253.100:9092,192.168.253.101:9092,192.168.253.102:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("orders_topic")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        outstream.sinkTo(sink);
        // 执行程序
        env.execute("Sales Data Sliding Window Processing");
    }

    // 自定义Source生成订单数据
    public static class MySource implements SourceFunction<Tuple4<String, String, String, Long>> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple4<String, String, String, Long>> ctx) throws Exception {
            int n = 0;
            while (isRunning) {
                n++;
                // 生成订单ID
                String orderId = UUID.randomUUID().toString();
                Random random = new Random();
                // 订单下单时间戳(当前时间)
                long orderTime = System.currentTimeMillis() - random.nextInt(4) * 10000;
                // 生成一个随机整数,范围从0(包含)到100(不包含)
                //int randomInt1 = random.nextInt(3);
                int randomInt1 = 0;
                String name[] = {"100001", "100002"};
                String orderMoney[] = {"5"};
                // 获取第二个时间点
                long sendtime = System.currentTimeMillis();
                // 计算时间差(以毫秒为单位)
                long diffe = orderTime - sendtime;
                // 将时间差转换为秒
                double diffetime = (double) diffe / 1000.0;
                // 发送订单数据
                ctx.collect(Tuple4.of(orderId, name[1], orderMoney[0], orderTime));
                // 每秒生成一个订单
                TimeUnit.SECONDS.sleep(1);
//                if (n > 10) {
//                    isRunning = false;
//                }
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

}

测试验证:

[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server master:9092 --topic orders_topic  --from-beginning
{"amount":"5","user_id":"10001","order_time":1775629814608}
{"amount":"5","user_id":"10001","order_time":1775629818613}
{"amount":"5","user_id":"10001","order_time":1775629991298}
{"amount":"5","user_id":"10001","order_time":1775629993561}
{"amount":"5","user_id":"10001","order_time":1775629994570}
{"amount":"5","user_id":"10001","order_time":1775630005574}
{"amount":"5","user_id":"10001","order_time":1775630007598}
{"amount":"5","user_id":"10001","order_time":1775629988603}
{"amount":"5","user_id":"10001","order_time":1775629999638}
{"amount":"5","user_id":"10001","order_time":1775630021658}
{"amount":"5","user_id":"10001","order_time":1775630240818}
{"amount":"5","user_id":"10001","order_time":1775630264951}
{"amount":"5","user_id":"10001","order_time":1775630290959}
{"amount":"5","user_id":"10001","order_time":1775630294005}
{"amount":"5","user_id":"10001","order_time":1775630295007}
{"amount":"5","user_id":"10001","order_time":1775630288011}
{"amount":"5","user_id":"10001","order_time":1775630299012}
{"amount":"5","user_id":"100002","order_time":1775630947658}
{"amount":"5","user_id":"100002","order_time":1775630949659}
{"amount":"5","user_id":"100002","order_time":1775630971661}
{"amount":"5","user_id":"100002","order_time":1775630953662}
{"amount":"5","user_id":"100002","order_time":1775630955665}
{"amount":"5","user_id":"100002","order_time":1775630967666}
{"amount":"5","user_id":"100002","order_time":1775630959668}
{"amount":"5","user_id":"100002","order_time":1775630981671}
{"amount":"5","user_id":"10001","order_time":1775629822601}
{"amount":"5","user_id":"10001","order_time":1775629826610}
{"amount":"5","user_id":"10001","order_time":1775629820615}
{"amount":"5","user_id":"10001","order_time":1775630002559}
{"amount":"5","user_id":"10001","order_time":1775630016590}
{"amount":"5","user_id":"10001","order_time":1775630020644}
{"amount":"5","user_id":"10001","order_time":1775630228803}
{"amount":"5","user_id":"10001","order_time":1775630229806}
{"amount":"5","user_id":"10001","order_time":1775630232921}
{"amount":"5","user_id":"10001","order_time":1775630243937}
{"amount":"5","user_id":"10001","order_time":1775630266961}
{"amount":"5","user_id":"10001","order_time":1775630267965}
{"amount":"5","user_id":"100002","order_time":1775630437215}
{"amount":"5","user_id":"100002","order_time":1775630948659}
{"amount":"5","user_id":"100002","order_time":1775630972661}
{"amount":"5","user_id":"100002","order_time":1775630986665}
{"amount":"5","user_id":"100002","order_time":1775630980669}
{"amount":"5","user_id":"10001","order_time":1775629831550}
{"amount":"5","user_id":"10001","order_time":1775629813607}
{"amount":"5","user_id":"10001","order_time":1775629835609}
{"amount":"5","user_id":"10001","order_time":1775629817612}
{"amount":"5","user_id":"10001","order_time":1775629809614}
{"amount":"5","user_id":"10001","order_time":1775629831617}
{"amount":"5","user_id":"10001","order_time":1775630257731}
{"amount":"5","user_id":"10001","order_time":1775630261834}
{"amount":"5","user_id":"10001","order_time":1775630265958}
{"amount":"5","user_id":"10001","order_time":1775630302002}
{"amount":"5","user_id":"10001","order_time":1775630283004}
{"amount":"5","user_id":"10001","order_time":1775630286008}
{"amount":"5","user_id":"10001","order_time":1775630287009}
{"amount":"5","user_id":"10001","order_time":1775630310014}
{"amount":"5","user_id":"10001","order_time":1775630291015}
{"amount":"5","user_id":"100002","order_time":1775630966650}
{"amount":"5","user_id":"100002","order_time":1775630980661}
{"amount":"5","user_id":"100002","order_time":1775630984663}
{"amount":"5","user_id":"100002","order_time":1775630978667}
{"amount":"5","user_id":"100002","order_time":1775630962671}
{"amount":"5","user_id":"100002","order_time":1775631206960}
{"amount":"5","user_id":"100002","order_time":1775631225950}
{"amount":"5","user_id":"100002","order_time":1775631217962}
{"amount":"5","user_id":"100002","order_time":1775631198962}
{"amount":"5","user_id":"100002","order_time":1775631229980}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐