存储模块

存储模块的主要流程是数据的写入、存储、读取、过期。持久化存储是基本的功能,但消息队列主要是用于缓冲分发,它的数据存储是临时的,数据持久化存储后,在一定的时间或操作后,需要自动过期删除,而消息队列中的数据一般分为元数据和消息数据,元数据指的是Topic、Group、User、ACL、Config等集群维度的资源数据信息,消息数据指的是客户端写入的用户业务数据


存储功能实现


元数据信息的存储

元数据信息的特点是数据量比较少,而且不会经常进行读写操作,需要保证数据的强一致性和高可靠性,不允许出现数据的丢失。同时,元数据信息一般需要通知到所有的Broker节点,Broker根据元数据信息执行具体的逻辑,比如创建Topic并生产元数据后,就需要去通知对应的Broker节点去执行创建分区、创建目录等操作。一般元数据的存储有两种思路:

1.基于第三方组件实现元数据的存储

基于第三方组件实现元素据的存储是业界的主流选择,比如KafKa ZooKeeper版本、RocketMQ、Pulsar用的就是这个思路,其中KafKaPulsar的元数据存储在ZooKeeper中,RocketMQ存储在NameServer中。这个方案的最大优点就是集成方便、开发成本低,能满足消息队列功能层面的基本要求,因为我们可以直接复用第三方组件已经实现的一致性存储、高性能的读写和存储、Hook机制等能力,而且在后续集群构建中也可以复用这个组件,能极大降低开发难度和工作成本,但引入第三方组件会增加系统部署和运维的复杂度,而且第三方组件自身的稳定性问题会增加系统风险,第三方组件和多台Broker之间可能会出现数据信息不一致的情况,导致读写异常


2.在集群内部实现元数据的存储

集群内部实现元数据的存储指的是在集群内部完成元数据的存储和分发,也就是在集群内部实现类似于第三方组件一样的元数据服务,比如基于Raft协议实现内部的元数据存储模块或依赖一些内置的数据库。目前KafKa ZooKeeper的版本、RocketMQMnesiaKafKaC++版本RedPanda用的就是这个思路。因为这个方案没有使用第三方组件,是在集群内部自己实现的,所以这个方案的部署和运维成本低,不会因为依赖第三方服务导致稳定性问题,也不会有数据不一致的问题。但是因为不是像第三方组件这样现成的,一致性存储、高性能读写、Hook机制这些功能都要自己实现,所以开发成本高,前期需要投入大量的开发人力


如何选择:元数据的存储思路的选择

当前主流的方案是第一种方案,主要出于考虑开发成本。用第三方组件导致的稳定性问题,大部分可以通过后期的运维、运营、编码技巧来解决规避或降低发送频率,但如果前期开发投入成本太大,架构太复杂,会影响项目的成型和业务的使用,所以在项目前期大部分会选择这个方案

如果消息队列核心架构已成熟或前期允许有较大的投入,才会建议使用方案二。因为第一种方案虽然开发成本较低,但使用成本、机器资源成本、运维成本还是偏高,另外一些稳定性问题,比如元数据不一致,因为第三方组件的存在是无法根治的,会有长久的隐患


消息数据的存储

消息队列的存储主要指的是消息数据的存储,分为存储结构、数据分段、数据存储格式、数据清理四部分

数据存储结构的设计

  • 从数据存储目录结构设计上看,消息队列和存储有关的主要是Topic和分区者两个维度,用户可以将数据写入Topic或直接写入分区,不过写入Topic的数据也是分发到多个分区进行存储的,从实际数据存储的角度看,TopicGroup不承担数据存储功能,而是承担逻辑组织的功能,实际的数据存储是在分区维度完成的

image.png

数据的落盘存储(存储结构)
1.每一个分区单独存储一个“文件”

每个分区Partition对应一个文件存储数据,那么每个分区的数据都是顺序写入磁盘的,数据的存储是连续的,因为消息队列中大部分情况的读写都是有序的。所以这种机制在读写性能上的表现也是最高的,但是分区如果太多的话就会占用过多的系统FD资源,甚至出现极端情况把所有节点的FD资源耗尽,并使得硬盘出现大量随机写,从而导致性能大幅下降,管理起来也相对复杂。

KafKa在存储数据的组织上采用的就是这个思路。

image.png


2.每个节点上所有分区的数据存储在同一个“文件”

