Kafka的设计精髓在于网络不稳定,服务也随时会崩溃的复杂场景下,如何保证消息的高并发、高吞吐。但是要理解那些复杂的问题,需要建立在这个基础模型基础上的。

1、消费者分组消费机制

在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。

public static final String GROUP_ID_CONFIG = "group.id";
public static final String GROUP_ID_DOC = "A unique string that identifies the
consumer group this consumer belongs to. This property is required if the consumer 
uses either the group management functionality by using <code>subscribe(topic)
</code> or the Kafka-based offset management strategy.";

 对于Consumer,如果需要在subcribe(订阅)时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性

kafka分组消费机制:

 生产者往topic发送消息时,会尽量均匀的发送到topic的各个partition中。这个消息会被推送到所有订阅了这个topic的消费者中。每个消费者组中只会推送一次,也就是同一个消费者组中的消费者实例只会消费一份消息副本,而不同的消费者组会重复消费同一份消息副本。

PARTITION:分区

CURRENT-OFFSET:分区最新消息偏移量

LOG-END-OFFSET:当前消费组已经消费结束消息的偏移量

LAG:为消费的消息

offset偏移量需要消费者处理完消息后,主动向kafka的broker提交。提交完成后,broker会更新消费进度,记录这个消息已经被消费者组消费。如果消费者没有向broker提交offset,broker就会认为这条消息还没有被消费者处理,会重新往消费者组中推送。不过这次推送,会尽量推送给同一个消费者组的其他消费者实例。

2、生产者拦截器机制

生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前,对消息进行拦截或修改消息内容。如上篇文章(Kakfa1)中的MyInterceptor类实现。

只需要在MyProducer设置kafka相关属性时指定拦截器:

//多个拦截器类,用逗号隔开
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.liyy.basic.MyInterceptor");

参数定义说明: 

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as
interceptors. Implementing the 
<code>org.apache.kafka.clients.producer.ProducerInterceptor</code> interface allows 
you to intercept (and possibly mutate) the records received by the producer before 
they are published to the Kafka cluster. By default, there are no interceptors.";
    

3、消息序列化机制

product指定的两个属性

public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
//参数文档
public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>org.apache.kafka.common.serialization.Serializer</code> interface.";

 可以将生产者发送的消息中的key和value序列华为二进制数据。

在kafka消息定义中,key和value的作用如下:

key:用来进行分区的可选项,通过key来判断将消息分发到那个partition。如果没有填写key,那么kafka通过轮询(Round-robin)的方式来选择partition。如果填写了key,kafka会通过声明的Serializer序列化接口,将key转化为byte接数组,然后对key进行hash来选择对应的partition,这样接可以保证key相同的消息被分配到同一个partition中。

value:传输的业务数据。kafka将value通过定义的Serializer序列化接口转化为byte数组,这样能够比较好的在网络上传输value信息,以及将value信息落到操作系统的文件中。

生产者对消息进行了序列化,那么消费者在拉取消息时,就需要对消息进行反序列化。在Consumer中,也有反序列化的两个配置

public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
//参数文档
public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Deserializer</code> interface.";

 在kafka中,对于一些常用的基础数据类型,已经提供了对应的实现类。但如果需要一些自定义的消息格式,如自定义的POJO,就需要指定具体的实现类了。

序列化机制是在高并发场景下非常重要的一个优化机制。高效的序列化机制能够极大的提升分布式系统的网络传输能力和数据落盘能力。

4、消息分区路由机制

4.1kafka如何通过key分配partition?


public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
private static final String PARTITIONER_CLASS_DOC = "A class to use to determine which partition to be send to when produce the records. Available options are:" +
        "<ul>" +
            "<li>If not set, the default partitioning logic is used. " +
        "This strategy will try sticking to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
                "<ul>" +
                    "<li>If no partition is specified but a key is present, choose a partition based on a hash of the key</li>" +
                    "<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
                "</ul>" +
            "</li>" +
            "<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: This partitioning strategy is that " +
        "each record in a series of consecutive records will be sent to a different partition(no matter if the 'key' is provided or not), " +
        "until we run out of partitions and start over again. Note: There's a known issue that will cause uneven distribution when new batch is created. " +
        "Please check KAFKA-9965 for more detail." +
            "</li>" +
        "</ul>" +
        "<p>Implementing the <code>org.apache.kafka.clients.producer.Partitioner</code> interface allows you to plug in a custom partitioner.";

Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。甚至可以自己实现一个简单的分配策略。

在3.2.0版本,Kafka提供了三种默认的Partitioner实现类,RoundRobinPartitioner,DefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期,被替换成了默认的分区实现机制。

自己也可以自定义一个Partitioner实现类,定制分区逻辑。核心就是要实现partition方法。根据获取到的partition信息来选择partition。如获取key的hash值对partition个数取模,topic的所有partition信息都可以在cluster中获取。

//获取Partition信息
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

 然后在Consumer中,可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略,决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "A list of class names or class types, " +
        "ordered by preference, of supported partition assignment strategies that the client will use to distribute " +
        "partition ownership amongst consumer instances when group management is used. Available options are:" +
        "<ul>" +
        "<li><code>org.apache.kafka.clients.consumer.RangeAssignor</code>: Assigns partitions on a per-topic basis.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.RoundRobinAssignor</code>: Assigns partitions to consumers in a round-robin fashion.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.StickyAssignor</code>: Guarantees an assignment that is " +
        "maximally balanced while preserving as many existing partition assignments as possible.</li>" +
        "<li><code>org.apache.kafka.clients.consumer.CooperativeStickyAssignor</code>: Follows the same StickyAssignor " +
        "logic, but allows for cooperative rebalancing.</li>" +
        "</ul>" +
        "<p>The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, " +
        "but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.</p>" +
        "<p>Implementing the <code>org.apache.kafka.clients.consumer.ConsumerPartitionAssignor</code> " +
        "interface allows you to plug in a custom assignment strategy.</p>";

 kafka内置了一些分区实现方式,在通常情况下也都是最有的选择。当然自己也可以自定义分区策略。

kafka默认提供了三种消费者分区分配策略

1)range/范围策略。

例如kafka有10个partition,一个消费者组下有3个Consumer。range策略将0-3分给Consumer1,4-6分给Consumer2,7-9分给consumer3

2)round-robin/轮询策略

3)sticky/粘性策略,有连个分配原则

  • 刚开始时会尽量保证均匀分配。比如使用range策略或round-robin策略(这个选择时随机的)
  • 分区的分配会尽量和执之前的分配保持一致。但是当Consumer3服务宕机时,就会按照sticky策略保证C1和C2原本分配到的partition不变,将C3分配到的partition尽量均匀的分配给C1和C2。这样就能够很好的保持各个Consumer数据的稳定性。

官方默认的生产者端分区器以及消费者端的Range+Sticky分配策略,在大部分场景下都是非常高效的算法。

5、生产者消息缓存机制

为了避免高并发请求对服务器造成的压力过大,kafka的Producer往服务端发消息时,并不是一条一条的发,而是增加了一个高速缓存区,将消息最终到缓存区后,当达到某一个阈值后,再批量发送到服务端。这种缓存机制是解决高并发场景的一种常用手段。

生产者消息缓存机制涉及到KafkaProducer中的两个关键组件:accumulator和sender

发送应答机制

生产消息幂等性

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