一、Kafka客户端概述

Kafka提供了两套客户端API:HighLevel API 和 LowLevel API。HighLevel API封装了Kafka的运行细节,使用简单,是企业开发中最常用的方式;而LowLevel API则需要客户端自行管理Partition、Offset等细节,适用于对性能要求极高的场景。

二、消息发送者(Producer)主流程

1. 引入依赖

xml

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.8.0</version>
</dependency>

2. 核心步骤

// 1. 设置Producer核心属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// 2. 构建Producer实例
Producer<String, String> producer = new KafkaProducer<>(props);

// 3. 构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// 4. 发送消息(支持三种方式)
// 单向发送
producer.send(record);
// 同步发送
RecordMetadata metadata = producer.send(record).get();
// 异步发送
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        // 回调处理
    }
});

三、消息消费者(Consumer)主流程

1. 核心步骤

// 1. 设置Consumer属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 2. 创建Consumer实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);

// 3. 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));

// 4. 拉取并处理消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 业务处理
    }
    // 5. 提交Offset(手动或自动)
    consumer.commitSync();
}

四、核心工作机制深度解析

1. 消费者分组消费机制

  • 消费者组(Consumer Group):同一组内的消费者共同消费一个Topic下的消息,每条消息在组内只会被消费一次

  • Offset管理:记录消费进度,支持自动提交和手动提交

  • Rebalance机制:消费者数量变化时重新分配分区

2. 消息序列化机制

Kafka的消息以Key-Value结构存储,需要序列化为字节数组进行传输:

  • 生产者序列化:通过KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG配置

  • 消费者反序列化:通过对应的DESERIALIZER配置

  • 自定义序列化:实现SerializerDeserializer接口处理POJO对象

3. 消息分区路由机制

  • 生产者分区策略

    • 默认使用StickyPartitioner,尽可能粘性发送到同一分区

    • 支持自定义Partitioner实现

  • 消费者分配策略

    • RangeAssignor:按范围分配

    • RoundRobinAssignor:轮询分配

    • StickyAssignor:粘性分配(默认)

4. 生产者消息缓存机制

// 关键参数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);     // 批次大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);          // 等待时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小

Kafka通过RecordAccumulator缓存消息,批量发送以提高吞吐量。

5. 发送应答机制(ACKs)

  • acks=0:不等待Broker确认,吞吐量最高,可靠性最低

  • acks=1:等待Leader写入成功

  • acks=all/-1:等待所有ISR副本写入成功,可靠性最高

6. 生产者消息幂等性

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  • PID + Sequence Number机制保证单分区消息不重复

  • 实现Exactly-Once语义的基础

7. 生产者数据压缩

支持gzipsnappylz4zstd四种压缩算法,权衡压缩比和吞吐量。

8. 生产者消息事务

props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txn-1");
producer.initTransactions();
producer.beginTransaction();
try {
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

保证跨分区消息的原子性。

五、SpringBoot集成Kafka

1. 添加依赖

xml

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件

properties

spring.kafka.bootstrap-servers=worker1:9092,worker2:9092,worker3:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=default-group

3. 生产者示例

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @GetMapping("/send/{message}")
    public void send(@PathVariable String message) {
        kafkaTemplate.send("topic1", message);
    }
}

4. 消费者示例

@Component
public class KafkaConsumer {
    @KafkaListener(topics = "topic1")
    public void onMessage(ConsumerRecord<?, ?> record) {
        System.out.println("收到消息:" + record.value());
    }
}

六、总结与最佳实践

  1. 配置调优建议

    • 根据网络状况调整batch.sizelinger.ms

    • 根据可靠性要求选择适当的acks级别

    • 监控消费者Lag,及时调整消费者数量

  2. 故障处理

    • 合理设置auto.offset.reset处理Offset失效

    • 实现消费幂等性,避免重复消费

    • 监控生产者重试次数和异常

  3. 性能优化

    • 根据消息大小选择合适的压缩算法

    • 调整max.in.flight.requests.per.connection平衡吞吐与顺序

    • 合理设置分区数,充分利用集群资源

Kafka的强大之处在于其精妙的设计和丰富的配置选项。理解这些底层机制不仅有助于解决实际问题,更能帮助我们在实际项目中做出合理的技术选型和架构设计。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