sparkStreaming集成Kafka
·
这几天看了spark集成Kafka,消费Kafka数据并向Kafka发送数据,仿照官方样例写了两个小例子。在此分享一下。
- 1.添加Kafka的repository
- 2.DirectKafkaWordCountDemo代码展示
- 3.kafkaProducer代码展示
- 4.从Kafka 集群中消费数据并处理后再存入Kafka代码展示
本案例中使用的Kafka为三个broker一个zookeeper的Kafka集群。
本案例中原始消息体为“message: 我是第 n 条信息“
1.添加Kafka的repository
spark中集成Kafka需要引入Kafka的repository,在pom文件中添加如下依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
2.DirectKafka代码展示
该段代码,从 Kafka集群中消费原始“message: 我是第n条信息“并把数据进行截断,过滤处理,输出为“我是第n条信息“
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by yangyibo on 16/12/1.
*/
object DirectKafkaWordCountDemo {
def main(args: Array[String]) {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
//此处在idea中运行时请保证local[2]核心数大于2
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092";
val topics = "abel";
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" ")).filter(!_.equals("message:"))
val wordCounts = words.map(x=>(x, 1l)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
3.kafkaProducer代码展示
此段代码吧array 中的字符串当作数据发送到Kafka集群中。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/**
* Created by yangyibo on 16/11/29.
*/
object KafkaProducerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaProducer")
val sc = new SparkContext(conf)
val array = ArrayBuffer("one","tow","three")
kafkaProducer(array)
}
def kafkaProducer(args: ArrayBuffer[String]) {
if (args != null) {
val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "abel2"
// Send some messages
for (arg <- args) {
println(arg + "----------我已经发送")
val message = new ProducerRecord[String, String](topic, null, arg)
producer.send(message)
}
Thread.sleep(500)
producer.close()
}
}
}
4.从Kafka 集群中读取数据处理后再存入Kafka代码展示
此段代码是sparkStreaming集合Kafka的消费和发送数据。从Kafka集群中消费原始数据“message: 我是第n条信息“将原始数据处理后为“message: 我是第n条信息--read“并将处理后的数据发送到Kafka集群的另外一个topic中。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf}
/**
* Created by yangyibo on 16/11/28.
*/
object KafkaWordCountDemo {
private val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
// Zookeeper connection properties
private val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
private val producer = new KafkaProducer[String, String](this.props)
def main(args: Array[String]): Unit = {
run()
}
def run(): Unit = {
val zkQuorum = "192.168.100.48:2181"
val group = "spark-streaming-test"
val topics = "abel"
val numThreads = 1
val sparkConf = new SparkConf().setAppName("KafkaWordCountDemo")
sparkConf.setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val array = ArrayBuffer[String]()
lines.foreachRDD(rdd => {
val count = rdd.count().toInt;
rdd.take(count + 1).take(count).foreach(x => {
array += x + "--read"
})
kafkaProducerSend(array)
array.clear()
})
ssc.start()
ssc.awaitTermination()
}
def kafkaProducerSend(args: ArrayBuffer[String]) {
if (args != null) {
val topic = "abel2"
// Send some messages
for (arg <- args) {
println(arg + "----------我已经读取")
val message = new ProducerRecord[String, String](topic, null, arg)
producer.send(message)
}
Thread.sleep(500)
}
}
}更多推荐
所有评论(0)