一、ØMQ的消息处理

使用套接字来传输数据

  • 但ØMQ的I/O模型与TCP模型有很大区别,你需要时间来转变观念。
  • 处理数据时,TCP套接字和ØMQ套接字之间的差异:
    • ØMQ套接字像UDP那样传递信息,而不是像TCP那样传递字节流。ØMQ消息是指定长度的二进制数据,因为它们的设计针对性能进行了优化,所以有点棘手
    • ØMQ套接字在一个后台线程执行自己的IO。这意味着消息到达本地输入队列并从本地输出队列被发送,不会影响到你的应用程序运行
    • ØMQ套接字根据套接字类型具有内置的1对N的路由行为

zmq_msg_xxx()消息处理接口

  • 在内存中,ØMQ消息是zmq_msg_t表示的结构(或类,取决于你采用的语言)
  • 下面是C语言中使用ØMQ消息的基本规则:
    • 创建并创建zmq_msg_t对象,使用zmq_msg_t来表示消息,而不是使用普通的数据块(char*)来交互数据
    • 要读取消息,可使用zmq_msg_init()创建一个空的消息,然后传递给zmq_msg_recv()
    • 要写入消息,可以使用zmq_msg_init_size()来创建消息,并分配某个大小的数据块数据,使用memcpy()将数据块的数据拷贝给zmq_msg_t,然后将zmq_msg_t传递给zmq_msg_send()进行发送
    • 要释放消息,则调用zmq_msg_close(),这会删除一个引用,当消息引用为0时,ØMQ会最终自动帮你销毁该消息
    • 要访问消息内容,可以使用zmq_msg_data()
    • 要知道消息包含多少数据,可以使用zmq_msg_size()
    • 一般不建议使用zmq_msg_move()、zmq_msg_copy()、zmq_msg_init_data(),除非你的目标很明确就是要用这些函数
    • zmq_msg_send()传递一个消息时候,会把该消息清除(把它的大小设置为0),因此消息发送之后需要关闭(zmq_msg_close())并且不再使用。如果你想多次发送相同的数据,可以创建两个zmq_msg_t消息对象发送,或者在调用zmq_msg_init()之前使用zmq_msg_copy()拷贝两份一样的数据并同时发送
  • 此处给出的只是个大概,更多的细节参阅下面的接口介绍和演示案例

ØMQ对字符串的处理

  • 在前面的文章我们介绍了如何处理ØMQ的字符串并封装了下面两个字符串处理函数,文章可以参阅:https://blog.csdn.net/qq_41453285/article/details/105991716
  • 下面是自定义的两个函数:
    • 一个是字符串接收函数:其从网络中接收一个ØMQ字符串,并申请多1个字节空间的内存保存该字符串,然后在尾部要添加0,以终止该字符串
    • 一个是字符串发送函数:向网络中发送一个字符串,单发送的字符串不含尾部的空字符
// 从套接字接收ØMQ字符串,并将其转换为C/C++字符串(在尾部添加0)
static char *s_recv(void* socket)
{
    // 此处使用zmq_msg_init()初始化即可, zmq_msg_recv()在内部会自动对zmq_msg_t对象进行大小设定
    zmq_msg_t message;
    zmq_msg_init(&message);

    int size = zmq_msg_recv(&message, socket, 0);
    if(size == -1)
        return NULL;

    char *string = (char*)malloc(size + 1);
    memcpy(string, zmq_msg_data(&message), size);

    zmq_msg_close(&message);
    string[size] = 0;
    return string;
}
// 将C字符串转换为ØMQ字符串(去掉尾部的'\0'),并发送到指定的套接字上
static int s_send(void *socket, char *string)
{
    // 因为是将数据拷贝给zmq_msg_t对象, 因此需要使用zmq_msg_init_size进行初始化
    zmq_msg_t msg;
    zmq_msg_init_size(&msg, strlen(string));
    memcpy(zmq_msg_data(&msg), string, strlen(string));

    // 发送数据
    int rc = zmq_msg_send(&msg, socket, 0);

    // 关闭zmq_msg_t对象
    zmq_msg_close(&msg);

    return rc;
}

“部件”和“帧”的概念

  • 帧(Frame)(在ØMQ参考手册中也称为“消息部件”)是ØMQ消息的基本线路格式。帧是已指定长度的数据块,此长度可以从零向上。如果做过任何TCP编程工作,你就会明白,为什么帧是“现在我应该从网络套接字读出多少数据?”这个问题的一个有用的答案

  • 最初,一个ØMQ消息是一帧,像UDP一样。后面我们采用多部分消息来扩展了这一点,这是相当简单的带有被设置为1的“more”位的帧的序列,接着是一个该位被设置为零的帧。ØMQ API然后让你写入一个“more”标志的消息,并且当你读取消息是,它可以让你检查是否存在“more”
  • 因此,在低级别ØMQ API和参考手册中,有关于消息与部分有一些模糊性。下面用一个有用的词汇表来说明:
    • 消息可以是一个或多个部件
    • 这些部件也称为帧
    • 每个部件都是一个zmq_msg_t对象
    • 你用低级别的API分别发送和接收各个部件
    • 高级别API为发送整个多部分消息提供包装
  • 其他与消息相关的内容:

    • 你可以发送长度为0的消息。例如,用于从一个线程把一个信号发送给另一个线程
    • ØMQ保证提供一个消息所有的部分(一个或多个)或者一个部分也没有
    • ØMQ不立刻发送消息(单个或多部分),而在以后某些不确定的时间发送。因此,一个多部分消息必须在内存中装入
    • 单部分消息也必须装入内存。如果你想发送任意大小的文件,应该把它们分解成片,并把每一片作为独立的单部分消息发送
    • 在作用域关闭时不自动销毁对象的语言中,在完成消息时,必须调用zmq_msg_close()
    • 轻易不要使用zmq_msg_init_data(),这是一个零拷贝的方法,如果使用不好会带来麻烦。零拷贝可参阅:https://blog.csdn.net/qq_41453285/article/details/106845900

