注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

一、相关api

1. rebalance_cb

  • 含义:RdKafka 中 RebalanceCb 抽象类的核心回调方法,消费组模式下重平衡事件的处理入口,承接 Broker 侧消费组协调器下发的分区分配 / 回收指令。
  • 使用方法:继承 RebalanceCb 并重写该方法,根据 ErrorCode(ASSIGN_PARTITIONS/REVOKE_PARTITIONS)执行分区接管 / 释放逻辑;将实现类实例注册至消费者配置。
  • 时机:消费者调用 poll/consume/subscribe/unsubscribe/close 时,librdkafka 检测到 Broker 侧触发的重平衡事件后同步执行。(调用subscribe、unsubscribe、close必定执行,consume跟poll是监测到才执行)
/**
 * @brief 消费者重平衡回调类
 * @details 继承 RdKafka::RebalanceCb,处理消费组重平衡事件
 *          (分区分配/回收),保证重平衡时分区正确接管/释放,避免重复消费
 */
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
    /**
     * @brief 辅助函数:打印分区列表(便于调试重平衡的分区变化)
     * @param partitions 分区列表(包含主题名+分区号)
     */
    static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)
    {
        for (unsigned int i = 0 ; i < partitions.size() ; i++)
            std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
        std::cerr << "\n";
    }

public:
    /**
     * @brief 重平衡回调函数(必须重写)
     * @param consumer 消费者实例指针
     * @param err 重平衡错误码(标识分配/回收/异常)
     * @param partitions 待分配/回收的分区列表
     */
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                       RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
        printTopicPartition(partitions);

        // 场景1:协调器为当前消费者分配新分区
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
        {
            // 手动接管分区分配(替代默认逻辑)
            consumer->assign(partitions);
            // 记录当前持有的分区数量
            partition_count = (int)partitions.size();
        }
        // 场景2:协调器回收当前消费者的旧分区(或其他异常)
        else
        {
            // 取消分区分配,释放消费权
            consumer->unassign();
            partition_count = 0;
        }
    }
private:
    // 记录当前消费者持有的分区数量(初始值0避免随机值)
    int partition_count = 0;
};

2. assign

  • 含义:KafkaConsumer 类的成员方法,显式将指定 TopicPartition 列表绑定至当前消费者实例,确立消费分区范围。
  • 使用方法:传入 TopicPartition 列表(指定主题 - 分区)调用该方法;消费组模式下仅在 rebalance_cb 内调用,非消费组模式可直接调用。
  • 时机:消费组模式下,rebalance_cb 接收到 ASSIGN_PARTITIONS 事件时(承接协调器分配的分区);非消费组模式下,初始化消费者后手动指定消费分区时。

初始化消费者时手动指定分区

注意:

assign是不触发重平衡机制的,也就说如果配置里绑定了groupid,可能会出现同一个消费组里面有两个消费者同时消费一个分区,严重违反约定。
因此,通常情况下,assign一般用于单消费者的消费组(即不指定groupid)

// 1. 构造指定的「主题+分区」列表
std::vector<RdKafka::TopicPartition *> assign_partitions;
// 遍历要消费的主题列表,为每个主题指定要消费的分区(示例:使用成员变量m_partition,也可自定义多个分区)
for (const auto &topic : m_topicVector)
{
    // 创建「主题+分区」对象(核心:指定具体分区号,比如m_partition=0)
    RdKafka::TopicPartition *tp = RdKafka::TopicPartition::create(topic, m_partition);
    // 【可选】指定消费起始位移(比如从第一条消息开始消费,不指定则用auto.offset.reset)
    tp->set_offset(RdKafka::Topic::OFFSET_STORED);
    /**
     * RdKafka::Topic::OFFSET_BEGINNING:从第一条消息开始消费 (注意值为-2,并非为0)
     * RdKafka::Topic::OFFSET_STORED:使用消费组存储的历史位移, (注意值为-1000)
     * RdKafka::Topic::OFFSET_END:只消费未来新消息 (注意值为-1)
     */
    assign_partitions.push_back(tp);
}

// 2. 用assign替换subscribe,手动分配分区
RdKafka::ErrorCode errorCode = m_consumer->assign(assign_partitions);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
    std::cout << "assign partition failed: " << RdKafka::err2str(errorCode) << std::endl;
    // 释放TopicPartition对象,避免内存泄漏
    for (auto tp : assign_partitions)
        delete tp;
    return;
}
// 释放TopicPartition对象(assign后不再需要)
for (auto tp : assign_partitions)
    delete tp;

3. unassign

  • 含义:KafkaConsumer 类的成员方法,解除当前消费者与所有 TopicPartition 的绑定关系,终止对所有分区的消费权限。
  • 使用方法:无参数调用该方法,清空消费者的分区绑定列表;释放分区前需完成位移提交以避免重复消费。
  • 时机:消费组模式下,rebalance_cb 接收到 REVOKE_PARTITIONS 事件时(回收分区前执行);消费者停止消费 / 异常容错时(终止所有分区消费)。

4. subscribe

  • 含义:KafkaConsumer 类的成员方法,订阅指定主题列表,启用消费组模式,交由 Broker 侧消费组协调器自动管理分区分配。
  • 使用方法:传入主题列表调用该方法,可配置分区分配策略;调用后消费者向协调器注册,触发初始重平衡。
  • 时机:消费者初始化后、启动消费循环前(启用消费组模式);业务需动态调整订阅主题时(触发重平衡更新分区分配)。
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
if (errorCode != RdKafka::ERR_NO_ERROR)
{
    std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
    return; // 订阅失败直接返回,避免无效循环
}

