ZeroMQ 教程 002 : 高级技巧 (13)

你还可以使用PUB/SUB来做线程同步. PUB/SUB不会封装你发送的消息, 你发啥就是啥, 但你需要每次为SUB端通过zmq_setsockopt设置过滤器, 否则SUB端收不到任何消息, 这一点很烦.

所以总的来说, 用PAIR是最方便的选择.

不同机器结点间的同步

当你需要同步, 或者协调的两个结点位于两个不同的机器上时, PAIR就不那么好用了, 直接原因就是: PAIR不支持断线重连. 在同一台机器上, 多个进程之间同步, 没问题, 多个线程之间同步, 也没问题. 因为单机内建立起的通讯连接基本不可能发生意外中断, 而一旦发生中断, 一定是进程挂了, 这个时候麻烦事是进程为什么挂了, 而不是通讯连接为什么挂了.

但是在跨机器的结点间进行同步, 就需要考虑到网络波动的原因了. 结点本身上运行的服务可能没什么问题, 但就是网线被剪了, 这种情况下使用PAIR就不再合适了, 你就必须使用其它socket类型了.

另外, 线程同步与跨机器结点同步之间的另外一个重大区别是: 线程数量一般是固定的, 服务稳定运行期间, 线程数目一般不会增加, 也不会减少. 但跨机器结点可能会横向扩容. 所以要考虑的事情就又我了一坨.

我们下面会给出一个示例程序, 向你展示跨机器结点之间的同步到底应该怎么做. 还记得上一章我们讲发布-订阅套路的时候, 提到的, 在订阅方建立连接的那段短暂的时间内, 所有发布方发布的消息都会被丢弃吗? 这里我们将改进那个程序, 在下面改进版的发布-订阅套路中, 发布方会等待所有订阅方都建立连接完成后, 才开始广播消息. 下面将要展示的代码主要做了以下的工作:

PUB方提前知道SUB方的数量

PUB方启动, 等待SUB方连接, 发送就绪信息.

当所有SUB方连接完毕后, 开始工作.

而同步工作是由REQ/REP完成的.

来看代码:

发布方代码:

#include <zmq.h> #include "zmq_helper.h" #define SUBSCRIBER_EXPECTED 10 int main(void) { void * context = zmq_ctx_new(); void * socket_for_pub = zmq_socket(context, ZMQ_PUB); int sndhwm = 1100000; zmq_setsockopt(socket_for_pub, ZMQ_SNDHWM, &sndhwm, sizeof(int)); zmq_bind(socket_for_pub, "tcp://*:5561"); void * socket_for_sync = zmq_socket(context, ZMQ_REP); zmq_bind(socket_for_sync, "tcp://*:5562"); printf("waiting for subscribers\n"); int subscribers_count = 0; while(subscribers_count < SUBSCRIBER_EXPECTED) { char * str = s_recv(socket_for_sync); free(str); s_send(socket_for_sync, ""); subscribers_count++; } printf("broadingcasting messages\n"); for(int i = 0; i < 1000000; ++i) { s_send(socket_for_pub, "Lalalal"); } s_send(socket_for_pub, "END"); zmq_close(socket_for_pub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }

订阅方代码

#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_sub = zmq_socket(context, ZMQ_SUB); zmq_connect(socket_for_sub, "tcp://localhost:5561"); zmq_setsockopt(socket_for_sub, ZMQ_SUBSCRIBE, "", 0); sleep(1); void * socket_for_sync = zmq_socket(context, ZMQ_REQ); zmq_connect(socket_for_sync, "tcp://localhost:5562"); s_send(socket_for_sync, ""); char * str = s_recv(socket_for_sync); free(str); int i = 0; while(1) { char * str = s_recv(socket_for_sub); if(strcmp(str, "END") == 0) { free(str); break; } free(str); i++; } printf("Received %d broadcast message\n", i); zmq_close(socket_for_sub); zmq_close(socket_for_sync); zmq_ctx_destroy(context); return 0; }

最后带一个启动脚本:

#! /bin/bash echo "Starting subscribers..." for((a=0; a<10; a++)); do ./subscriber & done echo "Starting publisher..." ./publisher

运行启动脚本之后, 你大概会得到类似于下面的结果:

Starting subscribers... Starting publisher... waiting for subscribers broadingcasting messages Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message Received 1000000 broadcast message

你看, 这次有了同步手段, 每个订阅者都确实收到了100万条消息, 一条不少

上面的代码还有一个细节需要你注意一下:

注意到在订阅者的代码中, 有一行sleep(1), 如果去掉这一行, 运行结果可能(很小的概率)不是我们期望的那样. 之所以这样做是因为:

先创建用于接收消息的socket_for_sub, 然后connect之. 再去做同步操作. 有可能: 同步的REQ与REP对话已经完成, 但是socket_for_sub的连接过程还没有结束. 这个时候还是会丢掉消息. 也就是说, 这个sleep(1)操作是为了确认: 在同步操作完成之后, 用于发布-订阅套路的通讯连接一定建立好了.

零拷贝

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgfs.html