原文链接:
通过 docker-compose 部署 Kafka
docker-compose 快速部署 Zookeeper

部署Zookeeper集群

创建docker镜像网络环境

# 创建,注意不能使用hadoop_network,要不然启动hs2服务的时候会有问题!!!
docker network create hadoop-network
 
# 查看
docker network ls

构建Dockerfile

下载 Zookeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz --no-check-certificate

编写zk配置文件

mkdir conf data/{zookeeper-node1,zookeeper-node2,zookeeper-node3}/data -p
 
# zookeeper 主配置文件
cat >conf/zoo.cfg<<EOF
# tickTime:Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。tickTime以毫秒为单位。session最小有效时间为tickTime*2
tickTime=2000
# Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里。不要使用/tmp目录
dataDir=/opt/apache/zookeeper/data
# 端口,默认就是2181
clientPort=2181
# 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量),超过此数量没有回复会断开链接
initLimit=10
# 集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)
syncLimit=5
# 最大客户端链接数量,0不限制,默认是0
maxClientCnxns=60
# zookeeper集群配置项,server.1,server.2,server.3是zk集群节点;zookeeper-node1,zookeeper-node2,zookeeper-node3是主机名称;2888是主从通信端口;3888用来选举leader
server.1=zookeeper-node1:2888:3888
server.2=zookeeper-node2:2888:3888
server.3=zookeeper-node3:2888:3888
EOF
 
# 在刚创建好的zk data数据目录下面创建一个文件 myid
# 里面内容是server.N中的N,会通过挂载的方式添加
echo 1 > ./data/zookeeper-node1/data/myid
echo 2 > ./data/zookeeper-node2/data/myid
echo 3 > ./data/zookeeper-node3/data/myid

编写启动脚本 bootstrap.sh

#!/usr/bin/env sh
 
${ZOOKEEPER_HOME}/bin/zkServer.sh start
 
