RabbitMQ消息队列与Elasticsearch全文检索之下篇
本文介绍了Elasticsearch搜索引擎的实战应用,包含三个核心功能:1)多租户搜索架构,通过独立索引实现数据隔离;2)搜索词高亮实现,配置前后标签和片段参数;3)搜索结果缓存策略,使用Redis缓存热门查询结果。代码示例展示了如何实现索引隔离、高亮字段处理和缓存键构建,优化搜索性能和用户体验。
·
三十三、Elasticsearch 搜索引擎完整实战
33.1 多租户搜索架构
// 多租户索引隔离策略
@Service
public class MultiTenantSearchService {
@Autowired
private ElasticsearchOperations elasticsearchOperations;
/**
* 获取租户专属索引名
* 每个租户有独立的索引,数据完全隔离
*/
private String getIndexName(String tenantId) {
return "product_" + tenantId;
}
/**
* 租户商品搜索
*/
public SearchHits<ProductDocument> search(String tenantId,
ProductSearchRequest request) {
String indexName = getIndexName(tenantId);
NativeQuery query = NativeQuery.builder()
.withQuery(buildQuery(request))
.withPageable(PageRequest.of(request.getPage() - 1, request.getSize()))
.build();
// 动态指定索引
return elasticsearchOperations.search(
query,
ProductDocument.class,
IndexCoordinates.of(indexName)
);
}
private Query buildQuery(ProductSearchRequest request) {
BoolQuery.Builder bool = new BoolQuery.Builder();
if (StringUtils.hasText(request.getKeyword())) {
bool.must(m -> m.match(mm -> mm
.field("name").query(request.getKeyword())));
}
bool.filter(f -> f.term(t -> t.field("onSale").value(true)));
return Query.of(q -> q.bool(bool.build()));
}
}
33.2 搜索词高亮完整实现
@Service
public class HighlightSearchService {
@Autowired
private ElasticsearchOperations elasticsearchOperations;
public List<ProductVO> searchWithHighlight(String keyword) {
// 配置高亮
HighlightField nameHighlight = new HighlightField("name");
HighlightField descHighlight = new HighlightField("description");
Highlight highlight = new Highlight(
HighlightParameters.builder()
.withPreTags("<em class='highlight'>") // 高亮前缀标签
.withPostTags("</em>") // 高亮后缀标签
.withNumberOfFragments(3) // 最多返回3个片段
.withFragmentSize(150) // 每个片段150字符
.build(),
List.of(nameHighlight, descHighlight)
);
NativeQuery query = NativeQuery.builder()
.withQuery(q -> q.multiMatch(mm -> mm
.query(keyword)
.fields("name^3", "description^1")
))
.withHighlightQuery(new HighlightQuery(highlight, ProductDocument.class))
.withPageable(PageRequest.of(0, 20))
.build();
SearchHits<ProductDocument> hits =
elasticsearchOperations.search(query, ProductDocument.class);
return hits.getSearchHits().stream().map(hit -> {
ProductVO vo = new ProductVO();
BeanUtils.copyProperties(hit.getContent(), vo);
// 替换高亮字段
Map<String, List<String>> hlFields = hit.getHighlightFields();
if (hlFields.containsKey("name") && !hlFields.get("name").isEmpty()) {
vo.setName(hlFields.get("name").get(0));
}
if (hlFields.containsKey("description") && !hlFields.get("description").isEmpty()) {
// 拼接多个片段
vo.setDescription(String.join("...", hlFields.get("description")));
}
return vo;
}).collect(Collectors.toList());
}
}
33.3 搜索结果缓存策略
@Service
@Slf4j
public class CachedSearchService {
@Autowired
private ProductSearchService searchService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final long CACHE_TTL = 300; // 5分钟缓存
/**
* 带缓存的搜索
* 热门搜索词结果缓存,减少ES压力
*/
public ProductSearchResponse searchWithCache(ProductSearchRequest request) {
// 构建缓存key
String cacheKey = buildCacheKey(request);
// 先查缓存
String cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
log.debug("命中搜索缓存, key={}", cacheKey);
return JSON.parseObject(cached, ProductSearchResponse.class);
}
// 查ES
ProductSearchResponse response = searchService.search(request);
// 写入缓存(只缓存第一页)
if (request.getPage() == 1) {
redisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(response),
CACHE_TTL,
TimeUnit.SECONDS
);
}
return response;
}
private String buildCacheKey(ProductSearchRequest request) {
return String.format("search:product:%s:%s:%.0f:%.0f:%d",
request.getKeyword() != null ? request.getKeyword() : "",
request.getBrand() != null ? request.getBrand() : "",
request.getMinPrice() != null ? request.getMinPrice() : 0.0,
request.getMaxPrice() != null ? request.getMaxPrice() : 0.0,
request.getPage()
);
}
/**
* 商品数据变更时清除相关缓存
*/
public void clearCache(String keyword) {
String pattern = "search:product:" + keyword + "*";
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
redisTemplate.delete(keys);
log.info("清除搜索缓存, pattern={}, count={}", pattern, keys.size());
}
}
}
三十四、RabbitMQ 消息顺序性保证
34.1 顺序消费问题
问题场景:
订单状态变更:创建 → 支付 → 发货 → 完成
如果消息乱序消费:
消费者1处理"发货"消息
消费者2处理"支付"消息
→ 状态变成"发货"后又变回"支付",数据错误!
34.2 解决方案
// 方案1:单消费者(性能差,不推荐)
// 只启动一个消费者实例,天然保证顺序
// 方案2:消息携带版本号,乐观锁更新
@Component
@Slf4j
public class OrderStatusConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitListener(queues = "queue.order.status")
public void handleStatusChange(OrderStatusMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
try {
// 使用版本号乐观锁,只有版本号匹配才更新
int updated = orderMapper.updateStatusWithVersion(
message.getOrderId(),
message.getNewStatus(),
message.getVersion(), // 期望的当前版本
message.getVersion() + 1 // 更新后的版本
);
if (updated == 0) {
// 版本不匹配,说明已被更新,忽略此消息
log.warn("订单状态版本不匹配,忽略消息, orderId={}, version={}",
message.getOrderId(), message.getVersion());
}
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
}
-- 乐观锁更新SQL
UPDATE `order`
SET status = #{newStatus}, version = #{newVersion}
WHERE id = #{orderId} AND version = #{expectedVersion}
34.3 基于 Hash 路由保证同一订单顺序
@Service
public class OrderedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 同一订单的消息路由到同一队列
* 使用订单ID的hash值决定路由键
*/
public void sendOrderMessage(Long orderId, Object message) {
// 根据orderId计算路由到哪个队列(共4个队列)
int queueIndex = (int) (orderId % 4);
String routingKey = "order.queue." + queueIndex;
rabbitTemplate.convertAndSend("exchange.order.ordered", routingKey, message);
log.info("有序消息发送, orderId={}, queue={}", orderId, queueIndex);
}
}
三十五、Elasticsearch 聚合进阶
35.1 桶聚合(Bucket Aggregation)
// 日期直方图聚合(按天统计订单数)
NativeQuery query = NativeQuery.builder()
.withQuery(q -> q.matchAll(m -> m))
.withAggregation("dailyOrders", Aggregation.of(a -> a
.dateHistogram(dh -> dh
.field("createTime")
.calendarInterval(CalendarInterval.Day)
.format("yyyy-MM-dd")
.minDocCount(0) // 没有数据的日期也显示0
.extendedBounds(eb -> eb
.min(FieldDateMath.of(f -> f.value((double) LocalDate.now()
.minusDays(30).toEpochDay() * 86400000)))
.max(FieldDateMath.of(f -> f.value((double) LocalDate.now()
.toEpochDay() * 86400000)))
)
)
))
.withPageable(PageRequest.of(0, 0))
.build();
35.2 管道聚合(Pipeline Aggregation)
// 先按品牌分组,再计算每个品牌的平均价格,最后找出平均价格最高的品牌
NativeQuery query = NativeQuery.builder()
.withQuery(q -> q.matchAll(m -> m))
.withAggregation("brandSales", Aggregation.of(a -> a
.terms(t -> t.field("brand").size(20))
// 子聚合:计算每个品牌的平均价格
.aggregations("avgPrice", Aggregation.of(sub -> sub
.avg(avg -> avg.field("price"))
))
))
// 管道聚合:找出平均价格最高的品牌
.withAggregation("maxAvgPrice", Aggregation.of(a -> a
.maxBucket(mb -> mb.bucketsPath(bp -> bp.single("brandSales>avgPrice")))
))
.withPageable(PageRequest.of(0, 0))
.build();
35.3 指标聚合(Metric Aggregation)
// 综合统计:最大值、最小值、平均值、总和、数量
NativeQuery query = NativeQuery.builder()
.withQuery(q -> q.term(t -> t.field("brand").value("华为")))
.withAggregation("priceStats", Aggregation.of(a -> a
.extendedStats(es -> es.field("price"))
))
.withAggregation("percentiles", Aggregation.of(a -> a
.percentiles(p -> p
.field("price")
.percents(25.0, 50.0, 75.0, 95.0, 99.0)
)
))
.withPageable(PageRequest.of(0, 0))
.build();
三十六、完整项目:商品搜索微服务
36.1 项目依赖(pom.xml)
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3</version>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- FastJSON -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.40</version>
</dependency>
</dependencies>
36.2 完整配置文件
server:
port: 8082
spring:
application:
name: search-service
datasource:
url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=utf8
username: root
password: root123
driver-class-name: com.mysql.cj.jdbc.Driver
redis:
host: localhost
port: 6379
password: redis123
database: 1
lettuce:
pool:
max-active: 20
max-idle: 10
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /shop
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10
elasticsearch:
uris: http://localhost:9200
connection-timeout: 5s
socket-timeout: 30s
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
configuration:
map-underscore-to-camel-case: true
global-config:
db-config:
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0
logging:
level:
com.example.search: debug
org.springframework.data.elasticsearch: info
36.3 搜索服务启动类
@SpringBootApplication
@EnableRabbit
@MapperScan("com.example.search.mapper")
public class SearchServiceApplication {
public static void main(String[] args) {
SpringApplication.run(SearchServiceApplication.class, args);
}
}
36.4 全局异常处理
@RestControllerAdvice
@Slf4j
public class GlobalExceptionHandler {
@ExceptionHandler(ElasticsearchException.class)
public Result<Void> handleESException(ElasticsearchException e) {
log.error("Elasticsearch异常", e);
return Result.fail(500, "搜索服务异常,请稍后重试");
}
@ExceptionHandler(AmqpException.class)
public Result<Void> handleMQException(AmqpException e) {
log.error("RabbitMQ异常", e);
return Result.fail(500, "消息服务异常");
}
@ExceptionHandler(Exception.class)
public Result<Void> handleException(Exception e) {
log.error("系统异常", e);
return Result.fail(500, "系统内部错误");
}
}
36.5 统一返回结果
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result<T> {
private Integer code;
private String message;
private T data;
public static <T> Result<T> success(T data) {
return new Result<>(200, "success", data);
}
public static <T> Result<T> success() {
return new Result<>(200, "success", null);
}
public static <T> Result<T> fail(Integer code, String message) {
return new Result<>(code, message, null);
}
public boolean isSuccess() {
return this.code != null && this.code == 200;
}
}
三十七、性能测试与压测
37.1 RabbitMQ 性能测试
# 使用 rabbitmq-perf-test 工具
# 生产者:100个并发,每秒1000条消息
java -jar perf-test.jar \
--uri amqp://admin:admin123@localhost:5672/%2Fshop \
--producers 100 \
--consumers 50 \
--rate 1000 \
--size 1024 \
--queue test.perf \
--time 60
# 输出示例:
# id: test-1, sending rate avg: 1000/s
# id: test-1, receiving rate avg: 950/s
# id: test-1, consumer latency min/median/75th/95th/99th 1/5/8/15/25 ms
37.2 Elasticsearch 性能测试
# 使用 esrally 工具
pip install esrally
# 运行基准测试
esrally race \
--track=geonames \
--target-hosts=localhost:9200 \
--pipeline=benchmark-only
# 自定义测试
esrally race \
--track-path=/path/to/custom-track \
--target-hosts=localhost:9200
37.3 JMeter 压测搜索接口
<!-- JMeter 测试计划配置 -->
<TestPlan>
<ThreadGroup>
<numThreads>100</numThreads> <!-- 100并发用户 -->
<rampTime>10</rampTime> <!-- 10秒内启动所有线程 -->
<duration>60</duration> <!-- 持续60秒 -->
<HTTPSamplerProxy>
<domain>localhost</domain>
<port>8082</port>
<path>/api/search/products</path>
<method>GET</method>
<Arguments>
<keyword>手机</keyword>
<page>1</page>
<size>20</size>
</Arguments>
</HTTPSamplerProxy>
</ThreadGroup>
</TestPlan>
三十八、生产环境部署方案
38.1 RabbitMQ 生产配置
# /etc/rabbitmq/rabbitmq.conf
# 内存水位线(超过40%开始限流)
vm_memory_high_watermark.relative = 0.4
# 磁盘空间水位线(低于2GB开始限流)
disk_free_limit.absolute = 2GB
# 最大连接数
tcp_listen_options.backlog = 4096
# 心跳间隔
heartbeat = 60
# 消息持久化刷盘策略
queue_index_embed_msgs_below = 4096
# Spring Boot 生产配置
spring:
rabbitmq:
host: rabbitmq-cluster.internal
port: 5672
username: ${RABBITMQ_USER}
password: ${RABBITMQ_PASS}
virtual-host: /prod
connection-timeout: 5s
requested-heartbeat: 60s
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 20
concurrency: 5
max-concurrency: 20
retry:
enabled: true
max-attempts: 3
initial-interval: 2000ms
multiplier: 2.0
max-interval: 30000ms
38.2 Elasticsearch 生产配置
# elasticsearch.yml
cluster.name: prod-es-cluster
node.name: es-node-1
# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# 集群发现
discovery.seed_hosts: ["es-node-1", "es-node-2", "es-node-3"]
cluster.initial_master_nodes: ["es-node-1", "es-node-2", "es-node-3"]
# 内存配置(禁止swap)
bootstrap.memory_lock: true
# 索引配置
action.destructive_requires_name: true # 防止误删索引
# 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
# JVM配置(jvm.options)
# 堆内存设置为物理内存的50%,不超过32GB
-Xms8g
-Xmx8g
# GC配置(ES 8.x默认使用G1GC)
-XX:+UseG1GC
-XX:G1HeapRegionSize=4m
-XX:InitiatingHeapOccupancyPercent=30
38.3 监控告警配置
# Prometheus + Grafana 监控
# RabbitMQ Prometheus插件
rabbitmq-plugins enable rabbitmq_prometheus
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['rabbitmq:15692']
- job_name: 'elasticsearch'
static_configs:
- targets: ['elasticsearch-exporter:9114']
// Spring Boot Actuator 暴露监控端点
@Configuration
public class ActuatorConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config()
.commonTags("application", "search-service")
.commonTags("env", "prod");
}
}
# application.yml 监控配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
三十九、大厂面试真题解析
39.1 字节跳动面试题
Q1:RabbitMQ 如何实现延迟消息?有哪些方案?各有什么优缺点?
方案1:TTL + 死信队列
原理:消息设置TTL,过期后转入死信队列,消费者监听死信队列
优点:不需要额外插件,原生支持
缺点:
- 每种延迟时间需要一个独立的TTL队列
- 队列头部消息未过期会阻塞后面的消息(即使后面的消息已过期)
- 不够灵活,无法动态设置延迟时间
方案2:rabbitmq_delayed_message_exchange 插件
原理:插件在Exchange层面实现延迟,消息存储在Mnesia数据库中
优点:
- 可以动态设置任意延迟时间
- 不会有消息阻塞问题
缺点:
- 需要安装插件
- 延迟消息存储在内存中,节点重启可能丢失(需配置持久化)
- 不适合大量延迟消息(内存压力)
方案3:Redis ZSet + 定时扫描
原理:消息存入Redis ZSet,score为执行时间戳,定时任务扫描到期消息
优点:灵活,支持任意延迟时间,可以修改/取消延迟消息
缺点:定时任务有延迟,精度不高(通常1秒级别)
生产推荐:
- 延迟时间固定(如30分钟)→ TTL + 死信队列
- 延迟时间动态 → 插件方案
- 需要修改/取消延迟 → Redis ZSet方案
Q2:ES 的 refresh 和 flush 有什么区别?
refresh(刷新):
- 将内存中的数据写入文件系统缓存(OS Cache)
- 写入后数据可以被搜索到(近实时)
- 默认每1秒执行一次
- 不保证数据持久化(断电可能丢失)
flush(冲刷):
- 将文件系统缓存中的数据写入磁盘
- 同时清空translog(事务日志)
- 保证数据持久化
- 默认每30分钟或translog达到512MB时执行
translog(事务日志):
- 每次写操作都会记录到translog
- 节点重启时通过translog恢复未flush的数据
- 保证数据不丢失
总结:
refresh → 数据可搜索(近实时)
flush → 数据持久化(防止丢失)
39.2 阿里巴巴面试题
Q3:如何设计一个高可用的消息系统?
高可用设计要点:
1. Broker高可用
- RabbitMQ:仲裁队列(Quorum Queue)+ 3节点集群
- 使用HAProxy做负载均衡
- 配置镜像策略,数据多副本
2. 生产者高可用
- 开启生产者确认(publisher-confirm)
- 消息持久化到本地数据库
- 定时任务补偿发送失败的消息
- 使用本地消息表保证事务一致性
3. 消费者高可用
- 多实例部署,自动故障转移
- 手动ACK,处理失败重新入队
- 死信队列兜底,人工处理失败消息
- 幂等性设计,防止重复消费
4. 监控告警
- 监控队列积压数量
- 监控消费者数量
- 监控消息延迟
- 异常时自动告警(钉钉/企微)
Q4:ES 集群脑裂问题如何解决?
脑裂(Split Brain):
网络分区导致集群分裂为两个独立集群,各自选出Master节点,
数据写入不同节点,网络恢复后数据冲突。
解决方案:
1. 设置 minimum_master_nodes(ES 7.x之前)
discovery.zen.minimum_master_nodes = (节点数/2) + 1
3节点集群设置为2,防止少数节点独立选主
2. ES 7.x+ 使用 cluster.initial_master_nodes
自动处理脑裂问题,基于Raft协议
3. 奇数节点部署
3节点或5节点,避免平票情况
4. 网络隔离
Master节点和Data节点分离
使用专用网络连接集群节点
39.3 美团面试题
Q5:如何保证 RabbitMQ 消息的顺序消费?
问题根源:
- 多个消费者并发消费同一队列
- 消费者处理速度不同,导致乱序
解决方案:
方案1:单消费者
- 只启动一个消费者实例
- 缺点:吞吐量低,无法水平扩展
方案2:消息分区(推荐)
- 同一业务ID(如orderId)的消息路由到同一队列
- 每个队列只有一个消费者
- 不同队列并行消费,提高吞吐量
实现:
// 4个队列,同一orderId的消息路由到同一队列
int queueIndex = (int)(orderId % 4);
String routingKey = "order.queue." + queueIndex;
方案3:消息版本号
- 消息携带版本号
- 消费者使用乐观锁更新,版本不匹配则忽略
- 适合允许少量乱序的场景
方案4:消息排序后消费
- 消费者收到消息后不立即处理
- 先存入本地有序队列(按序号排序)
- 按顺序处理
- 缺点:实现复杂,有延迟
Q6:ES 如何处理大量数据的聚合查询性能问题?
优化策略:
1. 使用 filter 代替 query
filter 有缓存,不计算相关性评分,性能更好
2. 减少聚合桶数量
terms聚合的size不要设置太大
使用 min_doc_count 过滤低频数据
3. 使用 execution_hint: map
对于高基数字段(如userId),使用map模式
terms聚合默认使用global_ordinals,高基数时性能差
4. 预计算(Rollup)
对历史数据进行预聚合,存储聚合结果
查询时直接读取预计算结果
5. 分片路由
使用routing将相关数据路由到同一分片
聚合时只查询相关分片
6. 增加副本数
读请求可以分发到副本,提高并发能力
7. 使用 search_after 代替 from+size
深度分页时性能更好
四十、综合实战:秒杀系统消息架构
40.1 秒杀系统架构图
40.2 秒杀核心代码
@Service
@Slf4j
public class SeckillService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String STOCK_KEY = "seckill:stock:";
private static final String USER_BUY_KEY = "seckill:user:buy:";
/**
* 秒杀接口(高并发入口)
*/
public SeckillResult seckill(Long userId, Long productId) {
// 1. 检查用户是否已购买(防止重复购买)
String userBuyKey = USER_BUY_KEY + productId + ":" + userId;
Boolean alreadyBought = redisTemplate.hasKey(userBuyKey);
if (Boolean.TRUE.equals(alreadyBought)) {
return SeckillResult.fail("您已参与过该商品的秒杀");
}
// 2. Redis原子操作预减库存
String stockKey = STOCK_KEY + productId;
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,恢复库存
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("商品已售罄");
}
// 3. 标记用户已购买
redisTemplate.opsForValue().set(userBuyKey, "1", 24, TimeUnit.HOURS);
// 4. 发送MQ消息,异步创建订单
SeckillMessage message = SeckillMessage.builder()
.userId(userId)
.productId(productId)
.messageId(UUID.randomUUID().toString())
.createTime(LocalDateTime.now())
.build();
rabbitTemplate.convertAndSend("exchange.seckill", "seckill.order", message);
log.info("秒杀成功,等待创建订单, userId={}, productId={}", userId, productId);
return SeckillResult.success("秒杀成功,正在处理订单");
}
}
// 秒杀订单消费者
@Component
@Slf4j
public class SeckillOrderConsumer {
@Autowired
private OrderService orderService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@RabbitListener(queues = "queue.seckill.order", concurrency = "5")
public void handleSeckillOrder(SeckillMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
log.info("处理秒杀订单, userId={}, productId={}",
message.getUserId(), message.getProductId());
// 幂等性检查
String idempotentKey = "seckill:order:created:" + message.getMessageId();
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(idempotentKey, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isNew)) {
log.warn("重复秒杀消息,忽略, messageId={}", message.getMessageId());
channel.basicAck(tag, false);
return;
}
try {
// 创建秒杀订单
Order order = orderService.createSeckillOrder(
message.getUserId(), message.getProductId()
);
log.info("秒杀订单创建成功, orderId={}", order.getId());
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("秒杀订单创建失败", e);
// 回滚Redis库存
redisTemplate.opsForValue().increment(
"seckill:stock:" + message.getProductId()
);
// 删除用户购买标记
redisTemplate.delete(
"seckill:user:buy:" + message.getProductId() + ":" + message.getUserId()
);
// 删除幂等标记
redisTemplate.delete(idempotentKey);
channel.basicNack(tag, false, false); // 不重新入队,进死信队列
}
}
}
40.3 秒杀商品预热
@Service
@Slf4j
public class SeckillInitService {
@Autowired
private ProductMapper productMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 秒杀开始前预热:将库存加载到Redis
*/
@Scheduled(cron = "0 55 9 * * ?") // 每天9:55预热10点秒杀
public void preheatSeckillStock() {
log.info("开始预热秒杀库存");
List<SeckillProduct> products = productMapper.findTodaySeckillProducts();
for (SeckillProduct product : products) {
String stockKey = "seckill:stock:" + product.getId();
// 设置库存,过期时间为秒杀结束时间
redisTemplate.opsForValue().set(
stockKey,
product.getSeckillStock(),
2, TimeUnit.HOURS
);
log.info("库存预热完成, productId={}, stock={}",
product.getId(), product.getSeckillStock());
}
}
}
四十一、知识总结与面试备战
41.1 RabbitMQ 核心知识点总结
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ 核心知识点 │
├─────────────────────────────────────────────────────────────────────┤
│ 基础概念 │
│ Exchange类型:Direct | Topic | Fanout | Headers │
│ 消息流转:Producer → Exchange → Queue → Consumer │
│ VirtualHost:逻辑隔离,类似数据库的schema │
├─────────────────────────────────────────────────────────────────────┤
│ 消息可靠性(三层保障) │
│ 生产者:publisher-confirm + publisher-returns │
│ Broker:队列持久化 + 消息持久化 + 镜像/仲裁队列 │
│ 消费者:手动ACK + 重试机制 + 死信队列 │
├─────────────────────────────────────────────────────────────────────┤
│ 高级特性 │
│ 死信队列:消息拒绝/TTL过期/队列满 → 死信交换机 → 死信队列 │
│ 延迟队列:TTL+死信 或 rabbitmq_delayed_message_exchange插件 │
│ 优先级队列:x-max-priority + setPriority │
│ 消息幂等:Redis SETNX / 数据库唯一索引 │
├─────────────────────────────────────────────────────────────────────┤
│ 高可用 │
│ 集群模式:普通集群 → 镜像队列 → 仲裁队列(推荐) │
│ 客户端:addresses配置多节点,自动故障转移 │
└─────────────────────────────────────────────────────────────────────┘
41.2 Elasticsearch 核心知识点总结
┌─────────────────────────────────────────────────────────────────────┐
│ Elasticsearch 核心知识点 │
├─────────────────────────────────────────────────────────────────────┤
│ 核心原理 │
│ 倒排索引:词项字典(FST) + 倒排列表(Posting List) │
│ 分词器:Standard(英文) | IK(中文) | 自定义 │
│ 相关性评分:BM25算法(词频TF + 逆文档频率IDF) │
│ 近实时:refresh(1s) → 文件系统缓存 → 可搜索 │
├─────────────────────────────────────────────────────────────────────┤
│ Mapping 设计 │
│ text:全文检索,会分词(name, description) │
│ keyword:精确匹配,不分词(brand, status, tags) │
│ nested:嵌套对象,精确查询数组中的对象组合 │
│ geo_point:地理位置,支持距离查询和排序 │
├─────────────────────────────────────────────────────────────────────┤
│ 查询 DSL │
│ 全文:match | multi_match | match_phrase │
│ 精确:term | terms | range | exists │
│ 组合:bool(must/should/filter/must_not) │
│ filter vs query:filter有缓存不计分,性能更好 │
├─────────────────────────────────────────────────────────────────────┤
│ 性能优化 │
│ 写入:批量写入 + 增大refresh_interval + 关闭副本 │
│ 查询:filter代替query + search_after分页 + 路由 │
│ 集群:合理分片数 + 副本数 + 节点角色分离 │
└─────────────────────────────────────────────────────────────────────┘
41.3 高频面试题速查
RabbitMQ 高频面试题:
Q: 如何保证消息不丢失?
A: 生产者确认 + 持久化 + 手动ACK + 死信队列
Q: 如何保证消息不重复消费?
A: 幂等性(Redis SETNX / 数据库唯一索引)
Q: 如何处理消息积压?
A: 扩容消费者 + 批量消费 + 优化消费逻辑
Q: 延迟队列如何实现?
A: TTL+死信 或 rabbitmq_delayed_message_exchange插件
Q: RabbitMQ vs Kafka?
A: RabbitMQ适合业务消息(延迟、路由灵活)
Kafka适合大数据流处理(高吞吐、消息回溯)
Elasticsearch 高频面试题:
Q: 倒排索引原理?
A: 词项字典(FST) + 倒排列表,词项→文档ID列表
Q: ES写入流程?
A: 内存Buffer → translog → refresh(1s) → Segment → flush → 磁盘
Q: ES查询流程?
A: Query Phase(各分片查ID+评分) → Fetch Phase(获取完整文档)
Q: 如何实现近实时搜索?
A: refresh操作(默认1s),将内存数据写入文件系统缓存
Q: 深度分页问题如何解决?
A: 使用search_after代替from+size
Q: ES如何优化查询性能?
A: filter代替query + search_after + 合理Mapping + 路由
四十二、附录:常用命令速查
42.1 RabbitMQ 管理命令
# 查看队列列表
rabbitmqctl list_queues name messages consumers
# 查看交换机列表
rabbitmqctl list_exchanges name type
# 查看绑定关系
rabbitmqctl list_bindings
# 清空队列
rabbitmqctl purge_queue queue_name
# 查看连接
rabbitmqctl list_connections
# 查看消费者
rabbitmqctl list_consumers
# 集群管理
rabbitmqctl cluster_status
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl forget_cluster_node rabbit@node2
# 用户管理
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
42.2 Elasticsearch REST API 速查
# 集群信息
GET /_cluster/health
GET /_cat/nodes?v
GET /_cat/indices?v&s=index
# 索引操作
PUT /my_index
DELETE /my_index
GET /my_index/_mapping
GET /my_index/_settings
# 文档操作
POST /my_index/_doc
GET /my_index/_doc/{id}
PUT /my_index/_doc/{id}
DELETE /my_index/_doc/{id}
# 搜索
GET /my_index/_search
POST /my_index/_search
# 批量操作
POST /_bulk
# 重建索引
POST /_reindex
# 别名管理
POST /_aliases
GET /_aliases
# 分析测试
GET /my_index/_analyze
{
"analyzer": "ik_max_word",
"text": "华为手机很好用"
}
42.3 Docker 快速启动
# 启动 RabbitMQ(带管理界面)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3.12-management
# 安装延迟队列插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 启动 Elasticsearch
docker run -d \
--name elasticsearch \
-p 9200:9200 \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-e "xpack.security.enabled=false" \
elasticsearch:8.8.0
# 启动 Kibana
docker run -d \
--name kibana \
-p 5601:5601 \
-e "ELASTICSEARCH_HOSTS=http://elasticsearch:9200" \
--link elasticsearch \
kibana:8.8.0
# 安装 IK 分词器
docker exec elasticsearch \
bin/elasticsearch-plugin install \
https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.8.0/elasticsearch-analysis-ik-8.8.0.zip
docker restart elasticsearch
四十三、RabbitMQ 消息格式与序列化
43.1 消息格式选择
常见消息格式对比:
| 格式 | 优点 | 缺点 | 适用场景 |
|---------|-------------------------|-----------------------|------------------|
| JSON | 可读性好,跨语言 | 体积较大 | 业务消息(推荐) |
| Protobuf| 体积小,性能好 | 需要定义schema | 高性能场景 |
| Avro | 支持schema演进 | 需要schema注册中心 | 大数据场景 |
| JDK序列化| 简单 | 体积大,不跨语言 | 不推荐 |
43.2 自定义消息转换器
@Configuration
public class MessageConverterConfig {
/**
* 使用 Jackson2JsonMessageConverter
* 支持 Java 8 时间类型(LocalDateTime等)
*/
@Bean
public MessageConverter jsonMessageConverter() {
ObjectMapper objectMapper = new ObjectMapper();
// 支持 Java 8 时间类型
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
// 忽略未知字段(兼容性)
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Jackson2JsonMessageConverter(objectMapper);
}
}
43.3 消息版本兼容
// 消息基类(包含版本信息)
@Data
public abstract class BaseMessage {
private String messageId = UUID.randomUUID().toString();
private String version = "1.0";
private LocalDateTime sendTime = LocalDateTime.now();
private String messageType;
}
// 订单消息 v1.0
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderMessageV1 extends BaseMessage {
private Long orderId;
private Long userId;
private BigDecimal amount;
public OrderMessageV1() {
setMessageType("ORDER_CREATE");
setVersion("1.0");
}
}
// 订单消息 v2.0(新增字段)
@Data
@EqualsAndHashCode(callSuper = true)
public class OrderMessageV2 extends BaseMessage {
private Long orderId;
private Long userId;
private BigDecimal amount;
private String couponCode; // 新增字段
private List<Long> productIds; // 新增字段
public OrderMessageV2() {
setMessageType("ORDER_CREATE");
setVersion("2.0");
}
}
// 消费者兼容处理
@Component
public class VersionCompatibleConsumer {
@RabbitListener(queues = "queue.order")
public void consume(String messageJson) {
// 先解析版本号
JSONObject json = JSON.parseObject(messageJson);
String version = json.getString("version");
if ("1.0".equals(version)) {
OrderMessageV1 msg = JSON.parseObject(messageJson, OrderMessageV1.class);
handleV1(msg);
} else if ("2.0".equals(version)) {
OrderMessageV2 msg = JSON.parseObject(messageJson, OrderMessageV2.class);
handleV2(msg);
} else {
log.warn("未知消息版本: {}", version);
}
}
private void handleV1(OrderMessageV1 msg) { /* 处理v1消息 */ }
private void handleV2(OrderMessageV2 msg) { /* 处理v2消息 */ }
}
四十四、Elasticsearch 自定义分词器
44.1 自定义分词器配置
PUT /product
{
"settings": {
"analysis": {
"char_filter": {
"html_strip": {
"type": "html_strip"
}
},
"tokenizer": {
"ik_tokenizer": {
"type": "ik_max_word"
}
},
"filter": {
"my_stop": {
"type": "stop",
"stopwords": ["的", "了", "和", "是", "在"]
},
"my_synonym": {
"type": "synonym",
"synonyms": [
"手机, 移动电话, 手持设备",
"笔记本, 笔记本电脑, 便携电脑"
]
}
},
"analyzer": {
"my_analyzer": {
"type": "custom",
"char_filter": ["html_strip"],
"tokenizer": "ik_max_word",
"filter": ["my_stop", "my_synonym", "lowercase"]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
44.2 同义词搜索
// 搜索"手机"时,同时匹配"移动电话"、"手持设备"
// 通过同义词过滤器实现
// 测试分词效果
GET /product/_analyze
{
"analyzer": "my_analyzer",
"text": "华为手机很好用"
}
// 返回:["华为", "手机", "移动电话", "手持设备", "好用"]
四十五、项目面试经验总结
45.1 如何在面试中描述 RabbitMQ 使用经验
面试官:说说你在项目中是如何使用 RabbitMQ 的?
回答模板:
"在我参与的电商项目中,我们在多个场景使用了 RabbitMQ:
1. 订单超时取消:
用户下单后,我们通过 RabbitMQ 的延迟队列(使用
rabbitmq_delayed_message_exchange 插件)发送一条30分钟的延迟消息。
消费者收到消息后检查订单状态,如果仍未支付则自动取消并释放库存。
2. 异步通知:
订单支付成功后,通过 Fanout 交换机广播消息,
多个消费者分别处理:发送短信、发送邮件、更新积分、推送通知。
这样主流程只需要等待支付完成,其他操作异步处理,响应时间从500ms降到100ms。
3. 消息可靠性:
我们开启了生产者确认(publisher-confirm),
消费者使用手动ACK,并配置了死信队列。
对于重要消息,还实现了本地消息表方案,保证消息最终一致性。
在实际使用中,我遇到过消息积压的问题,
原因是消费者处理逻辑中有一个慢SQL,
后来通过添加索引和批量消费解决了这个问题。"
45.2 如何在面试中描述 Elasticsearch 使用经验
面试官:说说你在项目中是如何使用 Elasticsearch 的?
回答模板:
"在电商项目中,我负责实现了商品搜索功能:
1. 索引设计:
商品名称(name)使用 text 类型,配置 IK 分词器;
品牌(brand)、标签(tags)使用 keyword 类型精确匹配;
价格(price)使用 double 类型支持范围查询。
2. 搜索功能:
使用 multi_match 查询同时搜索商品名称和描述,
name 字段权重设为3倍,description 为1倍;
品牌、价格范围使用 filter 过滤(有缓存,性能更好);
搜索结果对匹配词项进行高亮显示。
3. 聚合分析:
实现了品牌分布统计(Terms聚合)和价格区间统计(Range聚合),
帮助用户快速筛选商品。
4. 数据同步:
商品数据变更时通过 RabbitMQ 异步同步到 ES,
保证 MySQL 和 ES 的数据最终一致性。
遇到的问题:
搜索中文时发现分词不准确,后来安装了 IK 分词器解决;
深度分页时性能很差,改用 search_after 方案后性能提升了10倍。"
45.3 常见追问及回答
追问1:ES 的倒排索引和 MySQL 的 B+ 树索引有什么区别?
答:
- B+树索引:适合精确查询和范围查询,按字段值排序
- 倒排索引:适合全文检索,按词项建立索引
- B+树查询 LIKE '%手机%' 需要全表扫描
- 倒排索引可以直接找到包含"手机"的所有文档
追问2:如果 ES 集群出现 yellow 状态怎么处理?
答:
yellow 表示主分片正常,但部分副本分片未分配
原因通常是节点数少于副本数
解决:增加节点,或减少副本数
PUT /my_index/_settings
{"number_of_replicas": 0}
追问3:RabbitMQ 消息确认和 Kafka 的 offset 提交有什么区别?
答:
- RabbitMQ:消费者处理完后发送 ACK,Broker 删除消息
- Kafka:消费者提交 offset,消息不删除,可以重新消费
- RabbitMQ 更适合需要确保每条消息被处理的场景
- Kafka 更适合需要消息回溯和重放的场景
四十六、最终学习检查清单
RabbitMQ 掌握程度自测
- 能够解释消息队列的三大作用(异步、解耦、削峰)
- 能够区分四种交换机并说出各自适用场景
- 能够独立实现生产者确认机制
- 能够独立实现消费者手动ACK
- 能够实现死信队列
- 能够实现延迟队列(两种方案)
- 能够实现消息幂等性
- 能够处理消息积压问题
- 能够回答 RabbitMQ vs Kafka 的区别
- 能够在面试中清晰描述项目中的 MQ 使用经验
Elasticsearch 掌握程度自测
- 能够解释倒排索引原理
- 能够设计合理的 Mapping(text vs keyword)
- 能够安装和使用 IK 分词器
- 能够使用 Spring Data ES 进行 CRUD
- 能够编写 bool 查询(must/should/filter)
- 能够实现搜索结果高亮
- 能够实现聚合分析(Terms/Range/Stats)
- 能够实现 MySQL 到 ES 的数据同步
- 能够解决深度分页问题(search_after)
- 能够在面试中清晰描述项目中的 ES 使用经验
四十七、补充:ES 脚本更新与条件更新
47.1 Painless 脚本更新
// 使用脚本更新文档字段(原子操作)
elasticsearchClient.update(u -> u
.index("product")
.id(String.valueOf(productId))
.script(s -> s
.inline(i -> i
.source("ctx._source.stock -= params.quantity; " +
"if(ctx._source.stock < 0) { ctx.op = 'none' }")
.params(Map.of("quantity", JsonData.of(quantity)))
)
),
ProductDocument.class
);
47.2 Upsert(不存在则插入)
// upsert:存在则更新,不存在则插入
elasticsearchClient.update(u -> u
.index("product")
.id(String.valueOf(product.getId()))
.doc(product)
.upsert(product) // 不存在时插入
.retryOnConflict(3), // 并发冲突时重试3次
ProductDocument.class
);
47.3 批量更新
// 批量操作(BulkRequest)
List<BulkOperation> operations = products.stream()
.map(p -> BulkOperation.of(op -> op
.index(i -> i
.index("product")
.id(String.valueOf(p.getId()))
.document(p)
)
))
.collect(Collectors.toList());
BulkResponse response = elasticsearchClient.bulk(b -> b
.index("product")
.operations(operations)
);
if (response.errors()) {
response.items().stream()
.filter(item -> item.error() != null)
.forEach(item -> log.error("批量写入失败: {}", item.error().reason()));
}
四十八、RabbitMQ 与 Spring Retry 集成
48.1 配置重试策略
@Configuration
public class RetryConfig {
/**
* 配置重试拦截器
* 消费失败时自动重试,超过次数后进入死信队列
*/
@Bean
public StatefulRetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000) // 初始1s,倍数2,最大10s
.recoverer(new RejectAndDontRequeueRecoverer()) // 超过重试次数,拒绝消息
.build();
}
}
48.2 重试与死信队列联动
// 消费者配置重试拦截器
@RabbitListener(
queues = "queue.order",
containerFactory = "retryContainerFactory"
)
public void consume(OrderDTO order) {
// 抛出异常会触发重试
// 重试3次后,消息进入死信队列
orderService.process(order);
}
// 死信队列消费者(最终兜底)
@RabbitListener(queues = "queue.order.dead")
public void handleDeadLetter(OrderDTO order) {
log.error("订单处理最终失败,需人工介入, orderId={}", order.getOrderId());
// 发送告警,记录到数据库
alertService.sendAlert("订单处理失败: " + order.getOrderId());
}
四十九、ES 索引生命周期管理(ILM)
49.1 ILM 策略配置
PUT /_ilm/policy/logs_policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "7d",
"max_docs": 10000000
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
49.2 应用 ILM 策略
PUT /_index_template/logs_template
{
"index_patterns": ["logs-*"],
"template": {
"settings": {
"index.lifecycle.name": "logs_policy",
"index.lifecycle.rollover_alias": "logs"
}
}
}
五十、总结
本文档涵盖了 RabbitMQ 和 Elasticsearch 的完整知识体系:
RabbitMQ 部分:
- 四种交换机(Direct/Topic/Fanout/Headers)的原理与使用
- 消息可靠性三层保障(生产者确认、持久化、消费者ACK)
- 死信队列与延迟队列的完整实现
- 消息幂等性、顺序消费、优先级队列
- 集群高可用(仲裁队列)与生产部署
Elasticsearch 部分:
- 倒排索引原理与分词器(IK中文分词)
- Mapping 设计最佳实践(text vs keyword)
- DSL 查询(全文检索、精确查询、Bool组合)
- 聚合分析(Terms/Range/Stats/Pipeline)
- 高亮显示、自动补全、地理位置搜索
- 数据同步方案(MQ异步/Canal binlog)
- 集群部署与性能优化
综合实战:
- 电商商品搜索系统完整实现
- 订单超时取消(延迟队列)
- 秒杀系统消息架构
- 生产环境部署与监控
掌握这些内容,能够应对大厂面试中关于消息队列和搜索引擎的绝大多数问题。
更多推荐
所有评论(0)