二、多部分消息

  • ØMQ允许我们撰写由多个帧组成的单个消息,从而给我们一个“多部分消息”。实际的应用程序中相当多地使用了多部分消息,无论是包装带地址信息的消息,还是进行简单的序列化
  • 关于多部分消息,你需要了解的是:
    • 当发送一个多部分消息时,仅当你发送最后的部分时,所有的消息才会整整在线路上实际发送
    • 如果你使用的是zmq_poll(),当你收到一条消息的第一部分时,其余部分也都是已经到达了
    • ØMQ确保消息的原子传递:对等方应该收到消息的所有消息部分,或者根本不收到任何消息。除非关闭套接字,否则没有办法取消部分发送的消息
    • 消息部分的总数不受可用存储空间的限制
    • 在使用多部分消息时,每个部分都是一个zmq_msg_t条目。例如,如果你要发送的消息具有5个部分,你就必须构造5个zmq_msg_t对象
    • 在发送时,ØMQ消息帧都在内存中排队,直到最后一个小熊被接收到位置,然后再一起发送它们
    • 在接收时,无论你是否设置RCVMORE选项,都将受到一条消息的所有部分
  • 在后面的文章中我们会研究应答封包。现在我们学习如何安全地(但一位地)在需要转发消息但不检查它们的任何应用程序(如代理)中读写多部分消息

写多部分消息

  • 例如,下面发送三条消息,三条消息组成一条多部分消息,并且调用三次zmq_msg_send()发送出去,注意其中用到了ZMQ_SNDMORE选项
zmq_msg_t message1;
zmq_msg_t message2;
zmq_msg_t message3;

//初始化这三条消息

//发送第一条, 指定ZMQ_SNDMORE选项, 表示发送的是多部分消息的其中一部分, 后面还要消息要发送
zmq_msg_send(socket, &message1, ZMQ_SNDMORE);
//发送第二条,同上
zmq_msg_send(socket, &message2, ZMQ_SNDMORE);
//发送最后一条消息, 因为后面没有消息要发送了, 因此最后一个参数为0即可
zmq_msg_send(socket, &message3, 0);
  • 更多详细细节可以参阅下面“zmq_msg_send()的介绍及其演示案例②”

读多部分消息

  • 在接收消息时,可以使用ZMQ_RCVMORE调用zmq_getsockopt()函数来判断套接字是否还有更多的消息要接收
  • 下面是一个即可以处理单部分消息又可以处理多部分消息的代码
while(1)
{
    zmq_msg_t message;
    zmq_msg_init(&message);

    zmq_msg_recv(socket, &message, 0);

    zmq_msg_close(&message);

    int more;
    size_t more_size = sizeof(more);
    zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size);
    if(!more)
        break;
}

三、接口使用的几点说明

关于zmq_msg_init()和zmq_msg_init_size()的踩坑记录

  • 这两个函数曾经骚扰我半天,由于ØMQ操作文档说明的不详细,我搞了半天才弄好
  • ①在发送数据的时候:我们需要调用memcpy()将数据拷贝到zmq_msg_t中进行发送,不可以调用zmq_msg_init()初始化的zmq_msg_t对象进行存储,因为zmq_msg_init()初始化的对象其大小被设定为0,在调用zmq_msg_send()的时候会报错的。见下面代码
/*******下面这种情况是错误的*******/

zmq_msg_t msg;
zmq_msg_init(&msg);

// 将str拷贝给msg
char *str= "HelloWolrd";
memcpy(zmq_msg_data(&msg), str, 11);

// 打印的是HelloWolrd, 但是大小为0, 大小为0就代表该zmq_msg_t对象不可用
printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg));

// zmq_msg_send()会出错, msg虽然有内容, 但是其大小为0
// zmq_msg_send(&msg, socket, 0);

 

  • 发送数据的时候请使用zmq_msg_init_size()初始化对象,这样发送出去的zmq_msg_t对象是有大小的,不会被zmq_msg_send()判断为是错的
/*******下面这种情况是正确的*******/

char *str= "HelloWolrd";

// 初始化时指定其大小
zmq_msg_t msg;
zmq_msg_init_size(&msg, strlen(str) + 1);

// 将str拷贝给msg
memcpy(zmq_msg_data(&msg), str, 11);

// 打印HelloWorld, 大小为11
printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg));

// zmq_msg_send()调用成功
// zmq_msg_send(&msg, socket, 0);

  • ②在接收数据的时候:接收数据时,可以使用zmq_msg_init()定义的zmq_msg_t对象来保存数据,zmq_msg_recv()函数内部会自动的设置zmq_msg_t对象的大小
// 初始化时指定其大小
zmq_msg_t msg;
zmq_msg_init(&msg);

// 接收数据, zmq_msg_recv()内部会自动
zmq_msg_recv(&msg, socket, 0);

关于拷贝

  • 当把数据拷贝给zmq_msg_t对象时,如果数据的长度超过zmq_msg_t对象的大小,zmq_msg_t对象仍然可以获取完整数据,但是使用起来时只能使用其指定的大小
#include <stdio.h>
#include <zmq.h>

int main()
{
    // 初始化msg时, 指定其大小为5
    zmq_msg_t msg;
    zmq_msg_init_size(&msg, 5);

    // 拷贝11字节给msg
    memcpy(zmq_msg_data(&msg), "HelloWorld", 11);

    // 打印的大小将为5
    printf("%s %ld\n", (char*)zmq_msg_data(&msg), zmq_msg_size(&msg));

    zmq_msg_close(&msg);
   
    return 0;
}

