目录

前言

一、项目概述与技术选型

1.1 电商场景设计

1.2 技术栈

二、环境搭建

2.1 启动 Kafka(推荐 Docker 方式)

2.2 创建 Spring Boot 项目

2.3 配置文件

三、订单消息实体定义

3.1 订单事件实体

3.2 常量定义

四、消息生产者实现

4.1 订单服务发送消息

4.2 生产者配置类(可选)

五、消息消费者实现

5.1 库存服务消费订单消息

5.2 手动确认配置

六、生产级特性实现

6.1 幂等性保证

6.1.1 消息消费记录表

6.1.2 幂等校验服务

6.2 重试机制

6.2.1 失败消息表

6.2.2 消费者重试逻辑

6.2.3 定时重扫失败消息

6.3 顺序消费保证

七、同步请求-响应模式(扩展)

7.1 配置 ReplyingKafkaTemplate

7.2 发送请求并等待响应

八、监控与管理

8.1 添加 Actuator 端点

8.2 自定义监控指标

8.3 关键指标关注

九、完整测试流程

9.1 单元测试

9.2 集成测试

十、最佳实践总结

10.1 配置建议

10.2 设计要点

10.3 常见问题与解决方案

十一、总结


前言

在上一章中,我们使用 Spring Boot + MyBatis + MySQL 实现了一个基础的电商后端。然而,随着业务发展,系统面临两大挑战:

  1. 高并发压力:大促期间订单创建峰值可达每秒数千笔,直接操作数据库可能导致崩溃

  2. 系统解耦需求:订单创建后需要通知库存、物流、积分等多个服务,同步调用导致耦合严重

Kafka 正是解决这些问题的利器。作为业界公认的高吞吐分布式消息队列,Kafka 能在系统之间搭建异步通信桥梁,实现流量削峰填谷和服务解耦-1-6

本章将以电商订单处理为核心场景,手把手带你完成 Spring Boot 整合 Kafka 的全过程,并深入讲解幂等性、重试机制等生产级必备特性。


一、项目概述与技术选型

1.1 电商场景设计

我们将实现以下核心流程:

┌─────────────┐     ┌──────────┐     ┌──────────────────┐
│  用户下单   │ ──> │ 订单服务 │ ──> │   Kafka         │
└─────────────┘     └──────────┘     └────────┬─────────┘
                                              │
                    ┌─────────────────────────┼─────────────────────────┐
                    │                         │                         │
                    ▼                         ▼                         ▼
           ┌────────────────┐       ┌────────────────┐       ┌────────────────┐
           │   库存服务     │       │   支付服务     │       │   通知服务     │
           │  (消费者组A)   │       │  (消费者组A)   │       │  (消费者组B)   │
           └────────────────┘       └────────────────┘       └────────────────┘

业务流程

  1. 订单服务接收用户下单请求,将订单信息发送到 Kafka

  2. 库存服务消费订单消息,扣减库存

  3. 支付服务处理支付逻辑(可异步)

  4. 通知服务发送邮件/SMS 给用户

1.2 技术栈

组件 版本 说明
Spring Boot 2.7.x / 3.x 基础框架
Spring Kafka 2.8+ / 3.0+ Kafka 集成依赖
Apache Kafka 2.8+ 消息队列
MySQL 5.7+ 业务数据存储
MyBatis 2.2+ 持久层框架(沿用上一章)
Docker 可选 快速搭建 Kafka 环境

二、环境搭建

2.1 启动 Kafka(推荐 Docker 方式)

创建 docker-compose.yml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

执行命令启动:

docker-compose up -d

2.2 创建 Spring Boot 项目

在上一章项目基础上,添加 Kafka 依赖:

<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!-- 可选:JSON 序列化支持 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

2.3 配置文件