一个节点可能是有多个分区的,所有的分区只对应一个FIle文件,每个节点上所有分区的数据都存储在同一个文件中,所以需要对每个分区维护一个对应的索引文件,索引文件会记录每条消息在File中的位置信息可以快速定位到具体的消息内容。因为每一个分区对应的数据都存储在一个文件中,即File文件中,所以管理简单,不会占用过多的系统FD资源,单机上的数据写入都是顺序的,写入性能很高,但是这里无法利用消息队列按顺序读取的优势了,因为同一个分区对应的数据会分布在文件的不同位置,即分区中的数据是无序的,所以利用不了按顺序读的优势,但随着SSD技术的发展,随机读写的性能也越来越高,不过成本会比机械硬盘高很多。

目前RocketMQ、RabbitMQ、Pulsar的底层存储BooKKeeper使用的就是这个方案

假设这个二统一的文件叫commitlog,则commitlog就是用来存储数据的文件,idnex是每个分区的索引信息,标记这个分区在文件的什么位置

image.png


数据落盘存储方式的选择

核心考虑点是你对读和写的性能要求:

  • 每个分区单独存储一个“文件”:单个文件读和写是顺序的,性能最高。但当文件很多且都有读写的场景,硬盘层面就会退化为随机读写,性能会严重下降。

    • 关于FD的占用问题,Linux上的FD数是可以配置的,比如配置几十万个FD是没问题的,所以我们一般不会用完系统的FD限制,这一点在实际的落地中不需要太担心

  • 每个节点上所有分区的数据存储在同一个“文件”:因为只有一个文件,不存在文件过多的情况,写入层面一直都会是顺序的,性能一直很高,但在消费的时候,多个分区的数据存储在同一个文件中,同一个分区的数据在底层存储上是不连续的,硬盘层面会出现随机读的情况,导致读取的性能降低

    • 不过随机读带来的性能问题,可以通过给底层配备高性能的硬件来缓解。所以当前比较多的消息队列选用第二种方案,但是KafKa为了保证更高的吞吐性能,选用的是第一种方案

不管是第一种还是第二种,在数据存储过程中,如果单个文件过大,在文件加载、写入、检索的时候,性能就会有问题,并且消息队列有自动过期机制,如果单个文件过大,数据清理时很麻烦,效率很低。所以我们消息数据都会分段存储


消息数据的分段实现(数据分段)

数据分段的规则一般是根据大小来进行的,比如默认1G一个文件,同时会支持配置项调整分段数据的大小。当数据达到规定的大小后,就会创建一个新的文件来保存数据,如果进行了分段,消息数据就可能分布到不同的文件中,每当我们读取数据时,就需要定位消息在哪个文件,者通常通过偏移量定位或索引定位

根据偏移量定位消息在哪个文件中,是通过记录每个数据段文件的起始偏移量、中止偏移量、消息的偏移量,来快速定位消息在哪个文件。当消息存储时,通常会用一个自增的数值型数据,来表示这条数据在分区或commitlog中的位置,这个值就是消息的偏移量

  • 在实际的编码中,可以单独记录每个数据段的起始和结束偏移量,在文件名称中携带起始偏移量信息,因为数据是顺序存储的,每个文件记录了本文件的起始偏移量,这样在下一个文件的起始偏移量就是上一个文件的结束偏移量

根据索引定位消息在哪个文件,会直接存储消息对应的文件信息,具体来说是维护一个单独的索引文件,记录消息在哪个文件的哪个位置,读取消息时,先根据消息ID找到存储的信息,然后找到对应的文件和位置来读取数据,RabbitMQ和RocketMQ用的就是这个思路

通过偏移量定位或索引定位这两种方案面对的是不同的场景

  • 根据偏移量定位数据:通常用在每个分区单独存储一个“文件”的场景,因此通过位点来二分查找数据的效率最高

  • 根据索引定位数据:通常用在所有分区的数据存储在同一份文件的场景,这一份数据属于多个不同分区,则通过二分查找来查找数据效率低,用哈希查找效率最高

image.png

image.png

image.png


消息数据存储格式

