kafka概念

kafka是消息队列,负责暂时存放数据,例如a—>b发送消息,但是b没有上线,先把数据放在消息队列中,b上线后去消息队列取。

kafka单点部署

1. 启动zookeeper集群

[root@elk91 ~]# zkServer.sh start
[root@elk92 ~]# zkServer.sh start
[root@elk93 ~]# zkServer.sh start

2. 下载kafka软件包

wget https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz

3. 解压软件包

[root@elk91 ~]# tar xf kafka_2.13-3.9.1.tgz -C /usr/local/

4. 配置环境变量

[root@elk91 ~]# cat /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/usr/local/kafka_2.13-3.9.1
export PATH=$PATH:$KAFKA_HOME/bin

[root@elk91 ~]# source /etc/profile.d/kafka.sh

5. 修改kafka的配置文件

[root@elk91 ~]# vim /usr/local/kafka_2.13-3.9.1/config/server.properties 
...
# 指定kafka的ID,集群模式模式中该ID必须唯一的整数。
broker.id=91
# 指的kafka的数据目录
log.dirs=/var/lib/kafka     
# 指定kafka的元数据信息在zookeeper集群的znode存储路径。
zookeeper.connect=10.0.0.91:2181,10.0.0.92:2181,10.0.0.93:2181/oldboyedu-linux99-kafka39

6. 启动kafka

[root@elk91 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk91 ~]# 
[root@elk91 ~]# ss -ntl | grep 9092
LISTEN 0      50                      *:9092             *:*          
[root@elk91 ~]# 

7. zookeeper验证测试

[root@elk92 ~]# zkCli.sh 
...
[zk: localhost:2181(CONNECTED) 5] ls /
[oldboyedu-linux99-kafka39, zookeeper]
[zk: localhost:2181(CONNECTED) 6] ls /oldboyedu-linux99-kafka39
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
[zk: localhost:2181(CONNECTED) 7] 
[zk: localhost:2181(CONNECTED) 7] ls /oldboyedu-linux99-kafka39/brokers 
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 8] 
[zk: localhost:2181(CONNECTED) 8] 
[zk: localhost:2181(CONNECTED) 8] ls /oldboyedu-linux99-kafka39/brokers/ids 
[91]
[zk: localhost:2181(CONNECTED) 9] get /oldboyedu-linux99-kafka39/brokers/ids/91 
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://elk91:9092"],"jmx_port":-1,"port":9092,"host":"elk91","version":5,"timestamp":"1756948482637"}

kafka集群部署

1. 拷贝程序

[root@elk91 ~]# scp -r /usr/local/kafka_2.13-3.9.1/ 10.0.0.92:/usr/local/
[root@elk91 ~]# scp -r /usr/local/kafka_2.13-3.9.1/ 10.0.0.93:/usr/local/

2. 拷贝环境变量

[root@elk91 ~]# scp /etc/profile.d/kafka.sh 10.0.0.92:/etc/profile.d/
[root@elk91 ~]# scp /etc/profile.d/kafka.sh 10.0.0.93:/etc/profile.d/

3. 修改elk92节点kafka的配置文件

[root@elk92 ~]# sed -i '/^broker.id/s#91#92#' /usr/local/kafka_2.13-3.9.1/config/server.properties 

[root@elk92 ~]# grep ^broker.id /usr/local/kafka_2.13-3.9.1/config/server.properties 
broker.id=92

4. 启动elk92节点kafka

[root@elk92 ~]# source /etc/profile.d/kafka.sh 
[root@elk92 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
[root@elk92 ~]# ss -ntl | grep 9092
LISTEN 0      50                      *:9092             *:*          

5. 修改elk93节点kafka的配置文件

[root@elk93 ~]# sed -i '/^broker.id/s#91#93#' /usr/local/kafka_2.13-3.9.1/config/server.properties 
[root@elk93 ~]# grep ^broker.id /usr/local/kafka_2.13-3.9.1/config/server.properties
broker.id=93

6. 启动elk93节点kafka

[root@elk93 ~]# source /etc/profile.d/kafka.sh 
[root@elk93 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
[root@elk93 ~]# ss -ntl | grep 9092
LISTEN 0      50                      *:9092             *:*          

7. zookeeper验证测试

[root@elk93 ~]# zkCli.sh -server 10.0.0.91:2181 ls /oldboyedu-linux99-kafka39/brokers/ids | egrep "^\["
[91, 92, 93]

故障排查1:它的id改错了

[root@elk91 ~]# kafka-server-start.sh /usr/local/kafka_2.13-3.9.1/config/server.properties  # 去掉"-daemon"可以前台查看日志。
...
[2025-09-04 02:04:45,536] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,735] INFO Cluster ID = yNEkP2FsQCOfJcET-nZPGw (kafka.server.KafkaServer)
[2025-09-04 02:04:45,742] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.RuntimeException: Invalid cluster.id in: /var/lib/kafka/meta.properties. Expected yNEkP2FsQCOfJcET-nZPGw, but read klgG1THDRPeMmFhZh9Jx5Q
    at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:503)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:258)
    at kafka.Kafka$.main(Kafka.scala:112)
    at kafka.Kafka.main(Kafka.scala)
