为什么需要消息队列?

在单体应用中,一切都是同步的;但在分布式架构中,同步调用会导致严重的耦合性能瓶颈。MQ 的出现解决了三个核心痛点:

A. 异步处理(Asynchronous)

  • 场景:用户注册,需要发送短信、发邮件、赠送积分。

  • 痛点:如果同步调用,响应时间 = 注册 + 短信 + 邮件 + 积分。只要其中一个慢了,用户就会卡死。

  • MQ 做法:注册成功后,丢一条消息到 MQ,直接给用户返回“成功”。剩下的杂活由后台慢慢处理。

B. 应用解耦(Decoupling)

  • 场景:订单系统成功后,需要通知库存系统减库存。

  • 痛点:订单系统必须知道库存系统的接口。如果库存系统宕机或者换了接口,订单系统也要改代码。

  • MQ 做法:订单系统只管发消息到 MQ。谁感兴趣(库存、物流、营销)谁就去订阅,订单系统根本不需要知道它们的存在。

C. 削峰填谷(Peak Shaving)

  • 场景:秒杀活动,瞬时流量是平时的 100 倍。

  • 痛点:数据库每秒只能扛 2000 并发,瞬时来了 2 万请求,数据库会直接瞬间崩溃。

  • MQ 做法:请求先全部进 MQ。后端服务根据自己的节奏(每秒 1000 条)从 MQ 里拉取并处理。MQ 就像大坝,挡住了洪峰。

消息队列的“代价”

引入 MQ 后,系统复杂度呈几何倍数增长。你必须面对以下问题:

  1. 系统可用性降低:MQ 挂了,整个链路就断了。

  2. 复杂度提高:你需要处理消息丢失、消息重复(幂等性)、消息顺序性等一系列头疼的问题。

  3. 一致性问题:A 系统改完数据库发了消息,B 系统处理消息失败了怎么办?分布式事务怎么搞?

三大主流 MQ

特性 RabbitMQ Kafka RocketMQ
出身 Erlang 语言开发,老牌劲旅。 LinkedIn 出身,Apache 顶级项目。 阿里双 11 洗礼,金融级可靠。
单机吞吐量 万级(相对较慢)。 十万/百万级(极高)。 十万级。
时延 微秒级(极低,最快)。 毫秒级。 毫秒级。
可靠性 极高(数据不易丢失)。 较高(追求极致吞吐)。 极高(分布式事务支持)。
适用场景 中小型项目、实时性要求极高、逻辑复杂。 大数据处理、日志采集、高吞吐监控。 电商、金融、需要严格顺序或事务的场景。

Kafka和RocketMQ如何选型?

A. 吞吐量 vs 延迟与可靠性

  • Kafka 的设计初衷是处理海量日志它采用了批处理的思想,消息是按“攒一堆”的方式发送和读取的。这种设计在大数据采集、实时分析(如接入 Flink/Spark)场景下简直是无敌的存在。

  • RocketMQ 采用的是 长轮询拉取(Pull with Long Polling),在保持高吞吐的同时,更注重单条消息的低延迟和高可靠。如果你是做分布式事务(如订单扣款、库存同步),RocketMQ 的事务消息机制能让你少写很多兜底代码。

B. 业务逻辑的“重”与“轻”

  • 场景一:简单的“通知”与“流” 。如果你的需求只是把日志传给 Elasticsearch,或者把用户点击数据传给推荐引擎,Kafka 是首选。它的生态位极其稳固,所有大数据组件都默认支持它。

  • 场景二:复杂的“业务闭环” 。如果你需要延迟消息(比如 15 分钟未支付取消订单)、顺序消息(比如同一个用户的操作必须按顺序执行)、或者消费重试(消息处理失败了,自动隔几秒重试一次),RocketMQ 的开箱即用会让开发效率大幅提升。

C. 消息持久化

Kafka 在分区数达到几千个之后性能会断崖式下跌,而 RocketMQ 却能扛住上万个 Queue。

  • 在 Kafka 中,一个 Partition(分区) 在物理磁盘上对应一个文件夹。当分区达到几千个时,如果有成千上万个线程或客户端并发向不同分区发送消息,操作系统需要同时维护数万个文件句柄。磁盘磁头必须在成千上万个文件之间频繁跳跃(Seek),原本的顺序写在 OS 层级退化成了随机写
  • 在RocketMQ,无论你有 10 个 Topic 还是 10,000 个 Queue,所有的生产者消息到达 Broker 后,都会被追加到当前的 CommitLog 末尾。对磁盘来说,它永远只在做一件事:单文件顺序追加。这极大发挥了磁盘的吞吐性能。
  • RocketMQ 有一个后台线程(ReputMessageService)异步地将 CommitLog 中的消息位置分发到对应的 ConsumeQueue 中。即使 Queue 再多,也只是增加了少量的索引写入,不会破坏主数据的顺序写。

什么时候选 Kafka?

  1. 大数据场景:日志收集、用户行为追踪、流计算集成。

  2. 极致吞吐:数据量达到 PB 级,或者每秒有百万级的消息输入。

  3. 管道角色:数据只是通过 MQ 进行中转,不涉及复杂的业务逻辑处理。

什么时候选 RocketMQ?

  1. 电商/金融业务:涉及交易、支付、订单等对数据一致性极其敏感的场景。

  2. 业务特性依赖:需要使用延迟队列、顺序消费、分布式事务。

  3. 海量 Topic:单集群需要支持上万个 Topic(RocketMQ 的单物理文件存储设计使得它在 Topic 增多时性能波动较小)。

基本概念

生产者 (Producer) 

生产者是消息的源头,负责创建并发送数据到消息队列。

  • 职责将业务产生的数据封装成“消息”,并选择合适的“话题”发送给消息服务器(Broker)。

  • 特性

    • 独立性:生产者只负责发,不需要知道谁会接收,也不需要等待接收者的响应。

    • 多样性:一个系统可以有成千上万个生产者,如订单系统、日志采集插件或移动端 App。

消息 (Message) 

消息是信息的载体,是生产者与消费者之间传递的最小数据单位。

  • 组成部分

    • Header(消息头):包含元数据,如消息 ID、时间戳、优先级、路由 key 等。

    • Body(消息体)真实的业务数据,通常以 JSONXML 或二进制流(如 Protobuf)的形式存在。

  • 生命周期:从被创建开始,经历发送、存储、投递,直到被消费者确认(ACK)后从队列中删除。

话题 (Topic/Topic) 

发布/订阅 (Pub/Sub) 模型中,话题是对消息进行逻辑分类的单位。

  • 逻辑作用:它像是一个“公共频道”。生产者将消息发布到特定的 Topic,而消费者则通过订阅该 Topic 来接收消息。

  • 物理实现:在 Kafka 或 RocketMQ 等系统中,一个 Topic 通常会进一步划分为多个 分区(Partition/Queue),以实现并发处理和存储的水平扩展。

  • P2P vs. Topic

    • 点对点 (Queue):消息只能被一个消费者拿走。

    • 话题 (Topic):消息可以像广播一样,被所有订阅了该话题的消费者(群组)接收。

消费者 (Consumer)

消费者是消息的终点,负责从消息队列中获取并处理数据

  • 两种获取模式

    • Pull(拉取)消费者主动询问 Broker:“有我的消息吗?给我来 10 条。”(如 Kafka)。

    • Push(推送)Broker 主动把消息塞给消费者:“来新活了,快接住。”(如 RabbitMQ)。

  • 消费者组 (Consumer Group):这是一个高级概念。多个消费者可以组成一个群组,共同平摊一个 Topic 下的海量消息,从而实现负载均衡。

消息发送与接收

第一阶段:消息发送流程(生产者 --> Broker)

这是消息的“起航”阶段。

  1. 消息构建

    生产者(Producer)将业务数据序列化,并封装进 Message 对象,包含消息体(Body)和属性(Properties,如:Key、Tag)。

  2. 路由选择(Routing)

    生产者查询本地缓存或注册中心(如 NameServer),确定该 Topic 对应的 Broker 地址及具体的 分区/队列(Partition/Queue)

    • 高级逻辑:如果是顺序消息,会根据 Sharding Key 固定发送到某个分区。

  3. 网络传输

    通过 TCP 长连接将数据包发送给 Broker。此时通常会涉及重试机制:如果网络闪断,客户端会自动重试(通常 2-3 次)。

  4. Broker 接收与持久化

    Broker 收到消息后,将其顺序写入物理磁盘(如 RocketMQ 的 CommitLog 或 Kafka 的 Segment)。

    • 同步刷盘 vs 异步刷盘:同步刷盘确保数据落盘后才返回;异步刷盘则先写内存,性能更高但有丢数据风险。

  5. 发送确认(Send Result/Confirm)

    数据存储成功后,Broker 向生产者返回一个 ACK。生产者收到此信号,认为该消息发送成功。

第二阶段:消息存储与管理(Broker 内部)

消息在 Broker 里的短暂停留决定了它的“寿命”。

  1. 索引构建: 主数据存入物理文件后,Broker 会异步构建逻辑索引(如 ConsumeQueue),指向消息在物理文件中的位置。

  2. 副本同步(Replication): 如果是高可用集群,主节点(Master)会将数据同步给从节点(Slave/Follower)。

    • 关键点:只有当指定数量的副本都写成功,消息才被视为“可消费”状态。

第三阶段:消息消费流程(Broker --> 消费者)

这是消息的“抵达”与“执行”阶段。

  1. 建立连接与订阅

    消费者(Consumer)启动并订阅特定 Topic。消费者组(Consumer Group)会进行 Rebalance(负载均衡),决定哪个消费者负责哪个分区。

  2. 消息拉取/推送(Pull/Push)

    • Pull:消费者主动询问 Broker 是否有新消息。

    • Push:Broker 有新消息时主动推送(底层通常也是长轮询拉取)。

  3. 业务逻辑执行

    消费者接收到消息,进行反序列化,执行核心业务代码(如:修改数据库、发送通知)。

  4. 消费确认(ACK/Commit Offset)

    这是保证不丢消息的关键。

    • 手动确认(Recommended)业务处理成功后,调用 ack()。Broker 收到后更新 Offset(偏移量)

    • 自动确认:消息一发给消费者就认为成功。风险极高,如果业务执行时崩溃,消息就丢了。

  5. 异常处理(Retry/DLQ)

    如果业务执行失败,消费者返回 RECONSUME_LATER。消息会被投入重试队列。如果多次重试(如 16 次)依然失败,消息会进入死信队列(Dead Letter Queue)

RocketMQ

存储模型

在分布式消息中间件中,磁盘 I/O 永远是性能的瓶颈。RocketMQ 之所以能在高性能的同时保证高可靠,其精髓就在于其“顺序写、随机读”的存储模型。

RocketMQ 的所有数据文件默认存储在 ${USER_HOME}/store 目录下。

A. CommitLog(物理实体)

  • 职责:存储所有的消息元数据和消息内容

  • 特征

    • 全局唯一:整个 Broker 只有一套 CommitLog(物理上由多个 1GB 的文件组成)。

    • 顺序追加不论是哪个 Topic 的消息,到达 Broker 后统统按顺序追加到 CommitLog 末尾。

    • 大小固定每个文件默认 1GB,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。写满后自动创建新文件。

B. ConsumeQueue(逻辑索引)

  • 职责:供消费者使用的“消费队列”。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的

  • 特征

    • 分类存储:按照 Topic/Queue 维度划分。

    • 极度轻量它不存消息正文,只存索引每个条目固定 20 字节,包括:

      1. 消息在 CommitLog 中的起始 Offset (8 字节)

      2. 消息体 Size (4 字节)

      3. 消息 Tag 的 HashCode (8 字节)

  • 作用消费者先查 ConsumeQueue 找到位置,再去 CommitLog 读数据。

C. IndexFile(哈希索引)

  • 职责:提供根据 Key时间戳 查询消息的能力。

  • 特征

    • 基于 HashMap 结构的索引文件。

    • 它允许你在不遍历 CommitLog 的情况下,通过消息的 Unique Key 快速定位到消息。

三者的协作关系:Reput 机制

消息在 RocketMQ 中的“落户”过程并非同步写完这三个文件,而是一个“主从同步+异步分发”的过程:

  1. 写 CommitLog:生产者消息到达,Broker 立即将其顺序写入 CommitLog。此时对生产者来说,消息已经发送成功。

  2. 异步构建索引(Reput)Broker 内部有一个后台线程 ReputMessageService它持续监控 CommitLog 的写入进度

  3. 分发数据:一旦有新消息,该线程立即读取消息内容,并将索引信息分发到对应的 ConsumeQueueIndexFile

  4. 消费读取

    • 消费者向 Broker 请求消息,先读 ConsumeQueue

    • 根据其中的 Offset 和 Size,直接去 CommitLog 中进行随机读取

为什么RocketMQ能支持上万个 Topic?

Kafka 的痛点(分区过多导致随机写)

在 Kafka 中,每个 Partition 都对应磁盘上的物理文件

  • 如果你有 10,000 个 Topic/Partition,磁盘上就有 10,000 个文件。

  • 当大量消息并发进入时,磁头必须在这些文件之间频繁寻址(Seek),顺序写退化为随机写,导致 IOPS 飙升,性能大幅下降。

RocketMQ 的解法

RocketMQ 通过“逻辑与物理分离”的设计巧妙地绕过了这个问题:

  1. 始终顺序写无论 Topic 增加到一万还是十万,Broker 永远只在往唯一的 CommitLog 文件末尾顺序追加。磁盘的顺序写性能得到了极致的保护。

  2. 轻量的逻辑索引ConsumeQueue 虽然多,但由于它极其小(仅存储 20 字节的定长数据),且由于采用了 mmap(内存映射),写入开销非常低。

  3. Page Cache 友好:所有的写入都在 CommitLog 尾部,这使得操作系统能非常高效地利用 Page Cache 进行缓存预热,减少了真实的磁盘交互。

当 Broker 突然宕机时,RocketMQ如何保证数据不丢失?

数据的冗余是为了可靠,索引的冗余是为了容灾。

当 RocketMQ 的 Broker 进程突然崩溃(比如 kill -9 或断电)时,内存中尚未刷盘的索引数据会丢失,但只要物理上的 CommitLog 还在,一切就都有救。

1. abort 文件

在 RocketMQ 的存储目录(默认 ~/store)下,有一个名为 abort空文件。Broker 启动的第一件事就是寻找 abort 文件。

  • 正常启动:Broker 启动时会通过 DefaultMessageStore.createAbortFile()创建这个文件。

  • 正常关闭:Broker 在调用 shutdown() 正常关闭时会删掉这个文件。

  • 异常宕机如果 Broker 崩溃,这个文件会残留在磁盘上上次关闭没有走正常的生命周期,可能发生了:进程被 kill -9、系统 OOM、硬件掉电或内核崩溃