消息数据存储格式一般包含消息写入文件的格式消息内容的格式两个方面:

  • 消息写入文件的格式:指的是消息以什么格式写入到文件中,比如JSON字符串或 二进制,从性能和空间冗余的角度来看,消息队列中的数据基本都是以二进制的格式写入文件的,但这也使得我们不能直接用vim/cat等命令来查看文件数据,需要使用专门的工具进行读取,比如我们想要查看KafKa消息数据存储文件中的内容,需要使用日志解析工具kafka.tools.DumpLogSegments查看,才是格式化的数据

  • 消息内容的格式指的是写入到文件中的数据都包含哪些信息:对于一个成熟的消息队列来说,消息内容格式不仅关系功能维度的拓展,还牵涉性能维度的优化,如果消息格式设计不够精简,功能和性能都会大打折扣。比如冗余字段会增加分区的磁盘占用空间,使存储和网络开销变大,性能也会下降。如果缺少字段可能导致无法实现某些功能,或者实现某些功能的成本较高,因此在数据存储格式设计方面,内容的格式需要尽量完整且不要有太多冗余。以KafKa和RocketMQ的消息内容格式设计为例:

    • KafKa的消息内容包含了业务能感知到的消息的Header、Key、Value,还包含了时间戳、偏移量、协议版本、数据长度和大小、校验码等基础信息,最后还包含了压缩、事务、幂等KafKa业务相关的信息

    • RocketMQ的存储格式中也包含基础的Properties(相当于kafKa中的Header)、Value、时间戳、偏移量、协议版本、数据长度和大小、校验码等信息,还包含了系统标记、事务等RocketMQ特有的信息,另外还包含了数据来源和数据目标的节点信息

    • 对比看,消息数据的存储格式虽然没有统一的规范,但是一般包含通用信息和业务信息两部分。通用信息主要包括时间戳、CRC校验码、消息头、消息体、偏移量、长度、大小等信息,业务信息主要跟业务相关,包含事务、幂等、系统标记、数据来源、数据目标等信息。


消息数据清理机制

消息队列中的数据最终都会被删除,时间周期短的话几小时、甚至几分钟,长的话可能一个月,基本很少有场景需要消息队列存储一年的数据。而消息队列的数据过期机制一般有手动删除和自动删除两种形式。一般分为三种实现思路:

  • 消费完成执行ACK删除数据

    • 当客户端成功消费数据后,会回调服务端的ACK接口,告知服务端数据已经消费成功,服务端就会标记删除改行数据,以确保消息不会被重复消费。而ACK的请求一般会有单条消息ACK和批量消息ACK两种形式。因为消息队列的ACK一般是顺序的,如果前一条消息无法被正确处理并ACK,就无法消费下一条数据,导致消费卡住,此时就需要死信队列的功能把这条数据先写入到死信队列,等待后续处理。然后ACK这条消息,确保消费正确进行。这个方案不会出现重复消费,一条信息只会被消费一次,但ACK成功后消息会被删除,无法满足需要消息重放的场景

      • 电商秒杀场景,避免超卖问题:库存服务消费扣减消息,扣减成功后立即删除消息,消息处理失败时,通过死信队列补偿库存

      • 金融交易场景:支付网关收到支付回调后,消费订单状态变更消息,ACK后立即删除,若处理失败,如数据库宕机,消息进入死信队列人工处理

  • 根据时间和保留大小删除

    • 服务端提供偏移量提交的接口,当客户端消费成功后,客户端会回调偏移量提交接口,告知服务端这个偏移量的数据已经消费成功,让服务端把偏移量记录起来。然后服务端会根据消息保留的策略,比如保留时间或保留大小,通过一个常驻的异步线程来清理数据。 这个方案可以让一条消息重复消费多次,不管消息有没有成功消费,消息都会根据配置的时间规则或大小规则进行删除,这使得消息可以多次重放,适用于需要多次进行重放的场景,比如,但在某些情况下(比如客户端使用不当)会出现大量的重复消费。

      • 日志采集与分析:原始日志需要多批次分析

      • 用户行为分析:AB测试需要回溯历史行为数据

    • 通过对比不同版本的方案(如页面设计、算法策略),以数据驱动决策的实验方法,将用户随机分为对照组(A组)和实验组(B组),通过埋点记录用户行为(如点击、停留时长)。

  • ACK机制和过期机制相结合

    • 结合前两个方案,就有了ACK 机制和过期机制相结合的方案。当消息完成后,在 Group 维度 ACK 消息,此时消息不会被删除,只是这个 Group 也不会再重复消费到这个消息,而新的 Group 可以重新消费订阅这些数据。所以Group 维度避免了重复消费的情况,也可以允许重复订阅。

      • 跨团队数据共享:原始数据需要被多个业务方复用。电商订单数据在Kafka保留7天,支付团队消费后提交位移(不再接收),风控团队独立位移消费相同数据,数仓团队每天全量消费归档

        • 风控团队是负责识别、评估和管理企业业务中各类风险的团队,核心目标是降低潜在损失,保障业务安全。

        • 数仓团队负责构建和维护企业级数据仓库,整合多源数据,为业务分析、报表和决策提供统一的数据服务。

    • 纵观业界主流消息队列,三种方案都有在使用,RabbitMQ 选择的是第一个方案,KafkaRocketMQ 选择的是第二种方案,Pulsar 选择的是第三种方案。不同消息队列的方案选择,主要都是考虑架构设计和组件开发时业务场景的影响。

    • 消息数据是顺序存储在文件中的,会有很多分段数据,一个文件可能会有很多行数据。那么在 ACK 或者数据删除时,一个文件中可能既存在可删除数据,也存在不可删除数据。如果我们每次都立即删除数据,需要不断执行“读取文件、找到记录、删除记录、写入文件”的过程,即使批量操作,降低频率,还是得不断地重复这个过程,会导致性能明显下降。当前主流的思路都是延时删除以段数据为单位清理降低频繁修改文件内容和频繁随机读写文件的操作。只有该段里面的数据都允许删除后,才会把数据删除。而删除该段数据中的某条数据时,会先对数据进行标记删除,比如在内存或 Backlog 文件中记录待删除数据,然后在消费的时候感知这个标记,这样就不会重复消费这些数据。


