提交消息位移 (Offset Committing):

  • 消息消费完成后,需要提交已消费消息的位移 (Offset),以便 Kafka 集群知道消费者已经消费了哪些消息。

  • 位移提交方式:

    • 自动提交 (Enable Auto Commit): Kafka 消费者会自动定期提交位移 (由 auto.commit.interval.ms 控制)。 这种方式简单,但可能存在消息重复消费的风险。 如果在自动提交位移之前,消费者崩溃,未提交的消息会被重复消费。

    • 手动同步提交 (Synchronous Committing): 调用 commitSync() 方法手动同步提交位移。 这种方式可以保证位移提交成功,但会阻塞消费者线程,影响吞吐量。

    • 手动异步提交 (Asynchronous Committing): 调用 commitAsync() 方法手动异步提交位移。 这种方式不会阻塞消费者线程,可以提高吞吐量,但可靠性相对较差。 如果提交失败,需要进行重试。

    • 指定位移提交: 可以指定要提交的 TopicPartition 和 OffsetMetadata 信息。

(6)关闭消费者实例 :

  • 当消费者不再需要消费消息时,需要关闭 KafkaConsumer 实例,释放资源。

  • 调用 close() 方法关闭消费者。

  • close() 方法会向 Kafka 集群发送离开消费者组的请求,并释放所有资源。

  • 建议: 将 close() 方法放在 finally 块中,确保即使在发生异常的情况下,消费者也能被正确关闭。

 

二、Kafka 的 C++ API (librdkafka)

librdkafka 是 Kafka 官方推荐的 C/C++ 客户端库,提供了高性能和可靠性。

2.1、RdKafka::Conf

RdKafka::Conf 类用于配置 Kafka 客户端,包括全局配置和 Topic 配置。

enum ConfType: 定义配置的类型。

枚举值

描述

CONF_GLOBAL

全局配置,影响客户端整体行为。

CONF_TOPIC

Topic 配置,影响特定 Topic 的行为。

enum ConfResult: 定义配置结果。

枚举值

描述

CONF_UNKNOWN

未知配置错误。

CONF_INVALID

配置值无效。

CONF_OK

配置成功。

常用方法:

方法

描述

static Conf * create(ConfType type)

创建配置对象。type 指定配置类型 (全局或 Topic)。

Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr)

设置配置对象的属性值。name 是属性名,value 是属性值,errstr 用于返回错误信息。返回 CONF_OK 表示成功,否则表示失败。

Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr)

设置 DeliveryReportCb 回调函数。用于生产者,在消息成功或失败发送到 Broker 时调用。

Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr)

设置 EventCb 回调函数。用于接收 Kafka 客户端的事件,例如错误、统计信息、日志等。

Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr)

设置用于自动订阅 Topic 的默认 Topic 配置。

Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr)

设置 PartitionerCb 回调函数,用于自定义消息分区策略。 配置对象必须是 CONF_TOPIC 类型。

Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr)

设置 PartitionerKeyPointerCb 回调函数。 用于自定义消息分区策略时,提供 Key 指针。

Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr)

设置 SocketCb 回调函数。

Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr)

设置 OpenCb 回调函数。

Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr)

设置 RebalanceCb 回调函数。用于消费者,在消费者组 Rebalance 时调用。

Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr)

设置 OffsetCommitCb 回调函数。用于消费者,在提交位移时调用。

Conf::ConfResult get(const std::string &name, std::string &value) const

查询单条属性配置值。

示例代码:

展开

代码语言:C++

 

自动换行

AI代码解释

 

#include <iostream> #include <string> #include <librdkafka/rdkafka.h> int main() { std::string errstr; // 创建全局配置对象 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // 设置 Broker 地址 RdKafka::Conf::ConfResult result = conf->set("bootstrap.servers", "your_broker_address", errstr); if (result != RdKafka::Conf::CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << errstr << std::endl; delete conf; return 1; } // 设置消费者组 ID result = conf->set("group.id", "your_group_id", errstr); if (result != RdKafka::Conf::CONF_OK) { std::cerr << "Failed to set group.id: " << errstr << std::endl; delete conf; return 1; } // 获取配置值 std::string value; conf->get("group.id", value); std::cout << "group.id: " << value << std::endl; delete conf; return 0; }

2.2、RdKafka::Event

RdKafka::Event 类表示 Kafka 客户端的事件,例如错误、统计信息、日志等。

enum Type: 定义事件类型。

枚举值

描述

EVENT_ERROR

错误事件。

EVENT_STATS

JSON 文档统计事件。

EVENT_LOG

日志消息事件。

EVENT_THROTTLE

来自 Broker 的 throttle 级信号事件。

常用方法:

方法

描述

virtual Type type() const =0

返回事件类型。

virtual ErrorCode err() const =0

返回事件错误代码。

virtual Severity severity() const =0

返回 log 严重级别。

virtual std::string fac() const =0

返回 log 基础字符串。

virtual std::string str () const =0

返回 Log 消息字符串。

virtual int throttle_time() const =0

返回 throttle 时间。

virtual std::string broker_name() const =0

返回 Broker 名称。

virtual int broker_id() const =0

返回 Broker ID。

2.3、RdKafka::EventCb

RdKafka::EventCb 是事件回调函数的基类,用于接收 Kafka 客户端的事件。

方法

描述

virtual void event_cb(Event &event)=0

事件回调函数。

C++ 封装示例:

展开

代码语言:C++

 

自动换行

AI代码解释

 

#include <iostream> #include <string> #include <librdkafka/rdkafka.h> class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: if (event.fatal()) { std::cerr << "FATAL "; } std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl; break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } };

2.4、RdKafka::TopicPartition

RdKafka::TopicPartition 类表示 Topic 的一个分区。

 

 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