四、zmq_msg_t结构及源码分析

  • 本文使用的源码为zeromq4.1.7

zmq_msg_t的结构

  • ØMQ用zmq_msg_t结构来表示一条小消息,其源码定义如下: 
//zmq.h
typedef struct zmq_msg_t {
#if defined (__GNUC__) || defined ( __INTEL_COMPILER) || \
        (defined (__SUNPRO_C) && __SUNPRO_C >= 0x590) || \
        (defined (__SUNPRO_CC) && __SUNPRO_CC >= 0x590)
    unsigned char _ [64] __attribute__ ((aligned (sizeof (void *))));
#elif defined (_MSC_VER) && (defined (_M_X64) || defined (_M_ARM64))
    __declspec (align (8)) unsigned char _ [64];
#elif defined (_MSC_VER) && (defined (_M_IX86) || defined (_M_ARM_ARMV7VE))
    __declspec (align (4)) unsigned char _ [64];
#else
    unsigned char _ [64];
#endif
} zmq_msg_t;
  • zmq_msg_t并不是真正存储数据的地方:我们在操作的时候zmq_msg_t的时候,实际上将其转换为zmq命令空间中的一种msg_t类来使用的。例如下图所示:调用zmq_msg_init()初始化zmq_msg_t的时候实际调用的就是msg_t类的init()方法 

msg_t类

  • msg_t类是真正存储数据的地方,如下图所示(代码被缩减了,完整的定义见msg.hpp)
  • 结构如下:
    • base结构体:其有一个type成员用来表示这个消息属于什么类型的,不同类型的消息会用下面不同的结构体存储消息
    • vsm、lmsg、cmsg等结构体:代表消息的不同类型,当前msg_t属于哪一种类型,哪一个结构体就会被初始化。其中这些结构体中的data字段存储的是消息的真正值、size存储消息的大小等
//msg.hpp
namespace zmq
{
    class msg_t
    {
    public:
        bool check ();
        int init ();
        int init_size (size_t size_);
        int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
            void *hint_);
        int init_delimiter ();
        int close ();
        int move (msg_t &src_);
        int copy (msg_t &src_);
        void *data ();
        size_t size ();

    private:
        struct content_t
        {
            void *data;  //真正存储数据的复方
            size_t size; //数据的大小
            msg_free_fn *ffn; //数据释放函数, 见zmq_msg_init_data()函数
            void *hint; //传递给ffn的参数
            zmq::atomic_counter_t refcnt; //消息的引用计数
        };

        union {
            struct {
                metadata_t *metadata;
                unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
                unsigned char type;
                unsigned char flags;
            } base;
            struct {
                metadata_t *metadata;//元数据
                unsigned char data [max_vsm_size];//消息数据
                unsigned char size;
                unsigned char type;
                unsigned char flags;
            } vsm; //vsm消息类型
            struct {
                metadata_t *metadata;//元数据
                content_t *content;//消息数据
                unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
                unsigned char type;
                unsigned char flags;
            } lmsg; //lmsg消息类型
            struct {
                metadata_t *metadata;
                void* data;//消息数据
                size_t size;
                unsigned char unused
                    [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
                unsigned char type;
                unsigned char flags;
            } cmsg; //cmsg消息类型
            struct {
                metadata_t *metadata;
                unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
                unsigned char type;
                unsigned char flags;
            } delimiter;
        } u;
    };
}

API分析

  • 以zmq_msg_init()为例,它会在内部调用msg_t的init_size()函数,init_size()函数会根据消息的大小来初始化msg_t的不同类型(vsm、lmsg、cmsg等类型),同时将结构的数据初始化

  • 以zmq_msg_data()为例,它会在内部调用msg_t的data()函数,data()函数会根据base结构体中的type字段来判断当前数据属于什么类型,进而将不同结构的数据(返回data字段)进行返回

五、初始化空的ØMQ消息(zmq_msg_init)

int zmq_msg_init (zmq_msg_t *msg);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-init
  • 功能:初始化空的ØMQ消息
  • 参数:要初始化的ØMQ消息结构
  • 返回值:该函数总是返回0,没有错误定义
  • 相关描述:
    • zmq_msg_init()函数将初始化msg引用的消息对象,以表示一条空消息。在使用zmq_msg_recv()接收消息之前调用此函数最有用
    • 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
    • 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次
    • zmq_msg_init()的zmq_msg_t对象,其大小为0,因此不能用在类似发送的函数中,但是可以用来接收(详情参阅上面的“接口使用的几点说明”)

演示案例

  • 从套接字接收消息
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
assert(rc == 0);
int nbytes = zmq_msg_recv(socket, &msg, 0);
assert(nbytes != -1);

六、初始化指定大小的ØMQ消息(zmq_msg_init_size)

int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-init-size
  • 功能:初始化指定大小的ØMQ消息
  • 参数:
    • msg:要初始化的ØMQ消息结构
    • size:初始化ØMQ消息结构的大小
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:ENOMEM:可用的存储空间不足
  • 相关描述:
    • zmq_msg_init_size()函数将分配存储消息大小字节所需的任何资源,并初始化msg引用的消息对象来表示新分配的消息
    • 实现应该选择将消息内容存储在堆栈(小消息)还是堆(大消息)上。出于性能原因,zmq_msg_init_size()不得清除消息数据
    • 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
    • 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次

七、从缓冲区中初始化ØMQ消息(zmq_msg_init_data)

typedef void (zmq_free_fn) (void *data, void *hint);