[2025-09-04 02:04:45,744] INFO shutting down (kafka.server.KafkaServer)
[2025-09-04 02:04:45,748] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,859] INFO EventThread shut down for session: 0xb00003683e8000d (org.apache.zookeeper.ClientCnxn)
[2025-09-04 02:04:45,859] INFO Session: 0xb00003683e8000d closed (org.apache.zookeeper.ZooKeeper)
[2025-09-04 02:04:45,860] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,872] INFO App info kafka.server for 91 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2025-09-04 02:04:45,874] INFO shut down completed (kafka.server.KafkaServer)
[2025-09-04 02:04:45,874] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
java.lang.RuntimeException: Invalid cluster.id in: /var/lib/kafka/meta.properties. Expected yNEkP2FsQCOfJcET-nZPGw, but read klgG1THDRPeMmFhZh9Jx5Q
    at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:503)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:258)
    at kafka.Kafka$.main(Kafka.scala:112)
    at kafka.Kafka.main(Kafka.scala)
[2025-09-04 02:04:45,878] INFO shutting down (kafka.server.KafkaServer)
[root@elk91-10.0.0.91 ~]#

问题原因:

[root@elk91-10.0.0.91 ~]# cat /var/lib/kafka/meta.properties 
#
#Thu Sep 04 02:07:44 UTC 2025
broker.id=91
cluster.id=yNEkP2FsQCOfJcET-nZPGw
version=0

解决方案:
删除本地的kafka数据目录,重新启动即可。

故障排查2: 错误表明 Kafka 控制器(Controller)无法解析主机名 elk92(老师故意留下的坑,每个人都做)

[root@elk91 ~]# tail -100f /usr/local/kafka_2.13-3.9.1/logs/server.log  # 如果是后台运行的则可以来这里查看日志。
...
[2025-09-04 10:11:01,834] WARN [Controller id=91, targetBrokerId=92] Error connecting to node elk92:9092 (id: 92 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: elk92
    at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:998)
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1806)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1676)
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125)
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536)
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511)
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321)
    at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:299)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252)
    at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)

解决方案:
方案一:
所有kakfa节点添加hosts解析记录。

值得注意的是,将来客户端要访问集群时,也需要指定解析记录。

方案二:
修改配置文件来解决问题。

  1. 修改所有节点的配置文件