机制当下一次启动时,Broker 只要发现 abort 文件存在,就知道上次是非正常关机,随即触发异常恢复流程

2. checkpoint 文件

checkpoint 文件记录了磁盘上最重要三个组件的最后一次物理刷盘时间戳

  • physicMsgTimestamp:CommitLog 的最后刷盘时间。

  • logicsMsgTimestamp:ConsumeQueue 的最后刷盘时间。

  • indexMsgTimestamp:IndexFile 的最后刷盘时间。

恢复流程

当触发异常恢复逻辑时,RocketMQ 并不是盲目地从头扫描几百 GB 的文件,而是采用“寻找断点,局部扫描”的策略:

1. 定位起始扫描点(Checkpoint)

我们不可能为了恢复而扫描整个磁盘上 TB 级别的 CommitLog,那样启动会耗时几个小时。RocketMQ 会利用 checkpoint 文件来缩小搜索范围。

checkpoint 记录了三个关键点。在异常恢复中,Broker 会取这三个时间戳的最小值

  1. physicMsgTimestamp:最后一次 CommitLog 刷盘时间。

  2. logicsMsgTimestamp:最后一次 ConsumeQueue 刷盘时间。

  3. indexMsgTimestamp:最后一次 IndexFile 刷盘时间。

为什么要取最小值? 因为索引(CQ)和物理数据(CL)是异步分发的。通常 CL 跑得快,CQ 跑得慢。为了确保不漏掉任何一条可能没来得及建索引的数据,我们必须从最落后的那个时间点开始检查。

2. CommitLog 的物理校准

确定了时间点后,Broker 会找到对应时间戳所在的 MappedFile(即物理文件段)。

  1. 逆序扫描与正序校验Broker 会从最后一个文件开始向前找它会检查消息的 MagicCode(魔法值)。如果 MagicCode 非法,说明这块磁盘空间还没写过数据或者是脏数据。

  2. 消息完整性验证: 一旦找到了可能有数据的起始点,它就开始正序扫描

    • 读取消息的总长度。

    • 校验消息的 CRC 码(确保数据没在断电瞬间损坏)。

    • 校验属性长度和 Body 长度。

  • 确定“合法物理偏移量”扫描会一直持续到遇到第一条非法消息或文件末尾。这个位置之前的都被认为是“既定事实”,这个位置之后的被认为是“无效残留”。

  • 物理截断(Truncate): Broker 会调用 MappedFile.setWrotePosition()setCommittedPosition()将文件末尾的残余数据彻底抹除这确保了 CommitLog 绝对的物理真实。

3. 索引对齐与重建(Reput 机制)

现在物理数据(CommitLog)已经干净了,但逻辑索引(ConsumeQueue)可能还处于混乱状态:它可能多了一部分(指向了被截断的无效数据),也可能少了一部分(消息写进了 CL 但没来得及写进 CQ)。

  1. ConsumeQueue 的截断: Broker 会检查每一个 ConsumeQueue 文件。如果索引条目指向的 CommitLog 偏移量超过了刚才确定的“合法物理偏移量”,这些索引会被直接删掉。

  2. 重放(Reput)开始: 这是最核心的修复动作。Broker 启动 ReputMessageService 线程:

    • 它从 ConsumeQueue 目前记录的最大 Offset 开始,去 CommitLog 中拉取原始数据

    • 每读到一条消息,就重新提取出它的 Topic、QueueId、Tags、Keys。

    • 重新写入索引:将这些元数据重新写入 ConsumeQueue 条目。

    • 重新填充哈希:将消息 Key 重新存入 IndexFile

  3. 对齐完毕: 当 ReputMessageService 处理到的偏移量等于 CommitLog 的最大合法偏移量时,逻辑与物理正式对齐。

这里要区分两种情况:

  • 异步刷盘 (Async Flush):如果宕机瞬间消息还在 Page Cache 里没进磁盘,这部分数据确实会丢失

  • 同步刷盘 (Sync Flush):消息只有在磁盘写入成功后才会返回给生产者。此时宕机,虽然 ConsumeQueue(索引)可能没写,但物理数据已经在磁盘里了。通过上述的 Reput 机制,索引可以被完美还原。

工作流程

在聊流程之前,我们先明确四个核心角色的职责:

  • NameServer(注册中心):无状态,负责维护路由信息。它不直接处理消息,只告诉别人“谁在哪”。

  • Broker(处理中心):负责消息的存储、投递和查询。它是整个系统的体力活承担者。

  • Producer:负责产生并发送消息。

  • Consumer:负责订阅并处理消息。

第一阶段:启动与心跳(建立连接)

  1. NameServer 启动:先于所有角色启动,静默等待连接。

  2. Broker 注册

    • Broker 启动后,会遍历所有的 NameServer 地址,并与每一个 NameServer 建立长连接

    • 定时上报:每隔 30sBroker 会向所有 NameServer 发送心跳包,内容包含自己的 IP、Topic 信息、队列配置等。

  3. 心跳检测

    • NameServer 每隔 10s 扫描一次 Broker 列表。

    • 如果超过 120s 没收到某个 Broker 的心跳,NameServer 就会认为它挂了,将其从路由表中剔除。

第二阶段:路由发现

Producer 和 Consumer 在干活前,得先知道去哪找 Broker。

  1. 拉取路由Producer/Consumer 启动后,会向 NameServer 请求特定 Topic 的路由信息。

  2. 本地缓存:为了性能,它们会将路由表缓存到本地内存中

  3. 动态更新:Producer/Consumer 会每隔 30s 定期向 NameServer 更新一次路由信息,确保在 Broker 扩容或宕机时能及时调整。

第三阶段:消息生产流程

  1. 负载均衡:如果一个 Topic 分布在多个 Broker 上,Producer 会通过轮询(Round Robin)或自定义策略选择一个 MessageQueue

  2. 发送模式选择

    • 同步发送:等结果返回(最常用,最稳)。

    • 异步发送:传个回调,发完就走(追求高吞吐)。

    • 单向发送:发完不管(用于日志,丢了也没事)。

  3. 故障规避(Fault Latency Strategy)

    如果 Producer 发现某个 Broker 响应很慢或不可用,它会暂时“拉黑”这个 Broker 一段时间,将请求转给其他健康的 Broker。

第四阶段:Broker 存储流程(落盘)

  1. 写入 CommitLog:消息被顺序追加到物理文件。

  2. 同步/异步复制:如果是集群模式,Master 会将数据传给 Slave。

  3. 刷盘(Flush):数据从 Page Cache 进入物理硬盘。

  4. 构建索引:后台线程异步将数据分发到 ConsumeQueueIndexFile

第五阶段:消息消费流程(抵达)

消费端的逻辑比发送端更复杂,因为它涉及状态维护

  1. 负载均衡(Rebalance)

    • 如果一个 ConsumerGroup 有多个成员,它们会通过算法(如平均分配)平摊该 Topic 下的所有 MessageQueue

    • 核心准则:一个 MessageQueue 同一时间只能被同一个组内的一个 Consumer 消费。

  2. 消息拉取(Long Polling)

    • Consumer 并不是死板地“拉”或者被动地“接”,而是使用长轮询

    • 它向 Broker 发起请求,如果有消息则立即返回;如果没有消息,Broker 会把连接挂起一段时间(通常是 15s),期间一旦有消息就立即唤醒返回。

  3. 业务处理与 ACK

    • 业务代码执行。

    • 手动/自动提交 Offset:告诉 Broker“我处理到第几条了”。

NameServer

在没有 NameServer 之前,生产者发消息得知道每个 Broker 的 IP,这在动态增减机器的云原生时代是不可接受的。

它是做什么的?

NameServer 是一个几乎无状态的注册中心。它的核心任务只有两个:

  • 管理 Broker 路由:Broker 启动后向它报到,告诉它:“我负责哪些 Topic,我的 IP 是多少”。

  • 提供路由查询:Producer 和 Consumer 定期来问:“我要发/收 Topic A 的消息,该去哪台机器?”

故障延迟规避(Fault Latency Strategy) 机制(NameServer全面宕机怎么办)

生产者在启动后或运行期间,会定期(每 30 秒)从 NameServer 获取最新的 TopicRouteData 并缓存到内存中。

  • 脱离依赖:当 NameServer 全部宕机时,生产者会继续使用这份缓存的路由信息进行消息发送。

  • 静态视图:此时生产者眼中 Brokers 的列表是固定的,如果某个 Broker 此时宕机,生产者必须具备“自我识别”的能力。

RocketMQ 的生产者通过 本地路由缓存 保证了基础连通性,而通过 故障延迟规避(Fault Latency Strategy) 机制,则实现了在没有“大脑”(NameServer)指挥的情况下,自主识别并踢除故障 Broker 的能力。

故障延迟规避机制通过 sendLatencyFaultEnable 开关控制(默认值为 false)。其核心逻辑在于将 “发送耗时/发送失败” 转化为 “不可用时长”

A. 实时评估:updateFaultItem

每次消息发送结束后,无论成功或失败,生产者都会调用该方法更新对应 Broker 的“健康分”:

  • 发送成功:记录此次请求的耗时(Latency)。

  • 发送失败/超时此时会进行“隔离(Isolation)”,通常默认赋予一个较大的延迟值(如 30,000ms)。

B. 阶梯退避:计算不可用时长

生产者根据记录的延迟时间,通过预设的阶梯表计算该 Broker 需要被“屏蔽”多久:

发送耗时 (Latency) 规避/不可用时长 (Duration)
550ms 30,000ms (30秒)
1,000ms 60,000ms (60秒)
3,000ms 180,000ms (3分钟)
15,000ms 600,000ms (10分钟)

自动切换流程:selectOneMessageQueue

当生产者需要发送下一条消息时,其选择队列的逻辑会发生变化:

  1. 优先轮询健康 Broker: 生产者会遍历缓存中的所有 MessageQueue,但会优先校验该队列所属的 Broker 是否处于“规避期”内。如果 Broker 的 startTimestamp 显示它还未恢复,则直接跳过该队列,选择下一个健康 Broker 的队列。

  2. 故障规避与切换如果缓存中的所有 Broker 都曾出现过延迟或故障,生产者会通过 pickOneAtLeast() 方法尝试从失败列表中选择一个相对延迟最小规避时间最短的 Broker 进行尝试,而不是卡死

  3. 重试机制的配合: 在同步发送(SYNC)模式下,如果发送给 Broker A 失败,触发重试时,规避机制会确保下一次尝试自动切换到 Broker B 或其他可用节点,从而实现故障的瞬时平滑切换。

为什么不选 Zookeeper?

这是必须理解的 CAP 定理取舍。

  • 可用性优先:AP 与 CP 的抉择

    根据 CAP理论,任何分布式系统最多只能同时满足两个点。

    • ZooKeeper 追求的是 CP(一致性 + 分区容错性)

      • 在进行 Leader 选举时,整个 ZooKeeper 集群会处于不可用状态。

      • 对于注册中心而言,短时间的不可用会导致服务发现停滞,这在消息中间件这种对高可用要求极高的场景下是难以接受的。

    • NameServer 追求的是 AP(可用性 + 分区容错性)

      • NameServer 节点之间互不通信,是完全去中心化的。

      • 只要有一台 NameServer 在线,整个集群的发现功能就是可用的,这极大提升了系统的抗风险能力。

  • 性能与水平扩展的博弈

    NameServer 的设计目标是“极简”与“高性能”。

    • 轻量级实现:NameServer 逻辑非常简单,主要是内存中的 HashMap,因此处理速度极快。

    • 水平扩展性:可以通过简单地增加机器来提升集群的抗压能力。

  • ZooKeeper 的局限性:ZooKeeper 的写操作无法通过增加节点来扩展,因为所有的写请求都必须经过 Leader 并达成共识(Quorum机制)。

    • 为了解决写扩展问题,往往需要划分多个 ZK 集群或领域,但这增加了系统的复杂性,且可能导致服务之间的不连通。

持久化机制的“重量”对比

  • ZooKeeper 的 ZAB 协议太“重”:ZooKeeper 为了保证强一致性,每一个写请求都要记录事务日志,并定期生成内存快照(Snapshot)落盘。

  • 定制化需求:对于一个仅存储 Broker 路由信息的注册中心来说,这种复杂的持久化和一致性协议是不必要的开销。NameServer 存储的数据高度定制化且通常较小,没必要引入过于沉重的方案。

工程实践中的“弱依赖”设计

RocketMQ 的设计哲学是尽量减少对注册中心的强实时依赖

  • 本地缓存机制生产者和消费者在第一次从 NameServer 获取路由信息后,会将其缓存到本地内存中。

  • 容错能力:即使 NameServer 集群整体宕机,只要 Broker 依然存活,生产者和消费者在短时间内仍能利用本地缓存的地址进行通信,系统不会立即瘫痪。

MessageQueue

它是 Topic 在 Broker 上的物理/逻辑分区。一个 Topic 默认会有多个 MessageQueue(比如 4 个或 8 个),分布在不同的 Broker 上

  1. 并行能力的来源:消息不是挤在一个坑里,而是分散在多个 Queue 中。这意味着你可以同时起多个线程或多台机器去消费,速度翻倍。

  2. 顺序消费的基石:RocketMQ 保证在单个 MessageQueue 内的消息是先进先出的(FIFO)。如果你要求订单 A 的“创建”和“支付”必须按顺序处理,只要把它们发到同一个 MessageQueue 即可。

  3. 负载均衡的单位:在重平衡(Rebalance)时,系统是以 MessageQueue 为单位分配给消费者的。

ConsumerGroup

ConsumerGroup(消费者组) 是一个逻辑概念,它将一堆执行相同业务逻辑的消费者实例(Consumer Instance)打成一个包。

它定义了消息如何被“瓜分”。同一个组内的所有消费者,共享一个 Group ID

两种核心消费模式:

  • 集群模式(Clustering - 默认且最常用): 消息平均分配。一条消息发出来,组内 10 个实例,只有其中 1 个会收到。这实现了天然的负载均衡。

  • 广播模式(Broadcasting): 消息全量分发。一条消息发出来,组内 10 个实例,每人都会收到一份。常用于刷新本地缓存。

一个 MessageQueue 同一时间只能被同一个 ConsumerGroup 里的一个消费者实例消费。

  • 如果 Queue 有 4 个,Consumer 有 8 个,那么有 4 个 Consumer 会处于闲置状态(白费粮草)。

  • 所以,增加消费能力的第一步往往是增加 MessageQueue 的数量。

RocketMQ是怎么对文件进行读写的?

MappedFileMappedFileQueue