int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-init-data
  • 功能:从提供的缓冲区初始化ØMQ消息
  • 参数:
    • msg:要初始化的ØMQ消息结构
    • data:缓冲区的数据,是用来初始化msg的
    • size:参数data缓冲区数据对应的大小
    • ffn:如果data是动态申请的,那么可以添加这个函数用来回收内存,其参数1data就是zmq_msg_init_data()函数的参数1,参数2hint是zmq_msg_init_data()函数的参数4
    • hint:传递给ffn函数的参数2
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:ENOMEM:可用的存储空间不足
  • 相关描述:
    • zmq_msg_init_data()函数将初始化msg引用的消息对象。不得进行复制任何数据,ØMQ应拥有所提供缓冲区的所有权
    • 如果提供释放功能(参数4),则一旦ØMQ不再需要缓冲区中的数据,就应该调用释放函数ffn释放内存
    • 释放函数ffn需要是线程安全的,因为它将从任意线程调用。如果没有提供释放函数,则分配的内存将不会被释放,这可能会导致内存泄漏
    • 永远不要直接访问zmq_msg_t成员,而是始终使用zmq_msg_xxx()系列函数
    • 函数zmq_msg_init()、zmq_msg_init_data()和zmq_msg_init_size()是互斥的。调用这三者之一即可,不要初始化相同的zmq_msg_t两次
    • ØMQ使用zmq_msg_init_data()来实现零拷贝,可参阅:https://blog.csdn.net/qq_41453285/article/details/106845900

演示案例

  • 从提供的缓冲区初始化消息
//释放函数
void my_free (void *data, void *hint)
{
    free (data);
}

/* ... */

//申请数据
void *data = malloc (6);
assert (data);
memcpy (data, "ABCDEF", 6);

//用data初始化msg
zmq_msg_t msg;
rc = zmq_msg_init_data (&msg, data, 6, my_free, NULL);
assert (rc == 0);

八、释放ØMQ消息(zmq_msg_close)

int zmq_msg_close (zmq_msg_t *msg);
  •  API参考文档:http://api.zeromq.org/master:zmq-msg-close
  • 功能:将消息的引用计数减1
  • 参数:
    • msg:要释放的消息结构
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
  • 相关描述:
    • 应用程序应该确保在不再需要消息时调用zmq_msg_close(),否则可能发生内存泄漏
    • 该函数只是将zmq_msg_t对象所指数据的引用计数减1,并把自己的大小设置为0而已。当一个zmq_msg_t对象调用zmq_msg_close()之后,如果其之前所指的数据还有其他zmq_msg_t对象引用,那么该zmq_msg_t对象所指的数据不会真正的被释放,只有数据的最后一个zmq_msg_t引用对象调用zmq_msg_close()时才真正的释放内存(见下面演示案例①)
    • 当一个zmq_msg_t对象调用zmq_msg_close()之后就不能再对这个zmq_msg_t对象进行操作了,如果操作会报错(见下面演示案例②)。即使它所指的数据还有其它zmq_msg_t对象引用也不行
    • 注意,在zmq_msg_send()成功之后,zmq_msg_send()会把zmq_msg_t对象的大小设置为0(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),但是没有关闭该对象,因此zmq_msg_send()之后需要关闭zmq_msg_t对象(更多详细的细节见下面zmq_msg_send()函数的介绍和演示案例)

演示案例①

  • 下面创建两个消息msg1和msg2,其中msg2拷贝于msg1
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main()
{
    // 1.初始化第一个消息
    printf("第一步: 初始化msg1:\n");
    zmq_msg_t msg1;
    zmq_msg_init_size(&msg1, 6);
    memcpy(zmq_msg_data(&msg1), "Hello", 6);
    printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));

    // 2.将msg1拷贝给msg2
    printf("第二步:将msg1拷贝给msg2:\n");
    zmq_msg_t msg2;
    zmq_msg_init_size(&msg2, 6);
    zmq_msg_copy(&msg2, &msg1);
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 3.关闭msg1
    printf("第三步:关闭msg1:\n");
    zmq_msg_close(&msg1);
    //printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); msg1已经关闭了不能再进行访问了
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 4.关闭msg2
    printf("第四步:关闭msg2:\n");
    zmq_msg_close(&msg2);

    //msg2已经关闭了不能再进行访问了
    //printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    return 0;
}
  • 编译运行效果如下:

演示案例②

  • 下面创建一个消息msg1,然后初始化msg1消息,最后把它释放,释放之后就不能再操作该zmq_msg_t对象了,如果操作就会报错
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main()
{
    // 1.初始化第一个消息
    printf("第一步: 初始化msg:\n");
    zmq_msg_t msg;
    zmq_msg_init_size(&msg, 6);
    memcpy(zmq_msg_data(&msg), "Hello", 6);
    printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg), (int)zmq_msg_size(&msg));

    // 2.关闭msg1
    printf("第二步:关闭msg:\n");
    zmq_msg_close(&msg);

    // 3.msg已经关闭了, 再去访问就会报错
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg), (int)zmq_msg_size(&msg));

    return 0;
}

九、设置/获取消息属性(zmq_msg_set、zmq_msg_get)

zmq_msg_set

int zmq_msg_set (zmq_msg_t *message, int property, int value);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-set
  • 功能:设置消息属性
  • 参数:
    • message:要设置的消息
    • property:要设置的属性
    • value:属性值
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
  • 相关描述:当前zmq_msg_set()函数不能设置任何属性

zmq_msg_get

int zmq_msg_get (zmq_msg_t *message, int property);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-get
  • 功能:获取消息属性
  • 参数:
    • message:要获取的消息
    • property:要获取的属性
  • 返回值:
    • 成功:返回参数2所指定的属性的值
    • 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
  • 可以获取的属性如下:
    • ZMQ_MORE:指示消息后面有更多消息帧要跟随
    • ZMQ_SRCFD:返回从套接字读取消息的文件描述符。这允许应用程序通过getpeername()检索远程端点。请注意,相应的套接字可能已经关闭,甚至可以重用。目前只针对TCP套接字实现
    • ZMQ_SHARED:指示消息可以与该消息的另一个副本共享底层存储

