ZeroMQ 教程 002 : 高级技巧 (12)

这里我们写一个例子吧, 我们把最初的请求-回应套路代码改造成多线程版的. 原始版的服务端是单进程单线程程序, 如果请求量比较低的话, 是没有什么问题的, 单线程的ZMQ应用程序吃满一个CPU核心是没有问题的, 但请求量再涨就有点捉襟见肘了, 这个时候就需要让程序吃满多个核心. 当然多进程服务也能完成任务, 但这里主要是为了介绍在多线程编程中使用ZMQ, 所以我们把服务端改造成多线程模式.

另外, 显然你可以使用一个掮客, 再外加一堆服务端结点(无论结点是独立的进程, 还是独立的机器)来让服务端的处理能力更上一层楼. 但这更跑偏了.

还是看代码吧. 服务端代码如下:

#include <pthread.h> #include <unistd.h> #include <assert.h> #include "zmq_helper.h" static void * worker_routine(void * context) { void * socket_to_main_thread = zmq_socket(context, ZMQ_REP); assert(socket_to_main_thread); zmq_connect(socket_to_main_thread, "inproc://workers"); while(1) { char * strReq = s_recv(socket_to_main_thread); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket_to_main_thread, "World"); } zmq_close(socket_to_main_thread); return NULL; } int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_client = zmq_socket(context, ZMQ_ROUTER); assert(socket_to_client); zmq_bind(socket_to_client, "tcp://*:5555"); void * socket_to_worker_thread = zmq_socket(context, ZMQ_DEALER); assert(socket_to_worker_thread); zmq_bind(socket_to_worker_thread, "inproc://workers"); for(int i = 0; i < 5; ++i) { pthread_t worker; pthread_create(&worker, NULL, worker_routine, context); } zmq_proxy(socket_to_client, socket_to_worker_thread, NULL); zmq_close(socket_to_client); zmq_close(socket_to_worker_thread); zmq_ctx_destroy(context); return 0; }

这就是一个很正统的设计思路, 多个线程之间是互相独立的, worker线程本身很容易能改造成独立的进程, 主线程做掮客.

使用 PAIR socket 进行线程间通信

来, 下面就是一个例子, 使用PAIR socket完成线程同步, 内部通信使用的是inproc

#include <zmq.h> #include <pthread.h> #include "zmq_helper.h" static void * thread1_routine(void * context) { printf("thread 1 start\n"); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_thread2, "inproc://thread_1_2"); printf("thread 1 ready, send signal to thread 2\n"); s_send(socket_to_thread2, "READY"); zmq_close(socket_to_thread2); printf("thread 1 end\n"); return NULL; } static void * thread2_routine(void * context) { printf("thread 2 start\n"); void * socket_to_thread1 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread1, "inproc://thread_1_2"); pthread_t thread1; pthread_create(&thread1, NULL, thread1_routine, context); char * str = s_recv(socket_to_thread1); free(str); zmq_close(socket_to_thread1); void * socket_to_mainthread = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket_to_mainthread, "inproc://thread_2_main"); printf("thread 2 ready, send signal to main thread\n"); s_send(socket_to_mainthread, "READY"); zmq_close(socket_to_mainthread); printf("thread 2 end\n"); return NULL; } int main(void) { printf("main thread start\n"); void * context = zmq_ctx_new(); void * socket_to_thread2 = zmq_socket(context, ZMQ_PAIR); zmq_bind(socket_to_thread2, "inproc://thread_2_main"); pthread_t thread2; pthread_create(&thread2, NULL, thread2_routine, context); char * str = s_recv(socket_to_thread2); free(str); zmq_close(socket_to_thread2); printf("Test over\n"); zmq_ctx_destroy(context); printf("main thread end\n"); return 0; }

这个简单的程序包含了几个编写多线程同步时的潜规则:

线程间同步使用 inproc PAIR 型的socket. 共享context

父线程bind, 子线程connect

需要注意的是, 上面这种写法的多线程, 很难拆成多个进程, 上面这种写法一般用于压根就不准备拆分的服务端应用程序. inproc很快, 性能很好, 但是不能用于多进程或多结点通信.

另外一种常见的设计就是使用tcp来传递同步信息. 使用tcp使得多线程拆分成多进程成为一种可能. 另外一种同步场景就是使用发布-订阅套路. 而不使用PAIR. 甚至可以使用掮客使用的ROUTER/DEALER进行同步. 但需要注意下面几点:

在使用PUSH/PULL做同步时, 需要注意: PUSH会把消息广播给所有PULL.注意这一点, 不要把同步消息发给其它线程

在使用ROUTER/DEALER做同步时. 需要注意: ROUTER会把你发送的消息装进一个"信封", 也就是说, 你调用zmq_send接口发送的消息将变成一个多帧消息被发出去. 如果你发的同步消息不带语义, 那么还好, 如果你发送的消息带语义, 那么请特别小心这一点, 多帧消息的细节将在第三章进行进一步讨论. 而DEALER则会把消息广播给所有对端, 这一点和PUSH一样, 请额外注意. 总之建立在阅读第三章之前, 不要用ROUTER或DEALER做线程同步.

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgfs.html