在 application.yml 中配置 Kafka 连接信息:

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka 服务器地址
    
    # 生产者配置
    producer:
      retries: 3                        # 发送失败重试次数
      acks: all                         # 消息确认机制(all 表示等待所有副本确认)
      compression-type: snappy           # 压缩方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    
    # 消费者配置
    consumer:
      group-id: order-group              # 消费者组ID(核心!)
      auto-offset-reset: earliest        # 无偏移量时从最早消息开始消费
      enable-auto-commit: false          # 关闭自动提交,由业务控制
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.example.mall.entity  # 信任的包(反序列化用)
    
    # 监听器配置
    listener:
      type: batch                        # 批量消费模式
      concurrency: 3                      # 并发线程数(建议 <= 分区数)
      ack-mode: manual_immediate          # 手动确认模式

配置说明

  • group-id:消费者组标识,同一组内的消费者共同消费 Topic 消息(负载均衡)-1

  • auto-offset-reset:新消费者从哪里开始消费,earliest 从头开始,latest 从最新开始-1

  • enable-auto-commit: false:关闭自动提交,改为手动确认,确保消息可靠处理-2


三、订单消息实体定义

3.1 订单事件实体

package com.example.mall.event;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String eventId;          // 事件ID(用于幂等)
    private Long orderId;            // 订单ID
    private String orderNo;          // 订单号
    private Long userId;             // 用户ID
    private BigDecimal amount;       // 金额
    private String eventType;         // 事件类型:CREATED, PAID, CANCELLED
    private LocalDateTime eventTime;  // 事件时间
}

3.2 常量定义

package com.example.mall.constant;

public interface KafkaTopics {
    String ORDER_TOPIC = "order-events";          // 订单事件主题
    String INVENTORY_TOPIC = "inventory-events";  // 库存事件主题
    String NOTIFICATION_TOPIC = "notification-events"; // 通知主题
    
    String ORDER_GROUP = "order-process-group";   // 订单处理消费者组
    String INVENTORY_GROUP = "inventory-group";   // 库存服务组
    String NOTIFICATION_GROUP = "notification-group"; // 通知服务组
}

四、消息生产者实现

4.1 订单服务发送消息

package com.example.mall.service;

import com.example.mall.entity.Order;
import com.example.mall.event.OrderEvent;
import com.example.mall.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static com.example.mall.constant.KafkaTopics.ORDER_TOPIC;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
    
    private final OrderMapper orderMapper;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    /**
     * 创建订单并发送消息到Kafka
     */
    @Transactional
    public Order createOrder(Order order) {
        // 1. 保存订单到数据库
        orderMapper.insert(order);
        
        // 2. 构建订单事件
        OrderEvent event = new OrderEvent(
            UUID.randomUUID().toString(),
            order.getId(),
            order.getOrderNo(),
            order.getUserId(),
            order.getTotalAmount(),
            "CREATED",
            LocalDateTime.now()
        );
        
        // 3. 发送消息到Kafka(异步)
        CompletableFuture<SendResult<String, Object>> future = 
            kafkaTemplate.send(ORDER_TOPIC, event.getOrderId().toString(), event);
        
        // 4. 添加回调处理
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("订单消息发送成功: orderNo={}, offset={}", 
                    order.getOrderNo(), result.getRecordMetadata().offset());
            } else {
                log.error("订单消息发送失败: orderNo={}", order.getOrderNo(), ex);
                // TODO: 记录失败消息到本地表,便于后续补偿
            }
        });
        
        return order;
    }
}

关键点

  • 使用 KafkaTemplate 发送消息,支持泛型(Key类型, Value类型)-1

  • 异步发送并添加回调,便于监控和异常处理

  • 消息 Key 使用订单ID,保证同一订单的消息发往同一分区(顺序消费)

4.2 生产者配置类(可选)

如果配置文件中已设置,Spring Boot 会自动配置 KafkaTemplate。如需自定义,可添加:

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

五、消息消费者实现

5.1 库存服务消费订单消息

package com.example.mall.consumer;

import com.example.mall.event.OrderEvent;
import com.example.mall.service.InventoryService;
import com.example.mall.service.MessageConsumeRecordService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import static com.example.mall.constant.KafkaTopics.INVENTORY_GROUP;
import static com.example.mall.constant.KafkaTopics.ORDER_TOPIC;