十、获取消息元数据属性(zmq_msg_gets)

const char *zmq_msg_gets (zmq_msg_t *message, const char *property);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-gets
  • 功能:获取消息元数据属性
  • 参数:
    • message:要获取的消息
    • property:要获取的属性
  • 返回值:
    • 成功:返回属性的字符串值。调用方不得修改或释放返回的值,该值应归消息所有。属性和值的编码应为UTF8
    • 失败:返回NULL,并将errno设置为以下定义的值之一:EINVAL:请求的属性是未知的
  • 相关描述:
    • 该函数返回message的元数据,要获取的元数据属性为参数2所指定的属性,是一个字符串形式。参数2应该是以NULL结尾的UTF8编码字符串
    • https://rfc.zeromq.org/spec/37/中所指定的,在ZeroMQ连接握手期间,将基于每个连接定义元数据。应用程序可以使用zmq_setsockopt()设置ZMQ_METADATA设置元数据属性。应用程序元数据属性必须以X-为前缀
    • 另外,当底层传输可用时,Peer-Address属性将返回由getnameinfo()返回的远程端点的IP地址
    • 这些属性的名称也在zmq.h中定义为:ZMQ_MSG_PROPERTY_SOCKET_TYPE ZMQ_MSG_PROPERTY_ROUTING_ID、ZMQ_MSG_PROPERTY_PEER_ADDRESS。目前,这些定义仅作为API草案提供
    • 可以根据底层安全机制定义其他属性,请参阅下面的ZAP身份验证连接示例
    • 除了应用程序元数据外,还可以使用该函数检索以下ZMTP属性:
Socket-Type
Routing-Id

# 注意:Identity是Routing-Id的一个不赞成使用的别名

演示案例

  • 获取消息的zap身份验证用户ID:
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_msg_recv (&msg, dealer, 0);
assert (rc != -1);

const char *user_id = zmq_msg_gets (&msg, ZMQ_MSG_PROPERTY_USER_ID);
zmq_msg_close (&msg);

十一、获取指向消息内容的指针(zmq_msg_data)

void *zmq_msg_data (zmq_msg_t *msg);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-data
  • 功能:获取指向消息内容的指针,会将msg的内容以以指针的形式返回
  • 参数:msg:要检索的消息结构
  • 返回值:
    • 成功:返回一个指针,指向于msg的消息内容
    • 失败:没有出错的情况

十二、获取消息内容大小(zmq_msg_size)

size_t zmq_msg_size (zmq_msg_t *msg);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-size
  • 功能:获取消息内容的大小,以字节为单位
  • 参数:msg:要检索的消息结构
  • 返回值:
    • 成功:返回msg消息内容的大小(以字节为单位)
    • 失败:没有出错的情况

十三、复制消息内容(zmq_msg_copy)

int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-copy
  • 功能:将一条消息的内容复制给另一条消息,此时两条消息同指一块缓冲区数据
  • 参数:
    • dest:目标消息结构
    • src:源消息结构
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
  • 相关描述:
    • zmq_msg_copy()函数将src所引用的消息对象复制到dest所引用的消息对象中,如果dest之前有内容,则将其释放。在复制之前你必须初始化dest
    • 引用计数:zmq_msg_copy()的实现并不是在内存中创建一份dest新实例,然后将src拷贝给dest,而是将dest指向于src所指的内容,因此,dest和src是共享底层缓冲区中的数据的
    • 因此在复制之后要避免修改消息的内容,因为可能有多者共享这一条消息,其他修改会导致其它消息结构使用时产生未定义的行为
    • 如果您需要的是一个实际的硬拷贝,那么就不要使用该函数,可以使用zmq_msg_init_size()分配一个新消息,并使用memcpy()复制消息内容
    • 关于zmq_msg_copy()的演示案例可以看上面“zmq_msg_close()”函数的演示案例①

演示案例

  • 复制消息
//初始化msg
zmq_msg_t msg;
zmq_msg_init_size (&msg, 255);
memcpy(zmq_msg_data(&msg), "Hello, World", 12);

//将msg拷贝给copy, 此时copy与msg指向于同一块数据
zmq_msg_t copy;
zmq_msg_init(&copy);
zmq_msg_copy(&copy, &msg);

//...

zmq_msg_close (&copy); //关闭copy, 此时msg的内容只有自己引用
zmq_msg_close (&msg);  //再关闭msg, 此时msg指向的内容才真正释放

十四、移动消息内容(zmq_msg_move)

