全网仅此一篇!万字详解ZeroMQ的zmq_msg_t消息处理、多部分消息、及消息接口
一、ØMQ的数据处理数据数据时,TCP套接字和ØMQ套接字之间的差异:ØMQ套接字像UDP那样传递信息,而不是像TCP那样传递字节流。ØMQ消息是指定长度的二进制数据,因为它们的设计针对性能进行了优化,所以有点棘手ØMQ套接字在一个后台线程执行自己的IO。这意味着消息到达本地输入队列并从本地输出队列被发送,不会影响到你的应用程序运行ØMQ套接字根据套接字类型具有内置的1对N的路由行为二、zmq_m
·
一、ØMQ的消息处理
使用套接字来传输数据
- 但ØMQ的I/O模型与TCP模型有很大区别,你需要时间来转变观念。
- 处理数据时,TCP套接字和ØMQ套接字之间的差异:
- ØMQ套接字像UDP那样传递信息,而不是像TCP那样传递字节流。ØMQ消息是指定长度的二进制数据,因为它们的设计针对性能进行了优化,所以有点棘手
- ØMQ套接字在一个后台线程执行自己的IO。这意味着消息到达本地输入队列并从本地输出队列被发送,不会影响到你的应用程序运行
- ØMQ套接字根据套接字类型具有内置的1对N的路由行为
zmq_msg_xxx()消息处理接口
- 在内存中,ØMQ消息是zmq_msg_t表示的结构(或类,取决于你采用的语言)
- 下面是C语言中使用ØMQ消息的基本规则:
- 创建并创建zmq_msg_t对象,使用zmq_msg_t来表示消息,而不是使用普通的数据块(char*)来交互数据
- 要读取消息,可使用zmq_msg_init()创建一个空的消息,然后传递给zmq_msg_recv()
- 要写入消息,可以使用zmq_msg_init_size()来创建消息,并分配某个大小的数据块数据,使用memcpy()将数据块的数据拷贝给zmq_msg_t,然后将zmq_msg_t传递给zmq_msg_send()进行发送
- 要释放消息,则调用zmq_msg_close(),这会删除一个引用,当消息引用为0时,ØMQ会最终自动帮你销毁该消息
- 要访问消息内容,可以使用zmq_msg_data()
- 要知道消息包含多少数据,可以使用zmq_msg_size()
- 一般不建议使用zmq_msg_move()、zmq_msg_copy()、zmq_msg_init_data(),除非你的目标很明确就是要用这些函数
- zmq_msg_send()传递一个消息时候,会把该消息清除(把它的大小设置为0),因此消息发送之后需要关闭(zmq_msg_close())并且不再使用。如果你想多次发送相同的数据,可以创建两个zmq_msg_t消息对象发送,或者在调用zmq_msg_init()之前使用zmq_msg_copy()拷贝两份一样的数据并同时发送
- 此处给出的只是个大概,更多的细节参阅下面的接口介绍和演示案例
ØMQ对字符串的处理
- 在前面的文章我们介绍了如何处理ØMQ的字符串并封装了下面两个字符串处理函数,文章可以参阅:https://blog.csdn.net/qq_41453285/article/details/105991716
- 下面是自定义的两个函数:
- 一个是字符串接收函数:其从网络中接收一个ØMQ字符串,并申请多1个字节空间的内存保存该字符串,然后在尾部要添加0,以终止该字符串
- 一个是字符串发送函数:向网络中发送一个字符串,单发送的字符串不含尾部的空字符
// 从套接字接收ØMQ字符串,并将其转换为C/C++字符串(在尾部添加0)
static char *s_recv(void* socket)
{
// 此处使用zmq_msg_init()初始化即可, zmq_msg_recv()在内部会自动对zmq_msg_t对象进行大小设定
zmq_msg_t message;
zmq_msg_init(&message);
int size = zmq_msg_recv(&message, socket, 0);
if(size == -1)
return NULL;
char *string = (char*)malloc(size + 1);
memcpy(string, zmq_msg_data(&message), size);
zmq_msg_close(&message);
string[size] = 0;
return string;
}
// 将C字符串转换为ØMQ字符串(去掉尾部的'\0'),并发送到指定的套接字上
static int s_send(void *socket, char *string)
{
// 因为是将数据拷贝给zmq_msg_t对象, 因此需要使用zmq_msg_init_size进行初始化
zmq_msg_t msg;
zmq_msg_init_size(&msg, strlen(string));
memcpy(zmq_msg_data(&msg), string, strlen(string));
// 发送数据
int rc = zmq_msg_send(&msg, socket, 0);
// 关闭zmq_msg_t对象
zmq_msg_close(&msg);
return rc;
}
“部件”和“帧”的概念
- 帧(Frame)(在ØMQ参考手册中也称为“消息部件”)是ØMQ消息的基本线路格式。帧是已指定长度的数据块,此长度可以从零向上。如果做过任何TCP编程工作,你就会明白,为什么帧是“现在我应该从网络套接字读出多少数据?”这个问题的一个有用的答案
- 最初,一个ØMQ消息是一帧,像UDP一样。后面我们采用多部分消息来扩展了这一点,这是相当简单的带有被设置为1的“more”位的帧的序列,接着是一个该位被设置为零的帧。ØMQ API然后让你写入一个“more”标志的消息,并且当你读取消息是,它可以让你检查是否存在“more”
- 因此,在低级别ØMQ API和参考手册中,有关于消息与部分有一些模糊性。下面用一个有用的词汇表来说明:
- 消息可以是一个或多个部件
- 这些部件也称为帧
- 每个部件都是一个zmq_msg_t对象
- 你用低级别的API分别发送和接收各个部件
- 高级别API为发送整个多部分消息提供包装
-
其他与消息相关的内容:
- 你可以发送长度为0的消息。例如,用于从一个线程把一个信号发送给另一个线程
- ØMQ保证提供一个消息所有的部分(一个或多个)或者一个部分也没有
- ØMQ不立刻发送消息(单个或多部分),而在以后某些不确定的时间发送。因此,一个多部分消息必须在内存中装入
- 单部分消息也必须装入内存。如果你想发送任意大小的文件,应该把它们分解成片,并把每一片作为独立的单部分消息发送
- 在作用域关闭时不自动销毁对象的语言中,在完成消息时,必须调用zmq_msg_close()
- 轻易不要使用zmq_msg_init_data(),这是一个零拷贝的方法,如果使用不好会带来麻烦。零拷贝可参阅:https://blog.csdn.net/qq_41453285/article/details/106845900
二、多部分消息
- ØMQ允许我们撰写由多个帧组成的单个消息,从而给我们一个“多部分消息”。实际的应用程序中相当多地使用了多部分消息,无论是包装带地址信息的消息,还是进行简单的序列化
- 关于多部分消息,你需要了解的是:
- 当发送一个多部分消息时,仅当你发送最后的部分时,所有的消息才会整整在线路上实际发送
- 如果你使用的是zmq_poll(),当你收到一条消息的第一部分时,其余部分也都是已经到达了
- ØMQ确保消息的原子传递:对等方应该收到消息的所有消息部分,或者根本不收到任何消息。除非关闭套接字,否则没有办法取消部分发送的消息
- 消息部分的总数不受可用存储空间的限制
- 在使用多部分消息时,每个部分都是一个zmq_msg_t条目。例如,如果你要发送的消息具有5个部分,你就必须构造5个zmq_msg_t对象
- 在发送时,ØMQ消息帧都在内存中排队,直到最后一个小熊被接收到位置,然后再一起发送它们
- 在接收时,无论你是否设置RCVMORE选项,都将受到一条消息的所有部分
- 在后面的文章中我们会研究应答封包。现在我们学习如何安全地(但一位地)在需要转发消息但不检查它们的任何应用程序(如代理)中读写多部分消息
写多部分消息
- 例如,下面发送三条消息,三条消息组成一条多部分消息,并且调用三次zmq_msg_send()发送出去,注意其中用到了ZMQ_SNDMORE选项
zmq_msg_t message1; zmq_msg_t message2; zmq_msg_t message3; //初始化这三条消息 //发送第一条, 指定ZMQ_SNDMORE选项, 表示发送的是多部分消息的其中一部分, 后面还要消息要发送 zmq_msg_send(socket, &message1, ZMQ_SNDMORE); //发送第二条,同上 zmq_msg_send(socket, &message2, ZMQ_SNDMORE); //发送最后一条消息, 因为后面没有消息要发送了, 因此最后一个参数为0即可 zmq_msg_send(socket, &message3, 0);
- 更多详细细节可以参阅下面“zmq_msg_send()的介绍及其演示案例②”
读多部分消息
- 在接收消息时,可以使用ZMQ_RCVMORE调用zmq_getsockopt()函数来判断套接字是否还有更多的消息要接收
- 下面是一个即可以处理单部分消息又可以处理多部分消息的代码
while(1) { zmq_msg_t message; zmq_msg_init(&message); zmq_msg_recv(socket, &message, 0); zmq_msg_close(&message); int more; size_t more_size = sizeof(more); zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size); if(!more) break; }
三、接口使用的几点说明
关于zmq_msg_init()和zmq_msg_init_size()的踩坑记录
- 这两个函数曾经骚扰我半天,由于ØMQ操作文档说明的不详细,我搞了半天才弄好
- ①在发送数据的时候:我们需要调用memcpy()将数据拷贝到zmq_msg_t中进行发送,不可以调用zmq_msg_init()初始化的zmq_msg_t对象进行存储,因为zmq_msg_init()初始化的对象其大小被设定为0,在调用zmq_msg_send()的时候会报错的。见下面代码
/*******下面这种情况是错误的*******/ zmq_msg_t msg; zmq_msg_init(&msg); // 将str拷贝给msg char *str= "HelloWolrd"; memcpy(zmq_msg_data(&msg), str, 11); // 打印的是HelloWolrd, 但是大小为0, 大小为0就代表该zmq_msg_t对象不可用 printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); // zmq_msg_send()会出错, msg虽然有内容, 但是其大小为0 // zmq_msg_send(&msg, socket, 0);
- 发送数据的时候请使用zmq_msg_init_size()初始化对象,这样发送出去的zmq_msg_t对象是有大小的,不会被zmq_msg_send()判断为是错的
/*******下面这种情况是正确的*******/ char *str= "HelloWolrd"; // 初始化时指定其大小 zmq_msg_t msg; zmq_msg_init_size(&msg, strlen(str) + 1); // 将str拷贝给msg memcpy(zmq_msg_data(&msg), str, 11); // 打印HelloWorld, 大小为11 printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); // zmq_msg_send()调用成功 // zmq_msg_send(&msg, socket, 0);
- ②在接收数据的时候:接收数据时,可以使用zmq_msg_init()定义的zmq_msg_t对象来保存数据,zmq_msg_recv()函数内部会自动的设置zmq_msg_t对象的大小
// 初始化时指定其大小 zmq_msg_t msg; zmq_msg_init(&msg); // 接收数据, zmq_msg_recv()内部会自动 zmq_msg_recv(&msg, socket, 0);
关于拷贝
- 当把数据拷贝给zmq_msg_t对象时,如果数据的长度超过zmq_msg_t对象的大小,zmq_msg_t对象仍然可以获取完整数据,但是使用起来时只能使用其指定的大小
#include <stdio.h> #include <zmq.h> int main() { // 初始化msg时, 指定其大小为5 zmq_msg_t msg; zmq_msg_init_size(&msg, 5); // 拷贝11字节给msg memcpy(zmq_msg_data(&msg), "HelloWorld", 11); // 打印的大小将为5 printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); zmq_msg_close(&msg); return 0; }
四、zmq_msg_t结构及源码分析
- 本文使用的源码为zeromq4.1.7
zmq_msg_t的结构
- ØMQ用zmq_msg_t结构来表示一条小消息,其源码定义如下:
//zmq.h typedef struct zmq_msg_t { #if defined (__GNUC__) || defined ( __INTEL_COMPILER) || \ (defined (__SUNPRO_C) && __SUNPRO_C >= 0x590) || \ (defined (__SUNPRO_CC) && __SUNPRO_CC >= 0x590) unsigned char _ [64] __attribute__ ((aligned (sizeof (void *)))); #elif defined (_MSC_VER) && (defined (_M_X64) || defined (_M_ARM64)) __declspec (align (8)) unsigned char _ [64]; #elif defined (_MSC_VER) && (defined (_M_IX86) || defined (_M_ARM_ARMV7VE)) __declspec (align (4)) unsigned char _ [64]; #else unsigned char _ [64]; #endif } zmq_msg_t;
- zmq_msg_t并不是真正存储数据的地方:我们在操作的时候zmq_msg_t的时候,实际上将其转换为zmq命令空间中的一种msg_t类来使用的。例如下图所示:调用zmq_msg_init()初始化zmq_msg_t的时候实际调用的就是msg_t类的init()方法
msg_t类
- msg_t类是真正存储数据的地方,如下图所示(代码被缩减了,完整的定义见msg.hpp)
- 结构如下:
- base结构体:其有一个type成员用来表示这个消息属于什么类型的,不同类型的消息会用下面不同的结构体存储消息
- vsm、lmsg、cmsg等结构体:代表消息的不同类型,当前msg_t属于哪一种类型,哪一个结构体就会被初始化。其中这些结构体中的data字段存储的是消息的真正值、size存储消息的大小等
//msg.hpp namespace zmq { class msg_t { public: bool check (); int init (); int init_size (size_t size_); int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_); int init_delimiter (); int close (); int move (msg_t &src_); int copy (msg_t &src_); void *data (); size_t size (); private: struct content_t { void *data; //真正存储数据的复方 size_t size; //数据的大小 msg_free_fn *ffn; //数据释放函数, 见zmq_msg_init_data()函数 void *hint; //传递给ffn的参数 zmq::atomic_counter_t refcnt; //消息的引用计数 }; union { struct { metadata_t *metadata; unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)]; unsigned char type; unsigned char flags; } base; struct { metadata_t *metadata;//元数据 unsigned char data [max_vsm_size];//消息数据 unsigned char size; unsigned char type; unsigned char flags; } vsm; //vsm消息类型 struct { metadata_t *metadata;//元数据 content_t *content;//消息数据 unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)]; unsigned char type; unsigned char flags; } lmsg; //lmsg消息类型 struct { metadata_t *metadata; void* data;//消息数据 size_t size; unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)]; unsigned char type; unsigned char flags; } cmsg; //cmsg消息类型 struct { metadata_t *metadata; unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)]; unsigned char type; unsigned char flags; } delimiter; } u; }; }
API分析
- 以zmq_msg_init()为例,它会在内部调用msg_t的init_size()函数,init_size()函数会根据消息的大小来初始化msg_t的不同类型(vsm、lmsg、cmsg等类型),同时将结构的数据初始化
- 以zmq_msg_data()为例,它会在内部调用msg_t的data()函数,data()函数会根据base结构体中的type字段来判断当前数据属于什么类型,进而将不同结构的数据(返回data字段)进行返回
五、初始化空的ØMQ消息(zmq_msg_init)
int zmq_msg_init (zmq_msg_t *msg);
- API参考文档:http://api.zeromq.org/master:zmq-msg-init
- 功能:初始化空的ØMQ消息
- 参数:要初始化的ØMQ消息结构
- 返回值:该函数总是返回0,没有错误定义
- 相关描述:
- zmq_msg_init()函数将初始化msg引用的消息对象,以表示一条空消息。在使用zmq_msg_recv()接收消息之前调用此函数最有用
- 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
- 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次
- zmq_msg_init()的zmq_msg_t对象,其大小为0,因此不能用在类似发送的函数中,但是可以用来接收(详情参阅上面的“接口使用的几点说明”)
演示案例
- 从套接字接收消息
zmq_msg_t msg; rc = zmq_msg_init(&msg); assert(rc == 0); int nbytes = zmq_msg_recv(socket, &msg, 0); assert(nbytes != -1);
六、初始化指定大小的ØMQ消息(zmq_msg_init_size)
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
- API参考文档:http://api.zeromq.org/master:zmq-msg-init-size
- 功能:初始化指定大小的ØMQ消息
- 参数:
- msg:要初始化的ØMQ消息结构
- size:初始化ØMQ消息结构的大小
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:ENOMEM:可用的存储空间不足
- 相关描述:
- zmq_msg_init_size()函数将分配存储消息大小字节所需的任何资源,并初始化msg引用的消息对象来表示新分配的消息
- 实现应该选择将消息内容存储在堆栈(小消息)还是堆(大消息)上。出于性能原因,zmq_msg_init_size()不得清除消息数据
- 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
- 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次
七、从缓冲区中初始化ØMQ消息(zmq_msg_init_data)
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
- API参考文档:http://api.zeromq.org/master:zmq-msg-init-data
- 功能:从提供的缓冲区初始化ØMQ消息
- 参数:
- msg:要初始化的ØMQ消息结构
- data:缓冲区的数据,是用来初始化msg的
- size:参数data缓冲区数据对应的大小
- ffn:如果data是动态申请的,那么可以添加这个函数用来回收内存,其参数1data就是zmq_msg_init_data()函数的参数1,参数2hint是zmq_msg_init_data()函数的参数4
- hint:传递给ffn函数的参数2
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:ENOMEM:可用的存储空间不足
- 相关描述:
- zmq_msg_init_data()函数将初始化msg引用的消息对象。不得进行复制任何数据,ØMQ应拥有所提供缓冲区的所有权
- 如果提供释放功能(参数4),则一旦ØMQ不再需要缓冲区中的数据,就应该调用释放函数ffn释放内存
- 释放函数ffn需要是线程安全的,因为它将从任意线程调用。如果没有提供释放函数,则分配的内存将不会被释放,这可能会导致内存泄漏
- 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
- 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次
- ØMQ使用zmq_msg_init_data()来实现零拷贝,可参阅:https://blog.csdn.net/qq_41453285/article/details/106845900
演示案例
- 从提供的缓冲区初始化消息
//释放函数 void my_free (void *data, void *hint) { free (data); } /* ... */ //申请数据 void *data = malloc (6); assert (data); memcpy (data, "ABCDEF", 6); //用data初始化msg zmq_msg_t msg; rc = zmq_msg_init_data (&msg, data, 6, my_free, NULL); assert (rc == 0);
八、释放ØMQ消息(zmq_msg_close)
int zmq_msg_close (zmq_msg_t *msg);
- API参考文档:http://api.zeromq.org/master:zmq-msg-close
- 功能:将消息的引用计数减1
- 参数:
- msg:要释放的消息结构
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
- 相关描述:
- 应用程序应该确保在不再需要消息时调用zmq_msg_close(),否则可能发生内存泄漏
- 该函数只是将zmq_msg_t对象所指数据的引用计数减1,并把自己的大小设置为0而已。当一个zmq_msg_t对象调用zmq_msg_close()之后,如果其之前所指的数据还有其他zmq_msg_t对象引用,那么该zmq_msg_t对象所指的数据不会真正的被释放,只有数据的最后一个zmq_msg_t引用对象调用zmq_msg_close()时才真正的释放内存(见下面演示案例①)
- 当一个zmq_msg_t对象调用zmq_msg_close()之后就不能再对这个zmq_msg_t对象进行操作了,如果操作会报错(见下面演示案例②)。即使它所指的数据还有其它zmq_msg_t对象引用也不行
- 注意,在zmq_msg_send()成功之后,zmq_msg_send()会把zmq_msg_t对象的大小设置为0(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),但是没有关闭该对象,因此zmq_msg_send()之后需要关闭zmq_msg_t对象(更多详细的细节见下面zmq_msg_send()函数的介绍和演示案例)
演示案例①
- 下面创建两个消息msg1和msg2,其中msg2拷贝于msg1
#include <stdio.h> #include <string.h> #include <zmq.h> int main() { // 1.初始化第一个消息 printf("第一步: 初始化msg1:\n"); zmq_msg_t msg1; zmq_msg_init_size(&msg1, 6); memcpy(zmq_msg_data(&msg1), "Hello", 6); printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); // 2.将msg1拷贝给msg2 printf("第二步:将msg1拷贝给msg2:\n"); zmq_msg_t msg2; zmq_msg_init_size(&msg2, 6); zmq_msg_copy(&msg2, &msg1); printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); // 3.关闭msg1 printf("第三步:关闭msg1:\n"); zmq_msg_close(&msg1); //printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); msg1已经关闭了不能再进行访问了 printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); // 4.关闭msg2 printf("第四步:关闭msg2:\n"); zmq_msg_close(&msg2); //msg2已经关闭了不能再进行访问了 //printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); return 0; }
- 编译运行效果如下:
演示案例②
- 下面创建一个消息msg1,然后初始化msg1消息,最后把它释放,释放之后就不能再操作该zmq_msg_t对象了,如果操作就会报错
#include <stdio.h> #include <string.h> #include <zmq.h> int main() { // 1.初始化第一个消息 printf("第一步: 初始化msg:\n"); zmq_msg_t msg; zmq_msg_init_size(&msg, 6); memcpy(zmq_msg_data(&msg), "Hello", 6); printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg), (int)zmq_msg_size(&msg)); // 2.关闭msg1 printf("第二步:关闭msg:\n"); zmq_msg_close(&msg); // 3.msg已经关闭了, 再去访问就会报错 printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg), (int)zmq_msg_size(&msg)); return 0; }
九、设置/获取消息属性(zmq_msg_set、zmq_msg_get)
zmq_msg_set
int zmq_msg_set (zmq_msg_t *message, int property, int value);
- API参考文档:http://api.zeromq.org/master:zmq-msg-set
- 功能:设置消息属性
- 参数:
- message:要设置的消息
- property:要设置的属性
- value:属性值
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
- 相关描述:当前zmq_msg_set()函数不能设置任何属性
zmq_msg_get
int zmq_msg_get (zmq_msg_t *message, int property);
- API参考文档:http://api.zeromq.org/master:zmq-msg-get
- 功能:获取消息属性
- 参数:
- message:要获取的消息
- property:要获取的属性
- 返回值:
- 成功:返回参数2所指定的属性的值
- 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
- 可以获取的属性如下:
- ZMQ_MORE:指示消息后面有更多消息帧要跟随
- ZMQ_SRCFD:返回从套接字读取消息的文件描述符。这允许应用程序通过getpeername()检索远程端点。请注意,相应的套接字可能已经关闭,甚至可以重用。目前只针对TCP套接字实现
- ZMQ_SHARED:指示消息可以与该消息的另一个副本共享底层存储
十、获取消息元数据属性(zmq_msg_gets)
const char *zmq_msg_gets (zmq_msg_t *message, const char *property);
- API参考文档:http://api.zeromq.org/master:zmq-msg-gets
- 功能:获取消息元数据属性
- 参数:
- message:要获取的消息
- property:要获取的属性
- 返回值:
- 成功:返回属性的字符串值。调用方不得修改或释放返回的值,该值应归消息所有。属性和值的编码应为UTF8
- 失败:返回NULL,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
- 相关描述:
- 该函数返回message的元数据,要获取的元数据属性为参数2所指定的属性,是一个字符串形式。参数2应该是以NULL结尾的UTF8编码字符串
- 如https://rfc.zeromq.org/spec/37/中所指定的,在ZeroMQ连接握手期间,将基于每个连接定义元数据。应用程序可以使用zmq_setsockopt()设置ZMQ_METADATA设置元数据属性。应用程序元数据属性必须以X-为前缀
- 另外,当底层传输可用时,Peer-Address属性将返回由getnameinfo()返回的远程端点的IP地址
- 这些属性的名称也在zmq.h中定义为:ZMQ_MSG_PROPERTY_SOCKET_TYPE ZMQ_MSG_PROPERTY_ROUTING_ID、ZMQ_MSG_PROPERTY_PEER_ADDRESS。目前,这些定义仅作为API草案提供
- 可以根据底层安全机制定义其他属性,请参阅下面的ZAP身份验证连接示例
- 除了应用程序元数据外,还可以使用该函数检索以下ZMTP属性:
Socket-Type
Routing-Id
# 注意:Identity是Routing-Id的一个不赞成使用的别名
演示案例
- 获取消息的zap身份验证用户ID:
zmq_msg_t msg; zmq_msg_init (&msg); rc = zmq_msg_recv (&msg, dealer, 0); assert (rc != -1); const char *user_id = zmq_msg_gets (&msg, ZMQ_MSG_PROPERTY_USER_ID); zmq_msg_close (&msg);
十一、获取指向消息内容的指针(zmq_msg_data)
void *zmq_msg_data (zmq_msg_t *msg);
- API参考文档:http://api.zeromq.org/master:zmq-msg-data
- 功能:获取指向消息内容的指针,会将msg的内容以以指针的形式返回
- 参数:msg:要检索的消息结构
- 返回值:
- 成功:返回一个指针,指向于msg的消息内容
- 失败:没有出错的情况
十二、获取消息内容大小(zmq_msg_size)
size_t zmq_msg_size (zmq_msg_t *msg);
- API参考文档:http://api.zeromq.org/master:zmq-msg-size
- 功能:获取消息内容的大小,以字节为单位
- 参数:msg:要检索的消息结构
- 返回值:
- 成功:返回msg消息内容的大小(以字节为单位)
- 失败:没有出错的情况
十三、复制消息内容(zmq_msg_copy)
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
- API参考文档:http://api.zeromq.org/master:zmq-msg-copy
- 功能:将一条消息的内容复制给另一条消息,此时两条消息同指一块缓冲区数据
- 参数:
- dest:目标消息结构
- src:源消息结构
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
- 相关描述:
- zmq_msg_copy()函数将src所引用的消息对象复制到dest所引用的消息对象中,如果dest之前有内容,则将其释放。在复制之前你必须初始化dest
- 引用计数:zmq_msg_copy()的实现并不是在内存中创建一份dest新实例,然后将src拷贝给dest,而是将dest指向于src所指的内容,因此,dest和src是共享底层缓冲区中的数据的
- 因此在复制之后要避免修改消息的内容,因为可能有多者共享这一条消息,其他修改会导致其它消息结构使用时产生未定义的行为
- 如果您需要的是一个实际的硬拷贝,那么就不要使用该函数,可以使用zmq_msg_init_size()分配一个新消息,并使用memcpy()复制消息内容
- 关于zmq_msg_copy()的演示案例可以看上面“zmq_msg_close()”函数的演示案例①
演示案例
- 复制消息
//初始化msg zmq_msg_t msg; zmq_msg_init_size (&msg, 255); memcpy(zmq_msg_data(&msg), "Hello, World", 12); //将msg拷贝给copy, 此时copy与msg指向于同一块数据 zmq_msg_t copy; zmq_msg_init(©); zmq_msg_copy(©, &msg); //... zmq_msg_close (©); //关闭copy, 此时msg的内容只有自己引用 zmq_msg_close (&msg); //再关闭msg, 此时msg指向的内容才真正释放
十四、移动消息内容(zmq_msg_move)
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
- API参考文档:http://api.zeromq.org/master:zmq-msg-move
- 功能:将一条消息的内容移动给另一条消息
- 参数:
- dest:目标消息结构
- src:源消息结构
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
- 相关描述:
- zmq_msg_move()函数将把src引用的消息对象的内容移动到dest引用的消息对象,在调用zmq_msg_move()后,src变成一个空消息,如果dest之前有内容,则将其释放然后更改为src的内容
- 一个zmq_msg_t对象在调用zmq_msg_move()之后只是将自己的数据拷贝给其它zmq_msg_t对象,并把自己的大小设置为0。因此在移动之后其还可以访问之前的数据,但是大小变为0了(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),因此在一个zmq_msg_t对象调用zmq_msg_move()之后建议调用zmq_msg_close()关闭自己不要再使用了(见下面演示案例)
- 与msg_msg_copy()的不同:msg_msg_copy()是将一个zmq_msg_t对象的数据拷贝给其他zmq_msg_t对象,导致数据的引用计数加1;zmq_msg_move()是将一个zmq_msg_t对象的数据移动到另外一个zmq_msg_t对象上
- 演示案例如下:下面创建并初始化一个msg1对象,之后把msg1对象的内容移动给msg2对象。
#include <stdio.h>
#include <string.h>
#include <zmq.h>
int main()
{
// 1.初始化第一个消息
printf("第一步: 初始化msg1:\n");
zmq_msg_t msg1;
zmq_msg_init_size(&msg1, 6);
memcpy(zmq_msg_data(&msg1), "Hello", 6);
printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
// 2.将msg1内容移动到给msg2
printf("第二步:将msg1数据移动给msg2:\n");
zmq_msg_t msg2;
zmq_msg_init_size(&msg2, 6);
zmq_msg_move(&msg2, &msg1);
//虽然msg1的内容进行移动了, 但是其还可以访问数据, 只是大小变为0了
printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));
// 3.关闭msg1, 因为msg1数据进行移动了, 因此建议关闭不要再去使用
printf("第三步:关闭msg1:\n");
zmq_msg_close(&msg1);
//printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); //msg1已经关闭了, 不能再去操作了
printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));
// 4.关闭msg2
printf("第四步:关闭msg2:\n");
zmq_msg_close(&msg2);
//printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //msg2已经关闭了不能再进行访问了
return 0;
}
十五、消息路由ID的设置/获取(zmq_msg_set_routing_id、zmq_msg_routing_id)
设置路由ID(zmq_msg_set_routing_id)
int zmq_msg_set_routing_id (zmq_msg_t *message, uint32_t routing_id);
- API参考文档:http://api.zeromq.org/master:zmq-msg-set-routing-id
- 功能:在消息上设置路由ID属性
- 参数:
- message:设置的消息结构
- routing_id:路由ID
- 返回值:
- 成功:返回0
- 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:提供的routing_id为零
- 相关描述:
- 函数的作用是:在消息参数所指向的消息上设置指定的routing_id
- routing_id必须大于零
- 要获得有效的路由ID,您必须从ZMQ_SERVER套接字接收一条消息,并使用libzmq:zmq_msg_routing_id方法
- 路由id是临时的
获取路由ID(zmq_msg_routing_id)
uint32_t zmq_msg_routing_id (zmq_msg_t *message);
- API参考文档:http://api.zeromq.org/master:zmq-msg-routing-id
- 功能:获取消息的路由ID(如果有)
- 参数:message:要获取的消息结构
- 返回值:
- 没有路由ID:返回0
- 否则:返回一个大于0的32位无符号整数
- 相关描述:
- 函数的作用是:返回消息的路由ID(如果有的话)
- 在从ZMQ_SERVER套接字接收的所有消息上设置路由ID
- 要向ZMQ_SERVER套接字发送消息,必须设置已连接的ZMQ_CLIENT对等点的路由ID
- 路由id是临时的
- 演示案例如下:接收客户端消息和路由ID
// 1.创建上下文
void *ctx = zmq_ctx_new ();
assert (ctx);
// 2.创建一个ZMQ_SERVER服务端, 并开启服务
void *server = zmq_socket (ctx, ZMQ_SERVER);
assert (server);
int rc = zmq_bind (server, "tcp://127.0.0.1:8080");
assert (rc == 0);
// 3.初始化一个消息结构
zmq_msg_t message;
rc = zmq_msg_init (&message);
assert (rc == 0);
// 4.接收消息
rc = zmq_msg_recv (server, &message, 0);
assert (rc != -1);
// 5.接收之后, 获取路由ID
uint32_t routing_id = zmq_msg_routing_id (&message);
assert (routing_id);
十六、发送消息(zmq_msg_send)
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
- API参考文档:http://api.zeromq.org/master:zmq-msg-send
- 功能:在指定的套接字上发送消息
- 相关描述:
- 该函数替换了zmq_sendmsg()函数,zmq_sendmsg()不推荐使用了,关于zmq_sendmsg()可以参阅:http://api.zeromq.org/master:zmq-sendmsg
- zmq_msg_send()函数将把消息发送给socket,并且把msg消息在socket的消息队列中进行排队
- 当把msg传递给该函数之后,msg所对应的zmq_msg_t结构就失效了,zmq_msg_send()会把zmq_msg_t对象的大小设置为0(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),但是没有关闭该对象,因此在zmq_msg_send()之后建议调用zmq_close()关闭msg参数对应的zmq_msg_t对象
- 根据上面的特性,如果你想重复使用zmq_msg_send()之前msg参数对应的数据,那么可以在调用zmq_msg_send()之前使用zmq_msg_copy()拷贝msg参数,这样就有多个zmq_msg_t对象引用msg对引用的数据
- 成功调用zmq_msg_send()并不表示消息已传输到网络中,仅表名它已在套接字上排队并且ØMQ承担了对该消息的责任
参数
- msg:要发送的消息
- socket:操作的套接字
- flags:一些标志,如下所示:
- ZMQ_DONTWAIT:对于套接字类型(DEALER,PUSH),当没有可用的对等点(或所有的对等点有完整的高水位标记)时阻塞,指定该操作应该在非阻塞模式下执行。如果消息不能在套接字上排队,则zmq_msg_send()函数将失败,errno设置为EAGAIN
- ZMQ_SNDMORE:指定要发送的消息是多部分消息,并且后面还将有其他消息部分。详情参阅下面“多部分消息”介绍和演示案例②
返回值
- 成功:返回消息中的字节数(如果字节数大于MAX_INT,函数将返回MAX_INT)
- 失败:返回-1,并将errno设置为以下定义的值之一:
- EAGAIN:zmq_msg_send()在非阻塞模式(设置了ZMQ_DONTWAIT)下发送消息,但无法发送消息(详情见上面的ZMQ_DONTWAIT套接字选项)
- ENOTSUP:此套接字类型不支持zmq_msg_send()操作
- EINVAL:发送方试图发送多部分数据,这是套接字类型不允许的
- EFSM:由于套接字不处于适当的状态,目前无法在该套接字上执行zmq_msg_send()操作。如果套接字类型在多个状态之间切换,比如ZMQ_REP,可能会发生此错误。有关更多信息,请参阅zmq_socket()的消息传递模式部分
- ETERM:与指定套接字关联的ØMQ 上下文已终止(可以参阅zmq_ctx_destroy():https://blog.csdn.net/qq_41453285/article/details/105993260)
- ENOTSOCK:提供的套接字无效
- EINTR:在消息被发送之前,一个信号的发送中断了操作
- EFAULT:无效的消息
- EHOSTUNREACH:该消息无法路由
多部分消息
- 多部分消息在文章最初已经介绍过了,详情见文章最开始即可,此处做一个简单的介绍就可以了
- ØMQ消息由1或更多的消息部分。每个消息部分本身是一个独立的zmq_msg_t
- 发送多部分消息的应用程序在发送每个消息部分(最后一个消息除外)时必须使用ZMQ_SNDMORE标志(见下面演示案例②)
演示案例①
- 下面是一个客户端程序,其创建一个msg1对象和一个msg2对象,然后把msg1的内容拷贝给msg2,然后再将msg1的数据发送给服务端(这里想测一下zmq_msg_send()对参数1的影响)
#include <stdio.h> #include <string.h> #include <zmq.h> int main() { // 1.初始化上下文 void *context = zmq_ctx_new(); if(context == NULL) { printf("zmq_ctx_new error\n"); return -1; } // 2.创建套接字, 绑定地址 void *requester = zmq_socket(context, ZMQ_REQ); if(zmq_connect(requester, "tcp://localhost:5555") == -1) { printf("zmq_connect error\n"); return -1; } // 3.初始化msg1 printf("初始化第一条消息msg1:\n"); zmq_msg_t msg1; zmq_msg_init_size(&msg1, 6); memcpy(zmq_msg_data(&msg1), "Hello", 6); printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); // 4.将msg1拷贝给msg2 printf("将msg1拷贝给msg2:\n"); zmq_msg_t msg2; zmq_msg_init_size(&msg2, 6); zmq_msg_copy(&msg2, &msg1); printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //5.发送数据 printf("发送msg1:\n"); if(zmq_msg_send(&msg1, requester, 0) == -1) { printf("zmq_msg_send error\n"); return -1; } //可以看到zmq_msg_send()是把msg1的大小设置为0(标志其不能去使用了), 但是仍可以可以访问msg1的数据 printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); // 6.关闭msg1, 因为msg1的大小被设置为0, 不再使用了 printf("关闭msg1:\n"); zmq_msg_close(&msg1); //printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); //msg1被关闭了, 不能再去使用了 printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); // 7. 关闭msg2 printf("关闭msg2:\n"); zmq_msg_close(&msg2); //printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //msg2被关闭了, 不能再去使用了 // 8.关闭套接字, 释放上下文 zmq_close(requester); zmq_ctx_term(context); return 0; }
- 运行结果如下所示:
- 左侧就是我们上面这个程序,右侧是服务端代码(服务端代码这里就不给出了,想看的可以参阅https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.c)
- 可以看到在zmq_msg_send()之后,函数会把参数1的zmq_msg_t对象的大小设置为0,但是没有关闭(zmq_msg_close())该对象,因此我们还可以访问该对象
- 但是在 zmq_msg_send()之后,建议手动关闭(zmq_msg_close())参数1的zmq_msg_t对象大小
演示案例②
- 下面发送多部分信息,我们同时像服务端发送了msg1和msg2两个消息,备注:
#include <stdio.h> #include <string.h> #include <zmq.h> int main() { // 1.初始化上下文 void *context = zmq_ctx_new(); if(context == NULL) { printf("zmq_ctx_new error\n"); return -1; } // 2.创建套接字, 绑定地址 void *requester = zmq_socket(context, ZMQ_REQ); if(zmq_connect(requester, "tcp://localhost:5555") == -1) { printf("zmq_connect error\n"); return -1; } // 3.初始化msg1 printf("初始化第一条消息msg1:\n"); zmq_msg_t msg1; zmq_msg_init_size(&msg1, 6); memcpy(zmq_msg_data(&msg1), "Hello", 6); printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); // 4.初始化msg2 printf("初始化第二消息msg2:\n"); zmq_msg_t msg2; zmq_msg_init_size(&msg2, 6); memcpy(zmq_msg_data(&msg2), "Hello", 6); printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //5.将msg1和msg2原子地发送出去, 其中第一个send需要指明ZMQ_SNDMORE标记 printf("发送msg1和msg2:\n"); if(zmq_msg_send(&msg1, requester, ZMQ_SNDMORE) == -1) { printf("zmq_msg_send error\n"); return -1; } if(zmq_msg_send(&msg2, requester, 0) == -1) { printf("zmq_msg_send error\n"); return -1; } //可以看到zmq_msg_send()是把msg1和msg2的大小设置为0(标志其不能去使用了), 但是仍可以可以访问它们的数据 printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); // 6.关闭msg1和msg2, 它们都不再使用了 printf("关闭msg1和msg2:\n"); zmq_msg_close(&msg1); zmq_msg_close(&msg2); // 8.关闭套接字, 释放上下文 zmq_close(requester); zmq_ctx_term(context); return 0; }
- 效果如下所示:
- 左侧发送了两条消息给服务端,右侧服务端收到两条消息
- 注意:服务端是通过调用两次zmq_msg_recv()接收数据的,不是一次zmq_msg_recv(),见下面解析
- 服务端代码解析:服务端核心代码如下,先是接收客户端数据,然后再向服务端发送数据。通过上图我们知道,服务端打印了两次“Received Hello”,所以服务端执行了两次while(1),每次执行while循环的时候接收数据,然后发送数据,发送数据的时候由于客户端没有接收函数,所以其直接返回继续执行下一次循环,因此上面打印了两次“Received Hello”
十七、接收消息(zmq_msg_recv)
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
- API参考文档:http://api.zeromq.org/master:zmq-msg-recv
- 功能:在指定的套接字上接收消息
- 相关描述:
- 该函数替换了zmq_recvmsg()函数,zmq_recvmsg()不推荐使用了,关于zmq_recvmsg()可以参阅:http://api.zeromq.org/master:zmq-recvmsg
- zmq_msg_recv()函数将从套接字参数引用的套接字接收消息部分,并将其存储在msg参数引用的消息中
- 如果msg之前存储有消息则会被正确的释放
- 如果指定的套接字上没有可用的消息部分,则zmq_msg_recv()函数将阻塞,直到满足请求为止
参数
- msg:用来保存接收的数据
- socket:操作的套接字
- flags:一些标志,如下所示:
- ZMQ_DONTWAIT:指定该操作在非阻塞模式下执行。如果指定的套接字上没有可用的消息,则zmq_msg_recv()函数将失败并将errno设置为EAGAIN
返回值
- 成功:返回接收的消息的字节数。注意,如果消息被截断,该值可能会超过len参数的值
- 失败:返回-1,并将errno设置为以下定义的值之一:
- EAGAIN:zmq_msg_recv()函数在非阻塞模式(ZMQ_DONTWAIT)下运行,套接字上没有数据可接收
- ENOTSUP:这个套接字类型不支持zmq_msg_recv()操作
- EFSM:由于套接字不处于适当的状态,目前无法在该套接字上执行zmq_msg_recv()操作。如果套接字类型在多个状态之间切换,比如ZMQ_REP,可能会发生此错误。有关更多信息,请参阅zmq_socket()的消息传递模式部分
- ETERM:与指定套接字关联的ØMQ上下文已终止(可以参阅zmq_ctx_destroy():https://blog.csdn.net/qq_41453285/article/details/105993260)
- ENOTSOCK:提供的套接字无效
- EINTR:在消息被发送之前,一个信号的发送中断了操作
多部分消息
- 多部分消息在文章最初已经介绍过了,详情见文章最开始即可,此处做一个简单的介绍就可以了
- ØMQ消息由1或更多的消息部分。每个消息部分本身是一个独立的zmq_msg_t
- 处理多部分消息的应用程序在调用zmq_msg_recv()之后,必须传递ZMQ_RCVMORE选项给zmq_getsockopt()来确定是否还有其他部分要接收
演示案例①
- 从套接字接收消息
// 初始化消息结构 zmq_msg_t msg; int rc = zmq_msg_init (&msg); assert (rc == 0); // 接收消息 rc = zmq_msg_recv (&msg, socket, 0); assert (rc != -1); // 释放消息 zmq_msg_close (&msg);
演示案例②
- 接收多部分消息
int more; size_t more_size = sizeof (more); do { // 初始化消息 zmq_msg_t part; int rc = zmq_msg_init (&part); assert (rc == 0); // 接收消息 rc = zmq_msg_recv (&part, socket, 0); assert (rc != -1); //判断是否还有消息要接收 rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); assert (rc == 0); // 关闭消息 zmq_msg_close (&part); } while (more);
十八、判断是否还有很多的消息要接收(zmq_msg_more)
int zmq_msg_more(zmq_msg_t *message);
- API参考文档:http://api.zeromq.org/master:zmq-msg-more
- 功能:查询是否还有更多消息要接收
- 相关描述:
- zmq_msg_more()函数判断message参数所指的消息是否是由多个部分组成的消息的一部分,以及是否有其他部分要接收
- 可以在zmq_msg_close()之后安全地调用此方法。该方法与带有ZMQ_MORE参数的zmq_msg_get()相同
- 参数:message:要判断的消息结构
- 返回值:
- 成功0:如果这是多部分消息的最后一部分,或者是单部分消息的唯一一部分,返回0
- 失败1:如果这是多部分消息的最后一部分,或者是单部分消息的唯一一部分
- 演示案例如下:接收多部分信息.
zmq_msg_t part;
while (true) {
// 初始化消息
int rc = zmq_msg_init (&part);
assert (rc == 0);
// 接收消息
rc = zmq_msg_recv (socket, &part, 0);
assert (rc != -1);
// 判断是否还有更多消息要接收, 如果继续循环接收
if (zmq_msg_more (&part))
fprintf (stderr, "more\n");
else { //否则退出循环
fprintf (stderr, "end\n");
break;
}
//关闭消息
zmq_msg_close (&part);
}
十九、其他接口
- zmq_recvmsg():已经被zmq_msg_recv()取代了,不建议使用。参考文档为:http://api.zeromq.org/master:zmq-recvmsg
- zmq_sendmsg():已经被zmq_msg_send()取代了,不建议使用。参考文档为:http://api.zeromq.org/master:zmq-sendmsg
- zmq_send():与zmq_msg_send()接口的功能一样,也是向套接字发送数据,不过其发送的数据(参数2)为void*类型,不是zmq_msg_t类型。参考文档为:http://api.zeromq.org/master:zmq-send
- zmq_recv():与zmq_msg_recv()接口的功能一样,也是从套接字接收数据,不过其接收的数据(参数2)为void*类型,不是zmq_msg_t类型。参考文档为:http://api.zeromq.org/master:zmq-recv
- zmq_send_const():与zmq_send()一样,也是向套接字发送数据,不过其是发送恒定内存消息,也就是参数2所指的消息缓冲区为常量内存,因此不会以任何方式复制或释放。参考文档为:http://api.zeromq.org/master:zmq-send-const。
- 我是小董,V公众点击"笔记白嫖"解锁更多【ZeroMQ】资料内容。
更多推荐
已为社区贡献7条内容
所有评论(0)