在 RocketMQ 的源码里,所有的文件操作都被封装在两个核心类中:

  • MappedFile对应磁盘上的一个物理文件(如 1GB 的 CommitLog)。它内部通过 Java 的 FileChannel.map() 将文件映射到内存。

  • MappedFileQueue:管理一组 MappedFile。它将多个物理文件在逻辑上串联成一个无限长的“连续空间”。

读取文件

一:逻辑寻址(找到数据在哪)

由于物理文件 CommitLog 动辄数百 GB,直接遍历是不现实的。读取的第一步是利用“索引”进行定位。

  • 定位 ConsumeQueue:消费者根据 TopicQueueId,首先访问对应的逻辑队列文件 ConsumeQueue

  • 提取索引单元:从 ConsumeQueue 中读取一个固定 20 字节 的数据块。

    • 物理偏移量 (Offset):该消息在 CommitLog 中的起始位置(8 字节)。

    • 消息长度 (Size):该消息的总大小(4字节)。

    • Tag HashCode:用于初步过滤消息(8 字节)。

  • 回表查询拿着提取到的 OffsetSize,跳转到物理存储文件 CommitLog 中准备提取真实的报文内容

二:内存映射(跨过慢速磁盘)

RocketMQ 并不直接使用传统的磁盘 I/O(如 read 系统调用),而是利用 mmap 技术将文件“搬”进内存。

  • MappedFile 管理所有的 CommitLog 文件都被封装在 MappedFile 对象中,每个文件默认 1GB。

    • 文件预热(缺页中断消除)mmap 刚建立时,文件并没有真正载入内存RocketMQ 在创建新的 MappedFile 后,会每隔 4KB 写入一个 0这个动作会强制操作系统分配物理内存页,提前填充 PageCache。

    • 内存锁定 (mlock):通过系统调用 mlock 锁定这块内存。这样可以防止操作系统因为内存压力大,将 PageCache 里的消息数据“交换(Swap)”到虚拟内存(磁盘)中。对于 MQ 来说,一旦发生 Swap,读取延迟会从纳秒级直接退化到毫秒级

  • 建立映射 (mmap):通过 Java 的 FileChannel.map() 方法,将磁盘上的 CommitLogConsumeQueue 文件(物理大小通常为 1GB)映射到进程的虚拟内存地址空间映射完成后,程序对内存的读写直接作用于 PageCache如果数据不在内存中,内核会触发“缺页中断”,将磁盘数据加载到 PageCache;如果是在写入,数据进入 PageCache 后即视为逻辑写入完成。

    • 传统 I/O:数据需要在 磁盘 --> 内核缓冲区 (PageCache) -->  用户缓冲区 之间进行多次拷贝。用户程序只能操作自己内存里的“用户缓冲区”。

    • mmap (FileChannel.map):它将进程虚拟地址空间的一段地址,直接映射到磁盘文件在内核层面的 PageCache 上。

  • 内存化操作:对应用程序来说,读取文件就像读取内存数组一样简单,不再需要昂贵的内核态与用户态之间的数据拷贝。

第三层:内核交互

即便有了 mmap,如果数据不在物理内存(Page Cache)中,依然会触发磁盘读取。RocketMQ 通过以下手段确保“命中率”:

优化技术 实现机制 核心目的
文件预热 (Warm-up) 创建新文件后,每隔 4KB写入一个 0,强制内核分配物理内存页。 避免运行时触发“缺页中断”导致的瞬间卡顿。
内存锁定 (mlock) 调用系统指令将映射的内存区域锁定。 防止操作系统将这些热数据交换(Swap)到虚拟磁盘。
顺序预读 (madvise) 暗示内核后续将进行顺序访问。 触发内核提前将相邻的磁盘块加载到 Page Cache 中。
冷热隔离 (Tiered) 将冷数据(旧消息)转存至云端或对象存储。 防止读取旧消息时污染 Page Cache,保护热数据的读取性能。
PageCache

PageCache 是由 Linux 内核维护的、位于物理内存(RAM)中的磁盘数据缓存

  • 基本单位:内核以“页(Page)”为单位管理内存,通常一页的大小为 4KB

  • 存储内容:它存储了最近访问过的磁盘文件数据

  • 核心目标尽可能多地让 I/O 操作在内存中完成,减少昂贵的物理磁盘磁头寻址开销。

工作机制

A. 读操作:预读与命中

当应用程序请求读取文件数据时:

  1. 查找内核首先在 PageCache 中查找该数据所在的页

  2. 命中(Cache Hit):如果数据已在内存中,直接返回,速度是纳秒级的。

  3. 缺失(Cache Miss):如果数据不在内存,触发缺页中断(Page Fault),内核从磁盘读取数据填充到 PageCache,再返回给应用。

  4. 预读(Read Ahead)内核会根据你的访问习惯(如顺序读),预测性地提前将后续数据加载到 PageCache。

B. 写操作:延迟写入与“脏页”

当应用程序写入数据时:

  1. 写入缓存:数据首先被写入 PageCache,该页被标记为脏页(Dirty Page)

  2. 立即返回:此时对应用来说,写入已经完成,响应极快。

  3. 异步刷盘:内核线程(如 pdflush)会定期扫描脏页,将其批量写回磁盘。

  4. 强制刷盘:应用也可以调用 fsync 等指令强制要求内核立即将脏页写回磁盘。

零拷贝(Zero-Copy)

零拷贝(Zero-Copy) 是一项旨在减少 CPU 开销和内存带宽消耗的优化技术。简单来说,它的核心目标是:在数据传输过程中,尽可能减少甚至消除 CPU 将数据从一个内存区域拷贝到另一个内存区域的次数。

要理解零拷贝,必须先看传统的 I/O(如 read + write)有多么低效。当我们将磁盘文件通过网络发送出去时,数据经历了以下过程:

  1. 磁盘 --> 内核缓冲区(PageCache):通过 DMA 拷贝。

  2. 内核缓冲区 --> 用户缓冲区CPU 参与拷贝(上下文切换:内核态 --> 用户态)。

  3. 用户缓冲区 --> Socket 缓冲区CPU 参与拷贝(上下文切换:用户态 --> 内核态)。

  4. Socket 缓冲区 --> 网卡(NIC):通过 DMA 拷贝。

传统 I/O 的代价

  • 4 次上下文切换

  • 4 次数据拷贝(其中 2 次是由 CPU 亲自搬运的)。

对于高性能中间件(如 RocketMQ 或 Kafka)来说,这种重复的搬运不仅浪费 CPU,更会挤占内存带宽。

零拷贝并不是真的“零”拷贝,而是指 “没有 CPU 参与的内存拷贝”。目前主流的实现方案有以下几种:

A. mmap + write

我们在讨论 RocketMQ 时多次提到这个方案。

  • 原理:利用 mmap 系统调用,将内核缓冲区(PageCache)的地址用户空间的虚拟地址进行映射。

  • 流程

    1. 磁盘 --> 内核缓冲区(DMA 拷贝)。

    2. 用户空间直接访问内核缓冲区

    3. 内核缓冲区 --> Socket 缓冲区(CPU 拷贝)。

    4. Socket 缓冲区 --> 网卡(DMA 拷贝)。

  • 性能3 次拷贝 + 4 次上下文切换。虽然还是有 CPU 拷贝,但省去了内核与用户空间之间的大量重复数据搬运。

B. sendfile

Linux 2.1 版本引入的系统调用。

  • 原理:数据直接在内核空间内部进行传输,不再经过用户空间。

  • 流程

    1. 磁盘 --> 内核缓冲区(DMA 拷贝)。

    2. 内核缓冲区 --> Socket 缓冲区(CPU 拷贝)。

    3. Socket 缓冲区 --> 网卡(DMA 拷贝)。

  • 性能3 次拷贝 + 2 次上下文切换

C. sendfile + DMA Gather Copy(真正的零拷贝)

这是目前最极致的方案,需要硬件(网卡)的支持。

  • 原理:不仅不需要用户空间参与,甚至不需要将数据拷贝到 Socket 缓冲区,而是只将数据的位置(地址)和长度记录到 Socket 缓冲区。

  • 流程

    1. 磁盘 --> 内核缓冲区(DMA 拷贝)。

    2. 只拷贝文件描述符/指针到 Socket 缓冲区(几乎忽略不计)。

    3. 网卡根据指针直接从内核缓冲区提取数据(DMA 拷贝)。

  • 性能2 次拷贝(全部为 DMA 拷贝)+ 2 次上下文切换

为什么 RocketMQ 偏爱 mmap 而 Kafka 偏爱 sendfile?
  • Kafka (sendfile):Kafka 的逻辑非常纯粹——它只是一个“搬运工”。数据从磁盘进来,原封不动地通过网卡发出去,所以它使用 sendfile 追求最少的上下文切换和极致的吞吐。

  • RocketMQ (mmap)RocketMQ 需要在 Broker 端对消息进行各种操作(比如根据 Tag 过滤、消息重试处理等)mmap 能够让程序在用户态像操作内存一样读写文件,虽然多了一次上下文切换,但赋予了程序极大的灵活性。

写文件

CommitLog 的追加艺术

在 RocketMQ 中,所有的消息写操作最终都指向物理文件 CommitLog

  • 全局顺序写:无论消息属于哪个 Topic,Broker 都会将其按到达顺序追加到当前 CommitLog 文件的末尾。这种“只增不改”的模式避开了磁盘磁头频繁寻道的开销,使写入速度接近物理极限。

  • 定长管理:每个 CommitLog 文件默认大小为 1GB。当一个文件写满后,Broker 会创建一个新的文件,文件名即为该文件起始的物理偏移量。

写入流程:从内存到磁盘

消息的写入并不是直接打到磁盘上,而是经历了一个多级的“缓冲”过程。

写入内存(MappedFile)

  1. 映射文件Broker 通过 mmap(内存映射)技术,将 CommitLog 文件映射到虚拟内存空间。

  2. 指针移动写入时,程序直接在内存中操作 MappedFile 对象的指针,将消息序列化后的字节流写入对应位置。

  3. 逻辑完成一旦数据进入了 PageCache(页缓存),对发送者来说,如果不要求同步刷盘,这次写入就已经“逻辑成功”了

在高并发场景下,为了减轻 PageCache 的压力,RocketMQ 引入了暂存池(TransientStorePool)机制:

  • 堆外内存缓冲:开启该功能后,消息会先写入一块 DirectByteBuffer(堆外内存)

  • 异步提交(Commit):随后由专门的线程将数据从堆外内存 commitMappedFile(即 PageCache)。

  • 读写隔离这种方式让“写”发生在堆外内存,“读”发生在 PageCache,极大地缓解了在高负载下的内存竞争和 Page Cache 抖动。

数据进入内存后是不安全的。RocketMQ 通过两种刷盘策略来平衡性能可靠性

策略名称 运作机制 优缺点
异步刷盘 (Async) 消息写到 PageCache 后立即返回“成功”。后台线程定时(默认 500ms)调用 fsync 将脏页刷入磁盘。 吞吐量极大,但如果系统掉电,会丢失极少量未落盘的消息。
同步刷盘 (Sync) 消息写到 PageCache 后,必须等待刷盘线程执行完 fsync 成功后再给客户端返回。 数据安全性极高,但由于受限于磁盘 I/O 速度,吞吐量会有显著下降。
高并发场景下的CommitLog写入问题

在高并发写入场景下,RocketMQ 的 CommitLog 是一个单管道、顺序追加的物理文件。由于所有线程都必须争抢“末尾追加”的权限,锁竞争成为了决定吞吐量的关键瓶颈。

RocketMQ 并没有默认在运行时根据负载“自动”实时切换锁,而是通过配置参数 useReentrantLockWhenPutMessage默认为 false,即默认使用自旋锁)允许开发者根据磁盘的 I/O 特性进行手动切换。

以下是两种锁的设计逻辑以及在 IO Wait 过高时的优化思路:

1. 自旋锁 (Spin Lock):追求极致的低延迟

RocketMQ 默认使用的是 PutMessageSpinLock

  • 实现原理:当线程尝试获取锁失败时,它不会进入阻塞状态,而是利用 CPU 进行忙循环(Busy-Waiting),不断尝试重新获取锁。

  • 适用场景:适用于 PageCache 极其健康、写入几乎能在纳秒级完成的场景。

  • 优缺点

    • 优点:避免了线程从“运行态”切换到“等待态”再被唤醒的上下文切换开销

    • 缺点:如果锁被占用的时间变长(例如磁盘响应变慢),自旋会白白消耗大量 CPU 资源

当磁盘负载过高,PageCache 刷盘变慢,甚至触发了系统的强制回收时,消息追加(Append)的时间会从纳秒级上升到毫秒级。此时,成百上千个线程都在 CPU 上“疯狂自旋”,不仅无法加速 I/O,反而会让 CPU 负载爆表,进一步恶化系统性能。

在这种极端情况下,切换到重入锁可以让线程在等待磁盘完成操作时进入休眠。这虽然看起来慢了,但实际上释放了 CPU 资源去处理网卡数据接收或其他的管理逻辑,防止了系统彻底卡死(Broker Busy)。

所以,磁盘 I/O 负载(IO Wait)较高或出现频繁抖动时,我会通过配置切换到重入锁。这样做可以有效避免 CPU 在 I/O 阻塞期间无效自旋,保护系统在极端压力下的稳定性。

2. 重入锁 (Reentrant Lock):应对高 I/O 抖动

通过将配置设为 true,系统将使用 PutMessageReentrantLock

  • 实现原理基于 Java 的 ReentrantLock,当线程获取锁失败时,会被挂起(进入 Wait 状态),让出 CPU 资源。

  • 适用场景:适用于磁盘性能较差、经常出现 IO Wait 或系统负载极高的场景。

  • 优缺点

    • 优点:当写入过程因为磁盘 I/O 繁忙而变慢时,挂起线程可以保护 CPU 不被空转消耗。

    • 缺点:每次加锁/释放锁可能涉及操作系统的调度,增加了微小的延迟损耗。

单纯换锁可能还不够。当 IO Wait 严重时,RocketMQ 通常建议开启 transientStorePoolEnable

  • 机制写操作先进入 DirectByteBuffer(堆外内存),而不直接冲击 PageCache

  • 效果:这相当于在锁竞争之前又加了一层缓冲。写和读在物理内存上分离,减少了对同一块 PageCache 的锁定频率,从而间接降低了锁竞争的激烈程度。

如果出现了更极端的场景,CommitLog 的锁竞争已经无法通过换锁解决,怎么办?

当单机 CommitLog 的写入锁竞争(Lock Contention)达到物理瓶颈,且单纯依靠切换“自旋锁”或“重入锁”已无法显著提升性能时,我们需要从单点串行并发分片的架构演进。