存储功能性能优化

存储模块的性能优化主要从写的快和读得快两方面入手:

  • 内存读写的效率高于硬盘读写

  • 批量读写的效率高于单条读写

  • 顺序读写的效率高于随机读写

  • 数据复制次数越多,效率越低


提升写入操作的性能

消息队列中的数据最终是存储到文件中的,数据的写入需要经过内存,最终才会被写入磁盘。在mysql、redis中数据都是先写入内存的,然后通过持久化刷到磁盘上

所以写入操作的性能优化就要从磁盘和硬盘展开进行优化,写入优化主要有缓存写、批量写、顺序写三个思路


1.缓存写和批量写

在计算机多级存储模型中,层级越高速度越快,但容量也会越小,价格越贵,因此写入速度从快到慢依次是:寄存器、缓存、主存、本地存储、远程存储。因此内存的读写效率高于磁盘、批量读写效率高于单条读写,所以将数据写入速度更快的内存中,等积攒了一批数据之后,再批量刷回磁盘中,就是一种优化方式

就是批量地把数据先写入到内存中,积攒一批数据之后,再将这批数据批量刷到磁盘中

其实就是将硬盘中的数据存到内存中,然后每次写操作就是对内存进行访问,而不是对硬盘进行访问,然后对内存的数据的写操作结束之后,再通过某种方式将内存的数据刷回磁盘,确保数据的一致性。一般情况下内存数据是自动批量刷写回磁盘的。而将缓存数据刷回磁盘一般有按空间占用比例刷新、按时间周期刷新、手动强制刷新三种策略:

  • 按空间占用比例刷新:当系统内存中的脏数据大于某个阈值后,将缓存数据刷写回磁盘,操作系统提供了两个配置项。一个是脏数据在内存中的占比dirty_background_radio,另一个是脏数据的绝对字节数dirty_background_bytes。当这两个配置项超过阈值后,就会触发刷新操作,把内存上的数据刷回到磁盘中。如果两者同时设置,则以绝对字节数为更高级

  • 按时间周期刷新:根据配置好的时间,周期性刷新缓存数据回磁盘中,这是通过脏页存货时间dirty_expire_second和刷新周期dirty_writeback_centisecs配置的,这两个配置默认的时间间隔都是100秒,根据刷新周期的配置,周期性执行刷新,刷新会检查脏页的存活时间是否超过配置的最大存活时间,如果是则刷写回磁盘

  • 手动强制刷新:通过系统提供的sync()、msync()、fsync()调用来强制刷新缓存,在Java中可以通过Java.NIO包中FileChannel提供的write()force()方法,实现写缓存和强制刷新缓存。通过FileChannel提供的write()方法写数据时,FileChannel把数据写入到缓存就会返回成功,然后根据操作系统的缓存更新策略,将数据刷新到磁盘。我们可以在代码中调用FileChannel提供的force()方法,把数据立即刷写回磁盘中,以免丢失

    • 基本所有的消息队列在写入时用的都是这个方案,手动强制刷新。比如KafKa、RocketMQ、Pulsar就是先写入缓存,然后依赖操作系统的策略刷新数据到磁盘。消息队列一般会同时提供:是否同步刷盘、刷盘时间周期、刷盘的空间比例三个配置项,让业务根据需要调整自己的刷新策略。从性能角度看,异步刷新肯定是性能最高的,同步刷新是可靠性最高的


