快速上手:学习如何使用C++实现kafka消费者客户端
提交消息位移 (Offset Committing):消息消费完成后,需要提交已消费消息的位移 (Offset),以便 Kafka 集群知道消费者已经消费了哪些消息。位移提交方式:自动提交 (Enable Auto Commit): Kafka 消费者会自动定期提交位移 (由控制)。这种方式简单,但可能存在消息重复消费的风险。如果在自动提交位移之前,消费者崩溃,未提交的消息会被重复消费。手动同步提
提交消息位移 (Offset Committing):
-
消息消费完成后,需要提交已消费消息的位移 (Offset),以便 Kafka 集群知道消费者已经消费了哪些消息。
-
位移提交方式:
-
自动提交 (Enable Auto Commit): Kafka 消费者会自动定期提交位移 (由
auto.commit.interval.ms控制)。 这种方式简单,但可能存在消息重复消费的风险。 如果在自动提交位移之前,消费者崩溃,未提交的消息会被重复消费。 -
手动同步提交 (Synchronous Committing): 调用
commitSync()方法手动同步提交位移。 这种方式可以保证位移提交成功,但会阻塞消费者线程,影响吞吐量。 -
手动异步提交 (Asynchronous Committing): 调用
commitAsync()方法手动异步提交位移。 这种方式不会阻塞消费者线程,可以提高吞吐量,但可靠性相对较差。 如果提交失败,需要进行重试。 -
指定位移提交: 可以指定要提交的
TopicPartition和OffsetMetadata信息。
-
(6)关闭消费者实例 :
-
当消费者不再需要消费消息时,需要关闭
KafkaConsumer实例,释放资源。 -
调用
close()方法关闭消费者。 -
close()方法会向 Kafka 集群发送离开消费者组的请求,并释放所有资源。 -
建议: 将
close()方法放在finally块中,确保即使在发生异常的情况下,消费者也能被正确关闭。

二、Kafka 的 C++ API (librdkafka)
librdkafka 是 Kafka 官方推荐的 C/C++ 客户端库,提供了高性能和可靠性。
2.1、RdKafka::Conf
RdKafka::Conf 类用于配置 Kafka 客户端,包括全局配置和 Topic 配置。
enum ConfType: 定义配置的类型。
|
枚举值 |
描述 |
|---|---|
|
|
全局配置,影响客户端整体行为。 |
|
|
Topic 配置,影响特定 Topic 的行为。 |
enum ConfResult: 定义配置结果。
|
枚举值 |
描述 |
|---|---|
|
|
未知配置错误。 |
|
|
配置值无效。 |
|
|
配置成功。 |
常用方法:
|
方法 |
描述 |
|---|---|
|
|
创建配置对象。 |
|
|
设置配置对象的属性值。 |
|
|
设置 |
|
|
设置 |
|
|
设置用于自动订阅 Topic 的默认 Topic 配置。 |
|
|
设置 |
|
|
设置 |
|
|
设置 |
|
|
设置 |
|
|
设置 |
|
|
设置 |
|
|
查询单条属性配置值。 |
示例代码:
展开
代码语言:C++
自动换行
AI代码解释
#include <iostream> #include <string> #include <librdkafka/rdkafka.h> int main() { std::string errstr; // 创建全局配置对象 RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // 设置 Broker 地址 RdKafka::Conf::ConfResult result = conf->set("bootstrap.servers", "your_broker_address", errstr); if (result != RdKafka::Conf::CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << errstr << std::endl; delete conf; return 1; } // 设置消费者组 ID result = conf->set("group.id", "your_group_id", errstr); if (result != RdKafka::Conf::CONF_OK) { std::cerr << "Failed to set group.id: " << errstr << std::endl; delete conf; return 1; } // 获取配置值 std::string value; conf->get("group.id", value); std::cout << "group.id: " << value << std::endl; delete conf; return 0; }
2.2、RdKafka::Event
RdKafka::Event 类表示 Kafka 客户端的事件,例如错误、统计信息、日志等。
enum Type: 定义事件类型。
|
枚举值 |
描述 |
|---|---|
|
|
错误事件。 |
|
|
JSON 文档统计事件。 |
|
|
日志消息事件。 |
|
|
来自 Broker 的 throttle 级信号事件。 |
常用方法:
|
方法 |
描述 |
|---|---|
|
|
返回事件类型。 |
|
|
返回事件错误代码。 |
|
|
返回 log 严重级别。 |
|
|
返回 log 基础字符串。 |
|
|
返回 Log 消息字符串。 |
|
|
返回 throttle 时间。 |
|
|
返回 Broker 名称。 |
|
|
返回 Broker ID。 |
2.3、RdKafka::EventCb
RdKafka::EventCb 是事件回调函数的基类,用于接收 Kafka 客户端的事件。
|
方法 |
描述 |
|---|---|
|
|
事件回调函数。 |
C++ 封装示例:
展开
代码语言:C++
自动换行
AI代码解释
#include <iostream> #include <string> #include <librdkafka/rdkafka.h> class ConsumerEventCb : public RdKafka::EventCb { public: void event_cb(RdKafka::Event &event) { switch (event.type()) { case RdKafka::Event::EVENT_ERROR: if (event.fatal()) { std::cerr << "FATAL "; } std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; case RdKafka::Event::EVENT_STATS: std::cerr << "\"STATS\": " << event.str() << std::endl; break; case RdKafka::Event::EVENT_LOG: fprintf(stderr, "LOG-%i-%s: %s\n", event.severity(), event.fac().c_str(), event.str().c_str()); break; case RdKafka::Event::EVENT_THROTTLE: std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl; break; default: std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl; break; } } };
2.4、RdKafka::TopicPartition
RdKafka::TopicPartition 类表示 Topic 的一个分区。
更多推荐
所有评论(0)