在 RocketMQ 的标准架构中,单台 Broker 默认使用单一的 CommitLog 来确保全局顺序写。要实现“单机吞吐极限”的进一步突破,通常有以下几种分片与扩展思路:

1. 逻辑分片:MessageQueue 的横向水平扩展

RocketMQ 实现并发处理的核心在于Topic 拆分为多个 MessageQueue

  • 多 Broker 部署(集群扩展):这是最标准、最推荐的“分片写入”方式。通过在多台物理机器上部署多个 Broker,Topic 的不同 MessageQueue 会分布在不同的 Broker 实例上

  • 流量均衡:生产者通过轮询或其他路由算法,将消息分散发送到不同 Broker 的不同 MessageQueue 中。此时,每个 Broker 维护自己的 CommitLog,从而在系统整体层面消除了单点锁竞争。

2. 物理分片:单机多 Broker 实例部署

如果在极端的万兆网卡或高性能 NVMe 磁盘场景下,单个 Broker 进程由于单线程写入锁(Write Lock)无法跑满硬件性能,可以采取单机多实例方案。

  • 资源隔离同一台高性能服务器上启动多个 Broker 进程每个进程监听不同端口并管理独立的存储路径(独立的 CommitLog 和 ConsumeQueue)

  • 吞吐翻倍:这种方式本质上是在单机内部创建了多个 CommitLog 实例。由于每个进程拥有独立的写入锁,并发写入能力会随着实例数量线性增长,从而彻底压榨磁盘的 IOPS 潜能。

负载均衡

发送端负载均衡(Producer Load Balance)

发送端负载均衡的主要任务是:将消息均匀地分发到 Topic 对应的所有 MessageQueue 中

  • 水平扩展:通过将消息分散到分布在不同 Broker 上的队列,利用多机并行处理能力来提升整体吞吐。

  • 避免热点:防止某个单独的 Broker 因为承载过多消息而成为性能瓶颈。

生产者并不是盲目发送消息,它拥有一张“实时地图”:

  • 注册中心依赖Producer 启动后会与 NameServer 建立长连接,获取指定 Topic 的路由信息(包含 Broker 地址、读写队列数量等)。

  • 动态更新Producer 每隔 30 秒 会定期向 NameServer 拉取最新的路由数据并更新本地缓存。

  • 故障容错如果 NameServer 全挂了,Producer 会依靠本地缓存的路由信息继续工作,直到缓存失效或所有 Broker 不可用。

在默认情况下(即未开启故障规避开关时),RocketMQ 采用简单的轮询算法

  • 顺序选择:Producer 内部维护一个自增的计数器,每次发送消息时对可用的 MessageQueue 列表进行取模运算,按序选择下一个队列。

  • 重试切换:在同步发送模式下,如果发送给某个队列失败,重试逻辑会自动避开上一次失败的队列,尝试发送到该 Topic 下的其他队列。

进阶策略:故障延迟规避 (Fault Latency Strategy)

简单的轮询在遇到“亚健康”状态的 Broker(响应极慢但未完全断开)时,会导致大量的发送超时。

通过设置 sendLatencyFaultEnable = true 开启此机制:

  • 性能监控:Producer 会记录每次发送到 Broker 的耗时(Latency)。

  • 故障评估:如果某次发送耗时过长或直接失败,Producer 会通过 updateFaultItem 将该 Broker 标记为“不可用”。

  • 阶梯避让:根据耗时等级,该 Broker 会被“隔离”一段时间。

发送耗时 (Latency) 规避时长 (Duration)
550ms 30,000ms (30秒)
1,000ms 60,000ms (60秒)
15,000ms 600,000ms (10分钟)

消费端负载均衡

RocketMQ消费端(Push/Pull模式)的负载均衡,其目标是将Broker上某个Topic的多个消息队列(MessageQueue)合理分配给同一个消费者组(ConsumerGroup)下的所有消费者实例。核心原则是:一个MessageQueue在同一时刻只能被组内的一个Consumer消费,但一个Consumer可以同时消费多个MessageQueue。

1. 触发与准备

负载均衡由一个独立的 RebalanceService​ 线程驱动,默认每20秒执行一次。触发时,会进行两方面的数据准备:

  • 获取消息队列(MessageQueue)列表:从本地缓存topicSubscribeInfoTable中获取该Topic下所有的MessageQueue集合(mqSet)。

  • 获取消费者列表:以Topic和ConsumerGroup为参数,向Broker查询当前在线的所有消费者ID列表。

2. 执行分配算法

收集到元数据后,会对MQ列表和消费者ID列表进行排序,然后采用指定的策略进行分配。默认策略是 AllocateMessageQueueAveragely(平均分配算法)

  • 算法类比:该算法类似于数据库分页。将所有MessageQueue视为“记录”,所有Consumer视为“页数”,计算出每页(每个Consumer)应承载的记录范围(range),从而确定当前Consumer实例应负责哪些MessageQueue。

