1. 添加maven依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

2. 订单生产者

package com.demo;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;

import com.demo.utils.ConstantUtils;
import com.demo.utils.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 订单 kafka消息生产者
 */
public class OrderProducer {
    private static Logger logger = LoggerFactory.getLogger(OrderProducer.class);

    public static void main(String[] args) throws IOException {
        // set up the producer
        Producer<String, String> producer = null;
        ObjectMapper mapper = new ObjectMapper();

        try {

            Properties props = new Properties();
            // kafka集群
            props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);

            // 配置value的序列化类
            props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
            // 配置key的序列化类
            props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);

            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
            // 定义发布消息体
            List<KeyedMessage<String, String>> messages = new ArrayList<>();
            // 每隔3秒生产随机个订单消息
            while (true) {
                int random = RandomUtils.getRandomNum(20);
                if (random == 0) {
                    continue;
                }
                messages.clear();
                for (int i = 0; i < random; i++) {
                    int orderRandom = RandomUtils.getRandomNum(random * 10);
                    Order order = new Order("name" + orderRandom, Float.valueOf("" + orderRandom));

                    Random rnd = new Random();
                    int rndvalue = rnd.nextInt(5);

                    order.setMerchantCode("10388100011" + rndvalue);
                    order.setDeptCode("1000" + rndvalue);

                    // 订单消息体:topic和消息
                    KeyedMessage<String, String> message = new KeyedMessage<String, String>(
                            ConstantUtils.ORDER_TOPIC, mapper.writeValueAsString(order));
                    messages.add(message);
                }

                producer.send(messages);
                logger.warn("orderNum:" + random + ",message:" + messages.toString());
                Thread.sleep(5000);

            }

        } catch (Exception e) {
            e.printStackTrace();
            logger.error("-------------:" + e.getStackTrace());
        } finally {
            producer.close();
        }

    }
}

3. 订单统计

package com.demo;


import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import com.demo.utils.ConstantUtils;
import com.demo.utils.SparkUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;


import kafka.serializer.StringDecoder;
import scala.Tuple2;

import static jdk.nashorn.internal.objects.NativeArray.reduce;

/**
 * spark streaming
 * 1. 统计订单量和订单总值
 * 2. 按商户分组统计商户和分支机构
 *
 * @author liangming.deng
 *
 */
public class OrderSparkStreaming {
    private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
    private static AtomicLong orderCount = new AtomicLong(0);
    private static AtomicDouble totalPrice = new AtomicDouble(0);

    public static void main(String[] args) throws InterruptedException {

        // Create context with a 2 seconds batch interval
        JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
                "local[2]", null, Durations.seconds(10));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);
        kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);

        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                topicsSet);

        // json与对象映射对象
        final ObjectMapper mapper = new ObjectMapper();
        JavaDStream<Order> orderDStream = orderMsgStream
                .map(new Function<Tuple2<String, String>, Order>() {
                    /**
                     *
                     */
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Order call(Tuple2<String, String> t2) throws Exception {
                        Order order = mapper.readValue(t2._2, Order.class);
                        return order;
                    }
                }).cache();

        // 对DStream中的每一个RDD进行操作
        orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {
            /**
             *
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<Order> orderJavaRDD) throws Exception {
                long count = orderJavaRDD.count();
                if (count > 0) {
                    // 累加订单总数
                    orderCount.addAndGet(count);
                    // 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD
                    // 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格总和
                    JavaRDD<Float> priceMap = orderJavaRDD.map(new Function<Order, Float>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public Float call(Order order) throws Exception {
                            return order.getPrice();
                        }
                    });

                    Float sumPrice = priceMap.reduce(new Function2<Float, Float, Float>() {
                        /**
                         *
                         */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public Float call(Float a, Float b) throws Exception {
                            return a + b;
                        }
                    });
                    // 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。
                    totalPrice.getAndAdd(sumPrice);

                    // 数据订单总数和价格总和,生产环境中可以写入数据库
                    System.out.println("-------Total order count : " + orderCount.get()
                            + " with total price : " + totalPrice.get());
                }
            }
        });

        // 对DStream中的每一个RDD进行操作
        orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {

            @Override
            public void call(JavaRDD<Order> orderJavaRDD) throws Exception {

                JavaPairRDD<String, Float> javaPairRDD = orderJavaRDD.mapToPair(new PairFunction<Order, String, Float>() {

                    @Override
                    public Tuple2<String, Float> call(Order order) throws Exception {
                        String key = order.getMerchantCode() + "-" + order.getDeptCode();
                        Float value = order.getPrice();
                        return new Tuple2<String, Float>(key, value);
                    }
                });

                JavaPairRDD<String, Float> javaPairRDD2 = javaPairRDD.reduceByKey(new Function2<Float, Float, Float>() {
                    @Override
                    public Float call(Float aFloat, Float aFloat2) throws Exception {
                        return aFloat + aFloat2;
                    }
                });

                javaPairRDD2.foreach(new VoidFunction<Tuple2<String, Float>>() {
                                         @Override
                                         public void call(Tuple2<String, Float> t2) throws Exception {
                                             System.out.println("key = " + t2._1 + " value=" + t2._2);
                                         }
                                     }
                );

            }
        });

        orderDStream.print();

        jssc.start(); // Start the computation

        jssc.awaitTermination(); // Wait for the computation to terminate
        jssc.stop();
    }
}