随机写和顺序写

随机写和顺序写都是针对磁盘,针对整个操作系统和硬盘的关系,而不是单个文件和磁盘的关系。单文件顺序写入磁盘时,硬盘控制器只需要在连续的存储区域中写入数据,对硬盘来讲,数据就是顺序写入的。多文件顺序写入硬盘时,系统中有很多文件会同时写入,此时从硬盘的视角看,操作系统同时对多个不同的存储区域进行操作,硬盘控制器需要同时控制多个数据的写入,所以从磁盘的角度是随机写的

所以在消息队列中,实现随机写和顺序写的核心是数据存储结构的设计,也就是每个分区单独存储一个文件和每个节点上的所有的分区的数据都存储在同一个文件:

  • 第一种方案对单个文件的读写都是顺序的性能最高,但是如果文件很多且都进行读写时,在硬盘层面就会退化为随机读写,性能会下降很多

  • 第二种方案只有一个文件,不存在过多的情况,写入层面一直都是顺序的,性能很高,因此使用第二种方案比较好


提高写入操作的可靠性

消息队列基本都采用数据先写入缓存、再写入硬盘的方案,所以有丢失数据的风险,比如数据还没有写入硬盘中,机器由于异常导致重启了,而你对数据是在内存中的,那么此时就导致了数据的丢失。为了保证数据的可靠性,在消息队列的存储模块中,一般会通过三种方式来处理:同步刷盘、WAL预写日志、多副本备份,来提高数据的可靠性,确保数据没有丢失


1.同步刷盘

每条数据写入后立即调用force()操作强制刷入硬盘,相对于数据直接写入磁盘。

这种方案可以避免机器重启导致内存数据丢失,但是无法利用内存写入速度的优势,效率会降低很多,因为调用force()操作就是直接把数据写入了磁盘。

一般消息队列都会开放这个配置项,默认批量刷盘,但有丢失数据的风险

image.png


2.WAL预写日志

在写数据之前先将消息的元数据和操作指令等写入日志,当数据丢失时通过日志来恢复数据,避免了数据丢失,但理论上看,WAL机制肯定是比直接写入缓存中的性能低,而在消息队列中,消息数据的数据量是非常大的,我们不可能直接使用非常高性能的持久化存储设备,这样的成本太高。虽然WAL预写日志需要极高的写入性能,但数据量一般很小,而且是可顺序存储的、可预测的根据配置的缓存大小和更新策略可明确计算,所以在实际落地中,我们可以采取WAL日志盘和实际数据盘分离的策略,提升WAL日志的写入速度,具体:

  • WAL日志写入高性能、低容量的数据盘,消息数据写入性能较低、容量较大的数据盘,如果出现数据异常,就通过WAL日志进行数据恢复。这样,给WAL日志选择合适的设备,再加上并行读写等代码优化手段,性能损失就可控了,甚至可以忽略

  • 但是缺点是再实际部署运维过程中,需要单独给WAL日志分配高性能的数据盘镖旗进行单独的管理,配置和运维成本相对较高。但强制刷盘、WAL预写日志这两种方案都是指单机维度的可靠性保证。而我们在实际运维过程中单机是不可控的,都需要通过分布式的多副本存储来保证数据的高可靠,也就是使用第三种方案多副本备份

image.png


3.多副本备份

将数据同步到多台节点,每台节点都将数据写入内存中,从而完成数据的可靠性存储,而单机层面是把数据写入到内存中就算写入成功,但是单击层面也可能出现数据丢失,所以核心思路是同时在多台节点中缓存数据,只要不是多台节点同时重启,数据就可以恢复

好处是可以在分布式存储的基础上做优化,通过多台缓存的手段来降低数据丢失的概率。但如果所有节点在同一时刻重启,数据还是有可能丢失的,无法百分百保证数据高可靠性。

从消息队列业界的存储方案来看,方案一所有产品都支持,方案二和方案三一般会选一种支持,KafKa、RabbitMQ、RocketMQ用的是第三种,Pulsar用的是第二种

image.png

单击可靠性

  • WAL +异步刷新:消息先写入高性能日志盘(同步刷盘),数据异步刷入大容量存储。兼顾性能和可靠性

  • 例如:RocketMQ将消息顺序写入CommitLog(WAL),异步刷盘后分发到ConsumeQueue

