前言

记录docker部署kafka

部署kafka

#创建挂载目录
sudo mkdir -p /zero/kafka
#授权
chown -R 777 /zero/kafka/
#创建 Docker 网络(用于容器间通信,如果尚未创建)
docker network create app-tier

启动命令

docker run -d \
  --name kafka \
  --network app-tier \
  --restart always \
  --ulimit nofile=65536:65536 \
  -p 9092:9092 \
  -p 9094:9094 \
  -v /zero/kafka:/bitnami/kafka \
  -e TZ=Asia/Shanghai \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
  -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://172.16.10.40:9094 \
  -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT \
  -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  bitnami/kafka:3.6.2

如果docker pull  bitnami/kafka:3.6.2 拉取不了

vi /etc/docker/daemon.json

{
  "registry-mirrors": ["https://jhacxx1q.mirror.aliyuncs.com",
      "https://docker.1ms.run",
      "https://docker.1panel.live",
      "https://docker.ketches.cn",
            "https://docker.m.daocloud.io/",
            "https://hub-mirror.c.163.com",
            "https://dockerproxy.com/",
            "https://mirror.baidubce.com/",
            "https://docker.nju.edu.cn/",
            "https://docker.mirrors.sjtug.sjtu.edu.cn/",
            "https://mirror.ccs.tencentyun.com",
            "https://docker-0.unsee.tech",
            "https://register.liberx.info/",
            "https://docker.registry.cyou/",
            "https://docker-cf.registry.cyou/",
            "https://dockercf.jsdelivr.fyi/",
            "https://docker.jsdelivr.fyi/",
            "https://dockertest.jsdelivr.fyi/",
            "https://mirror.iscas.ac.cn/",
            "https://docker.rainbond.cc/",
            "https://mirror.aliyuncs.com",
            "https://docker.mirrors.ustc.edu.cn/"
        ]
}

#重新加载 systemd 配置

sudo systemctl daemon-reload

#重启 Docker 使配置生效

sudo systemctl restart docker

记录下kafka在java项目消费时如何处理:

pom引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <exclusions>
       <exclusion>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
       </exclusion>
    </exclusions>
</dependency>

yml配置

store:
  auto-startup-kafka: true
kafka:
  topics:
    - name: lst_export_excel_business
      partitions: 2
      replicas: 1

  bootstrap-servers: 172.16.10.40:9094
  producer:
    acks: -1
    retries: 32
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    batch-size: 32768
    compression-type: snappy
    buffer-memory: 67108864
    properties:
      'linger.ms': 5
  consumer:
    group-id: ${KAFKA_CONSUMER_GROUP_ID:storeServerGroup}
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    auto-offset-reset: latest
    enable-auto-commit: false
    auto-commit-interval: 3
    heartbeat-interval: 5000
  listener:
    ack-mode: manual_immediate

封装类

import java.util.Map;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@Configuration
@EnableKafka
@Log4j2
public class KafkaConsumerConfig {

    /**
     *  消费者批量工程
     */
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory(KafkaProperties kafkaProperties) {
        Map<String, Object> properties = kafkaProperties.getConsumer().buildProperties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); // boostrap server 配置
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);    // 一次poll操作最大获取的记录数量
        properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);   // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);      // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者
        log.info(" kafka batchFactory properties = {}",properties);
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties));
        factory.setBatchListener(true); // 设置批量消费
        factory.getContainerProperties().setPollTimeout(1000);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);  // 手动提交
        return factory;
    }

}
import cloud.jiuwei.store.constant.StoreConstants;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopicConfig {

    @Value("${spring.profiles.active:uat}")
    private String profile;

    private static String env;

    private static final String SUFFIX = "_uat";

    @PostConstruct
    public void init() {
        env = profile;
    }

    private static String getTopicName(final String topic) {
        if (StoreConstants.UAT.equals(env)) {
            return topic + SUFFIX;
        } else {
            return topic;
        }
    }
    /**
     * 业务异步导出 excel
     */
    private static final String STO_EXPORT_EXCEL_BUSINESS = "lst_export_excel_business";

    @Bean(name = STO_EXPORT_EXCEL_BUSINESS)
    public static String getStoExportExcelBusiness() {
        return getTopicName(STO_EXPORT_EXCEL_BUSINESS);
    }
}
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/**
 * 事务管理工具
 */
