注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。

完整代码:kafka_example: kafka的c++代码 (gitee.com)

一、代码框架

代码采用分层设计 + 单例模式 + 回调机制,整体架构分为 5 个核心模块,如下:

模块 核心类 / 结构体 核心作用
配置层 全局常量 定义消费超时、批量提交阈值、Kafka 配置等核心参数,统一管理配置
管理层 KafkaConsumerManager 单例模式,统一注册 / 注销 / 停止所有消费者,处理系统信号(SIGINT/SIGTERM)
状态层 GlobalState 存储消费者运行状态和统计信息,支持原子操作保证多线程安全
回调层 EventCallback/RebalanceCallback 处理 Kafka 事件(错误 / 统计)和重平衡(分区分配 / 回收),实现业务定制逻辑
核心业务层 KafkaConsumer 封装消费核心逻辑:初始化、订阅、循环消费、批量提交、优雅关闭
测试入口 main 函数 创建多消费者实例,启动消费线程,验证整体功能

二、代码编写流程

代码遵循「从基础到核心、从配置到业务」的编写逻辑,具体流程如下:

1. 基础配置定义

  • 定义核心常量:消费超时、批量提交阈值、Kafka 连接参数等,为后续逻辑提供配置支撑;
  • 选择常量而非硬编码,便于后续维护和参数调整。

2. 管理器实现

  • 实现单例模式的KafkaConsumerManager:保证全局只有一个管理器实例;
  • 实现消费者注册 / 注销 / 查询接口,统一管理所有消费者;
  • 注册信号处理函数,监听终止信号(SIGINT/SIGTERM),实现优雅关闭。

3. 状态结构体设计

  • 定义GlobalState:存储消费者运行状态(是否运行)、统计信息(消费数 / 提交失败数);
  • 使用原子类型(std::atomic)保证多线程安全,使用互斥锁保护位移提交。

4. 回调类实现

  • 实现EventCallback:处理 librdkafka 的各类事件(错误、统计、限流等),便于问题排查和监控;
  • 实现RebalanceCallback:处理重平衡事件,保证分区回收时位移不丢失。

5. 核心消费者类实现

  • 初始化逻辑:创建 Kafka 配置对象,设置必选 / 可选参数,注册回调函数,创建原生消费者实例;
  • 消费逻辑:订阅主题,循环拉取消息,批量缓存 + 批量处理,减少频繁 IO;
  • 提交逻辑:批量提交位移(异步),重平衡时兜底提交(同步),保证位移不丢失;
  • 关闭逻辑:析构函数中停止消费、关闭原生实例、释放资源,实现优雅关闭。

6. 测试入口编写

  • 创建多消费者实例,启动独立消费线程;
  • 捕获异常,保证程序鲁棒性;
  • 等待 librdkafka 资源销毁,避免内存泄漏。

7. 总结

  • 单例管理器:统一管理多消费者实例,避免分散控制;
  • 手动位移提交:禁用自动提交,通过批量提交提升性能,重平衡兜底提交保证数据不丢失;
  • 优雅关闭:信号处理 + 析构函数双重保证,避免强制终止导致的资源泄漏;
  • 多线程安全:原子类型 + 互斥锁,保证统计信息和位移提交的线程安全;
  • 日志完善:带时间戳、消费者名称、日志级别,便于问题定位和监控。

三、代码细节

1.consumer->commitSync(non_const_partitions); 

RdKafka::ErrorCode err = consumer->commitSync(non_const_partitions); 

使用这个必须手动给每个partition来set_offset,否则报错

Local: No offset stored

2. 触发重平衡的时机

  • subscribe:触发ERR__ASSIGN_PARTITIONS,即获取分区
  • unsubscribe/close:触发ERR__REVOKE_PARTITIONS,即释放分区
  • 非首次subscribe:触发ERR__REVOKE_PARTITIONS,后触发ERR__ASSIGN_PARTITIONS。即先释放原分区,后获取新分区

四、kafka_consumer.h

#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <string>
#include <vector>
#include <atomic>
#include <mutex>
#include <map>
#include <csignal>

