4.消息队列-基础篇-生产者客户端SDK设计
可以让用户灵活使用。内核提供接口。
生产者客户端SDK设计
消息队列的客户端主要包含生产、消费、集群管控三类功能,从客户端SDK实现的角度来看,生产模块包含客户端基础功能和生产相关功能两部分,其中客户端基础功能是客户端中所有功能共有的

客户端基础功能是蓝色部分:请求连接管理、心跳管理、序列化、重试、超时、容错处理等。
生产相关功能是黄色部分:客户端寻址、分区选择、生产错误处理、SSL、压缩、事务、幂等等等
客户端基础功能
1.连接管理
在网络模块中,客户端和服务端之间基本都是通过各自语言的网络库创建TCP长连接进行通信的。在大部分实现中,为了避免连接数膨胀,每个客户端实例和每台Broker只会维护一条TCP连接。建立一条TCP连接很简单,关键是在什么情况下建立连接。
一般有初始化创建连接和使用时创建连接:
-
初始化创建连接:客户端在实例初始化时就创建到各个
Broker的TCP连接,等待数据发送。这样做的好处是提前创建好连接可以避免发送数据时的冷启动,缺点是需要提前创建好所有的连接,可能导致连接空跑,会消耗一定的资源。 -
使用时创建连接:客户端在实例初始化时不创建连接,当需要发送数据时再建立连接。这样做的好处是发送数据时才建立连接,连接的使用率会较高。缺点是可能出现连接冷启动,会增加一点本次请求的耗时
由于客户端有空闲连接回收机制,并且创建连接的耗时一般比较短,所有在实际架构实现中,两种方式都会使用,它们的优劣并不是这么明显。
但是从资源利用率角度考虑,建议在使用时创建连接。
而且连接并不是任何时候都有数据,如果这个连接长时间没有数据,那么就会出现长时间的空闲连接,所以连接都会搭配连接回收机制,连接建立后长时间空闲就回收该连接,而连接回收的策略一般是判断这个连接在某一段时间内是否有发送数据的行为,如果没有就判断为空闲,然后执行回收
因为单个TCP连接发送数据的性能存在上限,所以我们需要在客户端启动多个生产者,提高并发读写的能力。一般情况下,每个生产者会有一个唯一的ID或唯一标识来标识客户端,比如ProduceID或客户端IP+Port
单个TCP连接的瓶颈和很多因素有关:网络带宽、网络延迟、客户端请求的socketbuff的配置、TCP窗口大小、发送速率导致本地数据反压堆积、服务端请求队列的堆积情况、收包和回包的速度等等
2.心跳机制
心跳检测是客户端和服务端之间保活的一种机制,检测服务端或客户端一方不可用时,另一方可以及时回收资源,避免资源浪费。
一般通过ping-pong的方式来检测
消息队列一般都是基于TCP协议通信的,所以客户端和服务端之间的心跳检测机制的实现,一般有基于TCP的KeepAlive保活机制和应用层主动探测两种形式
-
基于
TCP的KeepAlive保活机制:是TCP/IP协议层内置的功能,需要手动打开TCP的KeepAlive功能。通过这种方案实现的心跳检测比较简单,但KeepAlive的实现是服务器,需要Server主动发出检测包,此时如果客户端异常,可能会出现很多不可用的TCP连接,这种连接会占用服务器的内存资源,从而导致服务端的性能下降 -
应用层主动探测:一般是
Client向Server发起的,主要解决灵活性和TCP KeepAlive的缺陷。检测流程一般是客户端定时发送保活心跳,当服务端连接几次没收到请求就断开连接。这样做的好处是将压力分担到各个客户端,避免服务端的过载
3.错误处理
从请求的角度来看,有些错误是可以重试的,并且重试操作是幂等的,比如连接断开、Leader切换、发送偶尔超时、服务端某些异常等,有些错误是不可以重试的,比如Topic/分区不存在、服务端Broker不存在、集群和Broker长时间没有响应等。所以在客户端的处理中,会将错误分为可重试错误和不可重试错误两类
因为网络环境、架构部署的复杂性集群可能会出现短暂网络抖动、Leader切换等异常,可重试错误就是这类通过一次或多次重试可能恢复的异常,不可重试的错误就是不管如何重试都无法恢复的异常。当客户端收到可重复错误后,会通过一定的策略进行重试,尽量确保生产流程的顺利进行。虽然实现思路直接、简单,但是在客户端SDK的实现过程中,错误处理一个包含很多细节的工作,一般需要考虑下面几点:
-
如何定义可恢复错误和不可恢复错误
-
完整的错误码的定义和枚举,好的错误码定义可以提高排查问题的效率
-
错误后重试的代码实现方式是否合理高效
-
判断哪些情况需要停止客户端,向上抛错,以免一些错误信息一直在
SDK内空转,提高上层感知异常和排查异常的难度 -
日志信息打印
debug、info、error日志时,是否包含了完整的内容 -
发生错误后,客户端一般会提供重试策略