int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-move
  • 功能:将一条消息的内容移动给另一条消息
  • 参数:
    • dest:目标消息结构
    • src:源消息结构
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:EFAULT:无效的信息
  • 相关描述:
    • zmq_msg_move()函数将把src引用的消息对象的内容移动到dest引用的消息对象,在调用zmq_msg_move()后,src变成一个空消息,如果dest之前有内容,则将其释放然后更改为src的内容
    • 一个zmq_msg_t对象在调用zmq_msg_move()之后只是将自己的数据拷贝给其它zmq_msg_t对象,并把自己的大小设置为0。因此在移动之后其还可以访问之前的数据,但是大小变为0了(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),因此在一个zmq_msg_t对象调用zmq_msg_move()之后建议调用zmq_msg_close()关闭自己不要再使用了(见下面演示案例)
    • 与msg_msg_copy()的不同:msg_msg_copy()是将一个zmq_msg_t对象的数据拷贝给其他zmq_msg_t对象,导致数据的引用计数加1;zmq_msg_move()是将一个zmq_msg_t对象的数据移动到另外一个zmq_msg_t对象上
  • 演示案例如下:下面创建并初始化一个msg1对象,之后把msg1对象的内容移动给msg2对象。
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main()
{
    // 1.初始化第一个消息
    printf("第一步: 初始化msg1:\n");
    zmq_msg_t msg1;
    zmq_msg_init_size(&msg1, 6);
    memcpy(zmq_msg_data(&msg1), "Hello", 6);
    printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));

    // 2.将msg1内容移动到给msg2
    printf("第二步:将msg1数据移动给msg2:\n");
    zmq_msg_t msg2;
    zmq_msg_init_size(&msg2, 6);
    zmq_msg_move(&msg2, &msg1);
    //虽然msg1的内容进行移动了, 但是其还可以访问数据, 只是大小变为0了
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 3.关闭msg1, 因为msg1数据进行移动了, 因此建议关闭不要再去使用
    printf("第三步:关闭msg1:\n");
    zmq_msg_close(&msg1);
    //printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); //msg1已经关闭了, 不能再去操作了
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 4.关闭msg2
    printf("第四步:关闭msg2:\n");
    zmq_msg_close(&msg2);
    //printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //msg2已经关闭了不能再进行访问了

    return 0;
}

十五、消息路由ID的设置/获取(zmq_msg_set_routing_id、zmq_msg_routing_id)

设置路由ID(zmq_msg_set_routing_id)

int zmq_msg_set_routing_id (zmq_msg_t *message, uint32_t routing_id);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-set-routing-id
  • 功能:在消息上设置路由ID属性
  • 参数:
    • message:设置的消息结构
    • routing_id:路由ID
  • 返回值:
    • 成功:返回0
    • 失败:返回-1,并将errno设置为以下定义的值之一:EINVAL:提供的routing_id为零
  • 相关描述:
    • 函数的作用是:在消息参数所指向的消息上设置指定的routing_id
    • routing_id必须大于零
    • 要获得有效的路由ID,您必须从ZMQ_SERVER套接字接收一条消息,并使用libzmq:zmq_msg_routing_id方法
    • 路由id是临时的

获取路由ID(zmq_msg_routing_id)

uint32_t zmq_msg_routing_id (zmq_msg_t *message);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-routing-id
  • 功能:获取消息的路由ID(如果有)
  • 参数:message:要获取的消息结构
  • 返回值:
    • 没有路由ID:返回0
    • 否则:返回一个大于0的32位无符号整数
  • 相关描述:
    • 函数的作用是:返回消息的路由ID(如果有的话)
    • 在从ZMQ_SERVER套接字接收的所有消息上设置路由ID
    • 要向ZMQ_SERVER套接字发送消息,必须设置已连接的ZMQ_CLIENT对等点的路由ID
    • 路由id是临时的
  • 演示案例如下:接收客户端消息和路由ID
// 1.创建上下文
void *ctx = zmq_ctx_new ();
assert (ctx);

// 2.创建一个ZMQ_SERVER服务端, 并开启服务
void *server = zmq_socket (ctx, ZMQ_SERVER);
assert (server);
int rc = zmq_bind (server, "tcp://127.0.0.1:8080");
assert (rc == 0);

// 3.初始化一个消息结构
zmq_msg_t message;
rc = zmq_msg_init (&message);
assert (rc == 0);

// 4.接收消息
rc = zmq_msg_recv (server, &message, 0);
assert (rc != -1);

// 5.接收之后, 获取路由ID
uint32_t routing_id = zmq_msg_routing_id (&message);
assert (routing_id);

十六、发送消息(zmq_msg_send)

int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-send
  • 功能:在指定的套接字上发送消息
  • 相关描述:
    • 该函数替换了zmq_sendmsg()函数,zmq_sendmsg()不推荐使用了,关于zmq_sendmsg()可以参阅:http://api.zeromq.org/master:zmq-sendmsg
    • zmq_msg_send()函数将把消息发送给socket,并且把msg消息在socket的消息队列中进行排队
    • 当把msg传递给该函数之后,msg所对应的zmq_msg_t结构就失效了,zmq_msg_send()会把zmq_msg_t对象的大小设置为0(变为0之后就标记这个zmq_msg_t对象不需要再去使用了),但是没有关闭该对象,因此在zmq_msg_send()之后建议调用zmq_close()关闭msg参数对应的zmq_msg_t对象
    • 根据上面的特性,如果你想重复使用zmq_msg_send()之前msg参数对应的数据,那么可以在调用zmq_msg_send()之前使用zmq_msg_copy()拷贝msg参数,这样就有多个zmq_msg_t对象引用msg对引用的数据
    • 成功调用zmq_msg_send()并不表示消息已传输到网络中,仅表名它已在套接字上排队并且ØMQ承担了对该消息的责任

参数

  • msg:要发送的消息
  • socket:操作的套接字
  • flags:一些标志,如下所示:
    • ZMQ_DONTWAIT:对于套接字类型(DEALER,PUSH),当没有可用的对等点(或所有的对等点有完整的高水位标记)时阻塞,指定该操作应该在非阻塞模式下执行。如果消息不能在套接字上排队,则zmq_msg_send()函数将失败,errno设置为EAGAIN
    • ZMQ_SNDMORE:指定要发送的消息是多部分消息,并且后面还将有其他消息部分。详情参阅下面“多部分消息”介绍和演示案例②

