Linux C/C++ 学习日记(77):Kafka(五):消费者代码(3):订阅、消费、重平衡、日志
本文介绍了Kafka消费者API的核心功能和使用要点。主要内容包括:1)rebalance_cb回调方法处理消费组重平衡事件;2)assign/unassign方法手动分配/释放分区;3)subscribe方法启用消费组自动分区分配;4)consume/poll方法拉取消息;5)event_cb回调处理各类事件。重点分析了手动assign在消费组模式下的风险:会绕过协调器管控,导致分区重复消费。正
·
注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。
一、相关api
1. rebalance_cb
- 含义:RdKafka 中 RebalanceCb 抽象类的核心回调方法,消费组模式下重平衡事件的处理入口,承接 Broker 侧消费组协调器下发的分区分配 / 回收指令。
- 使用方法:继承 RebalanceCb 并重写该方法,根据 ErrorCode(ASSIGN_PARTITIONS/REVOKE_PARTITIONS)执行分区接管 / 释放逻辑;将实现类实例注册至消费者配置。
- 时机:消费者调用 poll/consume/subscribe/unsubscribe/close 时,librdkafka 检测到 Broker 侧触发的重平衡事件后同步执行。(调用subscribe、unsubscribe、close必定执行,consume跟poll是监测到才执行)
/**
* @brief 消费者重平衡回调类
* @details 继承 RdKafka::RebalanceCb,处理消费组重平衡事件
* (分区分配/回收),保证重平衡时分区正确接管/释放,避免重复消费
*/
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
/**
* @brief 辅助函数:打印分区列表(便于调试重平衡的分区变化)
* @param partitions 分区列表(包含主题名+分区号)
*/
static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)
{
for (unsigned int i = 0 ; i < partitions.size() ; i++)
std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
std::cerr << "\n";
}
public:
/**
* @brief 重平衡回调函数(必须重写)
* @param consumer 消费者实例指针
* @param err 重平衡错误码(标识分配/回收/异常)
* @param partitions 待分配/回收的分区列表
*/
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions)
{
std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
printTopicPartition(partitions);
// 场景1:协调器为当前消费者分配新分区
if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
// 手动接管分区分配(替代默认逻辑)
consumer->assign(partitions);
// 记录当前持有的分区数量
partition_count = (int)partitions.size();
}
// 场景2:协调器回收当前消费者的旧分区(或其他异常)
else
{
// 取消分区分配,释放消费权
consumer->unassign();
partition_count = 0;
}
}
private:
// 记录当前消费者持有的分区数量(初始值0避免随机值)
int partition_count = 0;
};
2. assign
- 含义:KafkaConsumer 类的成员方法,显式将指定 TopicPartition 列表绑定至当前消费者实例,确立消费分区范围。
- 使用方法:传入 TopicPartition 列表(指定主题 - 分区)调用该方法;消费组模式下仅在 rebalance_cb 内调用,非消费组模式可直接调用。
- 时机:消费组模式下,rebalance_cb 接收到 ASSIGN_PARTITIONS 事件时(承接协调器分配的分区);非消费组模式下,初始化消费者后手动指定消费分区时。
初始化消费者时手动指定分区
注意:
assign是不触发重平衡机制的,也就说如果配置里绑定了groupid,可能会出现同一个消费组里面有两个消费者同时消费一个分区,严重违反约定。
因此,通常情况下,assign一般用于单消费者的消费组(即不指定groupid)
// 1. 构造指定的「主题+分区」列表
std::vector<RdKafka::TopicPartition *> assign_partitions;
// 遍历要消费的主题列表,为每个主题指定要消费的分区(示例:使用成员变量m_partition,也可自定义多个分区)
for (const auto &topic : m_topicVector)
{
// 创建「主题+分区」对象(核心:指定具体分区号,比如m_partition=0)
RdKafka::TopicPartition *tp = RdKafka::TopicPartition::create(topic, m_partition);
// 【可选】指定消费起始位移(比如从第一条消息开始消费,不指定则用auto.offset.reset)
tp->set_offset(RdKafka::Topic::OFFSET_STORED);
/**
* RdKafka::Topic::OFFSET_BEGINNING:从第一条消息开始消费 (注意值为-2,并非为0)
* RdKafka::Topic::OFFSET_STORED:使用消费组存储的历史位移, (注意值为-1000)
* RdKafka::Topic::OFFSET_END:只消费未来新消息 (注意值为-1)
*/
assign_partitions.push_back(tp);
}
// 2. 用assign替换subscribe,手动分配分区
RdKafka::ErrorCode errorCode = m_consumer->assign(assign_partitions);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "assign partition failed: " << RdKafka::err2str(errorCode) << std::endl;
// 释放TopicPartition对象,避免内存泄漏
for (auto tp : assign_partitions)
delete tp;
return;
}
// 释放TopicPartition对象(assign后不再需要)
for (auto tp : assign_partitions)
delete tp;
3. unassign
- 含义:KafkaConsumer 类的成员方法,解除当前消费者与所有 TopicPartition 的绑定关系,终止对所有分区的消费权限。
- 使用方法:无参数调用该方法,清空消费者的分区绑定列表;释放分区前需完成位移提交以避免重复消费。
- 时机:消费组模式下,rebalance_cb 接收到 REVOKE_PARTITIONS 事件时(回收分区前执行);消费者停止消费 / 异常容错时(终止所有分区消费)。
4. subscribe
- 含义:KafkaConsumer 类的成员方法,订阅指定主题列表,启用消费组模式,交由 Broker 侧消费组协调器自动管理分区分配。
- 使用方法:传入主题列表调用该方法,可配置分区分配策略;调用后消费者向协调器注册,触发初始重平衡。
- 时机:消费者初始化后、启动消费循环前(启用消费组模式);业务需动态调整订阅主题时(触发重平衡更新分区分配)。
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
return; // 订阅失败直接返回,避免无效循环
}
5. consume
- 含义:KafkaConsumer 类的成员方法,阻塞式拉取单条 / 批量消息,底层封装 poll 方法驱动客户端事件循环。
- 使用方法:指定超时时间调用该方法,返回拉取到的 Message 实例;需在消费循环中持续调用以拉取消息并处理后台事件。
- 时机:消费业务的核心循环中,按固定超时周期调用(如 1000ms),是业务层拉取消息的核心入口。
while (true)
{
// 拉取消息(核心API:阻塞1000ms,超时返回ERR__TIMED_OUT)
RdKafka::Message *msg = m_consumer->consume(1000);
// 处理拉取到的消息
msg_consume(msg, NULL);
// 释放消息对象(必须!否则内存泄漏)
delete msg;
}
6. poll
- 含义:KafkaConsumer 类的底层核心方法,处理客户端所有后台事件(重平衡、心跳、错误等)并拉取消息。
- 使用方法:指定超时时间调用该方法,返回处理的事件数;consume 方法底层自动调用,也可手动调用以精细管控事件处理。
- 时机:consume 底层隐式调用;需手动管控客户端事件循环时(如自定义事件处理逻辑)显式调用。
7. event_cb
- 含义:RdKafka 中 EventCb 抽象类的核心回调方法,客户端全量事件的统一处理入口,覆盖错误、重平衡、统计、限流等事件。
- 使用方法:继承 EventCb 并重写该方法,按 Event 类型(ERROR/REBALANCE/STATS 等)处理不同事件;注册至消费者配置以接管全量事件。
- 时机:poll/consume 调用时,客户端检测到待处理事件后同步执行,替代 error_cb/stats_cb 等专用回调。
/**
* @brief 消费者事件回调类
* @details 继承 RdKafka::EventCb,处理消费者运行过程中的各类异步事件
* (错误、日志、统计、Broker限流等),是监控消费者状态的核心
*/
class ConsumerEventCb : public RdKafka::EventCb
{
public:
/**
* @brief 事件回调函数(必须重写)
* @param event 事件对象,包含事件类型、错误码、描述信息等
*/
void event_cb (RdKafka::Event &event)
{
// 根据事件类型分类处理
switch (event.type())
{
// 错误事件(FATAL表示致命错误,会导致消费者退出)
case RdKafka::Event::EVENT_ERROR:
if (event.fatal())
{
std::cerr << "FATAL "; // 标记致命错误
}
// 输出错误码和错误描述
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
// 统计事件(输出消费者性能指标,如拉取速率、请求延迟等)
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
// 日志事件(输出librdkafka内部日志,便于调试)
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), // 日志级别(INFO/WARN/ERROR)
event.fac().c_str(), // 日志模块
event.str().c_str()); // 日志内容
break;
// 限流事件(Broker对消费者拉取速率限流时触发)
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
event.broker_name() << " id " << (int)event.broker_id() << std::endl;
break;
// 其他未分类事件
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
二、初始化消费者用assign的注意事项
手动调用 assign 会打破消费组的分区互斥规则,导致同一分区被消费组内多个消费者同时消费,但这并非 Kafka 机制缺陷,而是「手动分区模式」与「消费组模式」的使用边界问题。
1.消费组分区互斥的前提:
Kafka 保证「一个分区仅被消费组内一个消费者消费」的核心,是消费组协调器的管控 —— 只有通过 subscribe 启用消费组模式,消费者才会向协调器注册自身状态,协调器统一分配分区并维护「分区 - 消费者」映射关系,从而保证互斥。
2. 手动 assign 打破互斥的原因:
当你在消费组内(配置了 group.id)直接调用 assign 时:
- 消费者仅在客户端层面绑定分区,不会向 Broker 侧的消费组协调器注册该分区绑定信息;
- 协调器无法感知该消费者的分区占用,仍会将该分区分配给消费组内其他通过 subscribe 加入的消费者;
- 最终导致多个消费者同时消费同一分区,违背互斥规则
3. assign 与消费组的正确使用边界:
- ✅ 消费组模式(需分区互斥):仅通过 subscribe 启用,由协调器主导分区分配,assign 仅在 rebalance_cb 中承接协调器分配的分区(此时协调器已保证互斥);
- ❌ 禁止操作:消费组内直接手动调用 assign(会绕开协调器,破坏互斥);
- ✅ 手动分区模式(无需消费组):不配置 group.id,直接调用 assign(此时无消费组,无需互斥)。
4. 解决方案
若需保证消费组内分区互斥,必须使用 subscribe 而非手动 assign:
- 消费组内所有消费者均调用
subscribe(topics),交由协调器统一分配分区; - 仅在
rebalance_cb中响应ASSIGN_PARTITIONS事件时调用assign,承接协调器分配的分区(此时协调器已确保分区不重复); - 禁止消费组内直接手动调用
assign绑定分区。
5. 总结
- 手动
assign不触发重平衡,且绕开消费组协调器管控,是导致分区重复消费的根本原因; - 消费组模式的核心是
subscribe+ 协调器主导的重平衡,而非手动assign; - 分区互斥的保障来自 Broker 侧的消费组协调器,而非客户端的
assign调用。
更多推荐
所有评论(0)