// 核心常量
const int CONSUME_TIMEOUT_MS = 1000;
const int BATCH_COMMIT_THRESHOLD = 200;
const int REBALANCE_TIMEOUT_MS = 5000;
const std::string AUTO_OFFSET_RESET = "latest";
const int BATCH_FETCH_SIZE = 100;
const bool ENABLE_AUTO_COMMIT = false;

// 全局消费者管理器(核心:通过名称映射实例)
class KafkaConsumerManager {
public:
    static KafkaConsumerManager& getInstance() {
        static KafkaConsumerManager instance;
        return instance;
    }

    void registerConsumer(const std::string& consumer_name, class KafkaConsumer* consumer);
    void unregisterConsumer(const std::string& consumer_name);
    class KafkaConsumer* getConsumer(const std::string& consumer_name);
    void stopAllConsumers();

private:
    KafkaConsumerManager() {
        signal(SIGINT, &KafkaConsumerManager::signalHandler);
        signal(SIGTERM, &KafkaConsumerManager::signalHandler);
    }

    static void signalHandler(int sig) {
        getInstance().stopAllConsumers();
    }

    std::map<std::string, class KafkaConsumer*> m_consumers;
    std::mutex m_mtx;
};

// 全局状态(包含唯一名称)
struct GlobalState {
    std::atomic<bool> running{true};
    std::atomic<uint64_t> total_consumed{0};
    std::atomic<uint64_t> commit_failures{0};
    std::mutex mtx;
    std::string consumer_name; // 唯一标识,用于管理器查找
    std::string group_id;      // 消费组ID
};

// 事件回调(无上下文,通过日志区分)
class EventCallback : public RdKafka::EventCb {
public:
    explicit EventCallback(const std::string& consumer_name) : m_consumer_name(consumer_name) {}
    void event_cb(RdKafka::Event& event) override;
private:
    std::string m_consumer_name; // 绑定当前消费者名称
};

// 重平衡回调(绑定名称,通过管理器找实例)
class RebalanceCallback : public RdKafka::RebalanceCb {
public:
    explicit RebalanceCallback(KafkaConsumer* consumer, const std::string& consumer_name) : 
    m_consumer(consumer), m_consumer_name(consumer_name) {}
    void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                      RdKafka::ErrorCode err,
                      std::vector<RdKafka::TopicPartition*>& partitions) override;
private:
    std::string m_consumer_name;
    KafkaConsumer* m_consumer;
    void commitPartitions(RdKafka::KafkaConsumer* consumer,
                          const std::vector<RdKafka::TopicPartition*>& partitions);
};

// 核心消费者类(静态指针)
class KafkaConsumer {
public:
    KafkaConsumer(const std::string& consumer_name,
                  const std::string& brokers,
                  const std::string& group_id,
                  const std::vector<std::string>& topics);
    ~KafkaConsumer();

    void start();
    void stop() { m_state.running = false; }
    GlobalState& getState() { return m_state; }

private:
    bool initConsumer();
    void processMessage(RdKafka::Message* msg);

    // 核心成员(每个实例独立)
    RdKafka::KafkaConsumer* m_consumer = nullptr;
    EventCallback m_event_cb;
    RebalanceCallback m_rebalance_cb;
    GlobalState m_state;

    // 独立配置
    std::string m_brokers;
    std::vector<std::string> m_topics;
};

五、kafka_consumer.cc

#include "kafka_consumer.h"
#include <iostream>
#include <fstream>
#include <ctime>
#include <iomanip>
#include <sstream>
#include <thread>

// 日志实现(带消费者名称)
static void log(const std::string &level, const std::string &consumer_name, const std::string &msg)
{
    char time_buf[64] = {0};
    std::time_t now = std::time(nullptr);
    std::strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));
    std::cout << "[" << time_buf << "] [" << consumer_name << "] [" << level << "] " << msg << std::endl;

    static std::ofstream log_file("./kafka_consumer.log", std::ios::app);
    if (log_file.is_open())
    {
        log_file << "[" << time_buf << "] [" << consumer_name << "] [" << level << "] " << msg << std::endl;
        log_file.flush();
    }
}