返回值

  • 成功:返回消息中的字节数(如果字节数大于MAX_INT,函数将返回MAX_INT)
  • 失败:返回-1,并将errno设置为以下定义的值之一:
    • EAGAIN:zmq_msg_send()在非阻塞模式(设置了ZMQ_DONTWAIT)下发送消息,但无法发送消息(详情见上面的ZMQ_DONTWAIT套接字选项)
    • ENOTSUP:此套接字类型不支持zmq_msg_send()操作
    • EINVAL:发送方试图发送多部分数据,这是套接字类型不允许的
    • EFSM:由于套接字不处于适当的状态,目前无法在该套接字上执行zmq_msg_send()操作。如果套接字类型在多个状态之间切换,比如ZMQ_REP,可能会发生此错误。有关更多信息,请参阅zmq_socket()的消息传递模式部分
    • ETERM:与指定套接字关联的ØMQ 上下文已终止(可以参阅zmq_ctx_destroy():https://blog.csdn.net/qq_41453285/article/details/105993260
    • ENOTSOCK:提供的套接字无效
    • EINTR:在消息被发送之前,一个信号的发送中断了操作
    • EFAULT:无效的消息
    • EHOSTUNREACH:该消息无法路由

多部分消息

  • 多部分消息在文章最初已经介绍过了,详情见文章最开始即可,此处做一个简单的介绍就可以了
  • ØMQ消息由1或更多的消息部分。每个消息部分本身是一个独立的zmq_msg_t
  • 发送多部分消息的应用程序在发送每个消息部分(最后一个消息除外)时必须使用ZMQ_SNDMORE标志(见下面演示案例②)

演示案例①

  • 下面是一个客户端程序,其创建一个msg1对象和一个msg2对象,然后把msg1的内容拷贝给msg2,然后再将msg1的数据发送给服务端(这里想测一下zmq_msg_send()对参数1的影响)
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main()
{
    // 1.初始化上下文
    void *context = zmq_ctx_new();
    if(context == NULL)
    {
        printf("zmq_ctx_new error\n");
        return -1;
    }

    // 2.创建套接字, 绑定地址
    void *requester = zmq_socket(context, ZMQ_REQ);
    if(zmq_connect(requester, "tcp://localhost:5555") == -1)
    {
        printf("zmq_connect error\n");
        return -1;
    }

    // 3.初始化msg1
    printf("初始化第一条消息msg1:\n");
    zmq_msg_t msg1;
    zmq_msg_init_size(&msg1, 6);
    memcpy(zmq_msg_data(&msg1), "Hello", 6);
    printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));

    // 4.将msg1拷贝给msg2
    printf("将msg1拷贝给msg2:\n");
    zmq_msg_t msg2;
    zmq_msg_init_size(&msg2, 6);
    zmq_msg_copy(&msg2, &msg1);
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));
    
    //5.发送数据
    printf("发送msg1:\n");
    if(zmq_msg_send(&msg1, requester, 0) == -1)
    {
        printf("zmq_msg_send error\n");
        return -1;
    }
    //可以看到zmq_msg_send()是把msg1的大小设置为0(标志其不能去使用了), 但是仍可以可以访问msg1的数据
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 6.关闭msg1, 因为msg1的大小被设置为0, 不再使用了
    printf("关闭msg1:\n");
    zmq_msg_close(&msg1);
    //printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1)); //msg1被关闭了, 不能再去使用了
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));
    
    // 7. 关闭msg2
    printf("关闭msg2:\n");
    zmq_msg_close(&msg2);
    //printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2)); //msg2被关闭了, 不能再去使用了
    
    // 8.关闭套接字, 释放上下文
    zmq_close(requester);
    zmq_ctx_term(context);

    return 0;
}
  • 运行结果如下所示:
    • 左侧就是我们上面这个程序,右侧是服务端代码(服务端代码这里就不给出了,想看的可以参阅https://github.com/dongyusheng/csdn-code/blob/master/ZeroMQ/hwserver.c) 
    • 可以看到在zmq_msg_send()之后,函数会把参数1的zmq_msg_t对象的大小设置为0,但是没有关闭(zmq_msg_close())该对象,因此我们还可以访问该对象
    • 但是在 zmq_msg_send()之后,建议手动关闭(zmq_msg_close())参数1的zmq_msg_t对象大小

演示案例②

  • 下面发送多部分信息,我们同时像服务端发送了msg1和msg2两个消息,备注:
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main()
{
    // 1.初始化上下文
    void *context = zmq_ctx_new();
    if(context == NULL)
    {
        printf("zmq_ctx_new error\n");
        return -1;
    }

    // 2.创建套接字, 绑定地址
    void *requester = zmq_socket(context, ZMQ_REQ);
    if(zmq_connect(requester, "tcp://localhost:5555") == -1)
    {
        printf("zmq_connect error\n");
        return -1;
    }

    // 3.初始化msg1
    printf("初始化第一条消息msg1:\n");
    zmq_msg_t msg1;
    zmq_msg_init_size(&msg1, 6);
    memcpy(zmq_msg_data(&msg1), "Hello", 6);
    printf("\tmsg1: %s, size: %d\n\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));

    // 4.初始化msg2
    printf("初始化第二消息msg2:\n");
    zmq_msg_t msg2;
    zmq_msg_init_size(&msg2, 6);
    memcpy(zmq_msg_data(&msg2), "Hello", 6);
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));
    
    //5.将msg1和msg2原子地发送出去, 其中第一个send需要指明ZMQ_SNDMORE标记
    printf("发送msg1和msg2:\n");
    if(zmq_msg_send(&msg1, requester, ZMQ_SNDMORE) == -1)
    {
        printf("zmq_msg_send error\n");
        return -1;
    }
    if(zmq_msg_send(&msg2, requester, 0) == -1)
    {
        printf("zmq_msg_send error\n");
        return -1;
    }
    //可以看到zmq_msg_send()是把msg1和msg2的大小设置为0(标志其不能去使用了), 但是仍可以可以访问它们的数据
    printf("\tmsg1: %s, size: %d\n", (char*)zmq_msg_data(&msg1), (int)zmq_msg_size(&msg1));
    printf("\tmsg2: %s, size: %d\n\n", (char*)zmq_msg_data(&msg2), (int)zmq_msg_size(&msg2));

    // 6.关闭msg1和msg2, 它们都不再使用了
    printf("关闭msg1和msg2:\n");
    zmq_msg_close(&msg1);
    zmq_msg_close(&msg2);
    
    // 8.关闭套接字, 释放上下文
    zmq_close(requester);
    zmq_ctx_term(context);

    return 0;
}
  • 效果如下所示:
    • 左侧发送了两条消息给服务端,右侧服务端收到两条消息
    • 注意:服务端是通过调用两次zmq_msg_recv()接收数据的,不是一次zmq_msg_recv(),见下面解析

  • 服务端代码解析:服务端核心代码如下,先是接收客户端数据,然后再向服务端发送数据。通过上图我们知道,服务端打印了两次“Received Hello”,所以服务端执行了两次while(1),每次执行while循环的时候接收数据,然后发送数据,发送数据的时候由于客户端没有接收函数,所以其直接返回继续执行下一次循环,因此上面打印了两次“Received Hello”

