一、服务器环境

序号

部署版本

版本

备注

1

操作系统

CentOS Linux release 7.9.2009 (Core)

2

docker

Docker version 20.10.6, build 370c289

3

docker-compose

docker-compose version 1.28.2, build 67630359

二、服务规划

序号

服务

名称

端口

1

zookeeper

zookeeper

2181,2888,3888

2

kafka

kafka1

9092:9092

3

kafka

kafka2

9093:9092

4

kafka

kafka3

9094:9092

三、部署kafka

1、创建/opt/beidousky/kafka-zk目录,添加docker-compose.yaml文件

version: "3"
services:
  zookeeper:
    image: zookeeper:3.6.3
    container_name: zookeeper
    user: root
    restart: always
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
    environment:
      ZOO_MY_ID: 1
      TZ: Asia/Shanghai
    volumes:
      - ./zk-conf/zoo.cfg:/conf/zoo.cfg
      - ./zk-conf/zookeeper_server_jaas.conf:/conf/zookeeper_server_jaas.conf
      - ./zk-conf/java.env:/conf/java.env
      - ./zk-data/data:/data
      - ./zk-data/datalog:/datalog
      - ./zk-data/logs:/logs

  kafka1:
    image: wurstmeister/kafka:2.13-2.8.1
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    volumes:
      - ./kafka-data1:/kafka
      - ./kafka-conf:/opt/kafka/secrets/
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.1.214:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_PORT: 9092
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.244:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    restart: always


  kafka2:
    image: wurstmeister/kafka:2.13-2.8.1
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - 9093:9092
    volumes:
      - ./kafka-data2:/kafka
      - ./kafka-conf:/opt/kafka/secrets/
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.1.214:9093 #宿主机IP
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_PORT: 9092
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.244:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    restart: always

  kafka3:
    image: wurstmeister/kafka:2.13-2.8.1
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - 9094:9092
    volumes:
      - ./kafka-data3:/kafka
      - ./kafka-conf:/opt/kafka/secrets/
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.1.214:9094 #宿主机IP
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_PORT: 9092
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.244:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    restart: always

2、创建/opt/beidousky/kafka-zk/zk-conf目录,添加zoo.cfg文件

dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
quorumListenOnAllIPs=true

server.1=192.168.1.244:2888:3888;2181
#server.2=192.168.1.xxx:2888:3888;2181
#server.3=192.168.1.xxx:3888;2181

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
sessionRequireClientSASLAuth=true
#requireClientAuthScheme=sasl
jaasLoginRenew=3600000

3、在/opt/beidousky/kafka-zk/zk-conf目录下,添加java.env文件

# 指定jaas文件的位置
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/conf/zookeeper_server_jaas.conf"

4、在/opt/beidousky/kafka-zk/zk-conf目录下,添加zookeeper_server_jaas.conf文件

数据格式为user_用户名="用户密码"或者username="用户名" password="用户密码"

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_admin="admin123"
       user_kafka="kafka123"
       ;
};

Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="kafka"
       password="kafka123"
       ;
};

5、创建/opt/beidousky/kafka-zk/kafka-conf目录,添加server_jaas.conf文件

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_admin="admin123"
       user_kafka="kafka123"
       ;
};

Client {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="kafka"
       password="kafka123"
       ;
};

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="123456";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456";
};

6、启动kafka集群服务

cd /opt/beidousky/kafka-zk
docker-compose up -d

7、创建Topic主题验证kafka集群服务

cd /opt/beidousky/kafka-zk
docker exec -it kafka1 /bin/bash
kafka-topics.sh --zookeeper 192.168.1.244:2181 --create --topic topic-test --partitions 3 --replication-factor 3
kafka-topics.sh --zookeeper 192.168.1.244:2181 --describe --topic topic-test

如图:

四、SpringBoot项目连接kafka集群

application.yml文件

  spring:    
    kafka:
      bootstrap-servers: 192.168.1.214:9092,192.168.1.214:9093,192.168.1.214:9094
      #=============== producer  =======================
      producer:
        #如果该值大于零时,表示启用重试失败的发送次数
        retries: 0
        #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
        batch-size: 16384
        #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
        buffer-memory: 33554432
        #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        #value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #=============== consumer  =======================
      consumer:
        #用于标识此使用者所属的使用者组的唯一字符串
        group-id: consumer-group
        #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
        #可选的值为latest, earliest, none
        auto-offset-reset: earliest
        #消费者的偏移量将在后台定期提交,默认值为true
        enable-auto-commit: true
        #如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
        auto-commit-interval: 100
        #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #认证相关配置
      properties:
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: PLAIN
          jaas:
            config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";'

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