ZeroMQ 教程 002 : 高级技巧 (5)

我们先看一下, 如果没有多路IO接口, 如果我们要从两个socket上接收数据, 我们会怎样做. 下面是一个没什么卵用的示例程序, 它试图从两个socket上读取数据, 使用了异步I/O. (如果你有印象的话, 应该记得对应的两个endpoint实际上是我们在第一章写的两个示例程序的数据生产方: 天气预报程序与村口的大喇叭)

#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; while(1) { int size = zmq_recv(receiver, msg, 255, ZMQ_DONTWAIT); if(size != -1) { // 接收数据成功 } else { break; } } while(1) { int size = zmq_recv(subscriber, msg, 255, ZMQ_DONTWAIT); if(size == -1) { // 接收数据成功 } else { break; } } sleep(1); // 休息一下, 避免疯狂循环 } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }

在没有多路IO手段之前, 这基本上就是你能做到的最好情形了. 大循环里的sleep()让人浑身难受. 不加sleep()吧, 在没有数据的时候, 这个无限空循环能把一个核心的cpu占满. 加上sleep()吧, 收包又会有最坏情况下1秒的延时.

但有了zmq_poll()接口就不一样了, 代码就会变成这样:

#include <zmq.h> #include <stdio.h> int main(void) { void * context = zmq_ctx_new(); void * receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:5557"); void * subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "10001 ", 6); while(1) { char msg[256]; zmq_pollitem_t items[] = { {receiver, 0, ZMQ_POLLIN, 0}, {subscriber,0, ZMQ_POLLIN, 0}, }; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { int size = zmq_recv(receiver, msg, 255, 0); if(size != -1) { // 接收消息成功 } } if(items[1].revents & ZMQ_POLLIN) { int size = zmq_recv(subscriber, msg, 255, 0); if(size != -1) { // 接收消息成功 } } } zmq_close(receiver); zmq_close(subscriber); zmq_ctx_destroy(context); return 0; }

zmq_pollitem_t类型定义如下, 这个定义可以从zmq_poll()的manpage里查到

typedef struct{ void * socket; // ZMQ的socket int fd; // 是的, zmq_poll()还可以用来读写linux file descriptor short events; // 要被监听的事件, 基础事件有 ZMQ_POLLIN 和 ZMQ_POLLOUT, 分别是可读可写 short revents; // 从zmq_poll()调用返回后, 这里存储着触发返回的事件 } zmq_pollitem_t; 多帧消息的收发

我们之前提到过, 用户数据被包装成zmq_msg_t对象, 也就是帧, 而在帧上, 还有一个逻辑概念叫"消息". 那么在具体编码中, 如何发送多帧消息呢? 而又如何接收多帧消息呢? 简单的讲, 两点:

在发送时, 向zmq_msg_send()传入ZMQ_SNDMORE选项, 告诉发送接口, "我后面还有其它帧"

在接收消息时, 每调用一次zmq_msg_recv()接收一个帧, 就调用一次zmq_msg_more()或者zmq_getsockopt() + ZMQ_RCVMORE来判断是否这是消息的最后一个帧

发送示例:

zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, ZMQ_SNDMORE); zmq_msg_send(&msg, socket, 0); // 消息的最后一个帧

接收示例:

while(1) { zmq_msg_t msg; zmq_msg_init(&msg); zmq_msg_recv(&msg, socket, 0); // 做处理 zmq_msg_close(&msg); if(!zmq_msg_more(&msg)) // 注意, zmq_msg_more可以在zmq_msg_close后被安全的调用 { break; } }

这里有一个需要注意的有趣小细节: 要判断一个收来的帧是不是消息的最后一个帧, 有两种途径, 一种是zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &more_size), 另外一种是zmq_msg_more(&msg). 前一种途径的入参是socket, 后一种途径的入参是msg. 这真是很因缺思汀. 目前来说, 两种方法都可以, 不过我建议你使用zmq_getsockopt(), 至于原因嘛, 因为在zmq_msg_recv()的manpage中, 是这样建议的.

关于多帧消息, 你需要注意以下几点:

多帧消息的传输是原子性的, 这是由ZMQ保证的

原子性也意味着, 当你使用zmq_poll()时, 当socket可读, 并且用zmq_msg_recv()读出一个帧时, 代表着不用等待下一次循环, 你直接继续读取, 一定能读取能整个消息中剩余的其它所有帧

当一个多帧消息开始被接收时, 无论你是否通过zmq_msg_more()或zmq_getsockopt() + ZMQ_RCVMORE检查消息是否接收完整, 你一帧帧的收, 也会把整个消息里的所有帧收集齐. 所以从这个角度看, zmq_msg_more()可以在把所有可读的帧从socket里统一接收到手之后, 再慢慢判断这些帧应该怎么拼装. 所以这样看, 它和zmq_getsockopt()的功能也不算是完全重复.

当一个多帧消息正在发送时, 除了把socket关掉(暴力的), 否则你不能取消本次发送, 本次发送将持续至所有帧都被发出.

中介与代理

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgfs.html