@Component
@Slf4j
public class TransactionUtil {

    private static ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public TransactionUtil(ThreadPoolTaskExecutor transactionPool) {
        threadPoolTaskExecutor = transactionPool;
    }

    /**
     * 在事务提交后同步执行
     *
     * @param runnable
     */
    public static void syncAfterCommit(Runnable runnable) {
        log.info("在事务提交后同步执行");
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                        @Override
                        public void afterCommit() {
                            runnable.run();
                        }
                    });
        } else {
            runnable.run();
        }
    }

    /**
     * 在事务提交后异步执行
     *
     * @param executor
     * @param runnable
     */
    public static void asyncAfterCommit(Executor executor, Runnable runnable) {
        log.info("在事务提交后异步执行");
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                        @Override
                        public void afterCommit() {
                            executor.execute(runnable);
                        }
                    });
        } else {
            executor.execute(runnable);
        }
    }

    /**
     * 在事务提交后异步执行(默认线程池)
     *
     * @param runnable
     */
    public static void asyncAfterCommit(Runnable runnable) {
        log.info("在事务提交后异步执行(默认线程池)");
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                        @Override
                        public void afterCommit() {
                            threadPoolTaskExecutor.execute(runnable);
                        }
                    });
        } else {
            threadPoolTaskExecutor.execute(runnable);
        }
    }
}

生产者-生产消息

TransactionUtil.syncAfterCommit(() ->
        KafkaUtils.sendAsynMsg(KafkaTopicConfig.getStoExportExcelBusiness(), exportRecordVO, null));

消费者-消费消息

/**
 * excel 导出 kafka 监听
 */
@Component
@Slf4j
public class ExportExcelBusinessListener{
@KafkaListener(autoStartup = "${store.auto-startup-kafka}", topics = "#{@lst_export_excel_business}")
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {
    log.info("【excel业务导出监听】接收:{}", record.toString());
    ExportRecordBusinessPO queryExportRecordBusinessPO = null;
    try {

        ExportRecordBusinessVO exportRecordBusinessVO = JSON.parseObject(record.value(), ExportRecordBusinessVO.class);
        queryExportRecordBusinessPO = exportRecordBusinessService.getById(exportRecordBusinessVO.getId());

        if (queryExportRecordBusinessPO == null) {
            log.warn("【excel业务导出监听】记录不存在,忽略消息");
            ack.acknowledge();
            return;
        }

        //修改数据状态
        LambdaUpdateWrapper<ExportRecordBusinessPO> updateWrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>()
                .eq(ExportRecordBusinessPO::getId, queryExportRecordBusinessPO.getId())
                .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORTING.getCode())
                .set(ExportRecordBusinessPO::getUpdateTime, new Date());
        exportRecordBusinessService.update(null, updateWrapper);

        ExportRecordBusinessDTO exportRecordBusinessDTO = export(exportRecordBusinessVO);
        log.info("【excel业务导出监听】导出文件结果:{}",exportRecordBusinessDTO);
        ExportRecordBusinessPO exportRecordBusinessPO = BeanUtil.toBean(exportRecordBusinessDTO, ExportRecordBusinessPO.class);
        exportRecordBusinessService.updateById(exportRecordBusinessPO);

        ack.acknowledge();
    }catch (BizException bizException) {
        log.error("【excel业务导出监听】业务异常", bizException);
        updateStatusToFailed(queryExportRecordBusinessPO, ack);
    } catch (Exception e) {
        log.error("【excel业务导出监听】系统异常", e);
        updateStatusToFailed(queryExportRecordBusinessPO, ack);
    }

}


private void updateStatusToFailed(ExportRecordBusinessPO po, Acknowledgment ack) {
    if (po == null || po.getId() == null) {
        ack.acknowledge();
        return;
    }
    LambdaUpdateWrapper<ExportRecordBusinessPO> wrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>()
            .eq(ExportRecordBusinessPO::getId, po.getId())
            .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORT_FAILED.getCode())
            .set(ExportRecordBusinessPO::getUpdateTime, new Date());
    exportRecordBusinessService.update(null, wrapper);
    ack.acknowledge();
}

}

备注:

1.kafka消费者默认消费时长超过5分钟后ack提交失败

2.如果代码执行时间较长 记录生产者发送kafka数据json,消费时ack配合异步+定时任务监控处理中数据重新发送kafka执行 实现预防与补偿机制

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