4. 辅助类ConstantUtils

package com.demo.utils;

public class ConstantUtils {
    public final static String METADATA_BROKER_LIST_VALUE = "10.18.0.34:9092";

    public final static String AUTO_OFFSET_RESET_VALUE = "smallest";

    public final static String SERIALIZER_CLASS_VALUE = "kafka.serializer.StringEncoder";

    public final static String ORDER_TOPIC = "orderTopic";
}

5. 辅助类RandomUtils

package com.demo.utils;

import java.util.Random;

/**
 * 随机数获取
 *
 */
public class RandomUtils {

    public static int getRandomNum(int bound){
        Random random = new Random();
        return random.nextInt(bound);
    }

    public static void main(String[] args) throws InterruptedException{
        while(true){
            int randomNum = getRandomNum(20);
            System.out.println(randomNum);

            for(int i=0; i<randomNum;i++){
                System.out.println("random:" + getRandomNum(randomNum*10));
            }

            Thread.sleep(30000);
        }
    }
}

6. 辅助类SparkUtils

package com.demo.utils;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

/**
 * spark工具类
 *
 */
public class SparkUtils {

    /**
     * 获取JavaSparkContext
     *
     * @return
     */
    public static JavaSparkContext getJavaSparkContext(String appName, String master,
                                                       String logLeverl) {
        SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        jsc.setLogLevel(logLeverl);

        return jsc;
    }

    /**
     * 获取JavaSparkContext
     *
     * @return
     */
    public static JavaStreamingContext getJavaStreamingContext(String appName, String master,
                                                               String logLeverl,Duration batchDuration) {
        SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);

        JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,batchDuration);
        return jsc;
    }


    /**
     * 外部存储方式读取RDD,文件读取
     *
     * @param jsc
     * @return
     */
    public static JavaRDD<String> createRddExternal(JavaSparkContext jsc, String filePath) {
        if (jsc == null) {
            return null;
        }

        // 文件读取方式创建RDD
        JavaRDD<String> readmeRdd = jsc.textFile(filePath);

        return readmeRdd;

    }

    /**
     * 集合方式创建RDD
     *
     * @param jsc
     * @return
     */
    public static JavaRDD<Integer> createRddCollect(JavaSparkContext jsc, List<Integer> list) {
        if (jsc == null) {
            return null;
        }

        // 创建RDD
        JavaRDD<Integer> listRdd = jsc.parallelize(list);

        return listRdd;
    }

    /**
     * 集合方式创建PairRDD
     *
     * @param jsc
     * @return
     */
    public static JavaPairRDD<Integer, Integer> createPairRddCollect(JavaSparkContext jsc,
                                                                     List<Tuple2<Integer, Integer>> list) {
        if (jsc == null) {
            return null;
        }

        // 创建RDD
        JavaPairRDD<Integer, Integer> pairRDD = jsc.parallelizePairs(list);

        return pairRDD;
    }
}

7. 统计输出

 8. 监控查看执行情况

通过监控可以查看任务执行相关信息,以便在任务多的时候进行性能优化调整。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