这几天看了spark集成Kafka,消费Kafka数据并向Kafka发送数据,仿照官方样例写了两个小例子。在此分享一下。


  • 1.添加Kafka的repository
  • 2.DirectKafkaWordCountDemo代码展示
  • 3.kafkaProducer代码展示
  • 4.从Kafka 集群中消费数据并处理后再存入Kafka代码展示

本案例中使用的Kafka为三个broker一个zookeeper的Kafka集群。
本案例中原始消息体为“message: 我是第 n 条信息“


1.添加Kafka的repository

spark中集成Kafka需要引入Kafka的repository,在pom文件中添加如下依赖

  <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

2.DirectKafka代码展示

该段代码,从 Kafka集群中消费原始“message: 我是第n条信息“并把数据进行截断,过滤处理,输出为“我是第n条信息“

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by yangyibo on 16/12/1.
  */
object DirectKafkaWordCountDemo {
  def main(args: Array[String]) {
    val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
    //此处在idea中运行时请保证local[2]核心数大于2
    sprakConf.setMaster("local[2]")
    val ssc = new StreamingContext(sprakConf, Seconds(3))

    val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092";
    val topics = "abel";
    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" ")).filter(!_.equals("message:"))
    val wordCounts = words.map(x=>(x, 1l)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

3.kafkaProducer代码展示

此段代码吧array 中的字符串当作数据发送到Kafka集群中。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by yangyibo on 16/11/29.
  */
object KafkaProducerDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("KafkaProducer")
    val sc = new SparkContext(conf)
    val array = ArrayBuffer("one","tow","three")    
      kafkaProducer(array)
    }

  def kafkaProducer(args: ArrayBuffer[String]) {
    if (args != null) {
      val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
      // Zookeeper connection properties
      val props = new HashMap[String, Object]()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String, String](props)
      val topic = "abel2"
      // Send some messages
        for (arg <- args) {
          println(arg + "----------我已经发送")
          val message = new ProducerRecord[String, String](topic, null, arg)
          producer.send(message)
        }
        Thread.sleep(500)
        producer.close()
    }
  }
}

4.从Kafka 集群中读取数据处理后再存入Kafka代码展示

此段代码是sparkStreaming集合Kafka的消费和发送数据。从Kafka集群中消费原始数据“message: 我是第n条信息“将原始数据处理后为“message: 我是第n条信息--read“并将处理后的数据发送到Kafka集群的另外一个topic中。

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf}

/**
  * Created by yangyibo on 16/11/28.
  */
object KafkaWordCountDemo {
  private val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
  // Zookeeper connection properties
  private val props = new HashMap[String, Object]()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  private val producer = new KafkaProducer[String, String](this.props)

  def main(args: Array[String]): Unit = {
    run()
  }

  def run(): Unit = {
    val zkQuorum = "192.168.100.48:2181"
    val group = "spark-streaming-test"
    val topics = "abel"
    val numThreads = 1

    val sparkConf = new SparkConf().setAppName("KafkaWordCountDemo")
    sparkConf.setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")


    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)       
    val array = ArrayBuffer[String]()
    lines.foreachRDD(rdd => {
      val count = rdd.count().toInt;
      rdd.take(count + 1).take(count).foreach(x => {
        array += x + "--read"
      })
      kafkaProducerSend(array)
      array.clear()
    })
    ssc.start()
    ssc.awaitTermination()
  }

  def kafkaProducerSend(args: ArrayBuffer[String]) {
    if (args != null) {
      val topic = "abel2"
      // Send some messages
      for (arg <- args) {
        println(arg + "----------我已经读取")
        val message = new ProducerRecord[String, String](topic, null, arg)
        producer.send(message)
      }
      Thread.sleep(500)
    }
  }

}
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