5. consume

  • 含义:KafkaConsumer 类的成员方法,阻塞式拉取单条 / 批量消息,底层封装 poll 方法驱动客户端事件循环。
  • 使用方法:指定超时时间调用该方法,返回拉取到的 Message 实例;需在消费循环中持续调用以拉取消息并处理后台事件。
  • 时机:消费业务的核心循环中,按固定超时周期调用(如 1000ms),是业务层拉取消息的核心入口。
while (true)
{
    // 拉取消息(核心API:阻塞1000ms,超时返回ERR__TIMED_OUT)
    RdKafka::Message *msg = m_consumer->consume(1000);
    // 处理拉取到的消息
    msg_consume(msg, NULL);
    // 释放消息对象(必须!否则内存泄漏)
    delete msg;
}

6. poll

  • 含义:KafkaConsumer 类的底层核心方法,处理客户端所有后台事件(重平衡、心跳、错误等)并拉取消息。
  • 使用方法:指定超时时间调用该方法,返回处理的事件数;consume 方法底层自动调用,也可手动调用以精细管控事件处理。
  • 时机:consume 底层隐式调用;需手动管控客户端事件循环时(如自定义事件处理逻辑)显式调用。

7. event_cb

  • 含义:RdKafka 中 EventCb 抽象类的核心回调方法,客户端全量事件的统一处理入口,覆盖错误、重平衡、统计、限流等事件。
  • 使用方法:继承 EventCb 并重写该方法,按 Event 类型(ERROR/REBALANCE/STATS 等)处理不同事件;注册至消费者配置以接管全量事件。
  • 时机:poll/consume 调用时,客户端检测到待处理事件后同步执行,替代 error_cb/stats_cb 等专用回调。
/**
 * @brief 消费者事件回调类
 * @details 继承 RdKafka::EventCb,处理消费者运行过程中的各类异步事件
 *          (错误、日志、统计、Broker限流等),是监控消费者状态的核心
 */
class ConsumerEventCb : public RdKafka::EventCb
{
public:
    /**
     * @brief 事件回调函数(必须重写)
     * @param event 事件对象,包含事件类型、错误码、描述信息等
     */
    void event_cb (RdKafka::Event &event)
    {
        // 根据事件类型分类处理
        switch (event.type())
        {
        // 错误事件(FATAL表示致命错误,会导致消费者退出)
        case RdKafka::Event::EVENT_ERROR:
            if (event.fatal())
            {
                std::cerr << "FATAL "; // 标记致命错误
            }
            // 输出错误码和错误描述
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;

        // 统计事件(输出消费者性能指标,如拉取速率、请求延迟等)
        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        // 日志事件(输出librdkafka内部日志,便于调试)
        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), // 日志级别(INFO/WARN/ERROR)
                    event.fac().c_str(), // 日志模块
                    event.str().c_str()); // 日志内容
            break;

        // 限流事件(Broker对消费者拉取速率限流时触发)
        case RdKafka::Event::EVENT_THROTTLE:
            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                      event.broker_name() << " id " << (int)event.broker_id() << std::endl;
            break;

        // 其他未分类事件
        default:
            std::cerr << "EVENT " << event.type() <<
                      " (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;
        }
    }
};

二、初始化消费者用assign的注意事项

手动调用 assign 会打破消费组的分区互斥规则,导致同一分区被消费组内多个消费者同时消费,但这并非 Kafka 机制缺陷,而是「手动分区模式」与「消费组模式」的使用边界问题。

1.消费组分区互斥的前提

Kafka 保证「一个分区仅被消费组内一个消费者消费」的核心,是消费组协调器的管控 —— 只有通过 subscribe 启用消费组模式,消费者才会向协调器注册自身状态,协调器统一分配分区并维护「分区 - 消费者」映射关系,从而保证互斥。

2. 手动 assign 打破互斥的原因:

当你在消费组内(配置了 group.id)直接调用 assign 时:

  • 消费者仅在客户端层面绑定分区,不会向 Broker 侧的消费组协调器注册该分区绑定信息;
  • 协调器无法感知该消费者的分区占用,仍会将该分区分配给消费组内其他通过 subscribe 加入的消费者;
  • 最终导致多个消费者同时消费同一分区,违背互斥规则

3. assign 与消费组的正确使用边界:

  • ✅ 消费组模式(需分区互斥):仅通过 subscribe 启用,由协调器主导分区分配,assign 仅在 rebalance_cb 中承接协调器分配的分区(此时协调器已保证互斥);
  • ❌ 禁止操作:消费组内直接手动调用 assign(会绕开协调器,破坏互斥);
  • ✅ 手动分区模式(无需消费组):不配置 group.id,直接调用 assign(此时无消费组,无需互斥)。

4. 解决方案

若需保证消费组内分区互斥,必须使用 subscribe 而非手动 assign

  • 消费组内所有消费者均调用 subscribe(topics),交由协调器统一分配分区;
  • 仅在 rebalance_cb 中响应 ASSIGN_PARTITIONS 事件时调用 assign,承接协调器分配的分区(此时协调器已确保分区不重复);
  • 禁止消费组内直接手动调用 assign 绑定分区。

5. 总结

  • 手动 assign 不触发重平衡,且绕开消费组协调器管控,是导致分区重复消费的根本原因;
  • 消费组模式的核心是 subscribe + 协调器主导的重平衡,而非手动 assign
  • 分区互斥的保障来自 Broker 侧的消费组协调器,而非客户端的 assign 调用。
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