4.重试机制
重试策略一般会支持重试次数和退避时间两种概念。
当消息发送失败,超过我们设置的退避时间后,会继续重试,当超过重试次数后,就会抛弃消息或消息投递到配置好的重试队列中。
退避时间是可以配置的,比如1s、10s、1分钟。当出现错误时,就会根据退避策略退避,在尝试写入
-
退避时间是客户端在两次重试尝试之间的等待时间间隔,其核心目标是平衡重试成功率和系统稳定性,避免因为高频重试导致服务端压力激增或客户端资源耗尽
一般情况下,重试次数是有上限的,当然也支持配置无限重试。退避策略影响的是重试的成功率,因为网络抖动正常是ms级,某些异常可能会抖动十几秒。
如果退避策略设置得太短,在退避策略和重试次数用完后,可能消息还没有生产成功,如果退避时间设置太长,可能导致客户端消息堆积,所以消息队列生产者得重试次数和退避策略的设置都是比较讲究的,不能随意设置,需要根据业务场景仔细设计,需要围绕可靠性、低延迟、搞吞吐量、资源消耗等指标来设计
-
金融场景(高可靠场景):无限重试+死信队列兜底,牺牲部分延迟换取绝对可靠性
-
单条消息价值较高,必须保证成功,并且对延迟敏感,要确保低延迟
-
容错机制:失败消息写入死信队列
DLQ人工兜底,或通过本地消息表+定时任务确保最终提交
-
-
日志场景(高吞吐量):快速放弃+批量压缩,有限保障吞吐量
-
海量数据,允许少量消息的丢失,对资源敏感,客户端可能运行在低配服务器或容器中
-
容错机制:允许少量丢数,依赖日志采样补偿或失败消息暂存内存或磁盘,批量重试
-
-
风控场景(动态敏感场景):动态退避+降级策略,平衡实时性和可靠性
-
需要毫秒级别响应,比如用户登录时需要实时拦截
-
-
电商场景:顺序保障+异步补偿,应对高并发和最终一致性
-
属于高并发场景(
秒杀场景QPS10万+),需要保证订单创建、支付、库存扣减增加等操作的顺序性 -
顺序保障:分区内顺序发送(
如KafKa分区Key适用用户ID) -
异步补偿:最终一致性通过定时任务修复
-
事务消息:使用
RocketMQ事务消息确保本地事务和消息发送原子性 -
流量削峰:客户端批量合并请求+服务端队列缓冲
-
订单创建成功率、消息积压量、分区负载均衡度
-
此外,客户端为了满足安全传输、性能、功能等方面的需求,客户端都会支持传输加密、压缩、事务、幂等等等
生产相关功能
1.客户端寻址机制
消息队列作为一个分布式系统,分区会发布在集群的不同Broker节点上,所以在客户端的视角来看,服务端有多台节点,发送请求时需要发送的哪个节点呢?
我们发送HTTP请求时,手动指定目标Broker的IP即可,也就是在代码指定分区对应的对端的Broker地址,然后将数据写到目标Broker,但怎么获取这个分区和broker的对应关系是一个问题,所以就提出了MetaData(元数据)寻址机制和服务端内部转发两种思路