3. 更新本地任务队列(processQueueTable

计算出当前Consumer应负责的MessageQueue集合(mqSet)后,会与本地正在处理的队列缓存processQueueTable进行比对和更新:

  • 红色部分(需移除的队列):对于processQueueTable中存在但不在新mqSet中的队列,会将其标记为Dropped=true,并尝试获取该队列锁。若1秒内获得锁,则将其从processQueueTable中移除,停止消费。

  • 绿色部分(保留的队列):对于processQueueTable中且仍在新mqSet中的队列,在Push模式下会检查其状态并可能触发相关更新。

4. 创建拉取请求

新分配的MessageQueue(即mqSet中新增的部分)执行以下操作:

  • 为每个MessageQueue创建一个ProcessQueue对象(消费过程快照),存入processQueueTable

  • 调用computePullFromWhere方法,计算该队列应从哪个消费位点(offset)开始拉取。

  • 将以上信息封装成一个PullRequest(拉取请求)对象。

5. 提交请求并拉取消息

将创建好的PullRequest对象,放入 PullMessageService​ 服务线程的阻塞队列pullRequestQueue中。该服务线程会不断从队列中取出PullRequest,并向Broker发起真正的Pull消息请求,从而开始消费。

总结:通过以上周期性的负载均衡,RocketMQ能够动态地根据消费者数量的变化,将消息队列的消费压力均匀地分摊到整个消费者组,实现了可扩展、高可用的消费能力。您提供的图片资料完整覆盖了从触发、计算、资源清理到任务执行的全链路,是理解该机制的优秀材料。

消息长轮询

长轮询要解决什么问题?

在消息队列中,消费者获取消息主要有两种方式:

  • 推模式:Broker 主动将消息推送给 Consumer。优点是实时性高,但缺点是无法感知 Consumer 的负载能力,可能造成 Consumer 过载甚至崩溃。

  • 拉模式:Consumer 主动向 Broker 拉取消息。优点是消费节奏可控,但缺点是实时性依赖拉取频率。如果采用简单的短轮询(例如每秒请求一次),在无消息时会产生大量无效请求,浪费网络和计算资源。

长轮询​ 的设计目标就是在拉模式的基础上,最大限度地减少无效请求,同时保证消息的实时性

长轮询的工作原理

长轮询的核心思想是:当 Consumer 发起拉取请求后,如果 Broker 端没有新消息,不会立即返回空结果,而是将这个请求“挂起”一段时间。在此期间,一旦有新消息到达或挂起超时,Broker 再响应这个请求

Broker 端的实现

Broker 端是长轮询机制实现的关键,主要由 PullMessageProcessor和 PullRequestHoldService两个组件协同工作。

  1. 请求接收与判断

    • Consumer 发送的拉取请求包含了 Topic、队列 ID、拉取偏移量等关键信息。

    • PullMessageProcessor处理请求,首先根据偏移量查询指定队列。如果有消息,立即返回

    • 如果没有新消息,并且请求允许挂起,则进入长轮询流程。

  2. 请求挂起

    • Broker 会将这个 PullRequest 封装起来,并以 Topic@QueueId为键,存储在一个名为 pullRequestTable的并发映射表中。这个过程就是“挂起”。

    • 此时,Broker 会将响应设置为 null,不会立即向客户端返回任何数据,客户端的网络连接会保持等待状态。

  3. 请求唤醒与超时控制

    挂起的请求如何被唤醒并返回呢?这里有三种触发机制:

    • 新消息到达时唤醒(最优路径)当生产者向某个队列成功写入一条新消息后,会触发一个通知事件PullRequestHoldService会检查 pullRequestTable,找出所有在等待这个队列消息的挂起请求,并立即唤醒它们,让它们重新尝试拉取消息。这保证了消息的极致实时性

    • 定时轮询检查(兜底路径)PullRequestHoldService自身也是一个定时任务,默认每 5 秒会扫描一次 pullRequestTable。对于每个挂起的请求,它会检查该队列的最大偏移量是否大于请求的偏移量。如果是,说明有新消息了,便会唤醒对应的请求。

    • 客户端超时控制Consumer 在发送请求时会设置一个挂起超时时间,由参数 brokerSuspendMaxTimeMillis控制(默认 15-30 秒)。如果挂起时间超过此限值,即使没有新消息,Broker 也会强制返回一个空响应,避免连接无限期占用。

消费端的配置

消费端有两个关键参数来控制长轮询行为:

  • brokerSuspendMaxTimeMillis:决定请求在 Broker 端最多挂起多久。

  • consumerTimeoutMillisWhenSuspend:消费端网络请求本身的超时时间,必须大于​ brokerSuspendMaxTimeMillis,以确保能给长轮询留出足够的挂起时间。

rocketmq是如何保证消息的可靠性/不丢失?

RocketMQ 通过在其消息生命周期的三个关键阶段——生产发送Broker存储消费处理——实施一系列精细的机制,来提供高可靠性的消息保障。

生产阶段:确保消息成功送达 Broker

这个阶段的目标是,只要 Producer 没有收到明确的成功响应,就认为消息可能丢失并触发重试。

  1. 使用同步发送

    • 核心机制:调用 producer.send()方法后,线程会阻塞等待,直到收到 Broker 返回的 SendResult如果返回状态为 SEND_OK,表明消息已被 Broker 成功接收

    • 规避风险:避免使用单向发送(sendOneway),因为这种方式不关心发送结果,网络波动可能导致消息丢失。

  2. 配置发送重试

    • 内置策略RocketMQ 的 Producer 内置了重试机制。默认情况下,如果发送失败或超时,会自动重试 2 次。

    • 灵活配置:你可以通过 producer.setRetryTimesWhenSendFailed(N)来调整重试次数。同时,RocketMQ 支持自动故障切换,当某个 Broker 失败时,会自动将消息重试发送到同一主题下的其他 Broker。

  3. 应对极端场景:事务消息

    • 解决难题对于需要先执行本地数据库事务再发送消息的场景普通消息无法保证本地事务成功和消息发送成功的原子性。事务消息通过两阶段提交解决此问题。

    • 工作流程

      1. Producer 向 Broker 发送 半消息,它对 Consumer 不可见。

      2. Producer 执行本地事务。

      3. 根据本地事务结果,Producer 向 Broker 提交 Commit​ 或 Rollback​ 指令。

      4. 如果 Broker 未收到确认指令,会回查 Producer 的本地事务状态。

    • 价值:这确保了只要本地事务成功,消息最终一定会被发送;反之,消息会被回滚。从而避免了业务处理成功但消息丢失的关键难题。

存储阶段:确保消息在 Broker 安全持久化

这是保证消息不丢失的最核心环节,重点在于通过多副本和持久化技术,防止单点故障。

同步刷盘

  • 机制对比:Broker 将消息写入内存的 PageCache 后,刷盘策略是关键。

    • 异步刷盘写入 PageCache 后就返回成功,性能极高,但若 Broker 宕机,内存中未写入磁盘的消息会丢失。

    • 同步刷盘必须等待消息真正持久化到磁盘后,才向 Producer 返回成功。这是最安全的方式。

  • 配置:在 broker.conf中设置 flushDiskType = SYNC_FLUSH

主从复制

  • 单点风险:即使配置了同步刷盘,如果磁盘损坏,消息依然会丢失。因此需要多副本。

  • 复制模式

    • 异步复制:主节点成功后就返回,性能好,但主节点宕机且无法恢复时,未同步到从节点的消息会丢失。

    • 同步复制:主节点必须等待消息被至少一个从节点成功复制后,才返回成功。这可以保证即使主节点磁盘损坏,从节点上也存在完整消息。

  • 配置:设置 brokerRole = SYNC_MASTER

新一代高可用架构:DLedger

  • 更进一步:在 RocketMQ 4.5+ 版本中,引入了基于 Raft 协议的 DLedger​ 模式。

  • 优势:DLedger 在数据一致性、故障自动转移和恢复方面比传统主从复制更强大。它要求消息写入多数派节点才算成功,提供了更高等级的数据可靠性。

消费阶段:确保消息被成功处理

此阶段遵循 At Least Once​ 原则,保证消息不会被遗漏,但可能需要消费端处理重复消息。

  1. 正确的 ACK 时机

    • 核心原则Consumer 必须在业务逻辑成功执行完毕后,再向 Broker 返回 CONSUME_SUCCESS状态。

    • 规避风险:切勿先返回成功再处理业务,否则若消费端宕机,消息将丢失。

  2. 消费重试与死信队列

    • 自动重试当消费失败(如抛出异常)或超时时,Consumer 应返回 RECONSUME_LATER。Broker 会将该消息投递到重试队列并按照延迟级别(如 5s, 10s, 30s, 1m...)在后续时间点重新投递。

    • 最终保障如果消息重试16次后仍然失败,它会被移入死信队列此时需要人工介入处理,但消息本身被永久保存,不会丢失。

  3. 消费端幂等性

    • 必备措施:由于重试机制和网络不确定性,同一条消息可能被多次投递。因此,消费端业务逻辑必须实现幂等性

    • 实现方案

      • 利用数据库唯一键约束(如订单ID)。

      • 使用 Redis 的 setnx命令或原子操作进行去重。

      • 维护一张消费记录表,在事务中判断消息是否已处理。

rocketmq如何处理消息重复?

处理重复消息的核心责任在于消费者端实现幂等性RocketMQ不保证消息绝对不重复,而是要求消费端业务逻辑自行实现“幂等性”来解决此问题。

消息为何会重复?

消息重复并非Bug,而是分布式系统在高可用、高可靠要求下的必然结果。主要发生在三个环节:

  1. 发送阶段重复

    • 场景:生产者发送消息后,由于网络抖动未能收到Broker的成功确认。由于生产者配置了重试机制,它会再次发送同一消息。实际上,Broker可能已成功保存了第一条消息。

    • 根本原因:网络不可靠与重试机制共同作用。

  2. 投递阶段重复

    • 场景:消费者处理完消息后,在向Broker返回消费成功确认(ACK)时发生网络闪断或消费者突然宕机。Broker未收到ACK,会认为该消息消费失败,从而在后续重新投递。

    • 根本原因:为确保消息“至少被消费一次”的可靠性机制。

  3. 系统运维与负载均衡

    • 场景:当Broker或消费者集群重启、扩容或缩容时,会触发Rebalance,消息队列会被重新分配。此过程可能导致部分已处理但未及时提交偏移量(Offset)的消息被再次消费。

核心对策:实现消费端幂等性

既然消息重复无法从根源上完全避免,最有效的方案就是让消费逻辑具备幂等性。幂等性是指:无论同一消息被消费多少次,其对业务数据状态造成的结果都与消费一次相同

以下是几种经过验证的常用方案:

1. 利用数据库唯一约束

这是实现强幂等性最经典、最可靠的方法,尤其适用于交易、金融等核心业务场景。

  • 原理:为消息体中的业务唯一标识(如订单ID)在数据库中建立唯一索引。消费者在处理消息时,首先尝试将该唯一标识插入到专用的“防重表”或业务主表中

  • 流程

    1. 消费者接收到消息。

    2. 获取消息中的业务唯一标识(例如,订单ID)。

    3. 执行INSERT语句,尝试将该标识插入防重表

    4. 如果插入成功,说明是首次消费,继续执行业务逻辑

    5. 如果捕获到DuplicateKeyException等唯一键冲突异常,则表明该消息已被处理过,直接跳过即可。

  • 优势:利用数据库的原子性和唯一约束,简单、可靠,能提供强一致性保证。

  • 注意:务必使用业务唯一标识而非RocketMQ自带的Message ID,因为重发的消息会拥有不同的Message ID。

2. 使用Redis的原子操作

适用于对性能要求极高的高并发场景,如秒杀、抢购。

  • 原理:利用Redis的SETNX(SET if Not eXists)命令或set key value NX EX的原子性,将业务唯一标识作为Key写入Redis。(或者为订单设置一个token)

  • 流程

    1. 消费者接收到消息。

    2. 使用业务唯一标识作为Key,向Redis发起一个SETNX操作,并设置合理的过期时间(TTL)。

    3. 如果命令返回true(或1),表示成功获取锁,是首次消费,执行业务逻辑。

    4. 如果返回false(或0),表示Key已存在,说明消息正在被处理或已处理过,直接跳过。

  • 优势:性能极高,远快于数据库操作。

  • 注意:需要妥善处理Redis的可用性问题,并确保设置的过期时间能覆盖业务处理时长,避免因锁过早失效而导致幂等性被破坏。

3. 基于业务状态机

如果业务逻辑本身具有清晰的状态流转,这是一种非常自然且高效的方案。

  • 原理在业务数据表中设计一个状态字段(如status),所有业务操作都必须是状态驱动的。只有当前状态符合预期时,操作才能执行成功。

  • 流程:以订单支付为例,订单状态可能为:待支付-> 已支付-> 已完成

    • 消费者收到支付成功消息后,执行更新的SQL可能为:UPDATE orders SET status = '已支付' WHERE order_id = ? AND status = '待支付'

    • 随后检查该SQL执行后影响的行数。如果影响行数为1,说明是第一次处理且状态更新成功。如果影响行数为0,则说明订单已处于非“待支付”状态,可能是重复消息。

  • 优势:无需额外组件,直接与业务逻辑结合,效率高。

  • 注意:适用于业务本身有清晰、可控状态流转的场景。

rocketmq如何处理消息积压?

第一步:诊断积压原因

发现积压后,首先要确定瓶颈在哪里。

  1. 查看堆积情况:通过 RocketMQ 控制台或 mqadmin命令(如 consumerProgress)查看消费者组的 Lag(未消费消息数)和消费进度 。

  2. 判断堆积位置:查看客户端日志文件(如 ons.log),如果出现 the cached message count exceeds the threshold这样的日志,说明消息积压在客户端,即消息已经从Broker拉取到本地,但消费逻辑处理不过来 。否则,积压可能发生在服务端(Broker),需要检查Broker的磁盘、CPU和网络资源 。

  3. 定位消费瓶颈:检查消费者线程的堆栈信息(使用 jstack工具,关注 ConsumeMessageThread线程)。如果线程阻塞在数据库查询、远程调用或复杂的业务计算上,这就是消费慢的根本原因 。

第二步:针对性解决方案

根据诊断结果,可以从以下几个层面入手解决。

1. 提升消费能力(最常用)

这是解决积压最直接的方法,核心是提高消费者的并行处理能力

  • 增加消费者实例:这是最有效的横向扩展方法。但必须遵循一个核心原则:消费者实例数量不能超过其订阅Topic的队列(MessageQueue)数量​ 。如果实例数已经等于队列数,就需要先扩容队列。

  • 增加消费线程数:调整消费者参数 consumeThreadMin和 consumeThreadMax,增加并发消费线程数 。

  • 启用批量消费:如果业务允许,开启批量消费模式,一次拉取多条消息处理,可以大幅减少网络交互和设备IO次数,提升吞吐量 

2. 扩容队列与系统重构

消费者实例数已经等于队列数,扩容队列本身是提升并行度的根本办法。

  • 扩容队列数:RocketMQ 5.0前,队列数在创建Topic时就固定了,修改需要新建Topic。可以采用 “临时Topic分流”​ 的方案 :

    1. 创建一个队列数更多的临时Topic。

    2. 编写一个临时的消费者程序,从原Topic快速消费消息(不做复杂业务逻辑),然后直接转发到新的临时Topic。

    3. 让真正的消费者业务逻辑去消费新的临时Topic。

  • 优化消费逻辑:这是治本之策。例如,将多次数据库交互优化为批量操作,或将耗时操作异步化 。检查消费逻辑,确保没有不必要的锁或阻塞调用。

3. Broker端与紧急处理
  • Broker端优化如果Broker成为瓶颈,可以调整刷盘策略为ASYNC_FLUSH(异步刷盘)以提升性能 在极高写入压力下,可以启用 transientStorePoolEnable机制,先将消息写入堆外内存池,再异步刷盘,以缓解PageCache压力 。

  • 紧急处理与降级:在积压严重威胁系统稳定时,可以考虑降级方案。

    • 跳过非核心消息:如果业务允许,可以监控队列偏移量差距(diff),当积压超过一定阈值时,直接返回消费成功,跳过部分非关键消息的处理 。

    • 重置消费位点:如果积压的消息可以丢弃,可以通过控制台或命令将消费位点重置到最新位置,从当前时刻开始消费,快速恢复业务 。

rocketmq如何保证消息有序?

RocketMQ 通过一套精巧的协作机制来保证消息的有序性,其核心在于确保同一组逻辑上相关的消息(如同一订单ID的所有消息)被发送到同一个消息队列,并由单个线程按顺序消费。

生产阶段:确保消息发送至同一队列

消息有序性的基础在发送时就已奠定。生产者必须保证有顺序要求的消息被写入同一个 MessageQueue(消息队列)

  • 关键接口 MessageQueueSelector:生产者发送消息时,需要实现此接口的 select方法。该方法接收一个参数(通常是一个业务键,如订单ID),通过计算(如对队列总数取模)来恒定地选择同一个队列 。

  • 必须使用同步发送:异步或单向发送无法保证消息到达Broker的顺序,因此顺序消息必须采用同步发送方式 

存储阶段:Broker 保障队列内顺序

当消息进入Broker后,其存储机制天然支持了队列内的顺序性。

  • 顺序写入对于单个 MessageQueue,RocketMQ 会将消息顺序追加到 CommitLog 文件中。同时,每个队列对应的 ConsumeQueue 索引文件也是顺序构建的,这保证了消息在队列内部是严格FIFO的 。

  • 队列是基础:只要生产端能正确地将顺序消息路由到同一队列,Broker 端就能保证其存储和提供的顺序与写入顺序一致 。

消费阶段:核心的“顺序消费”机制

这是实现顺序消费最复杂也最关键的一环。消费者必须使用顺序消费模式 。

  • 使用 MessageListenerOrderly:消费者需要注册顺序消息监听器,而不是默认的并发监听器 。

  • “三把锁”机制(核心原理)MessageListenerOrderly并非简单单线程,而是通过一套精细的加锁机制来保证顺序 :

    • 分布式锁消费者启动时,会向Broker申请当前 MessageQueue的锁。成功获取锁的消费者实例才有权消费该队列,这保证了在集群环境下,一个队列在同一时刻只被一个消费者消费 。

    • 本地队列锁:在消费者实例内部,对于同一个 MessageQueue,会使用 synchronized锁确保只有一个线程可以处理其中的消息,防止多线程并发导致乱序 。

    • ProcessQueue 锁:在处理一批消息前后,会对本地的 ProcessQueue(处理队列)加 ReentrantLock。这主要是在发生 Rebalance(重平衡,如消费者数量变化)时,防止队列被分配给新消费者的过程中,原有消费者正在处理的消息被中断或重复消费,从而破坏顺序性 。

上述机制实现的是局部(分区)有序即同一业务键(如订单)的消息有序。这是99%的业务场景所采用的方式,因为它能在保证业务正确性的同时,通过多个队列并行处理来维持高吞吐 。

实现 RocketMQ 消息的全局有序,需要确保一个 Topic 下的所有消息严格按照先进先出(FIFO)的原则进行处理。其核心在于,通过限制并行度来换取严格的顺序性。

实现全局有序需要同时满足以下三个关键条件:

  1. 单一消息队列:将 Topic 的读写队列数量均设置为 1​ (writeQueueNums=1readQueueNums=1)。这是实现全局有序的基础,因为所有消息都会被写入同一个队列,并由同一个队列被消费,从而天然保证了全局的 FIFO 顺序。

    • 操作命令示例mqadmin updateTopic -t YourTopicName -c DefaultCluster -r 1 -w 1

  2. 单一生产者与同步发送:使用一个生产者实例,并采用同步发送模式。这确保了消息能够按照调用 send方法的顺序被依次写入到 Broker 的单一队列中。多生产者或异步发送都无法保证消息到达 Broker 的绝对顺序。

  3. 单一消费者与顺序监听器:使用一个消费者实例,并注册 MessageListenerOrderly​ 监听器。即使只有一个队列,RocketMQ 的默认机制也可能在消费者内部采用多线程处理。MessageListenerOrderly监听器会通过加锁机制,确保同一时刻只有一个线程消费该队列中的消息,从而在消费端维持顺序。

rocketmq如何实现消息过滤?

RocketMQ 的消息过滤机制是其作为一个企业级消息中间件的核心能力之一,它允许消费者只接收自己关心的消息,极大地提升了消息系统的效率和业务处理的清晰度。其实现原理可以概括为 “生产者打标,Broker过滤,消费者订阅”​ 的协作模式,并在底层采用了精巧的双层过滤架构来平衡性能与灵活性。

TAG 过滤:简单高效的分类机制

TAG 过滤是 RocketMQ 中最基础且高效的过滤方式,其设计目标是快速处理简单的消息分类场景。

  • 基本用法生产者在发送消息时,为每条消息设置一个唯一的 Tag字符串(例如 "PAY_SUCCESS"消费者在订阅时,通过指定 Tag表达式(如 "TagA"或 "TagA||TagB")来告知 Broker 自己需要哪些消息

  • 核心原理:基于哈希码的快速匹配

    RocketMQ 为了追求极致的性能,在消息存储时进行了优化。当消息被写入主日志文件后,系统会计算该消息 Tag字符串的哈希码,并将这个哈希值(一个 long类型整数)存储在与消息队列对应的索引文件 ConsumeQueue中。

    • 服务端粗筛当消费者拉取消息时,Broker 会先将消费者订阅的 Tag表达式也转换为哈希码集合,然后与 ConsumeQueue中存储的每条消息的 Tag哈希码进行比对。这是一个非常快速的内存整数比较操作。匹配成功,则意味着该消息可能是消费者需要的。

    • 客户端精筛由于存在哈希冲突的可能性(不同的字符串可能计算出相同的哈希码),RocketMQ 在消费端最终处理消息前,还会用原始的 Tag字符串进行一轮精确匹配。这确保了最终被应用程序消费的消息一定是精确符合订阅条件的。

SQL92 过滤:强大灵活的表达式查询

当过滤条件超出简单的标签匹配,需要基于多个消息属性进行复杂判断时,SQL92 过滤提供了更强的表达能力。

  • 基本用法:生产者在发送消息时,可以通过 putUserProperty方法为消息设置多个自定义属性(键值对)。消费者订阅时,则可以使用类似 SQL 的语法表达式来定义规则,例如 "region = 'Hangzhou' AND price > 100"

  • 核心原理:表达式编译与运行时计算

    与 TAG 过滤不同,SQL92 过滤的条件无法通过简单的哈希值来表征,其实现过程相对复杂。

    1. 表达式编译:消费者上报订阅关系时,Broker 会使用 JavaCC​ 工具将 SQL92 表达式编译成一个可执行的 Expression​ 对象(一个语法树)。这个过程只需一次,编译结果会被缓存起来以供后续重复使用。

    2. 消息过滤:当进行消息匹配时,Broker 需要从 CommitLog(消息主体存储文件)中读取消息的完整属性信息,然后将其作为上下文代入编译好的 Expression中进行计算。只有表达式结果为 true的消息才会被投递给消费者。

性能优化:布隆过滤器

由于 SQL92 过滤需要读取和解析消息内容,其性能开销远大于 TAG 过滤。为了缓解这个问题,RocketMQ 提供了可选的布隆过滤器​ 优化机制。

  • 当开启相关配置后,在消息存储阶段,Broker 会预先为每个消息计算一个布隆过滤器位图,标识该消息可能匹配哪些消费组的过滤条件。

  • 在拉取消息时,先使用布隆过滤器进行快速判断,如果位图显示消息不可能匹配当前消费者,则直接跳过,避免昂贵的表达式计算。这相当于在 SQL92 过滤之前增加了一层高性能预过滤。

rocketmq如何实现延时消息?

ocketMQ 的延时消息并非支持任意时间精度,而是预设了 18 个延迟级别。每个级别对应一个固定的延迟时间。

  • 默认延迟级别1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 客户端设置:生产者发送消息时,通过调用 setDelayTimeLevel(level)方法指定延迟级别例如,setDelayTimeLevel(3)表示这条消息将在 10 秒后被消费

核心实现原理

1. 消息接收与暂存

当 Broker 接收到一条延时消息时,会在存储前进行关键处理:

  • 主题与队列重定向:Broker 会将消息的原始 Topic 和 QueueId 备份到消息的属性中,然后将消息的实际 Topic 修改为专用的内部主题 SCHEDULE_TOPIC_XXXX,并将 QueueId 设置为 delayLevel - 1这意味着不同延迟级别的消息会被路由到 SCHEDULE_TOPIC_XXXX这个内部主题的不同队列中。

  • 属性备份:原始 Topic 和 QueueId 被保存在消息的 PROPERTY_REAL_TOPIC和 PROPERTY_REAL_QUEUE_ID属性中,以便后续恢复。

2. 定时调度与消息投递

这是延时功能的核心,由 ScheduleMessageService类负责。

  • 服务启动:Broker 启动时,ScheduleMessageService会根据配置的延迟级别,为每个级别创建一个对应的定时任务 DeliverDelayedMessageTimerTask,专门扫描其对应的延迟队列。

  • 消息扫描与判断定时任务会持续检查对应队列中的消息。它从 ConsumeQueue中读取消息的存储时间戳和延迟级别,计算出消息的精确到期时间,并与当前时间进行比较

  • 到期投递:一旦发现消息到期,该服务会执行以下操作:

    1. 从 CommitLog中加载完整的消息内容。

    2. 从消息属性中恢复其原始的 Topic 和 QueueId

    3. 清除消息的延迟级别属性

    4. 将消息作为一条普通消息,重新存入 CommitLog,但这次是写入其原始目标队列。此后,订阅该原始 Topic 的消费者就能像消费普通消息一样收到它了。

为了保证 Broker 重启后不丢失延迟消息的投递进度,ScheduleMessageService会定期将每个延迟队列的消费偏移量(offset)持久化到磁盘文件 delayOffset.json。这样重启后可以从上次的进度继续执行,避免消息被重复投递或丢失

分布式事务消息(半消息)

RocketMQ 的分布式事务消息是其一项核心高级特性,它通过创新的 “半消息”​ 和 “事务状态回查”​ 机制,在普通消息基础上实现了类似两阶段提交(2PC)的效果,旨在解决跨服务场景下,本地事务执行与消息发送的最终一致性问题。其核心设计目标可以概括为:确保本地事务成功,消息一定能被投递;本地事务失败,消息一定不会被消费

第一阶段:发送与暂存半消息

  1. 发送半消息:生产者发送一条“半消息”到 RocketMQ Broker。这条消息包含了最终要发送的所有业务内容,但其关键区别在于,它会被标记为一个特殊的状态。

  2. 暂存至特殊Topic:Broker 收到半消息后,会将其持久化存储,但并非存入业务指定的目标Topic。而是将其存入一个名为 RMQ_SYS_TRANS_HALF_TOPIC的内部特殊Topic中。正因为消费者订阅的是真实的业务Topic,所以它们无法看到和消费存储在内部Topic中的半消息,从而实现了对消费者的“隐身”。

  3. Broker 响应:存储成功后,Broker 会向生产者返回一个确认响应(Ack)。

第二阶段:执行本地事务与确认

  1. 执行本地事务:生产者收到半消息发送成功的Ack后,开始执行本地事务(例如,扣减库存、更新订单状态等)。

  2. 提交二次确认:本地事务执行完毕后,生产者根据结果向 Broker 发送一个二次确认指令:

    • Commit(提交):如果本地事务成功,生产者发送 Commit 指令。Broker 会将该半消息从内部Topic中取出,恢复其原始的业务Topic和队列信息,并将其转存到真正的业务Topic中。此后,这条消息对消费者变为可见,可以被正常消费。

    • Rollback(回滚):如果本地事务失败,生产者发送 Rollback 指令。Broker 则会直接丢弃(删除)这条半消息。

容错机制:事务状态回查

这是保障一致性的关键环节。在分布式环境下,可能因为网络闪断、生产者应用重启等原因,导致 Broker 始终无法收到生产者的二次确认指令,使得消息长期处于“半消息”的悬而未决状态。

为了解决这个问题,RocketMQ 引入了事务状态回查机制

  • 定时扫描Broker 会启动一个后台服务TransactionalMessageCheckService),定期扫描 RMQ_SYS_TRANS_HALF_TOPIC中的半消息。

  • 发起回查对于长时间未得到确认的半消息,Broker 会向消息所属的生产者组内的任一可用生产者实例发起回查请求,询问该消息对应的本地事务最终状态。

  • 检查并回复:生产者收到回查请求后,需要实现一个 TransactionListener接口,在其 checkLocalTransaction方法中,根据消息中的业务唯一标识(如订单ID)去检查本地事务的最终执行结果(例如,查询数据库确认订单状态是否已更新),然后将结果(Commit 或 Rollback)回复给 Broker。

  • 推进消息状态:Broker 根据回查结果,按照上述第二阶段的方式,将消息转为可投递或丢弃。

rocketmq如何实现死信队列?

消息进入死信队列主要有以下两种情况:

  1. 最大重试次数超限:这是最常见的原因。RocketMQ 默认允许一条消息最多被重试 16 次如果重试 16 次后仍然消费失败,Broker 会自动将这条消息转移到死信队列。这个阈值可以通过参数 maxReconsumeTimes调整。

  2. 消费超时或ACK丢失消费者在处理消息时发生超时,或者在返回消费成功确认(ACK)时发生网络异常导致Broker未收到,消息会被重新投递若此类失败累计达到最大重试次数,同样会进入死信队列。

死信队列在设计上具有几个核心特性,确保其能有效发挥作用:

  • 自动创建与隔离死信队列由 RocketMQ 自动为每个消费者组创建,无需手动干预。其命名规则为 %DLQ%<ConsumerGroupName>。例如,消费者组 OrderConsumerGroup对应的死信队列Topic为 %DLQ%OrderConsumerGroup。这种命名方式实现了与正常业务Topic的严格隔离,避免对正常消息流造成污染。

  • 只读性与持久化:消息一旦进入死信队列,便不再有任何自动重试机制。这些消息会被持久化存储,默认保留时间为72小时(部分资料显示为48小时,具体可通过Broker配置调整),为问题排查和修复留出充足时间。

如何管理和处理死信消息?

对于死信队列中的消息,需要人工或通过编程方式介入处理。

  1. 查看死信消息

    • 通过控制台RocketMQ 提供的管理控制台通常有专门的“死信队列”页面,可以按消费组查询和查看死信消息的详细信息,包括消息ID、重试次数、产生时间等。

    • 通过命令行工具:可以使用 mqadmin命令进行查询,例如 sh mqadmin queryMsgByTopic -n <namesrv_addr> -t %DLQ%YourConsumerGroup

  2. 处理死信消息

    处理死信消息的常见思路如下:

    • 分析原因:首先需要查看消息内容和日志,定位消费失败的根本原因,是代码Bug、数据问题还是依赖服务不可用。

    • 手动重投:如果问题已经修复,可以通过控制台或Admin工具将死信消息重新发送回原始的业务Topic,让其能够被正常消费。

    • 编写专用消费者可以创建一个专用的消费者程序来订阅死信队列的Topic,实现自动化的告警、记录或条件性修复与重投。

如何保证rocketmq的高可用?

NameServer 集群

NameServer 是 RocketMQ 的“大脑”,负责管理所有 Broker 的路由信息。它的高可用设计非常简洁高效。

  • 无状态与去中心化:NameServer 节点之间互不通信,每个节点都独立维护一份完整的路由信息表。这种去中心化设计避免了复杂的同步逻辑,使得单个节点故障完全不影响其他节点 。

  • 最终一致性:Broker 会定时向所有 NameServer 发送心跳(默认30秒一次),注册自己的信息。NameServer 则通过一个定时任务(默认10秒一次)来扫描并剔除超过120秒未上报心跳的 Broker,从而实现路由信息的最终一致性 。

  • 客户端连接策略:Producer 和 Consumer 在启动时,会配置多个 NameServer 地址。它们会随机选择一个节点连接,如果失败则自动切换到下一个。在运行中,客户端也会定时从 NameServer 拉取最新的路由信息 。

总结:只要有一个 NameServer 节点存活,就能提供完整的路由服务,保证了路由层的极高可用性。

Broker 集群与主从复制

Broker 是消息存储和转发的核心,其高可用主要通过主从(Master-Slave)架构数据复制来实现。

1. 集群部署模式

根据业务需求,可以选择不同的集群模式 :

部署模式

优点

缺点

适用场景

多Master多Slave
(同步复制)

数据零丢失,服务与数据均无单点故障

性能略低(约10%),RT略高

金融、交易等核心业务,对数据强一致性要求极高

多Master多Slave
(异步复制)

性能高,几乎接近多Master模式,Master宕机后消费者仍可从Slave消费

可能丢失少量消息(Master宕机且数据未同步时)

绝大多数业务场景,在性能和数据可靠性间取得平衡

多Master

配置简单,性能最高

任意Master宕机,其上的消息将不可消费,实时性受影响

非核心业务,允许消息短暂不可用

生产环境推荐使用“多Master多Slave”模式,并根据业务要求选择同步或异步复制 。

2. 复制与刷盘机制

这是保证消息不丢的关键配置,主要体现在两个环节:

  • 主从复制(Replication):指消息在 Master 和 Slave 节点间的同步方式。

    • 同步复制(SYNC_MASTER):Producer 发送消息到 Master 后,Master 会等待 Slave 成功复制完数据后才向 Producer 返回成功。这确保了即使 Master 宕机,Slave 上也有一份完整的数据 。

    • 异步复制(ASYNC_MASTER):Master 收到消息后立即返回成功,然后异步地将数据复制给 Slave。性能更好,但存在少量数据丢失风险 。

  • 刷盘策略(Flush):指消息从内存(PageCache)持久化到磁盘的方式。

    • 同步刷盘(SYNC_FLUSH):Broker 将消息写入内存后,会立即强制刷盘,等待磁盘写入成功后才返回。非常可靠,但性能损耗大。

    • 异步刷盘(ASYNC_FLUSH):Broker 将消息写入内存后立即返回,由后台线程定期刷盘。性能高,是默认配置,但若Broker宕机可能丢失内存中未刷盘的消息。

最佳实践组合:通常采用 异步刷盘(保证性能) + 同步复制(保证数据冗余)​ 的组合,以兼顾性能和数据可靠性 

持久化与底层保障

即使软件层面配置完善,硬件故障(如磁盘损坏)仍是威胁。现代云平台提供了更强大的基础设施层高可用方案。

  • 云盘三副本机制:以腾讯云为例,其 RocketMQ 服务利用云硬盘(CBS)的数据三副本机制​ 。当 Broker 写入消息时,数据会被自动、同步地复制到三个不同的物理设备上。只有三份拷贝都写入成功,操作才会返回。这意味着,单块甚至两块磁盘损坏都不会导致数据丢失,极大地简化了数据高可用的实现 。

  • 跨可用区(AZ)部署:将 NameServer 和 Broker 节点分散在同一个地域的多个可用区(可理解为相互隔离的数据中心)。这样,即使整个机房发生故障,服务仍能由其他可用区的节点提供,实现机房级别的容灾 。

容错与负载均衡

Producer 和 Consumer 的智能容错是保证业务连续性的最后一道防线。

  • 生产端容错

    • 重试机制:发送消息失败时,Producer 会自动重试(默认2次)。对于同步发送,这是保证消息成功投递的重要手段 。

    • 故障延迟机制:当发送某个 Broker 失败时,Producer 会在一段时间内避免向该 Broker 发送消息,而是将消息发往集群中的其他健康 Broker,实现快速的故障转移 。

  • 消费端容错

    • 集群模式与重平衡:在集群消费模式下,如果某个 Consumer 实例宕机,Broker 会触发重平衡(Rebalance),将其负责的消息队列重新分配给组内其他存活的 Consumer,实现消费任务的自动接管 。

    • 重试与死信队列:若某条消息消费失败,RocketMQ 会将其投递到重试队列,按照延迟时间(如5s、10s、30s...)进行重试。若重试16次后仍失败,消息会被转入死信队列(Dead-Letter Queue),等待人工干预。这保证了即使业务逻辑有问题,消息也不会丢失 。

Kafka

存储模型

Kafka 的存储采用了一种清晰的分层逻辑,从抽象到具体依次为 Topic、Partition 和 Segment。

  • Topic(主题):消息的逻辑分类,比如 order-topic专门用于处理订单消息。它本身不涉及物理存储,只是一个命名空间。

  • Partition(分区):这是 物理存储和并行处理的基本单位一个 Topic 可以被划分为多个 Partition,每个 Partition 是一个有序的、不可变的消息队列消息被不断追加到队列末尾。Partition 可以分布在不同的 Broker 上,实现了数据的分布式存储和负载均衡。

    • 目录结构在磁盘上,每个 Partition 对应一个物理目录,命名规则为 <topic_name>-<partition_id>。例如,Topic order-topic的第 0 个分区,其目录名就是 order-topic-0

  • Segment(段)为了防止单个 Partition 文件过大,Kafka 又将每个 Partition 在物理上划分为多个 Segment当前正在写入的 Segment 称为 活跃段(active segment),只有它才能接受写入。当活跃段达到一定大小(默认 1GB)或时间后,就会滚动(roll)创建一个新的 Segment。这种设计使得 Kafka 可以轻松地删除或归档旧的、已消费的数据,只需清理对应的 Segment 文件即可,非常高效。

每个 Segment 由一组成对出现的文件组成,它们拥有相同的前缀(起始偏移量),但后缀不同。

文件类型

作用

文件名示例

数据文件(.log)

存储实际的消息数据

00000000000000000000.log

偏移量索引文件(.index)

建立 Offset​ 到消息在 .log 文件中物理位置的映射。

00000000000000000000.index

时间戳索引文件(.timeindex)

建立 时间戳​ 到 Offset​ 的映射,主要用于按时间戳快速定位消息。

00000000000000000000.timeindex

单条消息的物理结构(存储在 .log 文件中)包括:

  • offset(8字节): 消息在 Partition 中的唯一标识。

  • message size(4字节): 消息体大小。

  • CRC32(4字节): 用于校验消息完整性。

  • magic(1字节): 标识协议版本。

  • attributes(1字节): 标识压缩或编码类型。

  • key length和 key(可选): 消息的键。

  • value length和 value: 实际的消息体(payload)。

工作流程

消息生产阶段

  1. 生产者初始化与配置

    生产者 (KafkaProducer) 启动时,会配置关键参数,如集群地址 (bootstrap.servers)、序列化器 (key.serializervalue.serializer)、以及确认机制 (acks) 等 。

  2. 消息发送核心流程

    1. 序列化与分区:当调用 send()方法后,生产者首先将消息的 Key 和 Value 序列化为字节数组。接着,分区器 (Partitioner)​ 会决定消息应被发送到主题的哪个分区。若有 Key,则对 Key 进行哈希后取模,确保相同 Key 的消息总进入同一分区(保证顺序性);若无 Key,则采用粘性分区策略,随机选择分区并在一段时间内批量发送,以提高性能 。

    2. 批处理与异步发送消息并非立即发出,而是被存入内存中的消息累加器 (RecordAccumulator),按分区组织成批次 (Batch)。这是一个异步过程,主线程在将消息放入累加器后便可继续执行。独立的 Sender 线程​ 会负责将已满或等待超时的批次打包,通过 NetworkClient批量发送到对应的 Broker。这种批处理机制是 Kafka 高吞吐量的关键 。

  3. Broker 确认与可靠性

    消息被发送到目标分区的 Leader Broker。Broker 根据生产者设置的 acks参数进行确认 :

    • acks=0:生产者不等待确认,速度最快,但可能丢失消息。

    • acks=1(默认):等待 Leader 副本写入本地日志即确认。是性能和数据可靠性的折中方案。

    • acks=all等待 Leader 和所有处于同步状态的副本 (ISR, In-Sync Replicas) 都写入成功才确认。可靠性最高,但延迟也最高 。

消息存储与集群协调

  1. 分布式存储与副本同步

    • Kafka 将每个分区的数据以仅追加日志 (Append-only Log)​ 的形式持久化到磁盘。每个分区在物理上由多个日志段 (Segment) 文件组成,便于管理和清理 。

    • 为了高可用,每个分区有多个副本。生产者只与 Leader 副本交互。Leader 负责将数据同步到 Follower 副本。只有与 Leader 保持同步的副本才在 ISR 集合中。若 Leader 宕机,Kafka 会从 ISR 中自动选举新的 Leader,确保服务不间断 。

  2. 消费组与重平衡

    • 消费者以消费者组 (Consumer Group)​ 的形式工作。一个主题的一条消息只能被同一个消费者组内的一个消费者消费,从而实现点对点或发布订阅模式 。

    • 当消费者组内的消费者数量发生变化(如增删)或主题的分区数变化时,会触发重平衡 (Rebalance)。此过程由组协调者 (Group Coordinator)​ 管理,它会重新分配分区给组内的存活消费者,确保每个分区都有唯一的消费者处理 。重平衡期间,整个消费者组会短暂暂停消费。

消息消费阶段

  1. 消费者初始化与分区分配

    消费者 (KafkaConsumer) 启动后,会订阅主题并加入一个消费者组。随后,组协调者会触发重平衡,为每个消费者分配其要消费的分区 。

  2. 拉取消息与处理

    1. 消费者采用 拉取 (Pull) 模式,主动向 Broker 发起 poll()请求来获取消息。这种方式允许消费者根据自身处理能力控制消费速率 。

    2. 拉取到的消息由消费者进行业务逻辑处理。

  3. 偏移量提交与语义保证

    1. 消费者需要定期提交偏移量 (Offset),即记录当前已消费到的位置。偏移量默认存储在 Kafka 内部的 __consumer_offsets主题中 。

    2. 提交方式决定了消息的消费语义 :

      • 自动提交:可能因消费者崩溃在处理完消息后、提交偏移量前发生,导致消息被重复消费(至少一次语义)。

      • 手动提交:在处理完消息后手动提交偏移量,可以更精确地控制,实现至少一次或通过“先处理再提交”实现正好一次(需结合事务等机制)。

高可用

副本机制

Kafka 为每个主题分区维护多个副本,这些副本分布在不同的 Broker 上。它们有明确的角色划分,遵循“主写主读”原则。

  • Leader 副本

    • 读写流量唯一入口所有生产者的写入请求和消费者的读取请求必须直接发送到分区的 Leader 副本。这是为了简化一致性模型,避免多主写入带来的数据冲突 。

    • 数据同步的源头:负责接收新消息,并将其同步给所有的 Follower 副本 。

    • 状态维护者跟踪所有 Follower 副本的同步进度,并据此维护一个关键指标——高水位(High Watermark, HW)​ 。

  • Follower 副本

    • 数据冗余备份其核心职责是被动地、异步地从 Leader 副本拉取消息,并将其持久化到自己的本地日志中,从而实现数据冗余 。

    • 故障备援Follower 副本不处理任何客户端请求它们的唯一价值在于,当 Leader 副本所在的 Broker 发生故障时,其中一个 Follower 副本能被选举为新的 Leader,从而快速恢复服务,实现高可用 。

数据同步不是简单的“发出即忘”,Kafka 通过一套精巧的机制来保证副本间的一致性。

  1. 同步过程(Pull 模式)

    1. 生产者写入:生产者将消息发送给分区的 Leader 副本 。

    2. Leader 本地持久化Leader 将消息顺序追加到本地的 Commit Log 中,并更新自己的 LEO(Log End Offset,日志末端偏移量)

    3. Follower 拉取各个 Follower 副本会启动独立的线程(如 ReplicaFetcherThread),定期向 Leader 发送 FETCH 请求,拉取新消息

    4. Follower 本地持久化:Follower 将拉取到的消息写入自己的本地日志,并更新自身的 LEO,然后向 Leader 返回一个确认(ACK)。

  2. ISR 机制与动态成员管理

    ISR 是理解 Kafka 副本机制的关键。它不是一个固定的集合,而是一个与 Leader 保持“同步”的 Follower 副本的动态列表(包括 Leader 自身)。

    1. 入队标准一个 Follower 副本若能持续地、在配置的时间阈值replica.lag.time.max.ms,如10秒)内与 Leader 保持同步(即追赶 Leader 的 LEO),就会被保留在 ISR 中

    2. 出队机制:如果一个 Follower 副本落后 Leader 太多(例如,由于网络故障或自身负载过高),并且持续时间超过了阈值,Leader 会将其从 ISR 中移除,放入 OSR(Out-of-Sync Replicas)集合。这意味着它暂时失去了被选举为 Leader 的资格 。

  3. 高水位与消息可见性

    1. HW 的定义:高水位是一个分区级别的偏移量标记。它被定义为 ISR 集合中所有副本 LEO 的最小值。它代表了一个消息提交的界限

    2. 消息安全性只有位移小于 HW 的消息才被认为是“已提交的”。这意味着这些消息已经被 ISR 中的所有副本成功复制,即使 Leader 立刻崩溃,数据也不会丢失

    3. 消费者可见性消费者只能消费到 HW 之前的消息。HW 之后的消息,即使已经被 Leader 写入,但对消费者是不可见的,因为它们尚未被足够多的副本确认,处于“未提交”的不安全状态 。

