生产环境 Kafka 高可靠不丢数据最佳配置清单
【代码】生产环境 Kafka 高可靠不丢数据最佳配置清单。
·
一、生产端(Producer)核心配置(代码 / 配置文件)
# 1. 核心可靠性:必须等所有ISR副本写入成功
acks=all
# 2. 重试机制:网络抖动/临时故障自动重试(避免偶发失败丢数据)
retries=3 # 重试3次
retry.backoff.ms=1000 # 每次重试间隔1秒,避免高频重试打满Broker
delivery.timeout.ms=120000 # 消息投递总超时2分钟(覆盖重试+网络耗时)
# 3. 幂等性:防止重试导致重复,同时保证消息有序
enable.idempotence=true
max.in.flight.requests.per.connection=5 # 配合幂等性,最多5个未响应请求(保证有序)
# 4. 批量发送(兼顾性能+可靠性)
batch.size=16384 # 批量大小16KB(攒够再发,减少网络IO)
linger.ms=5 # 最多等5ms,即使没攒够16KB也发送(避免消息延迟过高)
# 5. 序列化+压缩(可选,不影响可靠性,提升传输效率)
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
compression.type=lz4 # LZ4压缩,兼顾压缩比和性能
# 6. 分区器(可选,保证同Key消息有序)
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner # 或自定义按业务Key分区
二、Broker 端(服务端)核心配置(server.properties)
# 1. 副本机制:核心保障
replication.factor=3 # 每个分区3个副本(生产环境最低2,推荐3)
min.insync.replicas=2 # 至少2个副本同步成功才认为写入成功(配合acks=all)
# 2. 禁止脏选举(避免落后副本成为Leader丢数据)
unclean.leader.election.enable=false
# 3. 副本同步超时(避免ISR列表频繁变化)
replica.lag.time.max.ms=30000 # 副本30秒没同步则踢出ISR
replica.fetch.min.bytes=1 # 副本拉取最小字节(保证及时同步)
replica.fetch.wait.max.ms=500 # 副本拉取超时500ms
# 4. 日志持久化(避免日志被过早删除)
log.retention.hours=72 # 日志保留72小时(根据业务调整)
log.retention.check.interval.ms=300000 # 5分钟检查一次日志清理
log.segment.bytes=1073741824 # 日志分段大小1GB(减少文件数,提升性能)
log.flush.interval.messages=10000 # 每1万条消息刷一次盘(或用时间)
log.flush.interval.ms=10000 # 每10秒刷一次盘(平衡性能和可靠性)
# 5. 控制器配置(保证集群稳定)
controller.quorum.voters=0@kafka1:9093,1@kafka2:9093,2@kafka3:9093 # 3节点控制器集群
controlled.shutdown.enable=true # 允许Broker优雅关闭(避免关闭时丢数据)
三、消费端(Consumer)核心配置(代码 / 配置文件)
# 1. 关闭自动提交(核心:手动控制提交时机)
enable.auto.commit=false
# 2. 手动提交相关配置
auto.commit.interval.ms=0 # 自动提交间隔置0(禁用)
max.poll.records=500 # 每次拉取500条(避免单次拉取过多导致处理超时)
session.timeout.ms=10000 # 会话超时10秒(及时检测消费端故障)
heartbeat.interval.ms=3000 # 心跳间隔3秒(配合会话超时)
# 3. 消费位移重置(避免消费端重启后丢数据)
auto.offset.reset=earliest # 找不到位移时从最早开始消费(而非latest)
# 4. 消费失败重试(可选,业务层保障)
# 代码层面需实现:处理失败时不提交offset,并重试/入死信队列
# 示例伪代码:
# try {
# 处理消息;
# consumer.commitSync(); // 处理成功才提交
# } catch (Exception e) {
# 记录异常;
# 重试3次 / 发送到死信队列;
# // 不提交offset,下次重新消费
# }
# 5. 反序列化(和生产端对应)
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
四、额外保障(运维 / 部署层面)
- 集群部署:Broker 节点至少 3 台,分布在不同物理机 / 机架(避免单机故障);
- 监控告警:监控 ISR 列表变化、副本同步延迟、生产 / 消费错误率,异常及时告警;
- 数据备份:定期备份 Kafka 日志目录(如每天凌晨),应对极端数据损坏;
- 版本选择:使用 2.8+ 或 3.x 稳定版本(修复了早期版本的丢数据 Bug);
- 权限控制:开启 SASL/SCRAM 或 ACL 权限,避免误操作删除 Topic / 数据。
总结
- 生产端:核心是 acks=all + 重试 + 幂等性,兼顾批量发送保证性能;
- Broker 端:核心是 3副本 + min.insync.replicas=2 + 禁止脏选举,日志保留足够时间;
- 消费端:核心是 关闭自动提交 + 处理成功后手动提交,位移重置为 earliest;
- 额外通过集群部署、监控、备份进一步兜底,确保生产环境零数据丢失。
更多推荐
所有评论(0)