1.1MetaData(元数据)寻址机制
服务端会提供一个获取全量MetaData的接口即MetaData Server提供的,客户端在启动时,首先通过接口拿到集群中的所有元数据,然后本地缓存这部分数据信息。当客户端发送数据时,就会根据缓存中的元数据信息的内容来得到服务端的地址,以及要发送的分区在哪台节点上,最后根据这两部分信息,将数据发送到服务端
消息队列的元数据是指Topic、分区、Group、节点、配置等集群维度的信息。比如Topic有几个分区,分区的Leader和Follower在哪些节点上,节点的IP和端口是什么,有哪些Group等。在MetaData寻址机制中,元数据信息等主要包括Topic及其对应的分区信息和Node节点两部分
客户端一般通过定期全量更新MetaData信息和请求报错时更新元数据信息两种方式,来保证客户端的元数据信息是最新的。目前KafKa、RocketMQ、Pulsar用的是这个方案

1.2服务端内部转发机制
服务端内部转发机制指的是客户端不需要经过寻址的过程因为都说了是内部转发机制,肯定是不需要经过寻址过程的
写入的时候是把数据随机写入到服务端任意一台Broker节点,而每一台Broker会缓存所有节点的元数据信息,生产者将数据发送给Broker后,broker如果判断分区不在当前节点上,就会去找这个分区在哪个节点上,然后将数据转发到目标节点
这个方案的好处就是分区寻址在服务端完成,客户端的实现成本就比较低。
但是生产流程步骤多了,会增加耗时,另外服务端因为转发多了一跳,会导致服务端的资源损耗多一倍,比如CPU、内存、网卡,在大流量的场景上,这种损耗会导致负载变高,从而导致集群性能降低。
所以这种方案不适合大流浪、高吞吐量的消息队列。
目前只有RabbitMQ使用这个方案
这个方案解决了请求要发送给哪个节点,接下来我们可靠消息数据要写入哪个分区

2.生产分区分配策略
生产者发送的数据可以直接写入分区或写入Topic。其实写入Topic数据最终还是会被写入到某个分区,这些数据选择写入哪个分区的过程就是生产数据分区分配的过程,这个过程的分配策略是生产分区分配策略。一般情况下,消息队列默认支持轮询、按Key Hash、手动指定、自定义分区分配策略四种分区分配策略
轮询是所有消息队列的默认选项消息通过轮询的方式依次写入到各个分区中,可以保证每个分区的数据量是一样的,不会出现分区数据倾斜。但是轮询无法保证数据的写入是有序的。因为在消费模型中每个分区的消费是独立的,如果数据顺序依次写入多个分区,消费时就无法保持顺序。所以为了保证数据有序,就需要保证Topic只有一个分区。只是另外两种分配策略的思路
-
分区数据倾斜指的是一个
Topic的每个分区的数据量不一致,有的分区数据量大,有的分区数据量小,从而导致硬件负载不均衡,就会导致集群性能出现问题
按Key Hash指的是根据消息的Key来计算出一个Hash值,然后跟Topic的分区数取余算出一个分区号,将数据写入到这个分区中。
参考公式:
partitionSeq = hash(key) % partitionNum
这种方案的好处是可以根据Key来保证数据的分区有序。
比如:
某个用户的访问轨迹,以用户的
AppID为Key,按Key Hash存储,就可以确保客户维度的数据分区有序。缺点是分区数量不能变化,变化后
hash值就会变,导致消息乱序,并且因为每个Key的数据量不一样,容易导致数据倾斜
手动指定指的是在生产数据的时候,手动指定数据写入哪个分区。这个方案的好处就是灵活,用户可以根据代码逻辑和根据自己的需求写入将数据写入指定的分区,缺点就是业务需要感知分区的数量和变化,代码实现相对复杂
自定义分区分配策略可以让用户灵活使用。内核提供接口Interface机制,用户如果需要指定自定义的分区分配策略,可以实现对应的接口,然后配置分区分配策略。比如KafKa可以提供过实现org.apache.kafka.clients.producer.Partitioner接口实现自定义分区策略
为了提高性能有些生产者客户端会提高批量batch写入的语义