#define LOG_STREAM(level, consumer_name, msg)  \
    do                                         \
    {                                          \
        std::ostringstream oss_;               \
        oss_ << msg;                           \
        log(level, consumer_name, oss_.str()); \
    } while (0)

#define LOG_INFO(consumer_name, msg) LOG_STREAM("INFO", consumer_name, msg)
#define LOG_WARN(consumer_name, msg) LOG_STREAM("WARN", consumer_name, msg)
#define LOG_ERROR(consumer_name, msg) LOG_STREAM("ERROR", consumer_name, msg)
#define LOG_FATAL(consumer_name, msg)            \
    do                                           \
    {                                            \
        LOG_STREAM("FATAL", consumer_name, msg); \
        exit(EXIT_FAILURE);                      \
    } while (0)

// ===================== KafkaConsumerManager 实现 =====================
void KafkaConsumerManager::registerConsumer(const std::string &consumer_name, KafkaConsumer *consumer)
{
    std::lock_guard<std::mutex> lock(m_mtx);
    m_consumers[consumer_name] = consumer;
}

void KafkaConsumerManager::unregisterConsumer(const std::string &consumer_name)
{
    std::lock_guard<std::mutex> lock(m_mtx);
    m_consumers.erase(consumer_name);
}

KafkaConsumer *KafkaConsumerManager::getConsumer(const std::string &consumer_name)
{
    std::lock_guard<std::mutex> lock(m_mtx);
    auto it = m_consumers.find(consumer_name);
    return it != m_consumers.end() ? it->second : nullptr;
}

void KafkaConsumerManager::stopAllConsumers()
{
    std::lock_guard<std::mutex> lock(m_mtx);
    for (auto &pair : m_consumers)
    {
        pair.second->stop();
        LOG_INFO(pair.first, "收到信号,停止消费");
    }
}

// ===================== EventCallback 实现 =====================
void EventCallback::event_cb(RdKafka::Event &event)
{
    switch (event.type())
    {
    case RdKafka::Event::EVENT_ERROR:
        LOG_ERROR(m_consumer_name, "错误事件: " << RdKafka::err2str(event.err()) << " | " << event.str());
        if (event.fatal())
            LOG_FATAL(m_consumer_name, "致命错误,退出");
        break;
    case RdKafka::Event::EVENT_STATS:
        LOG_INFO(m_consumer_name, "性能统计: " << event.str().substr(0, 200));
        break;
    case RdKafka::Event::EVENT_LOG:
        LOG_INFO(m_consumer_name, "内部日志: " << event.str());
        break;
    case RdKafka::Event::EVENT_THROTTLE:
        LOG_WARN(m_consumer_name, "限流事件: " << event.str());
        break;
    default:
        LOG_INFO(m_consumer_name, "其他事件(" << event.type() << "): " << event.str());
        break;
    }
}

// ===================== RebalanceCallback 实现 =====================
void RebalanceCallback::commitPartitions(RdKafka::KafkaConsumer *consumer,
                                         const std::vector<RdKafka::TopicPartition *> &partitions)
{
    // KafkaConsumer *kafka_consumer = KafkaConsumerManager::getInstance().getConsumer(m_consumer_name);
    // std::cout << "进入重平衡" << std::endl;
    // if (!kafka_consumer)
    //     return;

    if (!m_consumer) return;

    std::lock_guard<std::mutex> lock(m_consumer->getState().mtx);
    std::vector<RdKafka::TopicPartition *> non_const_partitions(partitions.begin(), partitions.end());

    // 旧版 commitSync 仅传分区列表
    // RdKafka::ErrorCode err = consumer->commitSync(non_const_partitions); // 传这个需要手动set_offset,否则直接报错
    RdKafka::ErrorCode err = consumer->commitSync();
    if (err != RdKafka::ERR_NO_ERROR)
    {
        LOG_ERROR(m_consumer_name, "重平衡提交失败: " << RdKafka::err2str(err));
        m_consumer->getState().commit_failures++;
    }
    else
    {
        LOG_INFO(m_consumer_name, "重平衡提交成功,分区数: " << partitions.size());
    }
}

