RocketMQ开发指导之三——RocketMQ编程示例_rocketmq c++
说明:生产环境代码使用的 rocketmq 的头文件及库文件的位置,需要根据实际情况存放。本文中是直接使用 rocketmq-client-cpp 源码包中的头文件及库文件。从 GitHub 上下载并解压 rocketmq-client-cpp 源码包,链接为。从上述结果能够看到,生产者程序成功地生产(向消息队列中发送)了 10 条消息。执行下列操作,安装 rocketmq-client-cpp。
2 安装
2.1 准备环境
rocketmq-client-cpp 的安装是通过脚本 build.sh 实现的,不过在安装之前,需要确保开发环境中已经安装了下表所需的编译软件和库:
软件/库名 | 版本号 |
---|---|
操作系统 | CentOS Linux release 7.5.1804 |
gcc-c++(c++ compiler while need support C++11) | 4.8.5 |
cmake(build jsoncpp require it) | 2.8.12.2 |
automake(build libevent require it) | 1.13.4 |
autoconf(build libevent require it) | 2.69 |
libtool(build libevent require it) | 2.4.2 |
bzip2-devel(boost depend it) | 1.0.6 |
zlib-devel(boost depend it) | 1.2.7 |
2.2 安装
执行下列操作,安装 rocketmq-client-cpp。
-
从 GitHub 上下载并解压 rocketmq-client-cpp 源码包,链接为 https://github.com/apache/rocketmq-client-cpp/tree/1.2.1
-
进入到解压后的目录中,执行 build.sh 进行安装操作,命令如下:
[root@node3 /opt/rocketmq-client-cpp-1.2.1]# sh build.sh
说明:
- build.sh 脚本会自动下载并构建安装 rocketmq-client-cpp 所需的依赖库(包括 libevent、json 和 boost),这些库将会被保存在 rocketmq-client-cpp 文件夹下。之后,该脚本就会构建 rocketmq-client 的静态库和动态库了。如果通过 build.sh 脚本构建依赖库失败,则需要通过 libevent、json、boost 的源码包手动构建这些依赖库了;
- 在服务器未联网时,可以通过手动下载 libevent-release-2.0.22-stable.zip、jsoncpp-0.10.6.zip 及 boost_1_58_0.tar.gz 三个软件的源码包,然后将其上传到 rocketmq-client-cpp 解压后的目录下,再执行 build.sh 即可。
- 上一步的 build.sh 执行完成后,rocketmq-client-cpp 的静态库和动态库都会保存在 rocketmq-client-cpp/bin 目录下。另外,当使用这些库构建程序或库的时候,还需要链接其他几个库(-lpthread -lz -ldl -lrt)。编译命令示例如下:
g++ -o consumer_example consumer_example.cpp -lrocketmq -lpthread -lz -ldl -lrt
3 示例代码
这里给出两个简单的生产者和消费者的示例代码。
说明:生产环境代码使用的 rocketmq 的头文件及库文件的位置,需要根据实际情况存放。本文中是直接使用 rocketmq-client-cpp 源码包中的头文件及库文件。
3.1 生产者示例代码
生产者示例代码(SimpleProducer.cpp)的内容如下:
/*
* Description: Simple Producer demo
*/
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include "CProducer.h"
#include "CMessage.h"
#include "CSendResult.h"
using namespace std;
// send message
void StartSendMessage(CProducer *producer)
{
CSendResult result;
// create message and set some values for it
CMessage *msg = CreateMessage("Test_Topic");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
for (int i = 0; i < 10; i++)
{
// construct different body
string strMessageBody = "this is body number-" + to_string(i);
SetMessageBody(msg, strMessageBody.c_str());
// send message
SendMessageSync(producer, msg, &result);
cout << "send message[" << i << "], result status:" << (int)result.sendStatus << ", msgBody:" << strMessageBody << endl;
usleep(1000000);
}
// destroy message
DestroyMessage(msg);
}
int main(int argc, char *argv[])
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "192.168.213.128:9876;192.168.213.129:9876");
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
return 0;
}
编译并执行上述代码,过程信息结果如下:
[root@node3 /opt/liitdar/rocketmq]# g++ -o SimpleProducer SimpleProducer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt -std=c++11
[root@node3 /opt/liitdar/rocketmq]# ./SimpleProducer
Producer Initializing.....
Producer start.....
send message[0], result status:0, msgBody:this is body number-0
send message[1], result status:0, msgBody:this is body number-1
send message[2], result status:0, msgBody:this is body number-2
send message[3], result status:0, msgBody:this is body number-3
send message[4], result status:0, msgBody:this is body number-4
send message[5], result status:0, msgBody:this is body number-5
send message[6], result status:0, msgBody:this is body number-6
send message[7], result status:0, msgBody:this is body number-7
send message[8], result status:0, msgBody:this is body number-8
send message[9], result status:0, msgBody:this is body number-9
Producer Shutdown!
[root@node3 /opt/liitdar/rocketmq]#
从上述结果能够看到,生产者程序成功地生产(向消息队列中发送)了 10 条消息。
3.2 消费者示例代码
Push 模式的消费者示例代码(SimplePushConsumer.cpp),内容如下:
/*
* Description: Simple push consumer demo
*/
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include "CPushConsumer.h"
#include "CMessageExt.h"
using namespace std;
// consume message
int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
{
cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt) << ", MsgTags:" << GetMessageTags(msgExt)
<< ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" << GetMessageBody(msgExt) << endl;
return E_CONSUME_SUCCESS;
}
int main(int argc, char *argv[])
{
cout << "Push consumer Initializing...." << endl;
// create push consumer and set some values for it
CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
SetPushConsumerNameServerAddress(consumer, "192.168.213.128:9876;192.168.213.129:9876");
Subscribe(consumer, "Test_Topic", "*");
// register message callback
RegisterMessageCallback(consumer, doConsumeMessage);
// start push consumer
StartPushConsumer(consumer);
cout << "Push consumer start, and listening message within 1min..." << endl;
for (int i = 0; i < 6; i++)
{
cout << "Already Running: " << (i * 10) << "S" << endl;
usleep(10000000);
}
// shutdown push consumer
ShutdownPushConsumer(consumer);
// destroy push consumer
DestroyPushConsumer(consumer);
cout << "PushConsumer Shutdown!" << endl;
return 0;
}
编译上述代码,得到可执行程序,命令如下:
[root@node3 /opt/liitdar/rocketmq]# g++ -o SimplePushConsumer SimplePushConsumer.cpp -I ./include/ -L/opt/rocketmq-client-cpp-1.2.1/bin/ -lrocketmq -lpthread -lz -ldl -lrt
执行上面生成的消费者程序,待消费者进入消息监听状态时,执行前面编译好的生产者程序(以产生消息),运行结果信息如下:
[root@node3 /opt/liitdar/rocketmq]# ./SimplePushConsumer
Push consumer Initializing....
Push consumer start, and listening message within 1min...
Already Running: 0S
Already Running: 10S
Already Running: 20S
Already Running: 30S
Already Running: 40S
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-0
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-1
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-2
[Consume Message] MsgTopic:Test_Topic, MsgTags:Test_Tag, MsgKeys:Test_Keys, MsgBody:this is body number-3
更多推荐
所有评论(0)