tail -f ${ZOOKEEPER_HOME}/logs/*.out

编写文件 Dockerfile

FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908
 
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
 
RUN export LANG=zh_CN.UTF-8
 
# 创建用户和用户组,跟yaml编排里的user: 10000:10000
RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m
 
# 安装sudo
RUN yum -y install sudo ; chmod 640 /etc/sudoers
 
# 给hadoop添加sudo权限
RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
 
RUN yum -y install install net-tools telnet wget nc less
 
RUN mkdir /opt/apache/
 
# 添加配置 JDK
#ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
#ENV JAVA_HOME /opt/apache/jdk1.8.0_212
#ENV PATH $JAVA_HOME/bin:$PATH
RUN yum -y install java-1.8.0-openjdk-1.8.0.402.b06-1.el7_9.x86_64 

# 添加配置 trino server
ENV ZOOKEEPER_VERSION 3.8.4
ADD apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz /opt/apache/
ENV ZOOKEEPER_HOME /opt/apache/zookeeper
RUN ln -s /opt/apache/apache-zookeeper-${ZOOKEEPER_VERSION}-bin $ZOOKEEPER_HOME
 
# 创建数据存储目录
RUN mkdir ${ZOOKEEPER_HOME}/data
# copy 配置文件
RUN cp ${ZOOKEEPER_HOME}/conf/zoo_sample.cfg ${ZOOKEEPER_HOME}/conf/zoo.cfg
# 这里的值会根据挂载的而修改
RUN echo 1 >${ZOOKEEPER_HOME}/data/myid
 
# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh
 
RUN chown -R hadoop:hadoop /opt/apache
 
WORKDIR $ZOOKEEPER_HOME

构建镜像

docker build -t registry.cn-hangzhou.aliyuncs.com/fazehan/zookeeper:3.8.4 . --no-cache

编写文件 docker-compose.yaml

version: '3'
services:
  zookeeper-node1:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/zookeeper:3.8.4
    user: "hadoop:hadoop"
    container_name: zookeeper-node1
    hostname: zookeeper-node1
    restart: always
    environment:
      - TZ=Asia/Shanghai
      - privileged=true
    env_file:
      - .env
    volumes:
      - ./conf/zoo.cfg:${ZOOKEEPER_HOME}/conf/zoo.cfg
      - ./data/zookeeper-node1/data/myid:${ZOOKEEPER_HOME}/data/myid
    ports:
      - "${ZOOKEEPER_NODE1_SERVER_PORT}:2181"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :2181 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  zookeeper-node2:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/zookeeper:3.8.4
    user: "hadoop:hadoop"
    container_name: zookeeper-node2
    hostname: zookeeper-node2
    restart: always
    environment:
      - TZ=Asia/Shanghai
      - privileged=true
    env_file:
      - .env
    volumes:
      - ./conf/zoo.cfg:${ZOOKEEPER_HOME}/conf/zoo.cfg
      - ./data/zookeeper-node2/data/myid:${ZOOKEEPER_HOME}/data/myid
    ports:
      - "${ZOOKEEPER_NODE2_SERVER_PORT}:2181"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :2181 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  zookeeper-node3:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/zookeeper:3.8.4
    user: "hadoop:hadoop"
    container_name: zookeeper-node3
    hostname: zookeeper-node3
    restart: always
    environment:
      - TZ=Asia/Shanghai
      - privileged=true
    env_file:
      - .env
    volumes:
      - ./conf/zoo.cfg:${ZOOKEEPER_HOME}/conf/zoo.cfg
      - ./data/zookeeper-node3/data/myid:${ZOOKEEPER_HOME}/data/myid
    ports:
      - "${ZOOKEEPER_NODE3_SERVER_PORT}:2181"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :2181 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
 
# 连接外部网络
networks:
   hadoop-network:
       external: true

编写环境变量文件 .ENV

# 对外暴露的端口
cat << EOF > .env
ZOOKEEPER_HOME=/opt/apache/zookeeper
ZOOKEEPER_NODE1_SERVER_PORT=31181
ZOOKEEPER_NODE2_SERVER_PORT=32181
ZOOKEEPER_NODE3_SERVER_PORT=33181
EOF

通过docker-compose启动部署

docker-compose -f docker-compose.yaml up -d
 
# 查看
docker-compose -f docker-compose.yaml ps

简单验证

# 检查节点
docker exec -it zookeeper-node1 bash
${ZOOKEEPER_HOME}/bin/zkServer.sh status
exit
docker exec -it zookeeper-node2 bash
${ZOOKEEPER_HOME}/bin/zkServer.sh status
exit
docker exec -it zookeeper-node3 bash
${ZOOKEEPER_HOME}/bin/zkServer.sh status

常用的 zookeeper 客户端命令

创建节点

# 随便登录一个容器节点
docker exec -it zookeeper-node1 bash
 
# 登录
${ZOOKEEPER_HOME}/bin/zkCli.sh -server zookeeper-node1:2181
 
# 【持久节点】数据节点创建后,一直存在,直到有删除操作主动清除,示例如下:
create /zk-node data
 
# 【持久顺序节点】节点一直存在,zk自动追加数字后缀做节点名,后缀上限 MAX(int),示例如下:
create -s /zk-node data
 
# 【临时节点】生命周期和会话相同,客户端会话失效,则临时节点被清除,示例如下:
create -e /zk-node-temp data
 
# 【临时顺序节点】临时节点+顺序节点后缀,示例如下:
create -s -e /zk-node-temp data

查看节点

# 随便登录一个容器节点
docker exec -it zookeeper-node1 bash
 
# 登录
${ZOOKEEPER_HOME}/bin/zkCli.sh -server zookeeper-node1:2181
 
# 列出zk执行节点的所有子节点,只能看到第一级子节点
ls /
# 获取zk指定节点数据内容和属性
get /zk-node

更新节点

# 表达式:set ${path} ${data} [version]
set /zk-node hello
get /zk-node

删除节点

# 对于包含子节点的节点,该命令无法成功删除,使用deleteall /zk-node
delete /zk-node
# 删除非空目录
deleteall /zk-node

非交互式命令

# 直接后面接上命令执行即可
${ZOOKEEPER_HOME}/bin/zkCli.sh -server zookeeper-node1:2181 ls /

部署Kafka集群

创建docker镜像网络环境

# 创建,注意不能使用hadoop_network,要不然启动hs2服务的时候会有问题!!!
docker network create hadoop-network
 
# 查看
docker network ls

构建Dockerfile

下载kafka

wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz --no-check-certificate

编写kafka配置文件

  • config/kafka-node1/server.properties
# 常见配置挂载目录
mkdir config/{kafka-node1,kafka-node2,kafka-node3} -p

# 配置
cat >config/kafka-node1/server.properties<<EOF
#broker的全局唯一编号,不能重复
broker.id=1
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
num.partitions=3
#副本,默认只有一个副本,不会进行数据备份和冗余
replication.factor=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node2/server.properties
cat >config/kafka-node2/server.properties<<EOF
#broker的全局唯一编号,不能重复
broker.id=2
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
num.partitions=3
#副本,默认只有一个副本,不会进行数据备份和冗余
replication.factor=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF
  • config/kafka-node3/server.properties
cat >config/kafka-node3/server.properties<<EOF
#broker的全局唯一编号,不能重复
broker.id=3
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/apache/kafka/logs
#指定Topic的分区数量,这里设置为3。默认只有一个分区,设置多分区可以支持并发读写和负载均衡
num.partitions=3
#副本,默认只有一个副本,不会进行数据备份和冗余
replication.factor=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=zookeeper-node1:2181,zookeeper-node2:2181,zookeeper-node3:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF

编写启动脚本 bootstrap.sh

#!/usr/bin/env sh
 
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties

编写文件 Dockerfile

FROM registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/centos:7.7.1908

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN export LANG=zh_CN.UTF-8

# 创建用户和用户组,跟yaml编排里的user: 10000:10000
RUN groupadd --system --gid=10000 hadoop && useradd --system --home-dir /home/hadoop --uid=10000 --gid=hadoop hadoop -m

# 安装sudo
RUN yum -y install sudo ; chmod 640 /etc/sudoers

# 给hadoop添加sudo权限
RUN echo "hadoop ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers

RUN yum -y install install net-tools telnet wget nc less

RUN mkdir -p /opt/apache/

# 添加配置 JDK
#ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
#ENV JAVA_HOME /opt/apache/jdk1.8.0_212
#ENV PATH $JAVA_HOME/bin:$PATH
RUN yum -y install java-1.8.0-openjdk-1.8.0.402.b06-1.el7_9.x86_64

# 添加配置 kafka server
ENV KAFKA_VERSION 2.12-3.5.2
ADD kafka_${KAFKA_VERSION}.tgz /opt/apache/

ENV KAFKA_HOME /opt/apache/kafka
RUN ln -s /opt/apache/kafka_${KAFKA_VERSION} $KAFKA_HOME

# 创建数据存储目录
RUN mkdir -p ${KAFKA_HOME}/data/logs

# copy bootstrap.sh
COPY bootstrap.sh /opt/apache/
RUN chmod +x /opt/apache/bootstrap.sh

RUN chown -R hadoop:hadoop /opt/apache

WORKDIR $KAFKA_HOME

构建镜像

# 需要查看构建镜像详细过程则需要加上 --progress=plain 选项
docker build -t registry.cn-hangzhou.aliyuncs.com/fazehan/kafka:2.12-3.5.2 . --no-cache 

编写文件 docker-compose.yaml

version: '3'
services:
  kafka-node1:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/kafka:2.12-3.5.2
    user: "hadoop:hadoop"
    container_name: kafka-node1
    hostname: kafka-node1
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node1/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE1_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node2:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/kafka:2.12-3.5.2
    user: "hadoop:hadoop"
    container_name: kafka-node2
    hostname: kafka-node2
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node2/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE2_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
  kafka-node3:
    image: registry.cn-hangzhou.aliyuncs.com/fazehan/kafka:2.12-3.5.2
    user: "hadoop:hadoop"
    container_name: kafka-node3
    hostname: kafka-node3
    restart: always
    privileged: true
    env_file:
      - .env
    volumes:
      - ./config/kafka-node3/server.properties:${KAFKA_HOME}/config/server.properties
    ports:
      - "${KAFKA_NODE3_SERVER_PORT}:9092"
    expose:
      - 2888
      - 3888
    command: ["sh","-c","/opt/apache/bootstrap.sh"]
    networks:
      - hadoop-network
    healthcheck:
      test: ["CMD-SHELL", "netstat -tnlp|grep :9092 || exit 1"]
      interval: 10s
      timeout: 10s
      retries: 5
 
# 连接外部网络
networks:
   hadoop-network:
        #driver: bridge
        external: true

编写环境变量文件 .ENV

内容如下:

# 对外暴露的端口
cat << EOF > .env
KAFKA_HOME=/opt/apache/kafka
KAFKA_NODE1_SERVER_PORT=39092
KAFKA_NODE2_SERVER_PORT=39093
KAFKA_NODE3_SERVER_PORT=39094
EOF

通过docker-compose启动部署

docker-compose -f docker-compose.yaml up -d
 
# 查看
docker-compose -f docker-compose.yaml ps

简单验证

# 登录zookeeper,在zookeeper查看brokers
${ZOOKEEPER_HOME}/bin/zkCli.sh ls /brokers/ids
 
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/1
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/2
${ZOOKEEPER_HOME}/bin/zkCli.sh get /brokers/ids/3

常用的 Kafka 客户端命令

添加topic

# 随便登录
docker exec -it kafka-node1 bash

# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092  --partitions 1 --replication-factor 1 --config retention.ms=259200000

查看topic

# 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list

# 查看topic列表详情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

# 查看消费者组
${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --list
kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe  --group test002

修改topic

# 修改分区,扩分区,不能减少分区
${KAFKA_HOME}/bin/kafka-topics.sh --alter --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partitions 2

# 修改过期时间,下面两行都可以
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --topic test002 --add-config retention.ms=86400000

${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000

# 修改副本数,将副本数修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --topic test002

扩容分区

#把test002 topic扩容为6个分区。
#注意:目前不支持减少分区,扩容前必须存在这个主题。
${KAFKA_HOME}/bin/kafka-topics.sh -alter --partitions 6 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002

${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe

删除topic

${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic test002 --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092

生成者和消费者

生产者

${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}

消费者

# 从头开始消费
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --from-beginning

# 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --partition 0 --offset 100

消费组

在 Kafka 中,消费组(Consumer Group)是一组独立消费者的集合,它们共同消费一个或多个 Topic 中的数据。消费组内的消费者协同工作,通过分摊该 Topic 中的所有分区,以实现消息的消费和处理。

消费组在 Kafka 消息队列中起到了至关重要的作用。它可以提供如下功能:

  • 并发消费:消费组内的每个消费者都可以独立地消费消息,可以实现高并发处理。

  • 自动负载均衡:消费组内的消费者会自动协作,将消费任务均分到所有消费者上,使得每个消费者都能处理相同数量的消息。

  • 提高可用性:当消费组内的一个或多个消费者故障退出时,消息会自动分配到其他消费者上,保证消费任务的不间断执行。

  • 支持多租户:可以通过 Consumer Group 来对不同的租户进行消息隔离,不同的 Consumer Group 可以读取同一个 Topic 的不同副本,或者读取不同 Topic 的不同分区,实现多个实例共享同一 Topic 或分散处理不同 Topic。

示例如下:

${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --topic test002 --group test002

查看数据积压

${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server kafka-node1:9092,kafka-node2:9092,kafka-node3:9092 --describe --group test002

kafka 数据积压处理方法

在 Kafka 中,由于消息的生产和消费速度可能不一致,导致消息会积压在 Kafka 的分区中,如果这些积压的消息处理不及时,会导致 Kafka 系统的性能下降和可用性降低等问题。因此,需要采取一些处理方法来解决数据积压问题:

  • 增加消费者:增加消费者可以使消费任务并行执行,加快消息的处理速度。可以通过增加消费者的方式将积压的消息消费掉,提高系统处理速度和效率。

  • 调整消费者组:当一个消费组中的消费者无法处理所有的消息时,可以考虑调整消费者组。可以增加消费者的数量或者更换消费者组,以适应消息处理的速度和大小。

  • 调整消息分区:Kafka 中Topic 的分区数也会影响数据积压的情况。可以调整分区数以改善数据读取和分发的情况,或者对热点 Topic 进行分区处理,以实现更好的性能和可用性。

  • 调整消费 offset:若积压的消息都已经被处理过了,却还在 Kafka 中存在,可能是消费者消费 offset 设置错误导致的。可以通过 Kafka 的 offset 操作,重置消费 offset,跳过已经处理过的消息,减少数据积压的问题。

  • 执行消息清洗:在消费 Kafka 消息时,可以额外执行一些消息清洗处理操作,将无用的数据过滤出去,或者将数据进行清理和格式化处理,减少中间处理环节,提高数据消费的效率和可用性。

以上是一些解决 Kafka 数据积压问题的常用方法,需要视具体情况而定,选择合适的方法来解决。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