void RebalanceCallback::rebalance_cb(RdKafka::KafkaConsumer *consumer,
                                     RdKafka::ErrorCode err,
                                     std::vector<RdKafka::TopicPartition *> &partitions)
{
    switch (err)
    {
    case RdKafka::ERR__ASSIGN_PARTITIONS:
        consumer->assign(partitions);
        LOG_INFO(m_consumer_name, "分配分区数: " << partitions.size());
        break;
    case RdKafka::ERR__REVOKE_PARTITIONS:
        commitPartitions(consumer, partitions);
        consumer->unassign();
        LOG_INFO(m_consumer_name, "回收分区数: " << partitions.size());
        break;
    default:
        LOG_ERROR(m_consumer_name, "重平衡异常: " << RdKafka::err2str(err));
        consumer->unassign();
        break;
    }
}

// ===================== KafkaConsumer 实现 =====================
KafkaConsumer::KafkaConsumer(const std::string &consumer_name,
                             const std::string &brokers,
                             const std::string &group_id,
                             const std::vector<std::string> &topics)
    : m_brokers(brokers), m_topics(topics),
      m_event_cb(consumer_name), m_rebalance_cb(this, consumer_name)
{
    m_state.consumer_name = consumer_name;
    m_state.group_id = group_id;

    if (!initConsumer())
    {
        LOG_FATAL(consumer_name, "消费者初始化失败");
        throw std::runtime_error("KafkaConsumer initialization failed");
    }

    KafkaConsumerManager::getInstance().registerConsumer(consumer_name, this);
    LOG_INFO(consumer_name, "消费者初始化成功,消费组: " << group_id);
}

KafkaConsumer::~KafkaConsumer()
{
    stop();
    KafkaConsumerManager::getInstance().unregisterConsumer(m_state.consumer_name);

    if (m_consumer)
    {
        m_consumer->close(); // 这个会触发重平衡,注意千万不要漏了,否则分区在kafka发现该消费者断连之前,这个分区就没人消费了
        delete m_consumer;
    }

    LOG_INFO(m_state.consumer_name, "消费者已销毁,累计消费: " << m_state.total_consumed
                                                               << " | 提交失败: " << m_state.commit_failures);
}

bool KafkaConsumer::initConsumer()
{
    std::string errstr;
    auto *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    // 必设配置
    if (conf->set("bootstrap.servers", m_brokers, errstr) != RdKafka::Conf::CONF_OK)
    {
        LOG_ERROR(m_state.consumer_name, "设置broker失败: " << errstr);
        delete conf;
        return false;
    }
    if (conf->set("group.id", m_state.group_id, errstr) != RdKafka::Conf::CONF_OK)
    {
        LOG_ERROR(m_state.consumer_name, "设置消费组失败: " << errstr);
        delete conf;
        return false;
    }

    // 可选配置
    conf->set("enable.auto.commit", ENABLE_AUTO_COMMIT ? "true" : "false", errstr);
    conf->set("max.poll.records", std::to_string(BATCH_FETCH_SIZE), errstr);
    conf->set("offset.commit.timeout.ms", std::to_string(REBALANCE_TIMEOUT_MS), errstr);
    conf->set("enable.partition.eof", "false", errstr);        //
    conf->set("max.partition.fetch.bytes", "1024000", errstr); //

    // 注册回调(绑定当前消费者名称)
    conf->set("event_cb", &m_event_cb, errstr);
    conf->set("rebalance_cb", &m_rebalance_cb, errstr);


    // 主题配置
    auto* m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    m_topicConfig->set("auto.offset.reset", AUTO_OFFSET_RESET, errstr);
    conf->set("default_topic_conf", m_topicConfig, errstr);



    // 创建消费者
    m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!m_consumer)
    {
        LOG_ERROR(m_state.consumer_name, "创建消费者失败: " << errstr);
        delete conf;
        return false;
    }

    delete conf;
    return true;
}