@Slf4j
@Component
@RequiredArgsConstructor
public class OrderConsumer {
    
    private final InventoryService inventoryService;
    private final MessageConsumeRecordService consumeRecordService;
    
    /**
     * 监听订单事件,处理库存扣减
     */
    @KafkaListener(
        topics = ORDER_TOPIC,
        groupId = INVENTORY_GROUP,
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void consumeOrderEvent(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
        String messageKey = record.key();
        OrderEvent event = record.value();
        
        log.info("接收到订单事件: key={}, event={}, partition={}, offset={}", 
            messageKey, event, record.partition(), record.offset());
        
        try {
            // 1. 幂等性校验:检查是否已处理过该消息
            if (consumeRecordService.isProcessed(messageKey)) {
                log.info("消息已处理过,跳过: {}", messageKey);
                ack.acknowledge();
                return;
            }
            
            // 2. 根据事件类型处理业务
            switch (event.getEventType()) {
                case "CREATED":
                    inventoryService.deductStock(event.getOrderId(), event.getUserId());
                    break;
                case "CANCELLED":
                    inventoryService.releaseStock(event.getOrderId());
                    break;
                default:
                    log.warn("未知事件类型: {}", event.getEventType());
            }
            
            // 3. 记录已处理的消息ID
            consumeRecordService.recordProcessed(messageKey);
            
            // 4. 手动提交偏移量
            ack.acknowledge();
            
        } catch (Exception e) {
            log.error("处理订单事件失败: key={}", messageKey, e);
            // 这里不提交ACK,消息会重新消费(取决于重试策略)
            throw new RuntimeException("消费失败,触发重试", e);
        }
    }
}

关键点

  • @KafkaListener:指定监听的 Topic 和消费者组 -1

  • ConsumerRecord:获取消息完整元数据(分区、偏移量、Key等)

  • Acknowledgment:手动确认机制,确保消息处理成功后才提交偏移量 -2

  • 幂等性校验防止重复消费(见下文)

5.2 手动确认配置

需要配置 KafkaListenerContainerFactory 启用手动确认:

@Configuration
public class KafkaConsumerConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(3); // 并发消费线程数
        return factory;
    }
}

六、生产级特性实现

6.1 幂等性保证

Kafka 可能因网络问题导致消息重复消费,因此幂等性设计至关重要 -2

