Spring Boot 整合 Kafka 项目实战:构建高可靠电商消息平台
本文介绍了如何使用SpringBoot整合Kafka解决电商系统中的高并发和系统解耦问题。主要内容包括: 技术选型:采用SpringBoot+SpringKafka+Kafka构建异步消息系统 环境搭建:通过Docker快速部署Kafka集群 核心实现: 订单服务作为生产者发送订单事件 库存服务作为消费者处理扣减库存 生产级特性: 幂等性设计防止重复消费 重试机制和死信队列处理失败消息 顺序消费保
目录
前言
在上一章中,我们使用 Spring Boot + MyBatis + MySQL 实现了一个基础的电商后端。然而,随着业务发展,系统面临两大挑战:
-
高并发压力:大促期间订单创建峰值可达每秒数千笔,直接操作数据库可能导致崩溃
-
系统解耦需求:订单创建后需要通知库存、物流、积分等多个服务,同步调用导致耦合严重
Kafka 正是解决这些问题的利器。作为业界公认的高吞吐分布式消息队列,Kafka 能在系统之间搭建异步通信桥梁,实现流量削峰填谷和服务解耦-1-6。
本章将以电商订单处理为核心场景,手把手带你完成 Spring Boot 整合 Kafka 的全过程,并深入讲解幂等性、重试机制等生产级必备特性。
一、项目概述与技术选型
1.1 电商场景设计
我们将实现以下核心流程:
┌─────────────┐ ┌──────────┐ ┌──────────────────┐
│ 用户下单 │ ──> │ 订单服务 │ ──> │ Kafka │
└─────────────┘ └──────────┘ └────────┬─────────┘
│
┌─────────────────────────┼─────────────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ 库存服务 │ │ 支付服务 │ │ 通知服务 │
│ (消费者组A) │ │ (消费者组A) │ │ (消费者组B) │
└────────────────┘ └────────────────┘ └────────────────┘
业务流程:
-
订单服务接收用户下单请求,将订单信息发送到 Kafka
-
库存服务消费订单消息,扣减库存
-
支付服务处理支付逻辑(可异步)
-
通知服务发送邮件/SMS 给用户
1.2 技术栈
| 组件 | 版本 | 说明 |
|---|---|---|
| Spring Boot | 2.7.x / 3.x | 基础框架 |
| Spring Kafka | 2.8+ / 3.0+ | Kafka 集成依赖 |
| Apache Kafka | 2.8+ | 消息队列 |
| MySQL | 5.7+ | 业务数据存储 |
| MyBatis | 2.2+ | 持久层框架(沿用上一章) |
| Docker | 可选 | 快速搭建 Kafka 环境 |
二、环境搭建
2.1 启动 Kafka(推荐 Docker 方式)
创建 docker-compose.yml:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
执行命令启动:
docker-compose up -d
2.2 创建 Spring Boot 项目
在上一章项目基础上,添加 Kafka 依赖:
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 可选:JSON 序列化支持 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2.3 配置文件
在 application.yml 中配置 Kafka 连接信息:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka 服务器地址
# 生产者配置
producer:
retries: 3 # 发送失败重试次数
acks: all # 消息确认机制(all 表示等待所有副本确认)
compression-type: snappy # 压缩方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者配置
consumer:
group-id: order-group # 消费者组ID(核心!)
auto-offset-reset: earliest # 无偏移量时从最早消息开始消费
enable-auto-commit: false # 关闭自动提交,由业务控制
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.mall.entity # 信任的包(反序列化用)
# 监听器配置
listener:
type: batch # 批量消费模式
concurrency: 3 # 并发线程数(建议 <= 分区数)
ack-mode: manual_immediate # 手动确认模式
配置说明:
-
group-id:消费者组标识,同一组内的消费者共同消费 Topic 消息(负载均衡)-1 -
auto-offset-reset:新消费者从哪里开始消费,earliest从头开始,latest从最新开始-1 -
enable-auto-commit: false:关闭自动提交,改为手动确认,确保消息可靠处理-2
三、订单消息实体定义
3.1 订单事件实体
package com.example.mall.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String eventId; // 事件ID(用于幂等)
private Long orderId; // 订单ID
private String orderNo; // 订单号
private Long userId; // 用户ID
private BigDecimal amount; // 金额
private String eventType; // 事件类型:CREATED, PAID, CANCELLED
private LocalDateTime eventTime; // 事件时间
}
3.2 常量定义
package com.example.mall.constant;
public interface KafkaTopics {
String ORDER_TOPIC = "order-events"; // 订单事件主题
String INVENTORY_TOPIC = "inventory-events"; // 库存事件主题
String NOTIFICATION_TOPIC = "notification-events"; // 通知主题
String ORDER_GROUP = "order-process-group"; // 订单处理消费者组
String INVENTORY_GROUP = "inventory-group"; // 库存服务组
String NOTIFICATION_GROUP = "notification-group"; // 通知服务组
}
四、消息生产者实现
4.1 订单服务发送消息
package com.example.mall.service;
import com.example.mall.entity.Order;
import com.example.mall.event.OrderEvent;
import com.example.mall.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import static com.example.mall.constant.KafkaTopics.ORDER_TOPIC;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
private final KafkaTemplate<String, Object> kafkaTemplate;
/**
* 创建订单并发送消息到Kafka
*/
@Transactional
public Order createOrder(Order order) {
// 1. 保存订单到数据库
orderMapper.insert(order);
// 2. 构建订单事件
OrderEvent event = new OrderEvent(
UUID.randomUUID().toString(),
order.getId(),
order.getOrderNo(),
order.getUserId(),
order.getTotalAmount(),
"CREATED",
LocalDateTime.now()
);
// 3. 发送消息到Kafka(异步)
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(ORDER_TOPIC, event.getOrderId().toString(), event);
// 4. 添加回调处理
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("订单消息发送成功: orderNo={}, offset={}",
order.getOrderNo(), result.getRecordMetadata().offset());
} else {
log.error("订单消息发送失败: orderNo={}", order.getOrderNo(), ex);
// TODO: 记录失败消息到本地表,便于后续补偿
}
});
return order;
}
}
关键点:
-
使用
KafkaTemplate发送消息,支持泛型(Key类型, Value类型)-1 -
异步发送并添加回调,便于监控和异常处理
-
消息 Key 使用订单ID,保证同一订单的消息发往同一分区(顺序消费)
4.2 生产者配置类(可选)
如果配置文件中已设置,Spring Boot 会自动配置 KafkaTemplate。如需自定义,可添加:
@Configuration
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
五、消息消费者实现
5.1 库存服务消费订单消息
package com.example.mall.consumer;
import com.example.mall.event.OrderEvent;
import com.example.mall.service.InventoryService;
import com.example.mall.service.MessageConsumeRecordService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import static com.example.mall.constant.KafkaTopics.INVENTORY_GROUP;
import static com.example.mall.constant.KafkaTopics.ORDER_TOPIC;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderConsumer {
private final InventoryService inventoryService;
private final MessageConsumeRecordService consumeRecordService;
/**
* 监听订单事件,处理库存扣减
*/
@KafkaListener(
topics = ORDER_TOPIC,
groupId = INVENTORY_GROUP,
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrderEvent(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
String messageKey = record.key();
OrderEvent event = record.value();
log.info("接收到订单事件: key={}, event={}, partition={}, offset={}",
messageKey, event, record.partition(), record.offset());
try {
// 1. 幂等性校验:检查是否已处理过该消息
if (consumeRecordService.isProcessed(messageKey)) {
log.info("消息已处理过,跳过: {}", messageKey);
ack.acknowledge();
return;
}
// 2. 根据事件类型处理业务
switch (event.getEventType()) {
case "CREATED":
inventoryService.deductStock(event.getOrderId(), event.getUserId());
break;
case "CANCELLED":
inventoryService.releaseStock(event.getOrderId());
break;
default:
log.warn("未知事件类型: {}", event.getEventType());
}
// 3. 记录已处理的消息ID
consumeRecordService.recordProcessed(messageKey);
// 4. 手动提交偏移量
ack.acknowledge();
} catch (Exception e) {
log.error("处理订单事件失败: key={}", messageKey, e);
// 这里不提交ACK,消息会重新消费(取决于重试策略)
throw new RuntimeException("消费失败,触发重试", e);
}
}
}
关键点:
-
@KafkaListener:指定监听的 Topic 和消费者组 -1 -
ConsumerRecord:获取消息完整元数据(分区、偏移量、Key等) -
Acknowledgment:手动确认机制,确保消息处理成功后才提交偏移量 -2 -
幂等性校验防止重复消费(见下文)
5.2 手动确认配置
需要配置 KafkaListenerContainerFactory 启用手动确认:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(3); // 并发消费线程数
return factory;
}
}
六、生产级特性实现
6.1 幂等性保证
Kafka 可能因网络问题导致消息重复消费,因此幂等性设计至关重要 -2。
6.1.1 消息消费记录表
CREATE TABLE `message_consume_record` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`message_key` VARCHAR(64) NOT NULL UNIQUE, -- 消息唯一键
`topic` VARCHAR(64) NOT NULL,
`partition` INT,
`offset` BIGINT,
`consumed_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.1.2 幂等校验服务
@Service
@RequiredArgsConstructor
public class MessageConsumeRecordService {
private final MessageConsumeRecordMapper recordMapper;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean isProcessed(String messageKey) {
// 尝试插入,利用数据库唯一键去重
try {
MessageConsumeRecord record = new MessageConsumeRecord();
record.setMessageKey(messageKey);
recordMapper.insert(record);
return false; // 插入成功,表示未处理过
} catch (DuplicateKeyException e) {
return true; // 主键冲突,已处理过
}
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void recordProcessed(String messageKey) {
// 实际在 isProcessed 中已经插入,此方法可留空或记录额外信息
}
}
原理:利用数据库唯一键约束,尝试插入消息ID,成功则继续处理,失败则跳过 -2。
6.2 重试机制
消费失败时,不应无限重试。设计有限重试 + 死信队列策略 -2。
6.2.1 失败消息表
CREATE TABLE `failed_message` (
`id` BIGINT AUTO_INCREMENT PRIMARY KEY,
`message_key` VARCHAR(64) NOT NULL,
`topic` VARCHAR(64) NOT NULL,
`payload` TEXT NOT NULL,
`error_message` TEXT,
`retry_count` INT DEFAULT 0,
`max_retries` INT DEFAULT 3,
`status` VARCHAR(20) DEFAULT 'PENDING',
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.2.2 消费者重试逻辑
@KafkaListener(topics = ORDER_TOPIC, groupId = INVENTORY_GROUP)
public void consumeWithRetry(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
try {
// 业务处理
processOrderEvent(record.value());
ack.acknowledge();
} catch (Exception e) {
// 记录失败消息
failedMessageService.save(record);
// 根据重试次数决定是否提交ACK
int retryCount = getRetryCount(record.key());
if (retryCount >= 3) {
// 超过最大重试,记录到死信并提交ACK(避免阻塞)
deadLetterService.sendToDLQ(record);
ack.acknowledge();
} else {
// 未超重试,不提交ACK,等待重新消费
throw e;
}
}
}
6.2.3 定时重扫失败消息
@Component
@Slf4j
@RequiredArgsConstructor
public class FailedMessageRetryTask {
private final FailedMessageService failedMessageService;
@Scheduled(fixedDelay = 60000) // 每分钟执行一次
public void retryFailedMessages() {
List<FailedMessage> failedList = failedMessageService.findPendingRetry();
for (FailedMessage msg : failedList) {
try {
// 重新发送到Kafka或直接调用处理逻辑
kafkaTemplate.send(msg.getTopic(), msg.getMessageKey(), msg.getPayload());
failedMessageService.markAsSuccess(msg.getId());
} catch (Exception e) {
failedMessageService.incrementRetryCount(msg.getId());
log.error("重试失败: {}", msg.getMessageKey(), e);
}
}
}
}
6.3 顺序消费保证
在电商场景中,同一订单的事件(创建、支付、取消)必须顺序处理。
解决方案:
-
设置消息 Key 为订单ID,保证同一订单进入同一分区 -5
-
消费者设置
concurrency为分区数,且每个分区单线程消费 -
业务处理时加分布式锁(如Redis),防止并发冲突
// 生产者:使用订单ID作为Key
kafkaTemplate.send(ORDER_TOPIC, String.valueOf(orderId), event);
// 消费者配置:分区内单线程
factory.setConcurrency(3); // 假设分区数为3
factory.getContainerProperties().setMissingTopicsFatal(false);
七、同步请求-响应模式(扩展)
某些场景需要同步等待处理结果,如支付回调。Kafka 原生异步,但可通过 ReplyingKafkaTemplate 实现请求-响应模式 -5。
7.1 配置 ReplyingKafkaTemplate
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate(
ProducerFactory<String, Object> pf,
ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
ConcurrentMessageListenerContainer<String, Object> replyContainer =
factory.createContainer("payment-reply-topic");
replyContainer.getContainerProperties().setGroupId("payment-client");
ReplyingKafkaTemplate<String, Object, Object> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
return template;
}
7.2 发送请求并等待响应
@Service
public class PaymentService {
@Autowired
private ReplyingKafkaTemplate<String, Object, Object> replyingTemplate;
public PaymentResponse requestPayment(PaymentRequest request) {
// 创建请求消息
ProducerRecord<String, Object> record = new ProducerRecord<>(
"payment-request-topic", request.getOrderId(), request);
// 发送并等待响应
RequestReplyFuture<String, Object, Object> future =
replyingTemplate.sendAndReceive(record);
// 同步等待结果
ConsumerRecord<String, Object> response = future.get(10, TimeUnit.SECONDS);
return (PaymentResponse) response.value();
}
}
八、监控与管理
8.1 添加 Actuator 端点
management:
endpoints:
web:
exposure:
include: health,info,metrics,kafka
metrics:
export:
kafka:
enabled: true
8.2 自定义监控指标
@Component
public class KafkaMonitor {
private final MeterRegistry meterRegistry;
private final KafkaTemplate<String, Object> kafkaTemplate;
@EventListener(ApplicationReadyEvent.class)
public void initMetrics() {
// 记录发送消息总数
Counter.builder("kafka.sent.messages")
.description("Total sent messages")
.register(meterRegistry);
}
public void recordSendSuccess(String topic) {
meterRegistry.counter("kafka.sent.success", "topic", topic).increment();
}
}
8.3 关键指标关注
-
发送延迟:
kafka.producer.request-latency-avg -
消费延迟:
kafka.consumer.fetch-latency-avg -
消费落后量:
kafka.consumer.records-lag-max(最重要!)
九、完整测试流程
9.1 单元测试
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"order-events"})
class OrderEventTest {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
void testSendAndReceive() throws InterruptedException {
OrderEvent event = new OrderEvent("test-id", 1L, "NO001", 100L,
BigDecimal.TEN, "CREATED", LocalDateTime.now());
// 发送消息
kafkaTemplate.send("order-events", event.getOrderId().toString(), event);
// 等待消费
Thread.sleep(2000);
// 验证消费结果(需配合测试用的 Consumer)
// assertThat(...)
}
}
9.2 集成测试
启动 Docker Kafka 和 Spring Boot 应用,使用 Postman 调用订单创建接口:
POST /api/orders
Content-Type: application/json
{
"userId": 1001,
"totalAmount": 299.00,
"items": [{"productId": 1, "quantity": 2}]
}
观察日志:
INFO OrderConsumer - 接收到订单事件: key=1, event=OrderEvent(orderId=1001, ...)
INFO InventoryService - 扣减库存成功: orderId=1001
十、最佳实践总结
10.1 配置建议
| 环境 | auto.offset.reset | enable.auto.commit | 重试次数 |
|---|---|---|---|
| 开发 | earliest | false | 1 |
| 测试 | earliest | false | 3 |
| 生产 | latest | false | 3+死信 |
10.2 设计要点
-
消息体不宜过大:建议不超过 1MB,可使用压缩(snappy/gzip)
-
合理设置分区数:分区数 = 预期最大并发消费线程数
-
监控消费延迟:lag 持续增长需扩容消费者或分区
-
做好容错设计:网络抖动、服务重启等情况下的消息不丢失不重复
-
业务无关字段最小化:事件只包含必要字段,避免传递大对象
10.3 常见问题与解决方案
| 问题 | 解决方案 |
|---|---|
| 消息重复消费 | 幂等表 + 唯一键约束 -2 |
| 消息丢失 | 生产者设置 acks=all,消费者手动提交 |
| 消费顺序错乱 | 同一业务使用相同 Key,分区内单线程 |
| 消息积压 | 增加分区、增加消费者并发、优化处理逻辑 |
| 消费失败无限重试 | 有限重试 |
十一、总结
通过本章学习,你已掌握:
✅ Kafka 基础概念:Topic、Partition、Consumer Group、Offset
✅ Spring Boot 整合 Kafka:生产者、消费者完整实现
✅ 电商场景实战:订单异步处理、库存扣减、服务解耦
✅ 生产级特性:幂等性设计、重试机制、死信队列、顺序消费
✅ 监控与运维:指标采集、延迟监控、测试策略
Kafka 的世界远不止于此。你可以继续探索:
-
Kafka Streams:流式处理订单实时统计
-
Schema Registry:使用 Avro 管理消息 schema -3
-
Kafka Connect:连接外部数据源
下一章,我们将进入微服务架构,学习如何使用 Spring Cloud 将这些服务真正拆分为独立部署的微服务,敬请期待!
附:完整项目代码结构
src/main/java/com/example/mall/
├── config/ # Kafka 配置类
├── constant/ # 常量定义(Topic、Group)
├── consumer/ # 消息消费者
├── entity/ # 实体类(含 OrderEvent)
├── mapper/ # MyBatis Mapper
├── service/ # 业务服务(含 Kafka 生产者)
├── task/ # 定时重试任务
└── util/ # 工具类
参考资源:
-
Spring Kafka 官方文档
-
《Kafka 权威指南》
如有问题,欢迎在评论区交流!Happy Coding!
更多推荐
所有评论(0)