分布式可靠性

  • 多副本+同步复制:数据同步写入多个节点,每个节点采用WAL+异步刷盘。例如:

    • KafKa配置min.insync.replicas=2并启用同步复制,确保至少两个副本写入日志(内存或磁盘)

    • 每个副本节点配置日志异步刷盘,利用多副本容忍单节点故障

极端高可靠场景

  • 同步刷盘+同步多副本:每条消息在多个节点均强制刷盘后返回成功,适用于金融、政务等场景,但性能代价高


提升读取操作的性能

主要有:读热数据、顺序读、批量读、零拷贝四个思路


1.冷读和热读

热读:消息数据本身还在缓存中,读取数据是从内存中获取,此时性能最高,不需要经过硬盘。

冷读:数据刷到磁盘中了,并且数据已经被换页换出缓存了,即数据已经不在缓存中了,此时读数据需要从硬盘获取。

理想情况肯定是全都是热读好,因为性能最高。

但是在代码层面我们无法控制热读或冷读,只能通过配置更大的内存,尽量保证缓存中保留更多的数据,从而提交热读的概率


2.顺序读、随机读、批量读

为了提高吞吐量,消费时服务端会支持批量读的能力,并通过预读机制Perfetching提前加载数据到内存,减少客户端等待时间,提升整体的吞吐量。而预读的实现方式分为硬盘层面预读和应用程序预读

  • 硬盘层面预读:是在连续的物理地址空间中读取数据,但这是由硬件或操作系统自动完成,无法在应用程序中干预,和数据目录存储结构设计有关。每个分区单独存储一个文件时,数据是连续存储的,性能最高。每个节点的所有分区的所有数据都存储在一个文件时,需要根据分区上的数据索引,在具体存储文件中的不同位置读取数据。数据可能是连续的,也可能是不连续的。这种情况下硬盘的预读就很有随机性,大部分情况下在硬盘看来就是随机读,性能比第一种方案低

  • 应用程序的预读:一般通过程序中的逻辑关系,提前通过调度去硬盘读取数据(可能是连续的也可能是不连续的)。因为消息队列的数据是分区有序的,当读取到某条数据时,手动读取后面的一个批次的数据就可以了。这种方案需要程序去控制,比如read(0)时,要同时读read(1,10)的数据,相对繁琐,并且性能较低

  • 理想情况下,肯定是硬盘层面的顺序预读的性能最高,所以针对读取操作,方案一更合适

image.png


零拷贝

在正常读取数据的过程中,会发生四次内核态和用户态的上下文切换,同时还发生了四次数据拷贝:先从硬盘拷贝到操作系统内核的 ReadBuffer缓冲区中,再从 ReadBuffer 拷贝到应用程序中,再从应用程序拷贝到内核的 SocketBuffer 中,最后从SocketBuffer 拷贝到网卡中,而数据在复制过程中会耗费资源和时间,从而降低性能,所以优化流程最重要的是减少数据复制的次数和资源损耗。因此,我们可以通过减少数据复制次数、减少上下文切换、通过DMA代替CPU完成数据读写,来解决复制和资源消耗问题,而零拷贝通过mmap + wrtiesendfile 两种实现方式,将数据复制缩短成从硬盘拷贝到操作系统内核的 ReadBuffer,再从ReadBuffer 拷贝到网卡设备,将复制次数从四次减少到两次,并且数据只在内核中复制,减少两次上下文下切换

  • 第一次拷贝是把磁盘上的数据拷贝到操作系统内核的缓冲区中,由DMA进行。

  • 第二次拷贝是把内核缓冲区的数据拷贝到用户的缓冲区中,使得应用程序可以使用这部分数据,由 CPU 进行。

  • 第三次拷贝是把刚才拷贝到用户缓冲区里的数据,再拷贝到内核的 socket 缓冲区中,由 CPU 进行。

  • 第四次拷贝是把内核的 socket 缓冲区里的数据拷贝到网卡的缓冲区中,由 DMA 进行。

  • DMA 全称是直接内存存取,在IO设备和内存之间传递数据时,数据搬运工 作全部交给 DMA 控制器,CPU 不再参与任何与数据搬运相关的事情,这样 CPU 就可以去处理别的事务,从而释放 CPU资源。

1.mmap+write