6.1.1 消息消费记录表
CREATE TABLE `message_consume_record` (
    `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
    `message_key` VARCHAR(64) NOT NULL UNIQUE,  -- 消息唯一键
    `topic` VARCHAR(64) NOT NULL,
    `partition` INT,
    `offset` BIGINT,
    `consumed_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.1.2 幂等校验服务
@Service
@RequiredArgsConstructor
public class MessageConsumeRecordService {
    
    private final MessageConsumeRecordMapper recordMapper;
    
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public boolean isProcessed(String messageKey) {
        // 尝试插入,利用数据库唯一键去重
        try {
            MessageConsumeRecord record = new MessageConsumeRecord();
            record.setMessageKey(messageKey);
            recordMapper.insert(record);
            return false; // 插入成功,表示未处理过
        } catch (DuplicateKeyException e) {
            return true; // 主键冲突,已处理过
        }
    }
    
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void recordProcessed(String messageKey) {
        // 实际在 isProcessed 中已经插入,此方法可留空或记录额外信息
    }
}

原理:利用数据库唯一键约束,尝试插入消息ID,成功则继续处理,失败则跳过 -2

6.2 重试机制

消费失败时,不应无限重试。设计有限重试 + 死信队列策略 -2

6.2.1 失败消息表
CREATE TABLE `failed_message` (
    `id` BIGINT AUTO_INCREMENT PRIMARY KEY,
    `message_key` VARCHAR(64) NOT NULL,
    `topic` VARCHAR(64) NOT NULL,
    `payload` TEXT NOT NULL,
    `error_message` TEXT,
    `retry_count` INT DEFAULT 0,
    `max_retries` INT DEFAULT 3,
    `status` VARCHAR(20) DEFAULT 'PENDING',
    `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.2.2 消费者重试逻辑
@KafkaListener(topics = ORDER_TOPIC, groupId = INVENTORY_GROUP)
public void consumeWithRetry(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
    try {
        // 业务处理
        processOrderEvent(record.value());
        ack.acknowledge();
    } catch (Exception e) {
        // 记录失败消息
        failedMessageService.save(record);
        
        // 根据重试次数决定是否提交ACK
        int retryCount = getRetryCount(record.key());
        if (retryCount >= 3) {
            // 超过最大重试,记录到死信并提交ACK(避免阻塞)
            deadLetterService.sendToDLQ(record);
            ack.acknowledge();
        } else {
            // 未超重试,不提交ACK,等待重新消费
            throw e;
        }
    }
}
6.2.3 定时重扫失败消息
@Component
@Slf4j
@RequiredArgsConstructor
public class FailedMessageRetryTask {
    
    private final FailedMessageService failedMessageService;
    
    @Scheduled(fixedDelay = 60000) // 每分钟执行一次
    public void retryFailedMessages() {
        List<FailedMessage> failedList = failedMessageService.findPendingRetry();
        for (FailedMessage msg : failedList) {
            try {
                // 重新发送到Kafka或直接调用处理逻辑
                kafkaTemplate.send(msg.getTopic(), msg.getMessageKey(), msg.getPayload());
                failedMessageService.markAsSuccess(msg.getId());
            } catch (Exception e) {
                failedMessageService.incrementRetryCount(msg.getId());
                log.error("重试失败: {}", msg.getMessageKey(), e);
            }
        }
    }
}

6.3 顺序消费保证

在电商场景中,同一订单的事件(创建、支付、取消)必须顺序处理。

解决方案

  1. 设置消息 Key 为订单ID,保证同一订单进入同一分区 -5

  2. 消费者设置 concurrency 为分区数,且每个分区单线程消费

  3. 业务处理时加分布式锁(如Redis),防止并发冲突

// 生产者:使用订单ID作为Key
kafkaTemplate.send(ORDER_TOPIC, String.valueOf(orderId), event);

// 消费者配置:分区内单线程
factory.setConcurrency(3); // 假设分区数为3
factory.getContainerProperties().setMissingTopicsFatal(false);

七、同步请求-响应模式(扩展)

某些场景需要同步等待处理结果,如支付回调。Kafka 原生异步,但可通过 ReplyingKafkaTemplate 实现请求-响应模式 -5

7.1 配置 ReplyingKafkaTemplate

@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate(
        ProducerFactory<String, Object> pf,
        ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
    
    ConcurrentMessageListenerContainer<String, Object> replyContainer = 
        factory.createContainer("payment-reply-topic");
    replyContainer.getContainerProperties().setGroupId("payment-client");
    
    ReplyingKafkaTemplate<String, Object, Object> template = 
        new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setDefaultReplyTimeout(Duration.ofSeconds(30));
    return template;
}

7.2 发送请求并等待响应

@Service
public class PaymentService {
    
    @Autowired
    private ReplyingKafkaTemplate<String, Object, Object> replyingTemplate;
    
    public PaymentResponse requestPayment(PaymentRequest request) {
        // 创建请求消息
        ProducerRecord<String, Object> record = new ProducerRecord<>(
            "payment-request-topic", request.getOrderId(), request);
        
        // 发送并等待响应
        RequestReplyFuture<String, Object, Object> future = 
            replyingTemplate.sendAndReceive(record);
        
        // 同步等待结果
        ConsumerRecord<String, Object> response = future.get(10, TimeUnit.SECONDS);
        return (PaymentResponse) response.value();
    }
}

八、监控与管理

8.1 添加 Actuator 端点

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,kafka
  metrics:
    export:
      kafka:
        enabled: true

8.2 自定义监控指标

@Component
public class KafkaMonitor {
    
    private final MeterRegistry meterRegistry;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    @EventListener(ApplicationReadyEvent.class)
    public void initMetrics() {
        // 记录发送消息总数
        Counter.builder("kafka.sent.messages")
            .description("Total sent messages")
            .register(meterRegistry);
    }
    
    public void recordSendSuccess(String topic) {
        meterRegistry.counter("kafka.sent.success", "topic", topic).increment();
    }
}

8.3 关键指标关注

  • 发送延迟kafka.producer.request-latency-avg

  • 消费延迟kafka.consumer.fetch-latency-avg

  • 消费落后量kafka.consumer.records-lag-max(最重要!)


九、完整测试流程

9.1 单元测试

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"order-events"})
class OrderEventTest {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Test
    void testSendAndReceive() throws InterruptedException {
        OrderEvent event = new OrderEvent("test-id", 1L, "NO001", 100L, 
            BigDecimal.TEN, "CREATED", LocalDateTime.now());
        
        // 发送消息
        kafkaTemplate.send("order-events", event.getOrderId().toString(), event);
        
        // 等待消费
        Thread.sleep(2000);
        
        // 验证消费结果(需配合测试用的 Consumer)
        // assertThat(...)
    }
}

9.2 集成测试

启动 Docker Kafka 和 Spring Boot 应用,使用 Postman 调用订单创建接口:

POST /api/orders
Content-Type: application/json

{
    "userId": 1001,
    "totalAmount": 299.00,
    "items": [{"productId": 1, "quantity": 2}]
}

观察日志:

INFO  OrderConsumer - 接收到订单事件: key=1, event=OrderEvent(orderId=1001, ...)
INFO  InventoryService - 扣减库存成功: orderId=1001

十、最佳实践总结

10.1 配置建议

环境 auto.offset.reset enable.auto.commit 重试次数
开发 earliest false 1
测试 earliest false 3
生产 latest false 3+死信

10.2 设计要点

  1. 消息体不宜过大:建议不超过 1MB,可使用压缩(snappy/gzip)

  2. 合理设置分区数:分区数 = 预期最大并发消费线程数

  3. 监控消费延迟:lag 持续增长需扩容消费者或分区

  4. 做好容错设计:网络抖动、服务重启等情况下的消息不丢失不重复

  5. 业务无关字段最小化:事件只包含必要字段,避免传递大对象

10.3 常见问题与解决方案

问题 解决方案
消息重复消费 幂等表 + 唯一键约束 -2
消息丢失 生产者设置 acks=all,消费者手动提交
消费顺序错乱 同一业务使用相同 Key,分区内单线程
消息积压 增加分区、增加消费者并发、优化处理逻辑
消费失败无限重试 有限重试

十一、总结

通过本章学习,你已掌握:

✅ Kafka 基础概念:Topic、Partition、Consumer Group、Offset
✅ Spring Boot 整合 Kafka:生产者、消费者完整实现
✅ 电商场景实战:订单异步处理、库存扣减、服务解耦
✅ 生产级特性:幂等性设计、重试机制、死信队列、顺序消费
✅ 监控与运维:指标采集、延迟监控、测试策略

Kafka 的世界远不止于此。你可以继续探索:

  • Kafka Streams:流式处理订单实时统计

  • Schema Registry:使用 Avro 管理消息 schema -3

  • Kafka Connect:连接外部数据源

下一章,我们将进入微服务架构,学习如何使用 Spring Cloud 将这些服务真正拆分为独立部署的微服务,敬请期待!


附:完整项目代码结构

src/main/java/com/example/mall/
├── config/               # Kafka 配置类
├── constant/             # 常量定义(Topic、Group)
├── consumer/             # 消息消费者
├── entity/               # 实体类(含 OrderEvent)
├── mapper/               # MyBatis Mapper
├── service/              # 业务服务(含 Kafka 生产者)
├── task/                 # 定时重试任务
└── util/                 # 工具类

参考资源

  • Spring Kafka 官方文档

  • 《Kafka 权威指南》

如有问题,欢迎在评论区交流!Happy Coding!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