Kafka

  • 特点:高吞吐、低延迟

Producer(生产者)

  • 定义:是向Kafka集群发送消息的客户端应用,如:日志收集器、传感器数据发送器等

一、关键特性

  • 异步发送:可提高吞吐量和降低延迟
  • 消息分区:进而实现负载均衡
  • 消息压缩:可使用gizp、snappy等压缩算法实现消息压缩,然后再传输到服务器
  • 批处理机制:可减少网络I/O操作次数,提高总体的消息吞吐量

二、优化

  • 发送缓冲区:在异步发送消息时,可以配置发送缓冲区来调整发送性能。通过调整发送缓冲区的大小,可以平衡性能和内存使用
  • 重试机制:难免出现发送消息失败的情况,建议考虑设置重试机制,确保消息的可靠性
  • 批处理大小和延迟:调整此参数可优化发送性能。较大的批处理大小和较长的延迟可减少网络开销和提高吞吐量

三、配置项

  • bootstrap.servers:指定Kafka集群所需的broker地址清单,用于建立初始连接
  • acks:指定分区中必须有多少个副本收到这条消息,才算消息发送成功。可选值:
    • 0(不等待任何相应)
    • 1(leader副本确认)
    • all/-1(ISR中所有副本确认)
  • retries:消息发送失败时的重试次数,用于处理临时故障
  • compression.type:消息压缩算法,用于减少网络传输量。可选值:
    • none
    • gzip
    • snappy
    • ls4
    • zstd
  • batch.size:每个批次的内存大小,影响消息的吞吐量和延迟
  • linger.ms:发送批次前的最大等待时间,用于平衡吞吐量和延迟
  • buffer.memory:生产者客户端中用于缓存消息的缓冲区大小
  • key.serializer和value.serializer:消息的键和值的序列化类,用于将消息转换为字节数组
  • max.request.size:生产者客户端能发送的最大消息大小
  • request.timeout.ms:生产者等待请求响应的最长时间

四、常用方法

send():

  • 定义:发送消息到kafka集群

    • 异步发送:直接调用send(record),不关心发送结果(但可通过回调函数处理结果)。
    • 同步发送:调用send(record).get(),等待Kafka响应,获取发送结果(RecordMetadata对象)。
    • 带回调的异步发送:调用send(record, new Callback() {...}),在回调函数中处理发送结果。

close():

  • 定义关闭生产者,释放资源

Consumer(消费者)

  • 定义:从kafka集群读取消息的客户端应用。如:数据分析工具、实时监控系统等

一、关键特性

  • 拉取模式
  • 消费者组:消费组->消费主题->消费分区,从而实现负载均衡。不同消费者组之间互不影响,可以独立地消费同一主题的消息
  • 偏移量管理:消费者可维护偏移量(Offset)来记录自己再分区中消费的位置。偏移量记录方式:重置到较旧的偏移量以重新处理过去的消息;跳到最近的记录从“现在”开始记录

二、优化

  • 自动提交偏移量:简化了消费者的实现,但可能导致消费消息重复和丢失。手动提交偏移量提供了更灵活的控制方式,适合需要确保消息消费成功的场景
  • 消费速率控制:消息者处理速度较慢,可能导致消息积压。即可以通过增加消费者实例、优化消费者逻辑等方式来解决问题
  • 错误处理与重试:遇到的错误如:网络错误、消息格式错误等。消费者需实现适当的错误处理机制,如重试、记录错误日志等来确保消息的可靠性

三、配置项

  • bootstrap.servers:同样用于指定kafka集群的broker地址清单
  • group.id:消费者所属消费组的唯一标识,用于协调消费者之间的消息分配
  • auto.offset.reset:当分区没有初始偏移量或偏移量无效时,消费者的行为。可选值:earliest(从最早的消息开始消费)、latest(从最新的消息开始消费)和none(抛出异常)
  • enable.auto.commit:是否开启自动提交消费位移的功能。
    • true(定期自动提交消费位移)
    • false(需要手动提交)
  • anto.commit.interval.ms:自动提交消费位移的时间间隔
  • fetch.min.bytes:消费者客户端一次请求从kafka拉取消息的最小数据量
  • fetch.max.bytes:消费者客户端一次请求从kafka拉取消息的最大数据量
  • max.poll.records:一次拉取请求的最大消息数
  • max.poll.interval.ms:指定拉取消息线程最长空闲时间
  • session.timeout.ms:检测消费者是否失效的超时时间
  • heartbeat.interval.ms:消费者心跳时间,用于维持与消费者协调器的连接
  • key.deserializer和value.deserializer:消息的键和值的反序列化类,用于将字节数据转换为消息对象

四、常用方法

poll()

含义:消费者向kafka服务器发起一次拉取请求

参数

  • 超时时间
    • 含义:控制消费者在单次poll循环中,如果没拿到数据,最多能等多久
    • 目的:避免忙循环、控制循环速度

subscribe()

  • 用于订阅一个或多个主题

commitSync()或commitAsync()

  • 手动提交偏移量

java和python开发的区别

1、Java

  • consumer-poll()方法
    • 返回结构:一组消息的集合-ConsumerRecords对象
    • 优点:提高吞吐量,批量效率更高

2、python

  • consumer-poll()方法
    • 返回结构:单条消息或None,但内部有批量预取机制
    • 批量预取机制:在后台批量拉取消息存在内部缓冲区中,当循环调用poll()时,从内部缓冲区依次取出单条消息,也相当于一次拉取了多条消息

                

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