[root@elk91 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.91:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties 

[root@elk91 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties 
listeners=PLAINTEXT://10.0.0.91:9092

[root@elk92 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.92:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties 

[root@elk92 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties 
listeners=PLAINTEXT://10.0.0.92:9092

[root@elk93 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.93:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties

[root@elk93 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties 
listeners=PLAINTEXT://10.0.0.93:9092
  1. 停止所有节点服务
[root@elk91 ~]# kafka-server-stop.sh 
[root@elk91 ~]# 

[root@elk92 ~]# kafka-server-stop.sh 
[root@elk92 ~]# 

[root@elk93 ~]# kafka-server-stop.sh 
[root@elk93 ~]# 
  1. 启动服务
[root@elk91 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties 
[root@elk91 ~]#
[root@elk91 ~]# ss -ntl | grep 9092
LISTEN 0      50     [::ffff:10.0.0.91]:9092             *:*          
[root@elk91 ~]# 

[root@elk92 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties 
[root@elk92 ~]# 
[root@elk92 ~]# ss -ntl | grep 9092
LISTEN 0      50     [::ffff:10.0.0.92]:9092             *:*          
[root@elk92 ~]# 

[root@elk93 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties 
[root@elk93 ~]# ss -ntl | grep 9092
LISTEN 0      50     [::ffff:10.0.0.93]:9092             *:*          
[root@elk93 ~]# 

kafka的相关术语在这里插入图片描述

topics
主题,客户端(生产者,消费者)进行数据的读写单元。

producer
生产者,往kafka集群写入数据。

consumer
消费者,从kafka集群读取数据。

partition
分区,topics的逻辑概念,将数据分散存储多份。

replica
副本,每个paritition都最少要有一个副本,是数据的实际载体。
说白了,就是实际存储数据的载体。

kafka常用的topic管理脚本

1. 查看topic列表

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list   此时为空

2. 创建topic

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --create
Created topic oldboyedu-linux99.

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-linux99

3. 创建topic指定分区和副本

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --create --partitions 3 --replication-factor 2  创建分区
Created topic oldboyedu-xixi.

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list  
oldboyedu-linux99
oldboyedu-xixi

4. 查看topic的详细信息

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe
Topic: oldboyedu-linux99	TopicId: mWxH0S20RzSk8FYCta_7wQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux99	Partition: 0	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A
Topic: oldboyedu-xixi	TopicId: pnGyqO4GSK-i2VtJBrEl8w	PartitionCount: 3	ReplicationFactor: 2	Configs: 
	Topic: oldboyedu-xixi	Partition: 0	Leader: 93	Replicas: 93,91	Isr: 93,91	Elr: N/A	LastKnownElr: N/A  表示93是主副本,91是备份副本
	Topic: oldboyedu-xixi	Partition: 1	Leader: 91	Replicas: 91,92	Isr: 91,92	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-xixi	Partition: 2	Leader: 92	Replicas: 92,93	Isr: 92,93	Elr: N/A	LastKnownElr: N/A

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-xixi
Topic: oldboyedu-xixi	TopicId: pnGyqO4GSK-i2VtJBrEl8w	PartitionCount: 3	ReplicationFactor: 2	Configs: 
	Topic: oldboyedu-xixi	Partition: 0	Leader: 93	Replicas: 93,91	Isr: 93,91	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-xixi	Partition: 1	Leader: 91	Replicas: 91,92	Isr: 91,92	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-xixi	Partition: 2	Leader: 92	Replicas: 92,93	Isr: 92,93	Elr: N/A	LastKnownElr: N/A

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99	TopicId: mWxH0S20RzSk8FYCta_7wQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux99	Partition: 0	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A

5. 修改topic的分区数

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99	TopicId: mWxH0S20RzSk8FYCta_7wQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux99	Partition: 0	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --partitions 5 --alter  修改成五个分区。只能调大

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99	TopicId: mWxH0S20RzSk8FYCta_7wQ	PartitionCount: 5	ReplicationFactor: 1	Configs: 
	Topic: oldboyedu-linux99	Partition: 0	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-linux99	Partition: 1	Leader: 92	Replicas: 92	Isr: 92	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-linux99	Partition: 2	Leader: 93	Replicas: 93	Isr: 93	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-linux99	Partition: 3	Leader: 91	Replicas: 91	Isr: 91	Elr: N/A	LastKnownElr: N/A
	Topic: oldboyedu-linux99	Partition: 4	Leader: 92	Replicas: 92	Isr: 92	Elr: N/A	LastKnownElr: N/A

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --partitions 3 --alter
Error while executing topic command : Topic currently has 5 partitions, which is higher than the requested 3.
[2025-09-04 10:47:19,059] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 3.
 (org.apache.kafka.tools.TopicCommand)

6. 删除topic

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-linux99
oldboyedu-xixi

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --delete

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-xixi

kafka生产者和消费者管理脚本(记住生产者负责写,消费者负责读)

1. 启动生产者

[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi 
>www.oldboyedu.com
>学IT来老男孩,月薪过万不是梦~
>周进喜欢洗脚按摩

2. 启动消费者

[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi
周进喜欢洗脚按摩

3. 从头消费数据(从头开始读)

[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --from-beginning 
www.oldboyedu.com
学IT来老男孩,月薪过万不是梦~
周进喜欢洗脚按摩

4. 查看topic列表

[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
__consumer_offsets
oldboyedu-xixi

温馨提示:
注意,这里的"__consumer_offsets"是kafka集群内置的topic,用来存储消费者消费数据的offset(偏移量)。
该"__consumer_offsets"不要删除哟~
在这里插入图片描述

此时有个问题?如果写快读慢怎么办,那么多增加几个消费者,并且这些消费者必须隶属于对应的消费者组,此时可以记录它们读的记录点,以防有消费者挂掉了可以继续再接着读。
同一个消费组里面的消费者,不能同时读一个分区,得读不同的分区。

kafka的消费者组管理

1. 查看消费者组列表

[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --list
console-consumer-97820
console-consumer-8471

2. 启动消费者并指定消费者组

[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --from-beginning --group linux99
www.oldboyedu.com
学IT来老男孩,月薪过万不是梦~
周进喜欢洗脚按摩

3. 查看消费者组详细信息

[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP           TOPIC           PARTITION(分区)  CURRENT-OFFSET(最后数据点)  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux99         oldboyedu-xixi  2          3               3               0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer
linux99         oldboyedu-xixi  0          0               0               0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer
linux99         oldboyedu-xixi  1          0               0               0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer

4. 多词启动生产者测试【启动一个可能看不出效果】

[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi 
>11111111111111111111
>22222222222222222222
>33333333333333333
>444444444444444444
>5555555555555555555555

5. 再次查看消费者组详细信息【很明显,启动多个生产者后,发现每个分区都写入了数据】是随机写入数据到各个分区的

[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux99         oldboyedu-xixi  2          10              10              0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer
linux99         oldboyedu-xixi  0          2               2               0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer
linux99         oldboyedu-xixi  1          5               5               0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer
[root@elk91 ~]# 

6. 再次启动消费者并指定消费者组【发现拿不到数据】因为此时多了一个消费者,在哪个节点运行都可以,都会生成一个新的消费者,重新分区

[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic oldboyedu-xixi --from-beginning --group linux99

7. 但是查看消费者信息,发现分区被重新分配了【消费者组的消费者重新分配分区的过程我们称之为:rebalancer(重平衡)】

分区0和分区1被分配给了新消费者(运行在 elk92 上,CONSUMER-ID 变了) 分区2留给了原来的消费者(运行在 elk93 上),此时再写入数据,就会随机给不同分区。

[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
linux99         oldboyedu-xixi  0          2               2               0               console-consumer-895cd90a-724a-41a7-b093-636aaa25c19f /10.0.0.92      console-consumer
linux99         oldboyedu-xixi  1          5               5               0               console-consumer-895cd90a-724a-41a7-b093-636aaa25c19f /10.0.0.92      console-consumer
linux99         oldboyedu-xixi  2          10              10              0               console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93      console-consumer

具体流程如下,此时有两个消费者。

ELFK对接kafka

filebeat对接kafka集群:filebeat把数据写到kafka进行消息队列的存储

1. 创建topic

[root@elk92 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-elasticstack --partitions 3 --replication-factor 2 --create
Created topic oldboyedu-elasticstack.

2. 生成测试数据

[root@elk92 ~]# python3 generate_log.py /tmp/apps.log

3. filebeat写入数据到kafka集群

[root@elk92 ~]# cat /etc/filebeat/config/14-filestream-to-kafka.yaml
filebeat.inputs:
- type: filestream
  paths:
    - /tmp/apps.log
output.kafka:
  hosts: 
  - 10.0.0.91:9092
  - 10.0.0.92:9092
  - 10.0.0.93:9092
  topic: "oldboyedu-elasticstack"

[root@elk92 ~]# filebeat -e -c /etc/filebeat/config/14-filestream-to-kafka.yaml

4. 查看kafka数据

[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic oldboyedu-elasticstack --group linux98-001 --from-beginning

logstsh对接kafka集群:从kafka拿到数据处理给ES集群然后出图展示

1. kibana创建api-key

假设我拿到的api-key为:JmNgF5kBubfXesjsCGYt:2vNBd_HvQq-5PnFjVUoHhA

2. Logstash从kafka拉取数据写入ES集群

[root@elk93 ~]# cat /etc/logstash/conf.d/10-kafka-to-es.conf 
input { 
  kafka {
     bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
     group_id => "logstash-linux99-001"
     topics => ["oldboyedu-elasticstack"]
     auto_offset_reset => "earliest"
  }
}  
filter {
  json {
    source => "message"
  }
  mutate {
     split => { "message" => "|" }
     add_field => { 
       "other" => "%{[message][0]}"
       "userId" => "%{[message][1]}"
       "action" => "%{[message][2]}"
       "svip" => "%{[message][3]}"
       "price" => "%{[message][4]}"
     }
  }
  mutate {
     split => { "other" => " " }
     add_field => { 
       "dt" => "%{[other][1]} %{[other][2]}"
     }
     convert => {
          "price" => "float"
          "userId" => "integer"
        }
     remove_field => [ "@version", "input","ecs","log","tags","agent","host","message","other"]
  }
  date {
    match => [ "dt", "yyyy-MM-dd HH:mm:ss" ]
  }
}
output { 
  elasticsearch {
    hosts => ["https://10.0.0.91:9200","https://10.0.0.92:9200","https://10.0.0.93:9200"]
    index => "oldboyedu-logstash-kafka-apps-%{+YYYY.MM.dd}"
    api_key => "GdnTE5kB1hiWm8KzQUOJ:e71bcej0QumGGMtnD1b4DA"
    ssl => true
    ssl_certificate_verification => false
  }
}

[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/10-kafka-to-es.conf 

3. kibana出图展示

JDK版本冲突导致Unknown garbage collector name解决方案(可忽略)

[root@elk93 ~]# java --version
openjdk 22.0.2 2024-07-16
OpenJDK Runtime Environment (build 22.0.2+9-70)
OpenJDK 64-Bit Server VM (build 22.0.2+9-70, mixed mode, sharing)

[root@elk93 ~]# /usr/share/logstash/jdk/bin/java --version
openjdk 11.0.26 2025-01-21
OpenJDK Runtime Environment Temurin-11.0.26+4 (build 11.0.26+4)
OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (build 11.0.26+4, mixed mode)

[root@elk93 ~]# export JAVA_HOME=/usr/share/logstash/jdk/

[root@elk93 ~]# echo $JAVA_HOME
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