read() 系统调用函数会把内核缓冲区的数据拷贝到用户的缓冲区里,于是为了减少这一步开销,我们可以用 mmap() 来代替。mmap() 系统调用函数会直接把内核缓冲区里的数据「映射」到用户空间,这样,操作系统内核与用户空间就不需要再进行任何的数据拷贝操作。具体来说:

  • 应用进程调用了 mmap() 后,DMA 会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系统内核「共享」这个缓冲区;

  • 应用进程再调用 write(),操作系统直接将内核缓冲区的数据拷贝到 socket 缓冲区中,这一切都发生在内核态,由 CPU 来搬运数据;

  • 最后,把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里,这个过程是由 DMA 搬运的。

  • 同时,使用 mmap() 来代替 read(), 可以减少一次数据拷贝的过程。

  • 但这还不是最理想的零拷贝,因为仍然需要通过 CPU 把内核缓冲区的数据拷贝到 socket 缓冲区里,而且仍然需要 4 次上下文切换,因为系统调用还是 2 次


2.sendfile

在 Linux 内核版本 2.1 中,提供了一个专门发送文件的系统调用函数 sendfile()。它的前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据的长度。

  • 首先,它可以替代前面的 read()write() 这两个系统调用,这样就可以减少一次系统调用,也就减少了 2 次上下文切换的开销。

  • 其次,该系统调用,可以直接把内核缓冲区里的数据拷贝到 socket 缓冲区里,不再拷贝到用户态,这样就只有 2 次上下文切换,和 3 次数据拷贝。

  • 但是这还不是真正的零拷贝技术,如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术(和普通的 DMA 有所不同),我们可以进一步减少通过 CPU 把内核缓冲区里的数据拷贝到 socket 缓冲区的过程

于是,从 Linux 内核 2.4 版本开始起,对于支持网卡支持 SG-DMA 技术的情况下, sendfile() 系统调用的过程发生了点变化,具体过程如下:

  • 第一步,通过DMA磁盘上的数据拷贝到内核缓冲区里;

  • 第二步,缓冲区描述符和数据长度传到 socket 缓冲区,这样网卡的 SG-DMA 控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中,这样就减少了一次数据拷贝;

于是,从 Linux 内核 2.4 版本开始起,对于支持网卡支持SG-DMA 技术的情况下, sendfile() 系统调用的过程发生了点变化,具体过程如下:

  • 第一步,通过DMA磁盘上的数据拷贝到内核缓冲区里;

  • 第二步,缓冲区描述符和数据长度传到 socket 缓冲区,这样网卡的 SG-DMA 控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到socket缓冲区中,这样就减少了一次数据拷贝;

所以,这个过程之中,只进行了 2 次数据拷贝

几乎所有的消息队列在消费时都使用了sendfile的调用,因为它配合DMA技术至少可以提升一倍的消费速度。


通过硬件和系统优化提升性能
1.提升硬件配置

为了提高热度的概率,直接配备更大的机器内存,性能提升最明显。另外,消息队列是一款非常重视IO的组件,使用更快的硬盘 IO 设备,提高单机的吞吐能力,也能快速提升性能。硬盘类型很多,比如物理机部署下的机械盘、SSD、NVMe SSD,以及在云环境部署的各种规格的云盘。核心衡量指标主要有三个:IOPS、吞吐量、延时,这些指标越好,性能越高。


2.配置多盘读写

系统层面,我们可以通过在机器上挂多块硬盘提升单机的硬盘吞吐能力。这种方案要内核支持这个机制,在部署的时候进行相关配置才能生效。一般实现思路是在消息队列的内核支持多目录读写的能力,将不同的文件或者不同的数据段调度存放在不同硬盘设备对应的挂载目录中。此时在数据的写入和读取的过程中,就可以同时利用到多块盘的吞吐和存储

image.png


3.配置RAIDLVM硬盘阵列

多目录读写的问题是多块盘之间无法共IO能力和存储空间,当遇到数据倾斜时,在单机层面会出现性能和容量瓶颈。Linux 提供了 RAID 硬盘阵列和LVM逻辑卷管理两种方式,通过串联多块盘的读写能力和容量,提升硬盘的性能和吞吐能力

image.png


思考


