一、生产端(Producer)核心配置(代码 / 配置文件)

# 1. 核心可靠性:必须等所有ISR副本写入成功
acks=all

# 2. 重试机制:网络抖动/临时故障自动重试(避免偶发失败丢数据)
retries=3                # 重试3次
retry.backoff.ms=1000    # 每次重试间隔1秒,避免高频重试打满Broker
delivery.timeout.ms=120000  # 消息投递总超时2分钟(覆盖重试+网络耗时)

# 3. 幂等性:防止重试导致重复,同时保证消息有序
enable.idempotence=true
max.in.flight.requests.per.connection=5  # 配合幂等性,最多5个未响应请求(保证有序)

# 4. 批量发送(兼顾性能+可靠性)
batch.size=16384         # 批量大小16KB(攒够再发,减少网络IO)
linger.ms=5              # 最多等5ms,即使没攒够16KB也发送(避免消息延迟过高)

# 5. 序列化+压缩(可选,不影响可靠性,提升传输效率)
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
compression.type=lz4     # LZ4压缩,兼顾压缩比和性能

# 6. 分区器(可选,保证同Key消息有序)
partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner  # 或自定义按业务Key分区

二、Broker 端(服务端)核心配置(server.properties)

# 1. 副本机制:核心保障
replication.factor=3    # 每个分区3个副本(生产环境最低2,推荐3)
min.insync.replicas=2    # 至少2个副本同步成功才认为写入成功(配合acks=all)

# 2. 禁止脏选举(避免落后副本成为Leader丢数据)
unclean.leader.election.enable=false

# 3. 副本同步超时(避免ISR列表频繁变化)
replica.lag.time.max.ms=30000  # 副本30秒没同步则踢出ISR
replica.fetch.min.bytes=1      # 副本拉取最小字节(保证及时同步)
replica.fetch.wait.max.ms=500  # 副本拉取超时500ms

# 4. 日志持久化(避免日志被过早删除)
log.retention.hours=72         # 日志保留72小时(根据业务调整)
log.retention.check.interval.ms=300000  # 5分钟检查一次日志清理
log.segment.bytes=1073741824   # 日志分段大小1GB(减少文件数,提升性能)
log.flush.interval.messages=10000  # 每1万条消息刷一次盘(或用时间)
log.flush.interval.ms=10000    # 每10秒刷一次盘(平衡性能和可靠性)

# 5. 控制器配置(保证集群稳定)
controller.quorum.voters=0@kafka1:9093,1@kafka2:9093,2@kafka3:9093  # 3节点控制器集群
controlled.shutdown.enable=true  # 允许Broker优雅关闭(避免关闭时丢数据)

三、消费端(Consumer)核心配置(代码 / 配置文件)

# 1. 关闭自动提交(核心:手动控制提交时机)
enable.auto.commit=false

# 2. 手动提交相关配置
auto.commit.interval.ms=0       # 自动提交间隔置0(禁用)
max.poll.records=500            # 每次拉取500条(避免单次拉取过多导致处理超时)
session.timeout.ms=10000        # 会话超时10秒(及时检测消费端故障)
heartbeat.interval.ms=3000      # 心跳间隔3秒(配合会话超时)

# 3. 消费位移重置(避免消费端重启后丢数据)
auto.offset.reset=earliest      # 找不到位移时从最早开始消费(而非latest)

# 4. 消费失败重试(可选,业务层保障)
# 代码层面需实现:处理失败时不提交offset,并重试/入死信队列
# 示例伪代码:
# try {
#   处理消息;
#   consumer.commitSync();  // 处理成功才提交
# } catch (Exception e) {
#   记录异常;
#   重试3/ 发送到死信队列;
#   // 不提交offset,下次重新消费
# }

# 5. 反序列化(和生产端对应)
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

四、额外保障(运维 / 部署层面)

  • 集群部署:Broker 节点至少 3 台,分布在不同物理机 / 机架(避免单机故障);
  • 监控告警:监控 ISR 列表变化、副本同步延迟、生产 / 消费错误率,异常及时告警;
  • 数据备份:定期备份 Kafka 日志目录(如每天凌晨),应对极端数据损坏;
  • 版本选择:使用 2.8+ 或 3.x 稳定版本(修复了早期版本的丢数据 Bug);
  • 权限控制:开启 SASL/SCRAM 或 ACL 权限,避免误操作删除 Topic / 数据。

总结

  • 生产端:核心是 acks=all + 重试 + 幂等性,兼顾批量发送保证性能;
  • Broker 端:核心是 3副本 + min.insync.replicas=2 + 禁止脏选举,日志保留足够时间;
  • 消费端:核心是 关闭自动提交 + 处理成功后手动提交,位移重置为 earliest;
  • 额外通过集群部署、监控、备份进一步兜底,确保生产环境零数据丢失。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