当故障发生时,这套机制能自动恢复。

  • Leader 选举:当 Leader 副本失效时,Kafka 集群的 Controller​ 组件会立即介入,并从当前ISR 集合中选举一个新的 Leader(默认是 ISR 中的第一个副本)。优先从 ISR 中选举能最大程度保证数据一致性,因为新 Leader 拥有最完整的数据。

  • 数据修复与截断:新的 Leader 开始工作后,其他 Follower 会从它这里同步数据。如果某个 Follower 副本之前因为故障持有一些未提交的脏数据(HW 之后的消息),在恢复同步时,它需要将这些数据截断,并重新从新的 Leader 拉取,以保持数据一致 。

ISR机制

ISR 的全称是 In-Sync Replicas(同步副本集)。要理解它,我们首先需要了解几个基本概念:

  • AR (Assigned Replicas):指分配给某个分区的所有副本的集合。

  • Leader 副本负责处理该分区所有读写请求的副本。生产者发送的消息总是先写入 Leader 副本。

  • Follower 副本:从 Leader 副本异步拉取数据,作为数据冗余的备份副本

  • OSR (Out-of-Sync Replicas)指那些由于网络延迟、机器故障等原因,导致其数据进度显著落后于 Leader 的 Follower 副本集合

因此,这三者的关系是:AR = ISR + OSR。ISR 就是 AR 中那些与 Leader 副本保持“同步”的“健康”子集,通常包括 Leader 本身和那些同步进度良好的 Follower 副本

