7-监控:kafka与elfk
kafka是消息队列,负责暂时存放数据,例如a—>b发送消息,但是b没有上线,先把数据放在消息队列中,b上线后去消息队列取。
kafka概念
kafka是消息队列,负责暂时存放数据,例如a—>b发送消息,但是b没有上线,先把数据放在消息队列中,b上线后去消息队列取。
kafka单点部署
1. 启动zookeeper集群
[root@elk91 ~]# zkServer.sh start
[root@elk92 ~]# zkServer.sh start
[root@elk93 ~]# zkServer.sh start
2. 下载kafka软件包
wget https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
3. 解压软件包
[root@elk91 ~]# tar xf kafka_2.13-3.9.1.tgz -C /usr/local/
4. 配置环境变量
[root@elk91 ~]# cat /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/usr/local/kafka_2.13-3.9.1
export PATH=$PATH:$KAFKA_HOME/bin
[root@elk91 ~]# source /etc/profile.d/kafka.sh
5. 修改kafka的配置文件
[root@elk91 ~]# vim /usr/local/kafka_2.13-3.9.1/config/server.properties
...
# 指定kafka的ID,集群模式模式中该ID必须唯一的整数。
broker.id=91
# 指的kafka的数据目录
log.dirs=/var/lib/kafka
# 指定kafka的元数据信息在zookeeper集群的znode存储路径。
zookeeper.connect=10.0.0.91:2181,10.0.0.92:2181,10.0.0.93:2181/oldboyedu-linux99-kafka39
6. 启动kafka
[root@elk91 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk91 ~]#
[root@elk91 ~]# ss -ntl | grep 9092
LISTEN 0 50 *:9092 *:*
[root@elk91 ~]#
7. zookeeper验证测试
[root@elk92 ~]# zkCli.sh
...
[zk: localhost:2181(CONNECTED) 5] ls /
[oldboyedu-linux99-kafka39, zookeeper]
[zk: localhost:2181(CONNECTED) 6] ls /oldboyedu-linux99-kafka39
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]
[zk: localhost:2181(CONNECTED) 7]
[zk: localhost:2181(CONNECTED) 7] ls /oldboyedu-linux99-kafka39/brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 8]
[zk: localhost:2181(CONNECTED) 8]
[zk: localhost:2181(CONNECTED) 8] ls /oldboyedu-linux99-kafka39/brokers/ids
[91]
[zk: localhost:2181(CONNECTED) 9] get /oldboyedu-linux99-kafka39/brokers/ids/91
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://elk91:9092"],"jmx_port":-1,"port":9092,"host":"elk91","version":5,"timestamp":"1756948482637"}
kafka集群部署
1. 拷贝程序
[root@elk91 ~]# scp -r /usr/local/kafka_2.13-3.9.1/ 10.0.0.92:/usr/local/
[root@elk91 ~]# scp -r /usr/local/kafka_2.13-3.9.1/ 10.0.0.93:/usr/local/
2. 拷贝环境变量
[root@elk91 ~]# scp /etc/profile.d/kafka.sh 10.0.0.92:/etc/profile.d/
[root@elk91 ~]# scp /etc/profile.d/kafka.sh 10.0.0.93:/etc/profile.d/
3. 修改elk92节点kafka的配置文件
[root@elk92 ~]# sed -i '/^broker.id/s#91#92#' /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk92 ~]# grep ^broker.id /usr/local/kafka_2.13-3.9.1/config/server.properties
broker.id=92
4. 启动elk92节点kafka
[root@elk92 ~]# source /etc/profile.d/kafka.sh
[root@elk92 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk92 ~]# ss -ntl | grep 9092
LISTEN 0 50 *:9092 *:*
5. 修改elk93节点kafka的配置文件
[root@elk93 ~]# sed -i '/^broker.id/s#91#93#' /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk93 ~]# grep ^broker.id /usr/local/kafka_2.13-3.9.1/config/server.properties
broker.id=93
6. 启动elk93节点kafka
[root@elk93 ~]# source /etc/profile.d/kafka.sh
[root@elk93 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@elk93 ~]# ss -ntl | grep 9092
LISTEN 0 50 *:9092 *:*
7. zookeeper验证测试
[root@elk93 ~]# zkCli.sh -server 10.0.0.91:2181 ls /oldboyedu-linux99-kafka39/brokers/ids | egrep "^\["
[91, 92, 93]
故障排查1:它的id改错了
[root@elk91 ~]# kafka-server-start.sh /usr/local/kafka_2.13-3.9.1/config/server.properties # 去掉"-daemon"可以前台查看日志。
...
[2025-09-04 02:04:45,536] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,735] INFO Cluster ID = yNEkP2FsQCOfJcET-nZPGw (kafka.server.KafkaServer)
[2025-09-04 02:04:45,742] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.RuntimeException: Invalid cluster.id in: /var/lib/kafka/meta.properties. Expected yNEkP2FsQCOfJcET-nZPGw, but read klgG1THDRPeMmFhZh9Jx5Q
at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:503)
at kafka.server.KafkaServer.startup(KafkaServer.scala:258)
at kafka.Kafka$.main(Kafka.scala:112)
at kafka.Kafka.main(Kafka.scala)
[2025-09-04 02:04:45,744] INFO shutting down (kafka.server.KafkaServer)
[2025-09-04 02:04:45,748] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,859] INFO EventThread shut down for session: 0xb00003683e8000d (org.apache.zookeeper.ClientCnxn)
[2025-09-04 02:04:45,859] INFO Session: 0xb00003683e8000d closed (org.apache.zookeeper.ZooKeeper)
[2025-09-04 02:04:45,860] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
[2025-09-04 02:04:45,872] INFO App info kafka.server for 91 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2025-09-04 02:04:45,874] INFO shut down completed (kafka.server.KafkaServer)
[2025-09-04 02:04:45,874] ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$)
java.lang.RuntimeException: Invalid cluster.id in: /var/lib/kafka/meta.properties. Expected yNEkP2FsQCOfJcET-nZPGw, but read klgG1THDRPeMmFhZh9Jx5Q
at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:503)
at kafka.server.KafkaServer.startup(KafkaServer.scala:258)
at kafka.Kafka$.main(Kafka.scala:112)
at kafka.Kafka.main(Kafka.scala)
[2025-09-04 02:04:45,878] INFO shutting down (kafka.server.KafkaServer)
[root@elk91-10.0.0.91 ~]#
问题原因:
[root@elk91-10.0.0.91 ~]# cat /var/lib/kafka/meta.properties
#
#Thu Sep 04 02:07:44 UTC 2025
broker.id=91
cluster.id=yNEkP2FsQCOfJcET-nZPGw
version=0
解决方案:
删除本地的kafka数据目录,重新启动即可。
故障排查2: 错误表明 Kafka 控制器(Controller)无法解析主机名 elk92(老师故意留下的坑,每个人都做)
[root@elk91 ~]# tail -100f /usr/local/kafka_2.13-3.9.1/logs/server.log # 如果是后台运行的则可以来这里查看日志。
...
[2025-09-04 10:11:01,834] WARN [Controller id=91, targetBrokerId=92] Error connecting to node elk92:9092 (id: 92 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: elk92
at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:998)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1806)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1676)
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:125)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.resolveAddresses(ClusterConnectionStates.java:536)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1075)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:321)
at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:299)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252)
at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
解决方案:
方案一:
所有kakfa节点添加hosts解析记录。
值得注意的是,将来客户端要访问集群时,也需要指定解析记录。
方案二:
修改配置文件来解决问题。
- 修改所有节点的配置文件
[root@elk91 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.91:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk91 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties
listeners=PLAINTEXT://10.0.0.91:9092
[root@elk92 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.92:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk92 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties
listeners=PLAINTEXT://10.0.0.92:9092
[root@elk93 ~]# sed -ir "/^#listeners=/s@#listeners=PLAINTEXT://:9092@listeners=PLAINTEXT://10.0.0.93:9092@" /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk93 ~]# grep "^listeners=" /usr/local/kafka_2.13-3.9.1/config/server.properties
listeners=PLAINTEXT://10.0.0.93:9092
- 停止所有节点服务
[root@elk91 ~]# kafka-server-stop.sh
[root@elk91 ~]#
[root@elk92 ~]# kafka-server-stop.sh
[root@elk92 ~]#
[root@elk93 ~]# kafka-server-stop.sh
[root@elk93 ~]#
- 启动服务
[root@elk91 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk91 ~]#
[root@elk91 ~]# ss -ntl | grep 9092
LISTEN 0 50 [::ffff:10.0.0.91]:9092 *:*
[root@elk91 ~]#
[root@elk92 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk92 ~]#
[root@elk92 ~]# ss -ntl | grep 9092
LISTEN 0 50 [::ffff:10.0.0.92]:9092 *:*
[root@elk92 ~]#
[root@elk93 ~]# kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.9.1/config/server.properties
[root@elk93 ~]# ss -ntl | grep 9092
LISTEN 0 50 [::ffff:10.0.0.93]:9092 *:*
[root@elk93 ~]#
kafka的相关术语
topics
主题,客户端(生产者,消费者)进行数据的读写单元。
producer
生产者,往kafka集群写入数据。
consumer
消费者,从kafka集群读取数据。
partition
分区,topics的逻辑概念,将数据分散存储多份。
replica
副本,每个paritition都最少要有一个副本,是数据的实际载体。
说白了,就是实际存储数据的载体。
kafka常用的topic管理脚本
1. 查看topic列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list 此时为空
2. 创建topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --create
Created topic oldboyedu-linux99.
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-linux99
3. 创建topic指定分区和副本
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --create --partitions 3 --replication-factor 2 创建分区
Created topic oldboyedu-xixi.
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-linux99
oldboyedu-xixi
4. 查看topic的详细信息
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe
Topic: oldboyedu-linux99 TopicId: mWxH0S20RzSk8FYCta_7wQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: oldboyedu-linux99 Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-xixi TopicId: pnGyqO4GSK-i2VtJBrEl8w PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: oldboyedu-xixi Partition: 0 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A 表示93是主副本,91是备份副本
Topic: oldboyedu-xixi Partition: 1 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-xixi Partition: 2 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-xixi
Topic: oldboyedu-xixi TopicId: pnGyqO4GSK-i2VtJBrEl8w PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: oldboyedu-xixi Partition: 0 Leader: 93 Replicas: 93,91 Isr: 93,91 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-xixi Partition: 1 Leader: 91 Replicas: 91,92 Isr: 91,92 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-xixi Partition: 2 Leader: 92 Replicas: 92,93 Isr: 92,93 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99 TopicId: mWxH0S20RzSk8FYCta_7wQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: oldboyedu-linux99 Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
5. 修改topic的分区数
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99 TopicId: mWxH0S20RzSk8FYCta_7wQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: oldboyedu-linux99 Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --partitions 5 --alter 修改成五个分区。只能调大
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --describe --topic oldboyedu-linux99
Topic: oldboyedu-linux99 TopicId: mWxH0S20RzSk8FYCta_7wQ PartitionCount: 5 ReplicationFactor: 1 Configs:
Topic: oldboyedu-linux99 Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-linux99 Partition: 1 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-linux99 Partition: 2 Leader: 93 Replicas: 93 Isr: 93 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-linux99 Partition: 3 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: oldboyedu-linux99 Partition: 4 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --partitions 3 --alter
Error while executing topic command : Topic currently has 5 partitions, which is higher than the requested 3.
[2025-09-04 10:47:19,059] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 3.
(org.apache.kafka.tools.TopicCommand)
6. 删除topic
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-linux99
oldboyedu-xixi
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-linux99 --delete
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
oldboyedu-xixi
kafka生产者和消费者管理脚本(记住生产者负责写,消费者负责读)
1. 启动生产者
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi
>www.oldboyedu.com
>学IT来老男孩,月薪过万不是梦~
>周进喜欢洗脚按摩
2. 启动消费者
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi
周进喜欢洗脚按摩
3. 从头消费数据(从头开始读)
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --from-beginning
www.oldboyedu.com
学IT来老男孩,月薪过万不是梦~
周进喜欢洗脚按摩
4. 查看topic列表
[root@elk91 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --list
__consumer_offsets
oldboyedu-xixi
温馨提示:
注意,这里的"__consumer_offsets"是kafka集群内置的topic,用来存储消费者消费数据的offset(偏移量)。
该"__consumer_offsets"不要删除哟~
此时有个问题?如果写快读慢怎么办,那么多增加几个消费者,并且这些消费者必须隶属于对应的消费者组,此时可以记录它们读的记录点,以防有消费者挂掉了可以继续再接着读。
同一个消费组里面的消费者,不能同时读一个分区,得读不同的分区。
kafka的消费者组管理
1. 查看消费者组列表
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --list
console-consumer-97820
console-consumer-8471
2. 启动消费者并指定消费者组
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi --from-beginning --group linux99
www.oldboyedu.com
学IT来老男孩,月薪过万不是梦~
周进喜欢洗脚按摩
3. 查看消费者组详细信息
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP TOPIC PARTITION(分区) CURRENT-OFFSET(最后数据点) LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
linux99 oldboyedu-xixi 2 3 3 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
linux99 oldboyedu-xixi 0 0 0 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
linux99 oldboyedu-xixi 1 0 0 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
4. 多词启动生产者测试【启动一个可能看不出效果】
[root@elk91 ~]# kafka-console-producer.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-xixi
>11111111111111111111
>22222222222222222222
>33333333333333333
>444444444444444444
>5555555555555555555555
5. 再次查看消费者组详细信息【很明显,启动多个生产者后,发现每个分区都写入了数据】是随机写入数据到各个分区的
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
linux99 oldboyedu-xixi 2 10 10 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
linux99 oldboyedu-xixi 0 2 2 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
linux99 oldboyedu-xixi 1 5 5 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
[root@elk91 ~]#
6. 再次启动消费者并指定消费者组【发现拿不到数据】因为此时多了一个消费者,在哪个节点运行都可以,都会生成一个新的消费者,重新分区
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.91:9092 --topic oldboyedu-xixi --from-beginning --group linux99
7. 但是查看消费者信息,发现分区被重新分配了【消费者组的消费者重新分配分区的过程我们称之为:rebalancer(重平衡)】
分区0和分区1被分配给了新消费者(运行在 elk92 上,CONSUMER-ID 变了) 分区2留给了原来的消费者(运行在 elk93 上),此时再写入数据,就会随机给不同分区。
[root@elk91 ~]# kafka-consumer-groups.sh --bootstrap-server 10.0.0.93:9092 --describe --group linux99 ; echo
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
linux99 oldboyedu-xixi 0 2 2 0 console-consumer-895cd90a-724a-41a7-b093-636aaa25c19f /10.0.0.92 console-consumer
linux99 oldboyedu-xixi 1 5 5 0 console-consumer-895cd90a-724a-41a7-b093-636aaa25c19f /10.0.0.92 console-consumer
linux99 oldboyedu-xixi 2 10 10 0 console-consumer-b5ea00f0-7ab5-4744-9b60-6a51b3cd3e02 /10.0.0.93 console-consumer
具体流程如下,此时有两个消费者。
ELFK对接kafka
filebeat对接kafka集群:filebeat把数据写到kafka进行消息队列的存储
1. 创建topic
[root@elk92 ~]# kafka-topics.sh --bootstrap-server 10.0.0.93:9092 --topic oldboyedu-elasticstack --partitions 3 --replication-factor 2 --create
Created topic oldboyedu-elasticstack.
2. 生成测试数据
[root@elk92 ~]# python3 generate_log.py /tmp/apps.log
3. filebeat写入数据到kafka集群
[root@elk92 ~]# cat /etc/filebeat/config/14-filestream-to-kafka.yaml
filebeat.inputs:
- type: filestream
paths:
- /tmp/apps.log
output.kafka:
hosts:
- 10.0.0.91:9092
- 10.0.0.92:9092
- 10.0.0.93:9092
topic: "oldboyedu-elasticstack"
[root@elk92 ~]# filebeat -e -c /etc/filebeat/config/14-filestream-to-kafka.yaml
4. 查看kafka数据
[root@elk93 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.92:9092 --topic oldboyedu-elasticstack --group linux98-001 --from-beginning
logstsh对接kafka集群:从kafka拿到数据处理给ES集群然后出图展示
1. kibana创建api-key
假设我拿到的api-key为:JmNgF5kBubfXesjsCGYt:2vNBd_HvQq-5PnFjVUoHhA
2. Logstash从kafka拉取数据写入ES集群
[root@elk93 ~]# cat /etc/logstash/conf.d/10-kafka-to-es.conf
input {
kafka {
bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"
group_id => "logstash-linux99-001"
topics => ["oldboyedu-elasticstack"]
auto_offset_reset => "earliest"
}
}
filter {
json {
source => "message"
}
mutate {
split => { "message" => "|" }
add_field => {
"other" => "%{[message][0]}"
"userId" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
mutate {
split => { "other" => " " }
add_field => {
"dt" => "%{[other][1]} %{[other][2]}"
}
convert => {
"price" => "float"
"userId" => "integer"
}
remove_field => [ "@version", "input","ecs","log","tags","agent","host","message","other"]
}
date {
match => [ "dt", "yyyy-MM-dd HH:mm:ss" ]
}
}
output {
elasticsearch {
hosts => ["https://10.0.0.91:9200","https://10.0.0.92:9200","https://10.0.0.93:9200"]
index => "oldboyedu-logstash-kafka-apps-%{+YYYY.MM.dd}"
api_key => "GdnTE5kB1hiWm8KzQUOJ:e71bcej0QumGGMtnD1b4DA"
ssl => true
ssl_certificate_verification => false
}
}
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/10-kafka-to-es.conf
3. kibana出图展示
JDK版本冲突导致Unknown garbage collector name解决方案(可忽略)
[root@elk93 ~]# java --version
openjdk 22.0.2 2024-07-16
OpenJDK Runtime Environment (build 22.0.2+9-70)
OpenJDK 64-Bit Server VM (build 22.0.2+9-70, mixed mode, sharing)
[root@elk93 ~]# /usr/share/logstash/jdk/bin/java --version
openjdk 11.0.26 2025-01-21
OpenJDK Runtime Environment Temurin-11.0.26+4 (build 11.0.26+4)
OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (build 11.0.26+4, mixed mode)
[root@elk93 ~]# export JAVA_HOME=/usr/share/logstash/jdk/
[root@elk93 ~]# echo $JAVA_HOME
更多推荐
所有评论(0)