3.批量语义
客户端支持批量写入数据的前提是在协议层支持批量的语义。如果不支持批量语义就只能在业务中自定义将多条数据组合成一条数据来写入。
批量发送消息的实现思路一般是在客户端内存中维护一个队列,数据写入时,先将其写入到这个内存队列,然后通过批量发送数据的策略从内存队列读取数据,发送到服务端。而批量发送数据的策略和存储模块的刷盘策略很像,都是根据数据条数或时间聚合后汇总发送到服务端,一般是满足时间或条数的条件后触发发送操作,也会有立即发送的配置项。比如KafKa是按照时间的策略批量发送的,提供linger.ms、max.request.size、batch.size三个参数,来控制数据的批量发送。
RocketMQ提供sendBatch()方法,允许将多条消息打包发送。可以设置batchSize()来控制单词批量发送的消息条数上限,也可以通过sendLatencyMills设置最大等待时间默认3s,超时或达到batchSize后触发发送
-
Linger.ms:设置消息延迟发送的时间,这样可以等待更多的消息组成Batch发送。默认为0表示立即发送。 -
max.request.size:生产者能够发送的请求包大小上限,默认为1MB。 -
batch.size:生产者会尝试将业务发送到相同的Partition的消息合包后再进行发送,它设置了消息合包的大小上限。
为了支持对于性能和可靠性有不同需求的业务场景,客户端一般会支持多种数据发送方式。
4.数据发送方式
消息队列提供同步发送、异步发送、发送即忘三种形式。
同步发送和异步发送是语言语言的实现
-
同步发送主要解决数据发送的即时性和顺序性
-
异步发送主要考虑性能
发送即忘指的是消息发送之后就不再关心请求返回的结果,立即去发送下一条消息,即只管发送。因此发送性能会有所提升,但是如果数据发送失败是无法感知的,所以可能会导致数据丢失,因此适用于发送不重要的日志等场景。KafKa提供了ack=0、RocketMQ提供了sendOneway来支持这种模式(发送即忘)
5.集群管控操作
集群管控操作用于完成资源的创建、查询、修改、删除等集群管理动作。资源包括主题、分区、配置、消费分组等。从功能上看,消息队列一般会提供多种集群管理方式,比如命令行、客户端、HTTP接口等。命令行工具是最基本的支持方式。如下图所示,它的底层主要通过包装客户端SDK和服务端的相关功能接口进行交互。程序编码上一般由命令行、参数包装、底层SDK调用三部分组成。主要流程是接收参数、处理参数、调用SDK等相关操作
有的消息队列也会支持HTTP接口形式的管控操作,好处是因为HTTP协议通用性,业务可以从各个环境发起管控的调用,不用非得使用admin SDK,另外客户端封装HTTP接口实现命令行工具的成本也比较低

假设从头开始写一个消息队列的某个语言的SDK,怎么去思考,实现步骤是什么?
-
思考客户端的模块组成
-
参考服务端网络模块的实现,进行客户端网络模块的选型开发,比如使用
Netty Client或者Java Socket Client,然后完成连接管理、心跳检测等网路模块的开发工作 -
了解这个消息队列的协议设计的内容,各个接口的请求和返回的协议是什么样子的。
-
思考如何构建请求,实现构建各个请求相关的逻辑代码实现。
-
思考序列化模块怎么实现。
-
完成第一个接口的请求和返回的处理。
-
根据各个接口的调用参数进行开发。
-
如果需要支
SSL,就去参考这个语言官方的SSL CLient配置,然后编码实现,其他比如压缩的支持也是类似。
客户端SDK的设计需从网络通信、协议兼容性、业务功能三层次递进实现。
-
网络模块在高并发场景可以使用
Netty开发,轻量级场景可以使用Java Socket开发,然后完成连接管理、心跳检测等网络模块的开发工作 -
之后就需要了解这个消息队列的协议设计,各个接口的请求和返回对应的是什么协议,以此来对齐协议格式
-
然后思考如何构建请求,实现构建各个请求相关的逻辑代码实现
-
之后就可以考虑序列化模块的实现
-
接着完成首个接口的实现,构建请求对象、序列化消息体,然后从连接池中获取连接,发送请求并等待响应,期间可能需要处理超时等待、重试处理、错误处理等
-
如果需要支持
SSL,就去参考这个语言官方的SSL CLient配置,然后编码实现,其他比如 压缩的支持也是类似。
更多推荐
所有评论(0)