docker快速部署kafka
本文介绍了使用Docker部署Kafka的详细步骤。首先创建挂载目录并授权,然后建立Docker网络。通过docker run命令启动Kafka容器,配置了端口映射、环境变量和数据卷挂载。针对镜像拉取问题,提供了修改daemon.json配置文件的解决方案,列出了20多个国内镜像源地址。最后执行systemctl命令使配置生效。整个过程涵盖了Kafka 3.6.2版本的容器化部署及常见问题的解决方
前言
记录docker部署kafka
部署kafka
#创建挂载目录
sudo mkdir -p /zero/kafka
#授权
chown -R 777 /zero/kafka/
#创建 Docker 网络(用于容器间通信,如果尚未创建)
docker network create app-tier
启动命令
docker run -d \
--name kafka \
--network app-tier \
--restart always \
--ulimit nofile=65536:65536 \
-p 9092:9092 \
-p 9094:9094 \
-v /zero/kafka:/bitnami/kafka \
-e TZ=Asia/Shanghai \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://172.16.10.40:9094 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:3.6.2
如果docker pull bitnami/kafka:3.6.2 拉取不了
vi /etc/docker/daemon.json
{
"registry-mirrors": ["https://jhacxx1q.mirror.aliyuncs.com",
"https://docker.1ms.run",
"https://docker.1panel.live",
"https://docker.ketches.cn",
"https://docker.m.daocloud.io/",
"https://hub-mirror.c.163.com",
"https://dockerproxy.com/",
"https://mirror.baidubce.com/",
"https://docker.nju.edu.cn/",
"https://docker.mirrors.sjtug.sjtu.edu.cn/",
"https://mirror.ccs.tencentyun.com",
"https://docker-0.unsee.tech",
"https://register.liberx.info/",
"https://docker.registry.cyou/",
"https://docker-cf.registry.cyou/",
"https://dockercf.jsdelivr.fyi/",
"https://docker.jsdelivr.fyi/",
"https://dockertest.jsdelivr.fyi/",
"https://mirror.iscas.ac.cn/",
"https://docker.rainbond.cc/",
"https://mirror.aliyuncs.com",
"https://docker.mirrors.ustc.edu.cn/"
]
}
#重新加载 systemd 配置
sudo systemctl daemon-reload
#重启 Docker 使配置生效
sudo systemctl restart docker
记录下kafka在java项目消费时如何处理:
pom引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
yml配置
store: auto-startup-kafka: truekafka: topics: - name: lst_export_excel_business partitions: 2 replicas: 1 bootstrap-servers: 172.16.10.40:9094 producer: acks: -1 retries: 32 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 32768 compression-type: snappy buffer-memory: 67108864 properties: 'linger.ms': 5 consumer: group-id: ${KAFKA_CONSUMER_GROUP_ID:storeServerGroup} key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest enable-auto-commit: false auto-commit-interval: 3 heartbeat-interval: 5000 listener: ack-mode: manual_immediate
封装类
import java.util.Map;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@Configuration
@EnableKafka
@Log4j2
public class KafkaConsumerConfig {
/**
* 消费者批量工程
*/
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> batchFactory(KafkaProperties kafkaProperties) {
Map<String, Object> properties = kafkaProperties.getConsumer().buildProperties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); // boostrap server 配置
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 一次poll操作最大获取的记录数量
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者
log.info(" kafka batchFactory properties = {}",properties);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(properties));
factory.setBatchListener(true); // 设置批量消费
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); // 手动提交
return factory;
}
}
import cloud.jiuwei.store.constant.StoreConstants;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaTopicConfig {
@Value("${spring.profiles.active:uat}")
private String profile;
private static String env;
private static final String SUFFIX = "_uat";
@PostConstruct
public void init() {
env = profile;
}
private static String getTopicName(final String topic) {
if (StoreConstants.UAT.equals(env)) {
return topic + SUFFIX;
} else {
return topic;
}
}
/**
* 业务异步导出 excel
*/
private static final String STO_EXPORT_EXCEL_BUSINESS = "lst_export_excel_business";
@Bean(name = STO_EXPORT_EXCEL_BUSINESS)
public static String getStoExportExcelBusiness() {
return getTopicName(STO_EXPORT_EXCEL_BUSINESS);
}
}
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* 事务管理工具
*/
@Component
@Slf4j
public class TransactionUtil {
private static ThreadPoolTaskExecutor threadPoolTaskExecutor;
public TransactionUtil(ThreadPoolTaskExecutor transactionPool) {
threadPoolTaskExecutor = transactionPool;
}
/**
* 在事务提交后同步执行
*
* @param runnable
*/
public static void syncAfterCommit(Runnable runnable) {
log.info("在事务提交后同步执行");
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
runnable.run();
}
});
} else {
runnable.run();
}
}
/**
* 在事务提交后异步执行
*
* @param executor
* @param runnable
*/
public static void asyncAfterCommit(Executor executor, Runnable runnable) {
log.info("在事务提交后异步执行");
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
executor.execute(runnable);
}
});
} else {
executor.execute(runnable);
}
}
/**
* 在事务提交后异步执行(默认线程池)
*
* @param runnable
*/
public static void asyncAfterCommit(Runnable runnable) {
log.info("在事务提交后异步执行(默认线程池)");
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
threadPoolTaskExecutor.execute(runnable);
}
});
} else {
threadPoolTaskExecutor.execute(runnable);
}
}
}
生产者-生产消息
TransactionUtil.syncAfterCommit(() ->
KafkaUtils.sendAsynMsg(KafkaTopicConfig.getStoExportExcelBusiness(), exportRecordVO, null));
消费者-消费消息
/** * excel 导出 kafka 监听 */ @Component @Slf4j public class ExportExcelBusinessListener{@KafkaListener(autoStartup = "${store.auto-startup-kafka}", topics = "#{@lst_export_excel_business}") public void onMessage(ConsumerRecord<String, String> record, Acknowledgment ack) { log.info("【excel业务导出监听】接收:{}", record.toString()); ExportRecordBusinessPO queryExportRecordBusinessPO = null; try { ExportRecordBusinessVO exportRecordBusinessVO = JSON.parseObject(record.value(), ExportRecordBusinessVO.class); queryExportRecordBusinessPO = exportRecordBusinessService.getById(exportRecordBusinessVO.getId()); if (queryExportRecordBusinessPO == null) { log.warn("【excel业务导出监听】记录不存在,忽略消息"); ack.acknowledge(); return; } //修改数据状态 LambdaUpdateWrapper<ExportRecordBusinessPO> updateWrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>() .eq(ExportRecordBusinessPO::getId, queryExportRecordBusinessPO.getId()) .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORTING.getCode()) .set(ExportRecordBusinessPO::getUpdateTime, new Date()); exportRecordBusinessService.update(null, updateWrapper); ExportRecordBusinessDTO exportRecordBusinessDTO = export(exportRecordBusinessVO); log.info("【excel业务导出监听】导出文件结果:{}",exportRecordBusinessDTO); ExportRecordBusinessPO exportRecordBusinessPO = BeanUtil.toBean(exportRecordBusinessDTO, ExportRecordBusinessPO.class); exportRecordBusinessService.updateById(exportRecordBusinessPO); ack.acknowledge(); }catch (BizException bizException) { log.error("【excel业务导出监听】业务异常", bizException); updateStatusToFailed(queryExportRecordBusinessPO, ack); } catch (Exception e) { log.error("【excel业务导出监听】系统异常", e); updateStatusToFailed(queryExportRecordBusinessPO, ack); } } private void updateStatusToFailed(ExportRecordBusinessPO po, Acknowledgment ack) { if (po == null || po.getId() == null) { ack.acknowledge(); return; } LambdaUpdateWrapper<ExportRecordBusinessPO> wrapper = new LambdaUpdateWrapper<ExportRecordBusinessPO>() .eq(ExportRecordBusinessPO::getId, po.getId()) .set(ExportRecordBusinessPO::getStatus, BusinessExportStatusEnum.EXPORT_FAILED.getCode()) .set(ExportRecordBusinessPO::getUpdateTime, new Date()); exportRecordBusinessService.update(null, wrapper); ack.acknowledge(); }}
备注:
1.kafka消费者默认消费时长超过5分钟后ack提交失败
2.如果代码执行时间较长 记录生产者发送kafka数据json,消费时ack配合异步+定时任务监控处理中数据重新发送kafka执行 实现预防与补偿机制
更多推荐
所有评论(0)