
告别Zookeeper,两条命令容器化搭建Kafka
在大数据的实时数据处理中,不论是使用Spark、还是Flink,都需要与其他组件进行数据交互才有意义。在整个数据流处理中,交互组件的性能决定了数据处理的效率,例如在与缓存中间件Redis的交互,QPS过高就会导致响应过慢,进而表现为程序整体数据处理延时。如何保证组件性能就成为了重中之重,所以在选择组件的时候,我们会根据其测试的性能指标作为参考依据。在大数据实时流处理中,Kafka是用的比较多的数据
前言
在大数据的实时数据处理中,不论是使用Spark、还是Flink,都需要与其他组件进行数据交互才有意义。在整个数据流处理中,交互组件的性能决定了数据处理的效率,例如在与缓存中间件Redis的交互,QPS过高就会导致响应过慢,进而表现为程序整体数据处理延时。
如何保证组件性能就成为了重中之重,所以在选择组件的时候,我们会根据其测试的性能指标作为参考依据。在大数据实时流处理中,Kafka是用的比较多的数据源组件。其分区机制提高了并发、副本机制保障了数据的高可用。
除此之外,零拷贝、磁盘顺序读写以及数据文件的索引设计,都极大提高了Kafka的性能。随着Kafka版本的更迭,Kafka也已经成长到,不再依靠Zookeeper实现元数据的管理和节点控制。所以,今天就跟着官方文档,使用Kafka 3.7.0版本,在云服务器上利用docker来搭建一个Kafka。
Kafka镜像
使用docker搭建kafka,不用考虑平台和环境,使用docker pull直接拉取镜像就可以了。官方的文档也给出了命令。
1. 拉取镜像
执行命令,拉取kafka的镜像。
docker pull apache/kafka:3.7.0
拉取失败,提示”missing signature key“,刚开始以为是镜像仓库的问题,后来查找资料是云主机上的docker版本太老了。
使用docker --version查看现在docker的版本是1.13.1,所以将docker卸载了重装。
yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine
执行上面命令卸载与docker相关的包和依赖。
卸载完成之后,我们重新安装新版本的docker。
yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum install -y docker-ce docker-ce-cli containerd.io
systemctl restart docker
添加docker-ce的yum源,安装docker-ce,然后重启docker服务。我们再查看docker的版本,就已经变成了26.1.4。
然后我们再次拉取镜像成功。
2. 启动容器
接下来我们就可以利用新拉取的镜像,来启动一个Kafka的容器。
docker run -d --name kafka -p 9092:9092 apache/kafka:3.7.0
查看启动日志:
这样就新建了一个kafka容器,我们也拥有了一个单节点的Kafka,从日志中不难看出Kafka容器中,没有启动Zookeeper,而是启动了一个KafkaRaftServer(简称:KRaft),KRaft代替了Zookeeper,而且是启动在Kafka的节点上的。
目前Kafka提供了两种启动方式,KRaft和Zookeeper二选一,而且不论是server.properties的配置,还是集群启动方式,都是有区别的,这个后面搭建集群的时候讲。这里我们可以看到kafka容器进程。
Kafka客户端
现在Kafka的broker服务运行在docker中了,如果我们想要在Linux中连接这个Kafka,就需要Kafka的一些命令。所以我们需要下载一个Kafka的安装包,解压之后在bin目录下我们会发现一系列的Kafka命令,来这些命令对Kafka进行操作。
其实最常用的也就kafka-topic.sh、kafka-console-consumer.sh、kafka-console.producer.sh这三个命令,分别对应topic管理、消费、生产三个操作。
1. 创建topic
对于Kafka,数据都是存放在topic中的,所以我们需要创建topic,在创建之前可以看一下Kafka集群是否有topic。
# 查看topic
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建topic
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic aqi_test --partitions 10 --replication-factor 1
创建一个名为aqi_test的topic,分区设置为10,副本设置为1。
因为只有一个kafka broker节点,刚开始我将副本设置为2的时候,就会报错。
2. 生产数据
我们使用kafka-console-producer向aqi_test这个topic中生产数据。
kafka-console-producer.sh --topic aqi_test --bootstrap-server localhost:9092
这个命令行会启动一个交互会话,我们输入一行数据之后,通过回车换行写入topic中。
如图,我们一共写入了四条数据。
3. 消费数据
使用kafka-console-consume消费topic中生产者写入的数据。
kafka-console-consumer.sh --topic aqi_test --bootstrap-server localhost:9092 --from-beginning
在Kafka中的消费机制中,在消费时没有指定特殊的配置的话,consumer只能消费到最新的数据,换句话说,就是consumer只能消费consumer启动后的数据,如果想要消费历史数据,
就要使用from-beginning,或者在消费者配置中指定earliest消费策略。
如图,我们消费到了之前写入的四条数据。
结语
这就是我使用docker在云服务器上搭建单节点Kafka的过程。但是因为是云服务器,需要使用弹性公网IP进行访问,而官方镜像中的advertised.listeners对外广播的地址是
localhost,所以我在我的笔记本上是访问不到这个Kafka数据的。
按理说,通过docker exec进入Kafka容器,修改server.properties中的advertised.listeners就可以了,但是这个文件设置了只读权限,无法修改。所以,下一篇文章就主要解决这个外网无法访问的问题。
更多推荐
所有评论(0)