ISR 机制的核心在于其动态性和基于时间的判断标准。

  1. 动态维护:Leader 副本负责实时监控所有 Follower 副本的同步状态。判断一个 Follower 是否“同步”的关键参数是 replica.lag.time.max.ms(默认10秒或30秒)。如果一个 Follower 副本在此时间窗口内未能成功从 Leader 拉取数据(例如,没有发送拉取请求或其数据进度落后),Leader 就会将其从 ISR 列表中移除,降级为 OSR。反之,当 OSR 中的副本重新追上了 Leader 的数据进度,又会被重新纳入 ISR。

  2. 高水位线 (HW) 与消息提交:这是保证一致性的关键。HW(High Watermark)代表已提交消息的偏移量,即消费者可以读取到的最大位置。一条消息只有在被 ISR 中所有副本都成功复制后,Leader 才会推进 HW,这条消息才被视为“已提交”,从而对消费者可见。这样可以防止消费者读到那些仅存在于 Leader 但可能因故障丢失的“未提交”消息。

ISR 如何平衡一致性与可用性?

ISR 机制的精妙之处在于它没有采用代价高昂的“全部副本确认”的强一致性方案,也没有采用风险较高的“Leader确认即成功”的弱一致性方案,而是找到了一个动态的平衡点。

下表清晰地展示了 ISR 机制在不同场景下如何通过关键配置来权衡一致性和可用性:

机制/场景

对一致性的影响

对可用性的影响

关键配置参数

生产者 ACK 机制

acks=all:必须等待 ISR 所有副本确认,一致性最高

acks=1:只需 Leader 确认,写入延迟低,可用性高

acks

Leader 选举

unclean.leader.election.enable=false:只从 ISR 中选举,保证数据一致

若 ISR 全挂,分区不可用,牺牲可用性

unclean.leader.election.enable

最小同步副本数

min.insync.replicas=2:即使 Leader 挂掉,仍有副本数据完整。

若存活副本数少于设定值,分区拒绝写入,牺牲可用性

min.insync.replicas

高水位(HW)和日志末端位移(LEO)

概念

全称

定义

核心作用

LEO

Log End Offset

每个副本(包括Leader和Follower)独有的指针,指向其日志文件中下一条待写入消息的位置。例如,若已写入10条消息(offset 0-9),则LEO=10。

跟踪每个副本的实时写入进度

HW

High Watermark

分区级别的全局标记,其值是所有处于同步状态的副本(ISR)的LEO中的最小值。它标识了已提交消息的边界。

定义消息的可见性安全性。消费者只能消费HW之前的消息。

核心关系:在一个分区内,任何副本的 HW ≤ 其自身的 LEO。并且,对于Leader副本,其 HW = min(ISR中所有副本的LEO)。

如何保证数据不丢失?

Kafka通过一套精巧的流程,让HW和LEO协同工作,确保数据在分布式环境下不会丢失。这套机制的核心是 “已提交”​ 状态。

1. 消息提交与可见性

  • 一条消息只有在被ISR中的所有副本都成功复制后,才被认为是“已提交”。

  • HW的推进标志着消息的提交。只有HW之前的消息才对消费者可见。这意味着,即使Leader副本所在Broker突然宕机,由于ISR中至少有一个Follower副本也拥有了这条消息,数据不会丢失。

2. 故障恢复与数据一致性

当故障发生时,HW是恢复数据一致性的基准:

  • Follower副本故障:当发生故障的Follower副本恢复后,它会将自己的日志截断到本地记录的HW处,然后从Leader重新同步HW之后的数据。这确保了它不会含有任何未被ISR确认的“脏数据”。

  • Leader副本故障:当Leader宕机,Kafka会从ISR中选举一个新的Leader。新Leader上台后,所有Follower副本会将自己的日志截断到各自记录的HW处,然后从新Leader同步。这个过程会丢弃所有HW之后未被确认的消息,虽然可能造成少量数据丢失,但严格保证了分区内所有副本的数据最终一致性。

Kafka为什么这么快?

写入优化:将随机写变为顺序写

这是Kafka高性能的基石。

  • 顺序追加写入:Kafka 将所有消息严格按顺序追加日志文件末尾,只进行追加操作而不修改已有数据。这种顺序I/O​ 的性能远超随机I/O,因为在传统机械硬盘上,顺序I/O可以避免磁头频繁寻道,其性能可能与内存的随机读写相当甚至更快。即便是固态硬盘,顺序I/O也能更好地发挥其性能。

  • 批处理与延迟:生产者发送消息时并非“来一条发一条”,而是会在内存中积攒一批消息,然后一次性发送到Broker。这通过参数 batch.size(批量大小)和 linger.ms(等待时间)控制。它将大量小的网络IO请求合并为少量大的IO,极大地提高了网络吞吐量和Broker的写入效率。

存储优化:高效的数据组织与缓存

  • 分段日志与稀疏索引一个主题的分区在物理上被划分为多个。当某个日志段文件达到一定大小后,就会滚动创建新的文件。这样做便于老旧数据的删除和索引的维护。Kafka还为每个日志段建立了稀疏索引,它不会为每条消息创建索引项,而是每隔一定字节的数据建立一条索引在查找消息时,先通过索引快速定位到大致区域,再在该区域内进行少量顺序扫描,从而在索引文件大小和查询效率之间取得完美平衡。

  • 利用页缓存:Kafka 重度依赖操作系统的页缓存,而非在JVM堆内存中缓存数据。消息写入时,直接进入页缓存;消息读取时,也优先从页缓存中查找。这样做的好处极大:

    • 避免GC开销:避免了在JVM中管理大量缓存对象带来的垃圾回收开销。

    • 利用OS优化:操作系统会使用空闲内存作为页缓存,读写速度接近内存。

    • 重启缓存不失效:即使Broker重启,页缓存中的数据也不会立刻清空,热数据依然有效

读取优化:零拷贝与并行消费

  • 零拷贝技术:这是提升消费端性能的关键。在传统的数据读取流程中,数据从磁盘到网卡需要经历4次上下文切换和4次数据拷贝。而Kafka通过调用 sendfile系统调用,实现了零拷贝数据可以直接从磁盘的页缓存通过网络接口卡发送出去,无需拷贝到用户空间。这消除了不必要的CPU拷贝和上下文切换,显著降低了CPU开销和延迟。

  • 分区与消费者组:Kafka通过分区实现了数据的天然并行。一个主题可以被分为多个分区,分散在不同的Broker上。消费者组模型允许多个消费者并行消费同一个主题的不同分区,水平扩展了消费能力。这是Kafka能够支撑高吞吐读请求的架构基础。

整体协作与其它优化

  • 高效的序列化与压缩:生产者支持在发送前对整批消息进行压缩,减少网络传输和磁盘占用的带宽。Kafka内部使用高效的二进制格式进行通信,进一步减少了开销。

  • 无锁设计:在Broker端,每个分区的Leader在处理写入请求时,由于是顺序追加,可以避免复杂的锁竞争,简化了处理流程,提高了并发性。