void KafkaConsumer::processMessage(RdKafka::Message *msg)
{
    const std::string &name = m_state.consumer_name;
    switch (msg->err())
    {
    case RdKafka::ERR_NO_ERROR:
        LOG_INFO(name, "消费消息: topic=" << msg->topic_name()
                                          << " | partition=" << msg->partition()
                                          << " | offset=" << msg->offset()
                                          << " | content=" << static_cast<const char *>(msg->payload()));
        m_state.total_consumed++;
        break;
    case RdKafka::ERR__TIMED_OUT:
        break;
    default:
        LOG_ERROR(name, "消费错误: " << msg->errstr());
        break;
    }
}

void KafkaConsumer::start()
{
    const std::string &name = m_state.consumer_name;
    auto err = m_consumer->subscribe(m_topics);
    if (err != RdKafka::ERR_NO_ERROR)
    {
        LOG_FATAL(name, "订阅主题失败: " << RdKafka::err2str(err));
        m_state.running = false;
        return;
    }
    LOG_INFO(name, "订阅主题成功: " << m_topics.size() << "个");

    std::vector<RdKafka::Message *> msg_batch;
    int batch_count = 0;
    while (m_state.running)
    {
        auto *msg = m_consumer->consume(CONSUME_TIMEOUT_MS);
        if (msg->err() == RdKafka::ERR_NO_ERROR)
        {
            msg_batch.push_back(msg);
            batch_count++;
            if (msg_batch.size() >= BATCH_FETCH_SIZE)
            {
                for (auto &m : msg_batch)
                {
                    processMessage(m);
                    delete m;
                }
                msg_batch.clear();
            }
            
        }
        else if (msg->err() == RdKafka::ERR__TIMED_OUT)
        {
            if (!msg_batch.empty())
            {
                for (auto &m : msg_batch)
                {
                    processMessage(m);
                    delete m;
                }
                msg_batch.clear();
            }
            processMessage(msg);
            delete msg;
            
        }
        else
        {
            // 消费运行时错误,记录并退出
            LOG_ERROR(name, "消费消息失败: " << msg->errstr());
            delete msg;
            m_state.running = false;
            break;
        }

        if (batch_count >= BATCH_COMMIT_THRESHOLD)
        {
            std::lock_guard<std::mutex> lock(m_state.mtx);
            auto commit_err = m_consumer->commitAsync();
            if (commit_err != RdKafka::ERR_NO_ERROR)
            {
                LOG_ERROR(name, "批量提交失败: " << RdKafka::err2str(commit_err));
                m_state.commit_failures++;
            }
            else
            {
                LOG_INFO(name, "批量提交成功,累计消费: " << batch_count);
                batch_count = 0;
            }
        }
    }

    // 处理剩余消息
    if (!msg_batch.empty())
    {
        for (auto &m : msg_batch)
        {
            processMessage(m);
            delete m;
        }
    }

    // auto commit_err = m_consumer->commitAsync();
    // if (commit_err != RdKafka::ERR_NO_ERROR)
    // {
    //     LOG_ERROR(name, "兜底提交失败: " << RdKafka::err2str(commit_err));
    //     m_state.commit_failures++;
    // }
    // else
    // {
    //     LOG_INFO(name, "兜底提交成功");
    //     batch_count = 0;
    // }
    // LOG_INFO(name, "消费循环已停止");
}

// 主函数(多实例测试)
int main(int argc, char *argv[])
{
    try
    {
        // 创建2个独立消费者实例
        KafkaConsumer consumer1("consumer1", "127.0.0.1:9092", "group1", {"test"});
        std::thread t1(&KafkaConsumer::start, &consumer1);
        //sleep(2);
        KafkaConsumer consumer2("consumer2", "127.0.0.1:9092", "group2", {"test"});

        // 多线程运行
        std::thread t2(&KafkaConsumer::start, &consumer2);

        //auto* c = KafkaConsumerManager::getInstance().getConsumer("consumer1");
        //c->m_consumer->unsubscribe();
        //KafkaConsumerManager::getInstance().unregisterConsumer("consumer1");

        t1.join();
        t2.join();
    }
    catch (const std::exception &e)
    {
        LOG_FATAL("main", "启动失败: " << e.what());
    }

    RdKafka::wait_destroyed(5000);
    return EXIT_SUCCESS;
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