十七、接收消息(zmq_msg_recv)

int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-recv
  • 功能:在指定的套接字上接收消息
  • 相关描述:
    • 该函数替换了zmq_recvmsg()函数,zmq_recvmsg()不推荐使用了,关于zmq_recvmsg()可以参阅:http://api.zeromq.org/master:zmq-recvmsg
    • zmq_msg_recv()函数将从套接字参数引用的套接字接收消息部分,并将其存储在msg参数引用的消息中
    • 如果msg之前存储有消息则会被正确的释放
    • 如果指定的套接字上没有可用的消息部分,则zmq_msg_recv()函数将阻塞,直到满足请求为止

参数

  • msg:用来保存接收的数据
  • socket:操作的套接字
  • flags:一些标志,如下所示:
    • ZMQ_DONTWAIT:指定该操作在非阻塞模式下执行。如果指定的套接字上没有可用的消息,则zmq_msg_recv()函数将失败并将errno设置为EAGAIN

返回值

  • 成功:返回接收的消息的字节数。注意,如果消息被截断,该值可能会超过len参数的值
  • 失败:返回-1,并将errno设置为以下定义的值之一:
    • EAGAIN:zmq_msg_recv()函数在非阻塞模式(ZMQ_DONTWAIT)下运行,套接字上没有数据可接收
    • ENOTSUP:这个套接字类型不支持zmq_msg_recv()操作
    • EFSM:由于套接字不处于适当的状态,目前无法在该套接字上执行zmq_msg_recv()操作。如果套接字类型在多个状态之间切换,比如ZMQ_REP,可能会发生此错误。有关更多信息,请参阅zmq_socket()的消息传递模式部分
    • ETERM:与指定套接字关联的ØMQ上下文已终止(可以参阅zmq_ctx_destroy():https://blog.csdn.net/qq_41453285/article/details/105993260
    • ENOTSOCK:提供的套接字无效
    • EINTR:在消息被发送之前,一个信号的发送中断了操作

多部分消息

  • 多部分消息在文章最初已经介绍过了,详情见文章最开始即可,此处做一个简单的介绍就可以了
  • ØMQ消息由1或更多的消息部分。每个消息部分本身是一个独立的zmq_msg_t
  • 处理多部分消息的应用程序在调用zmq_msg_recv()之后,必须传递ZMQ_RCVMORE选项给zmq_getsockopt()来确定是否还有其他部分要接收

演示案例①

  • 从套接字接收消息
// 初始化消息结构
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);

// 接收消息
rc = zmq_msg_recv (&msg, socket, 0);
assert (rc != -1);

// 释放消息
zmq_msg_close (&msg);

演示案例②

  • 接收多部分消息
int more;
size_t more_size = sizeof (more);
do {
    // 初始化消息
    zmq_msg_t part;
    int rc = zmq_msg_init (&part);
    assert (rc == 0);
    
    // 接收消息
    rc = zmq_msg_recv (&part, socket, 0);
    assert (rc != -1);
    
    //判断是否还有消息要接收
    rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
    assert (rc == 0);

    // 关闭消息
    zmq_msg_close (&part); 
} while (more);

十八、判断是否还有很多的消息要接收(zmq_msg_more)

int zmq_msg_more(zmq_msg_t *message);
  • API参考文档:http://api.zeromq.org/master:zmq-msg-more
  • 功能:查询是否还有更多消息要接收
  • 相关描述:
    • zmq_msg_more()函数判断message参数所指的消息是否是由多个部分组成的消息的一部分,以及是否有其他部分要接收
    • 可以在zmq_msg_close()之后安全地调用此方法。该方法与带有ZMQ_MORE参数的zmq_msg_get()相同
  • 参数:message:要判断的消息结构
  • 返回值:
    • 成功0:如果这是多部分消息的最后一部分,或者是单部分消息的唯一一部分,返回0
    • 失败1:如果这是多部分消息的最后一部分,或者是单部分消息的唯一一部分
  • 演示案例如下:接收多部分信息.
zmq_msg_t part;
while (true) {
    
    // 初始化消息
    int rc = zmq_msg_init (&part);
    assert (rc == 0);
    
    // 接收消息
    rc = zmq_msg_recv (socket, &part, 0);
    assert (rc != -1);

    // 判断是否还有更多消息要接收, 如果继续循环接收
    if (zmq_msg_more (&part))
        fprintf (stderr, "more\n");
    else { //否则退出循环
        fprintf (stderr, "end\n");
        break;
    }

    //关闭消息
    zmq_msg_close (&part); 
}

十九、其他接口


  • 我是小董,V公众点击"笔记白嫖"解锁更多【ZeroMQ】资料内容。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