顺序I/O

Kafka 之所以能实现极高的吞吐量,其核心秘诀之一就在于它彻底摒弃了随机磁盘I/O,转而全面拥抱顺序I/O(追加写)

传统观念认为“磁盘慢”,但这其实是对随机I/O而言的。磁盘操作中最耗时的部分是寻道(移动磁头到正确磁道)和旋转延迟(等待盘片旋转到正确扇区),这个过程可能耗时数毫秒至十几毫秒。而真正传输数据的时间反而很短。

Kafka 的巧妙之处在于,它通过只追加(Append-only)​ 的方式写入消息,将大量的随机小写操作转换为连续的批量大顺序写这样,磁头几乎无需大幅度移动,可以持续不断地写入数据,从而将磁盘的顺序读写性能(机械硬盘可达 100-200 MB/s,SSD 可达数 GB/s)发挥到极致,其性能甚至可以媲美内存的随机访问。

Kafka 的 I/O 设计哲学深受日志结构文件系统的影响。

  • 只追加的日志Kafka 将每个分区(Partition)​ 本质上视为一个只能追加写入的日志文件。生产者发送的新消息被严格地、连续地追加到文件的末尾,永不允许修改或删除已有数据。这种不可变性(Immutable)简化了并发控制和数据一致性保证。

  • 分段滚动为了防止单个日志文件无限增大,Kafka 采用了分段(Segment)​ 机制。当当前活跃的日志文件达到一定大小(例如 1GB)后,就会“滚动”创建一个新的文件继续写入。这种机制不仅便于文件管理,也使得清理过期数据变得非常简单高效——直接删除整个旧的 Segment 文件即可,这依然是顺序大块操作,避免了在文件内部随机删除导致的碎片化和性能抖动。

Kafka中的零拷贝

传统方法 readwrite低效。当需要发送一个文件时,传统的步骤是:

  1. 读取文件:程序调用 read函数。这会导致一次上下文切换(从用户态切换到内核态)。然后,数据通过 DMA​ 技术从硬盘直接拷贝到内核缓冲区。接着,数据又由 CPU​ 从内核缓冲区拷贝到用户缓冲区此时发生第二次上下文切换(切换回用户态)。

  2. 发送数据:程序调用 write函数。这引发第三次上下文切换(用户态到内核态)。CPU​ 再次将数据从用户缓冲区拷贝到内核的 Socket 缓冲区。最后,数据通过 DMA​ 从 Socket 缓冲区拷贝到网卡进行发送。完成后,第四次上下文切换发生(内核态回用户态)。

sendfile系统调用设计得非常巧妙,它通过“绕过”用户空间,直接在操作系统内核中完成文件数据的传输。其核心流程如下:

  1. DMA 将文件数据从磁盘拷贝到内核缓冲区(读缓存)。这一步由 DMA 完成,无需 CPU 参与。
  2. 然后,内核将数据直接从内核缓冲区拷贝到相关的内核 Socket 缓冲区。注意,这个拷贝操作是在内核内部完成的,不需要将数据拷贝到用户空间。
  3. 最后,DMA 将数据从 Socket 缓冲区拷贝到网卡,发送出去。

基于上述原理,sendfile带来的性能提升主要来自两方面:

  1. 减少数据拷贝次数:这是最直接的收益。它消除了两次不必要的 CPU 数据拷贝(内核缓冲区到用户缓冲区,用户缓冲区到 Socket 缓冲区),将 4 次拷贝减少到 2 次,显著降低了 CPU 的负担。

  2. 减少上下文切换:由于整个过程都在内核态完成,避免了在用户态和内核态之间频繁切换(从 4 次减少到 2 次)。上下文切换是昂贵的系统操作,减少它能有效提升系统效率。

批处理消息压缩

如果 Producer 每产生一条消息就立即发送,会产生大量的小网络数据包,导致网络利用率低下,并给 Broker 带来巨大的请求处理压力。批处理就是解决这个问题的方案。

核心组件:RecordAccumulator

Producer 内部有一个核心的内存缓冲区,称为 RecordAccumulator

  • 按分区打包RecordAccumulator为每个分区维护一个 Deque<ProducerBatch>(双端队列),里面存放着多个 ProducerBatch(生产批次)。当消息经过序列化和分区计算后,会被追加到对应分区的队列中。如果当前批次未满且有空间,则直接追加;如果批次已满或没有可用批次,则创建一个新的批次。

  • ProducerBatch:这是批处理的基本单位。一个 ProducerBatch包含了一批消息(多条 ProducerRecord),它最终会被封装成一个网络请求 ProduceRequest发送给 Broker。

发送触发条件

Sender 线程不会无休止地等待。它会在以下条件满足之一时,将批次从 RecordAccumulator中取出并发送:

  • 批次已满:一个 ProducerBatch的大小达到了配置的 batch.size(例如 16KB)。

  • 等待超时:从第一条消息进入批次开始计时,超过了配置的 linger.ms(例如 0ms,表示立即发送;或设为 5ms 以等待更多消息填充批次)。

  • 其他条件RecordAccumulator内存耗尽、Producer 即将关闭等。

即使消息被打包成批次,如果消息内容本身很大(例如 JSON 日志),网络传输和磁盘存储的成本依然很高。消息压缩就是在发送前,将整个批次的消息进行压缩,以节省带宽和存储空间。

压缩的时机与粒度

  • Producer 端压缩:压缩发生在 Producer 客户端,在 Sender 线程将批次发送到网络之前。压缩的最小单位是整个 ProducerBatch,而不是单条消息。这意味着,更大的批次通常能获得更好的压缩率。

  • 端到端压缩:被压缩的批次会保持压缩状态发送到 Broker,并以压缩形式存储在磁盘上。当 Consumer 拉取消息时,收到的仍然是压缩的数据,需要在 Consumer 端进行解压。这种设计避免了 Broker 解压和再压缩

Kafka 支持多种压缩算法,各有优劣:

算法

压缩比

压缩/解压速度

CPU 开销

适用场景

GZIP

最高

最慢

网络带宽极其昂贵,且对延迟不敏感的场景。

Snappy

中等

非常快

通用推荐,在压缩比和速度间取得了良好平衡,由 Google 开发。

LZ4

中等偏高

最快

非常低

极致性能场景,当 CPU 资源紧张或追求最低延迟时首选。

Zstandard (zstd)

接近 GZIP

比 Snappy 稍慢,但比 GZIP 快很多

中等

新一代算法,在压缩比和速度上都有很好表现,是未来趋势。

选择时需要考虑:

  • 如果网络带宽是瓶颈,优先选择压缩比高的算法(如 GZIP, zstd)。

  • 如果CPU 资源是瓶颈,或追求端到端最低延迟,优先选择速度快的算法(如 LZ4, Snappy)。

一些基本问题

如何保证消息不丢失

Kafka 通过生产、存储、消费三个环节的协同机制来保证消息不丢失。

生产者端保证
  • acks=all(或 -1:要求 Leader 收到消息后,必须等待所有 ISR 中的副本都同步成功才返回确认。这是最强的可靠性保证。

  • min.insync.replicas:与 acks=all配合使用,定义 ISR 的最小副本数(包括 Leader)。例如设置为 2,即使只有一个副本存活,写入也会失败,防止数据冗余不足。

  • 重试机制:设置 retries为较大值(如 Integer.MAX_VALUE)并启用幂等性(enable.idempotence=true),确保网络抖动等临时故障不会导致消息丢失。

  • 同步发送:使用 send().get()或带回调的异步发送,确保及时获知发送失败并进行处理。

Broker 端保证
  • 副本机制:设置 replication.factor >= 3,确保消息有多个副本。

  • ISR 机制:Leader 维护一个同步副本列表(ISR),只有 ISR 中的副本才参与选举和数据同步,防止落后副本成为 Leader 导致数据丢失。

  • 持久化:虽然 Kafka 依赖操作系统页缓存异步刷盘,但可通过 flush.messages和 flush.ms控制刷盘频率,极端情况下可设置 log.flush.interval.messages=1和 log.flush.interval.ms=1000来更频繁刷盘(但会牺牲性能)。

消费者端保证
  • 手动提交偏移量:禁用自动提交(enable.auto.commit=false),在消息处理完成后再手动提交偏移量,确保“消费后才提交”。

  • 正确处理偏移量:在消费者均衡(Rebalance)或重启时,确保偏移量提交到正确的消息位置,避免重复消费或丢失。

如何处理消息重复

Kafka 提供“至少一次”语义,消息重复不可避免,需在消费端实现幂等性。

重复产生的原因
  1. 生产者重试:网络超时导致生产者未收到确认而重发,Broker 可能收到重复消息。

  2. 消费者重平衡:消费者组发生重平衡时,可能重复提交偏移量,导致消息被重复消费。

  3. 消费者提交偏移量失败:消息处理后,提交偏移量前消费者崩溃,重启后从上次提交的偏移量重新消费。

解决方案
  • 生产者端:启用幂等性(enable.idempotence=true),Kafka 会为每个生产者分配唯一 ID 并为消息分配序列号,Broker 据此去重。

  • 消费者端:实现业务逻辑的幂等性,常用方法:

    • 数据库唯一约束:利用主键或唯一索引去重。

    • Redis 原子操作:使用 SETNX命令或分布式锁。

    • 状态机:业务状态流转时,确保只有当前状态才能转到下一状态。

如何处理消息积压

消息积压是消费速度低于生产速度导致的。处理思路是“先治标,再治本”。

临时扩容(治标)
  1. 增加分区数:分区是并行消费的最小单位。增加分区数可提升消费能力,但需重启集群。

  2. 增加消费者实例:确保消费者实例数不超过分区数,否则多余的消费者闲置。

  3. 调整消费者参数

    • fetch.min.bytes:增加每次拉取的最小字节数,提高吞吐量但增加延迟。

    • max.poll.records:增加单次拉取的消息数。

    • 调整 session.timeout.ms和 max.poll.interval.ms,避免误判消费者死亡而触发重平衡。

优化消费逻辑(治本)
  1. 批量处理:将多次数据库操作合并为批量操作,减少 I/O。

  2. 异步处理:将非关键操作异步化,避免阻塞主流程。

  3. 优化代码:检查消费逻辑是否存在性能瓶颈,如复杂计算、低效查询等。

如何保证消息有序

Kafka 仅保证分区内有序,不保证全局有序。

分区内有序
  • 生产者:将需要有序的消息指定相同的 KeyKafka 根据 Key 的哈希值将消息发送到同一分区。设置 max.in.flight.requests.per.connection=1可确保同一连接上的消息按发送顺序写入,但会降低吞吐量。

  • 消费者:使用单线程按分区消费,一个分区只能被一个消费者线程消费。

全局有序
  • 将 Topic 设置为单分区,但会严重限制吞吐量,仅适用于消息量极小的场景。

  • 使用事务幂等性生产者确保跨分区的原子性写入,但无法保证跨分区消费的顺序。

如何实现消息过滤

Kafka 本身不提供类似 RocketMQ 的 Tag 或 SQL 过滤功能,但可通过以下方式实现:

消费者端过滤
  • 代码中过滤:消费者拉取消息后,在业务代码中根据条件过滤。简单但浪费带宽和 CPU。

    for (ConsumerRecord<String, String> record : records) {
        if (record.key().equals("important")) {
            process(record);
        }
    }

流处理过滤
  • Kafka Streams:使用 Kafka Streams API 将原始 Topic 过滤后写入新 Topic,消费者消费新 Topic。

KStream<String, String> source = builder.stream("source-topic");
KStream<String, String> filtered = source.filter((key, value) -> value.contains("important"));
filtered.to("filtered-topic");
使用 Connect 或第三方工具
  • Kafka Connect:配合单消息转换(SMT)进行过滤。

  • Flink/Spark Streaming:使用流处理框架进行复杂过滤和转换。

分区策略过滤
  • 将不同类型的数据写入不同的 Topic,消费者按需订阅。

  • 使用 Key 进行分区,相同类型的数据集中在特定分区,消费者可选择消费特定分区(但不推荐,破坏了负载均衡)。

如何实现延时消息

Kafka 原生不支持延时消息,需通过以下方式模拟:

消费者轮询(延迟消费)
  • 消息包含期望的执行时间戳,消费者拉取后,若未到时间则暂停消费该分区(使用 pause()),并记录该消息。通过定时任务或延迟队列在到期后恢复消费resume())。

  • 缺点:实现复杂,可能影响其他消息的消费。

多级 Topic 方案
  1. 创建多个延迟级别 Topic(如 delay-1sdelay-5sdelay-30s)。

  2. 生产者将延迟消息发送到对应延迟级别的 Topic。

  3. 消费者消费延迟 Topic,到期后转发到目标 Topic。

  4. 最终消费者消费目标 Topic。

如何实现死信队列

Kafka 没有内置死信队列(DLQ),但可基于其重试机制模拟实现。

重试机制
  • 消费者消费失败时,可将消息发送到重试 Topic,并设置延迟时间(通过时间戳或延迟 Topic 方案)。

  • 重试 Topic 由专门的重试消费者消费,失败多次后转入死信 Topic。

实现步骤
  1. 定义重试 Topic 和死信 Topic

    • original-topic:原始 Topic。

    • retry-topic-1retry-topic-2:多级重试 Topic。

    • dead-letter-topic:死信 Topic。

  2. 消费者逻辑

    try {
        process(message);
        consumer.commitSync();
    } catch (Exception e) {
        int retryCount = getRetryCount(message);
        if (retryCount >= MAX_RETRIES) {
            // 发送到死信队列
            sendToDeadLetterTopic(message);
            consumer.commitSync(); // 跳过该消息
        } else {
            // 发送到重试队列,设置延迟时间
            sendToRetryTopic(message, retryCount + 1);
            consumer.commitSync();
        }
    }
  3. 重试消费者:消费重试 Topic,将消息重新发送回原始 Topic 或下一级重试 Topic。、

    • 使用 Kafka Streams 的 branch()或 flatMap()将处理失败的消息路由到死信 Topic。

      KStream<String, String> stream = builder.stream("input-topic");
      KStream<String, String>[] branches = stream.branch(
          (key, value) -> processSuccessfully(value), // 成功
          (key, value) -> true // 失败
      );
      branches[0].to("success-topic");
      branches[1].to("dead-letter-topic");

Kafka 在消息不丢失、顺序性方面提供了较强的原生支持,但在消息过滤、延时消息、死信队列等方面需要结合业务逻辑或外部系统实现。理解其设计哲学(追求高吞吐、可扩展性)有助于合理选择解决方案。在实际应用中,常将 Kafka 与流处理框架(如 Kafka Streams、Flink)结合,构建更复杂的消息处理管道。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