1.从头实现消息队列存储模块的思考路劲
  1. 确定存储的数据类型

  • 消息数据:消息内容(Payload)、消息头(Headers)、时间戳、消息ID等。

  • 元数据:Topic/分区的元信息(如分区分配、消费进度 Offset)、生产者/消费者状态等。

  1. 消息存储结构设计:Topic 还是分区

  • 按 Topic 维度存储:所有 Topic 的消息按顺序写入同一文件(如 RocketMQ CommitLog

  • 按分区维度存储:每个分区独立文件(如 Kafka 的 Partition 目录结构)

  1. 分区数据存储方案

  • 所有分区共享一个文件:消息按全局顺序写入,通过索引定位分区。顺序写入性能高,适合高吞吐场景。索引维护复杂,消费时需频繁跳转文件位置。

  • 每个分区独立文件:消费简单,消费者直接顺序读取分区文件;数据隔离,故障恢复和扩容更灵活;分区过多时文件数激增,影响文件系统性能(需结合分段策略优化)。

  1. 数据分段(Segment)策略

单一文件过大(如 TB 级)会导致维护困难(备份、清理、恢复耗时)。分段后可通过删除旧文件快速清理数据。

  • 按大小分段:单个文件达到阈值(如1GB)后创建新文件(Kafka 默认 log.segment.bytes=1GB

  • 按时间分段:固定时间窗口(如 1 小时)生成新文件(适合时间敏感型数据)。

  • **Kafka使用 按大小+时间双分段策略**,优先按大小分段,同时检查时间阈值。

  1. 分段后消息定位机制

索引设计

  • 全局索引:记录消息 Offset 到物理位置(文件+偏移量)的映射。

  • 如 Kafka 的 OffsetIndexTimeIndex

  • 分层索引

  • 一级索引:消息 Offset → 分段文件。

  • 二级索引:分段文件内 Offset → 物理位置。

  • 定位流程

  1. 根据消费 Offset 找到对应分段文件。

  2. 在分段文件中通过二分查找或稀疏索引定位消息。

  3. 数据清理机制

  • 消费完成执行 ACK 删除数据

  • 根据时间和保留大小删除

  • ACK 机制和过期机制相结合


2.数据的批量写入,如果不用PageCache的缓存刷新机制,我们可以在应用程序中管理数据完成批量写入吗?如果可以怎么实现?优缺点是什么?

应用程序可以通过使用Direct IO来模拟实现PageCahce的功能。大致思路是通过Direct IO管理硬盘,然后在应用中维护内存缓冲区来缓存数据,等待数据达到一定量的时候再统一批量写入硬盘,然后调用force刷新数据到硬盘。 这样做的好处是可以跳过操作系统的刷盘策略,根据自己业务的读写特点自定义刷盘策略,实现得好的话有助于提升缓存的命中率,同时Direct I/O无需在应用内存和PageCache间复制数据,减少CPU开销 不过这种方式的效果和写PageCache是一样的,遇到的问题也是一样的。区别在于数据缓存在哪里,通过什么策略刷新到硬盘,并且也要有可靠性风险,比如缓存未刷盘前宕机会丢失数据,需结合WAL(预写日志)等机制保障。写PageCache的好处是不需要程序自己管理缓存,不需要自定义策略写入,操作系统都可以帮忙做,缺点是可能无法百分百满足我们自己的业务场景。

 {
   "deviceType": {
     "id": 9091,
     "name": "EMS-从-pf"
   },
   "errorParamNum": 0,
   "controllerId": "ECS00039230914",
   "modifyStatus": 0,
   "regionName": "重庆乾岷光学",
   "jsonParams": [
     {
       "qs": 0,
       "realValue": 2.0,
       "flag": "s_247055",
       "unitName": "",
       "realQs": 0,
       "dataType": "AI",
       "isString": false,
       "newProtocol": true,
       "id": "3161740",
       "paramName": "电池状态",
       "value": 2.0
     },
     {
       "qs": 0,
       "realValue": 0.0,
       "flag": "s_247053",
       "unitName": "",
       "realQs": 0,
       "dataType": "AI",
       "isString": false,
       "newProtocol": true,
       "id": "3161738",
       "paramName": "PCS失联",
       "value": 0.0
     },
     {
       "qs": 0,
       "realValue": 0.0,
       "flag": "s_247054",
       "unitName": "",
       "realQs": 0,
       "dataType": "AI",
       "isString": false,
       "newProtocol": true,
       "id": "3161739",
       "paramName": "BMS失联",
       "value": 0.0
     }
   ],
   "oldOffline": false,
   "deviceId": "51782",
   "deviceName": "EMS-从-重庆乾岷光学2",
   "errorTimeSum": 0,
   "runTimeSum": 44309707476,
   "offline": false,
   "warnEventSum": 0,
   "regionId": "5726",
   "warnParamNum": 0,
   "errorEventSum": 0,
   "projectId": "5441",
   "status": 0,
   "timestamp": 1755073890247
 }

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