Linux C/C++ 学习日记(78):Kafka(六):消费者代码(4):代码解析(附源码)
本文介绍了一个基于C++的Kafka消费者实现方案,采用分层设计和单例模式架构。代码包含配置层、管理层、状态层、回调层和核心业务层五个模块,支持多消费者实例管理、批量消息处理、手动位移提交和优雅关闭等功能。关键特性包括:1)单例模式统一管理消费者实例;2)手动位移提交保证数据不丢失;3)信号处理和析构函数实现优雅关闭;4)原子操作和互斥锁确保线程安全;5)完善的日志系统便于监控。实现细节涵盖了消费
·
注:该文用于个人学习记录和知识交流,如有不足,欢迎指点。
完整代码:kafka_example: kafka的c++代码 (gitee.com)
一、代码框架
代码采用分层设计 + 单例模式 + 回调机制,整体架构分为 5 个核心模块,如下:
| 模块 | 核心类 / 结构体 | 核心作用 |
|---|---|---|
| 配置层 | 全局常量 | 定义消费超时、批量提交阈值、Kafka 配置等核心参数,统一管理配置 |
| 管理层 | KafkaConsumerManager | 单例模式,统一注册 / 注销 / 停止所有消费者,处理系统信号(SIGINT/SIGTERM) |
| 状态层 | GlobalState | 存储消费者运行状态和统计信息,支持原子操作保证多线程安全 |
| 回调层 | EventCallback/RebalanceCallback | 处理 Kafka 事件(错误 / 统计)和重平衡(分区分配 / 回收),实现业务定制逻辑 |
| 核心业务层 | KafkaConsumer | 封装消费核心逻辑:初始化、订阅、循环消费、批量提交、优雅关闭 |
| 测试入口 | main 函数 | 创建多消费者实例,启动消费线程,验证整体功能 |
二、代码编写流程
代码遵循「从基础到核心、从配置到业务」的编写逻辑,具体流程如下:
1. 基础配置定义
- 定义核心常量:消费超时、批量提交阈值、Kafka 连接参数等,为后续逻辑提供配置支撑;
- 选择常量而非硬编码,便于后续维护和参数调整。
2. 管理器实现
- 实现单例模式的
KafkaConsumerManager:保证全局只有一个管理器实例; - 实现消费者注册 / 注销 / 查询接口,统一管理所有消费者;
- 注册信号处理函数,监听终止信号(SIGINT/SIGTERM),实现优雅关闭。
3. 状态结构体设计
- 定义
GlobalState:存储消费者运行状态(是否运行)、统计信息(消费数 / 提交失败数); - 使用原子类型(
std::atomic)保证多线程安全,使用互斥锁保护位移提交。
4. 回调类实现
- 实现
EventCallback:处理 librdkafka 的各类事件(错误、统计、限流等),便于问题排查和监控; - 实现
RebalanceCallback:处理重平衡事件,保证分区回收时位移不丢失。
5. 核心消费者类实现
- 初始化逻辑:创建 Kafka 配置对象,设置必选 / 可选参数,注册回调函数,创建原生消费者实例;
- 消费逻辑:订阅主题,循环拉取消息,批量缓存 + 批量处理,减少频繁 IO;
- 提交逻辑:批量提交位移(异步),重平衡时兜底提交(同步),保证位移不丢失;
- 关闭逻辑:析构函数中停止消费、关闭原生实例、释放资源,实现优雅关闭。
6. 测试入口编写
- 创建多消费者实例,启动独立消费线程;
- 捕获异常,保证程序鲁棒性;
- 等待 librdkafka 资源销毁,避免内存泄漏。
7. 总结
- 单例管理器:统一管理多消费者实例,避免分散控制;
- 手动位移提交:禁用自动提交,通过批量提交提升性能,重平衡兜底提交保证数据不丢失;
- 优雅关闭:信号处理 + 析构函数双重保证,避免强制终止导致的资源泄漏;
- 多线程安全:原子类型 + 互斥锁,保证统计信息和位移提交的线程安全;
- 日志完善:带时间戳、消费者名称、日志级别,便于问题定位和监控。
三、代码细节
1.consumer->commitSync(non_const_partitions);
RdKafka::ErrorCode err = consumer->commitSync(non_const_partitions);
使用这个必须手动给每个partition来set_offset,否则报错
Local: No offset stored
2. 触发重平衡的时机
- subscribe:触发ERR__ASSIGN_PARTITIONS,即获取分区
- unsubscribe/close:触发ERR__REVOKE_PARTITIONS,即释放分区
- 非首次subscribe:触发ERR__REVOKE_PARTITIONS,后触发ERR__ASSIGN_PARTITIONS。即先释放原分区,后获取新分区
四、kafka_consumer.h
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <string>
#include <vector>
#include <atomic>
#include <mutex>
#include <map>
#include <csignal>
// 核心常量
const int CONSUME_TIMEOUT_MS = 1000;
const int BATCH_COMMIT_THRESHOLD = 200;
const int REBALANCE_TIMEOUT_MS = 5000;
const std::string AUTO_OFFSET_RESET = "latest";
const int BATCH_FETCH_SIZE = 100;
const bool ENABLE_AUTO_COMMIT = false;
// 全局消费者管理器(核心:通过名称映射实例)
class KafkaConsumerManager {
public:
static KafkaConsumerManager& getInstance() {
static KafkaConsumerManager instance;
return instance;
}
void registerConsumer(const std::string& consumer_name, class KafkaConsumer* consumer);
void unregisterConsumer(const std::string& consumer_name);
class KafkaConsumer* getConsumer(const std::string& consumer_name);
void stopAllConsumers();
private:
KafkaConsumerManager() {
signal(SIGINT, &KafkaConsumerManager::signalHandler);
signal(SIGTERM, &KafkaConsumerManager::signalHandler);
}
static void signalHandler(int sig) {
getInstance().stopAllConsumers();
}
std::map<std::string, class KafkaConsumer*> m_consumers;
std::mutex m_mtx;
};
// 全局状态(包含唯一名称)
struct GlobalState {
std::atomic<bool> running{true};
std::atomic<uint64_t> total_consumed{0};
std::atomic<uint64_t> commit_failures{0};
std::mutex mtx;
std::string consumer_name; // 唯一标识,用于管理器查找
std::string group_id; // 消费组ID
};
// 事件回调(无上下文,通过日志区分)
class EventCallback : public RdKafka::EventCb {
public:
explicit EventCallback(const std::string& consumer_name) : m_consumer_name(consumer_name) {}
void event_cb(RdKafka::Event& event) override;
private:
std::string m_consumer_name; // 绑定当前消费者名称
};
// 重平衡回调(绑定名称,通过管理器找实例)
class RebalanceCallback : public RdKafka::RebalanceCb {
public:
explicit RebalanceCallback(KafkaConsumer* consumer, const std::string& consumer_name) :
m_consumer(consumer), m_consumer_name(consumer_name) {}
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override;
private:
std::string m_consumer_name;
KafkaConsumer* m_consumer;
void commitPartitions(RdKafka::KafkaConsumer* consumer,
const std::vector<RdKafka::TopicPartition*>& partitions);
};
// 核心消费者类(静态指针)
class KafkaConsumer {
public:
KafkaConsumer(const std::string& consumer_name,
const std::string& brokers,
const std::string& group_id,
const std::vector<std::string>& topics);
~KafkaConsumer();
void start();
void stop() { m_state.running = false; }
GlobalState& getState() { return m_state; }
private:
bool initConsumer();
void processMessage(RdKafka::Message* msg);
// 核心成员(每个实例独立)
RdKafka::KafkaConsumer* m_consumer = nullptr;
EventCallback m_event_cb;
RebalanceCallback m_rebalance_cb;
GlobalState m_state;
// 独立配置
std::string m_brokers;
std::vector<std::string> m_topics;
};
五、kafka_consumer.cc
#include "kafka_consumer.h"
#include <iostream>
#include <fstream>
#include <ctime>
#include <iomanip>
#include <sstream>
#include <thread>
// 日志实现(带消费者名称)
static void log(const std::string &level, const std::string &consumer_name, const std::string &msg)
{
char time_buf[64] = {0};
std::time_t now = std::time(nullptr);
std::strftime(time_buf, sizeof(time_buf), "%Y-%m-%d %H:%M:%S", std::localtime(&now));
std::cout << "[" << time_buf << "] [" << consumer_name << "] [" << level << "] " << msg << std::endl;
static std::ofstream log_file("./kafka_consumer.log", std::ios::app);
if (log_file.is_open())
{
log_file << "[" << time_buf << "] [" << consumer_name << "] [" << level << "] " << msg << std::endl;
log_file.flush();
}
}
#define LOG_STREAM(level, consumer_name, msg) \
do \
{ \
std::ostringstream oss_; \
oss_ << msg; \
log(level, consumer_name, oss_.str()); \
} while (0)
#define LOG_INFO(consumer_name, msg) LOG_STREAM("INFO", consumer_name, msg)
#define LOG_WARN(consumer_name, msg) LOG_STREAM("WARN", consumer_name, msg)
#define LOG_ERROR(consumer_name, msg) LOG_STREAM("ERROR", consumer_name, msg)
#define LOG_FATAL(consumer_name, msg) \
do \
{ \
LOG_STREAM("FATAL", consumer_name, msg); \
exit(EXIT_FAILURE); \
} while (0)
// ===================== KafkaConsumerManager 实现 =====================
void KafkaConsumerManager::registerConsumer(const std::string &consumer_name, KafkaConsumer *consumer)
{
std::lock_guard<std::mutex> lock(m_mtx);
m_consumers[consumer_name] = consumer;
}
void KafkaConsumerManager::unregisterConsumer(const std::string &consumer_name)
{
std::lock_guard<std::mutex> lock(m_mtx);
m_consumers.erase(consumer_name);
}
KafkaConsumer *KafkaConsumerManager::getConsumer(const std::string &consumer_name)
{
std::lock_guard<std::mutex> lock(m_mtx);
auto it = m_consumers.find(consumer_name);
return it != m_consumers.end() ? it->second : nullptr;
}
void KafkaConsumerManager::stopAllConsumers()
{
std::lock_guard<std::mutex> lock(m_mtx);
for (auto &pair : m_consumers)
{
pair.second->stop();
LOG_INFO(pair.first, "收到信号,停止消费");
}
}
// ===================== EventCallback 实现 =====================
void EventCallback::event_cb(RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
LOG_ERROR(m_consumer_name, "错误事件: " << RdKafka::err2str(event.err()) << " | " << event.str());
if (event.fatal())
LOG_FATAL(m_consumer_name, "致命错误,退出");
break;
case RdKafka::Event::EVENT_STATS:
LOG_INFO(m_consumer_name, "性能统计: " << event.str().substr(0, 200));
break;
case RdKafka::Event::EVENT_LOG:
LOG_INFO(m_consumer_name, "内部日志: " << event.str());
break;
case RdKafka::Event::EVENT_THROTTLE:
LOG_WARN(m_consumer_name, "限流事件: " << event.str());
break;
default:
LOG_INFO(m_consumer_name, "其他事件(" << event.type() << "): " << event.str());
break;
}
}
// ===================== RebalanceCallback 实现 =====================
void RebalanceCallback::commitPartitions(RdKafka::KafkaConsumer *consumer,
const std::vector<RdKafka::TopicPartition *> &partitions)
{
// KafkaConsumer *kafka_consumer = KafkaConsumerManager::getInstance().getConsumer(m_consumer_name);
// std::cout << "进入重平衡" << std::endl;
// if (!kafka_consumer)
// return;
if (!m_consumer) return;
std::lock_guard<std::mutex> lock(m_consumer->getState().mtx);
std::vector<RdKafka::TopicPartition *> non_const_partitions(partitions.begin(), partitions.end());
// 旧版 commitSync 仅传分区列表
// RdKafka::ErrorCode err = consumer->commitSync(non_const_partitions); // 传这个需要手动set_offset,否则直接报错
RdKafka::ErrorCode err = consumer->commitSync();
if (err != RdKafka::ERR_NO_ERROR)
{
LOG_ERROR(m_consumer_name, "重平衡提交失败: " << RdKafka::err2str(err));
m_consumer->getState().commit_failures++;
}
else
{
LOG_INFO(m_consumer_name, "重平衡提交成功,分区数: " << partitions.size());
}
}
void RebalanceCallback::rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition *> &partitions)
{
switch (err)
{
case RdKafka::ERR__ASSIGN_PARTITIONS:
consumer->assign(partitions);
LOG_INFO(m_consumer_name, "分配分区数: " << partitions.size());
break;
case RdKafka::ERR__REVOKE_PARTITIONS:
commitPartitions(consumer, partitions);
consumer->unassign();
LOG_INFO(m_consumer_name, "回收分区数: " << partitions.size());
break;
default:
LOG_ERROR(m_consumer_name, "重平衡异常: " << RdKafka::err2str(err));
consumer->unassign();
break;
}
}
// ===================== KafkaConsumer 实现 =====================
KafkaConsumer::KafkaConsumer(const std::string &consumer_name,
const std::string &brokers,
const std::string &group_id,
const std::vector<std::string> &topics)
: m_brokers(brokers), m_topics(topics),
m_event_cb(consumer_name), m_rebalance_cb(this, consumer_name)
{
m_state.consumer_name = consumer_name;
m_state.group_id = group_id;
if (!initConsumer())
{
LOG_FATAL(consumer_name, "消费者初始化失败");
throw std::runtime_error("KafkaConsumer initialization failed");
}
KafkaConsumerManager::getInstance().registerConsumer(consumer_name, this);
LOG_INFO(consumer_name, "消费者初始化成功,消费组: " << group_id);
}
KafkaConsumer::~KafkaConsumer()
{
stop();
KafkaConsumerManager::getInstance().unregisterConsumer(m_state.consumer_name);
if (m_consumer)
{
m_consumer->close(); // 这个会触发重平衡,注意千万不要漏了,否则分区在kafka发现该消费者断连之前,这个分区就没人消费了
delete m_consumer;
}
LOG_INFO(m_state.consumer_name, "消费者已销毁,累计消费: " << m_state.total_consumed
<< " | 提交失败: " << m_state.commit_failures);
}
bool KafkaConsumer::initConsumer()
{
std::string errstr;
auto *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// 必设配置
if (conf->set("bootstrap.servers", m_brokers, errstr) != RdKafka::Conf::CONF_OK)
{
LOG_ERROR(m_state.consumer_name, "设置broker失败: " << errstr);
delete conf;
return false;
}
if (conf->set("group.id", m_state.group_id, errstr) != RdKafka::Conf::CONF_OK)
{
LOG_ERROR(m_state.consumer_name, "设置消费组失败: " << errstr);
delete conf;
return false;
}
// 可选配置
conf->set("enable.auto.commit", ENABLE_AUTO_COMMIT ? "true" : "false", errstr);
conf->set("max.poll.records", std::to_string(BATCH_FETCH_SIZE), errstr);
conf->set("offset.commit.timeout.ms", std::to_string(REBALANCE_TIMEOUT_MS), errstr);
conf->set("enable.partition.eof", "false", errstr); //
conf->set("max.partition.fetch.bytes", "1024000", errstr); //
// 注册回调(绑定当前消费者名称)
conf->set("event_cb", &m_event_cb, errstr);
conf->set("rebalance_cb", &m_rebalance_cb, errstr);
// 主题配置
auto* m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
m_topicConfig->set("auto.offset.reset", AUTO_OFFSET_RESET, errstr);
conf->set("default_topic_conf", m_topicConfig, errstr);
// 创建消费者
m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!m_consumer)
{
LOG_ERROR(m_state.consumer_name, "创建消费者失败: " << errstr);
delete conf;
return false;
}
delete conf;
return true;
}
void KafkaConsumer::processMessage(RdKafka::Message *msg)
{
const std::string &name = m_state.consumer_name;
switch (msg->err())
{
case RdKafka::ERR_NO_ERROR:
LOG_INFO(name, "消费消息: topic=" << msg->topic_name()
<< " | partition=" << msg->partition()
<< " | offset=" << msg->offset()
<< " | content=" << static_cast<const char *>(msg->payload()));
m_state.total_consumed++;
break;
case RdKafka::ERR__TIMED_OUT:
break;
default:
LOG_ERROR(name, "消费错误: " << msg->errstr());
break;
}
}
void KafkaConsumer::start()
{
const std::string &name = m_state.consumer_name;
auto err = m_consumer->subscribe(m_topics);
if (err != RdKafka::ERR_NO_ERROR)
{
LOG_FATAL(name, "订阅主题失败: " << RdKafka::err2str(err));
m_state.running = false;
return;
}
LOG_INFO(name, "订阅主题成功: " << m_topics.size() << "个");
std::vector<RdKafka::Message *> msg_batch;
int batch_count = 0;
while (m_state.running)
{
auto *msg = m_consumer->consume(CONSUME_TIMEOUT_MS);
if (msg->err() == RdKafka::ERR_NO_ERROR)
{
msg_batch.push_back(msg);
batch_count++;
if (msg_batch.size() >= BATCH_FETCH_SIZE)
{
for (auto &m : msg_batch)
{
processMessage(m);
delete m;
}
msg_batch.clear();
}
}
else if (msg->err() == RdKafka::ERR__TIMED_OUT)
{
if (!msg_batch.empty())
{
for (auto &m : msg_batch)
{
processMessage(m);
delete m;
}
msg_batch.clear();
}
processMessage(msg);
delete msg;
}
else
{
// 消费运行时错误,记录并退出
LOG_ERROR(name, "消费消息失败: " << msg->errstr());
delete msg;
m_state.running = false;
break;
}
if (batch_count >= BATCH_COMMIT_THRESHOLD)
{
std::lock_guard<std::mutex> lock(m_state.mtx);
auto commit_err = m_consumer->commitAsync();
if (commit_err != RdKafka::ERR_NO_ERROR)
{
LOG_ERROR(name, "批量提交失败: " << RdKafka::err2str(commit_err));
m_state.commit_failures++;
}
else
{
LOG_INFO(name, "批量提交成功,累计消费: " << batch_count);
batch_count = 0;
}
}
}
// 处理剩余消息
if (!msg_batch.empty())
{
for (auto &m : msg_batch)
{
processMessage(m);
delete m;
}
}
// auto commit_err = m_consumer->commitAsync();
// if (commit_err != RdKafka::ERR_NO_ERROR)
// {
// LOG_ERROR(name, "兜底提交失败: " << RdKafka::err2str(commit_err));
// m_state.commit_failures++;
// }
// else
// {
// LOG_INFO(name, "兜底提交成功");
// batch_count = 0;
// }
// LOG_INFO(name, "消费循环已停止");
}
// 主函数(多实例测试)
int main(int argc, char *argv[])
{
try
{
// 创建2个独立消费者实例
KafkaConsumer consumer1("consumer1", "127.0.0.1:9092", "group1", {"test"});
std::thread t1(&KafkaConsumer::start, &consumer1);
//sleep(2);
KafkaConsumer consumer2("consumer2", "127.0.0.1:9092", "group2", {"test"});
// 多线程运行
std::thread t2(&KafkaConsumer::start, &consumer2);
//auto* c = KafkaConsumerManager::getInstance().getConsumer("consumer1");
//c->m_consumer->unsubscribe();
//KafkaConsumerManager::getInstance().unregisterConsumer("consumer1");
t1.join();
t2.join();
}
catch (const std::exception &e)
{
LOG_FATAL("main", "启动失败: " << e.what());
}
RdKafka::wait_destroyed(5000);
return EXIT_SUCCESS;
}
更多推荐
所有评论(0)