ZeroMQ 教程 002 : 高级技巧 (9)

下面我们写一个健壮的分治套路, 和我们在第一章中写过的类似, 不同的是, 这次, 在监理收到"所有工作均完成"的消息之后, 会发消息给各个工程队, 让工程队停止运行. 这个例子主要有两个目的:

向大家展示, 在使用ZMQ库的同时, 把代码写健壮

向大家展示如何优雅的干掉一个进程

原先的分治套路代码, 使用PUSH/PULL这两种socket类型, 将任务分发给多个工程队. 但在工作做完之后, 工程队的程序还在运行, 工程队的程序无法得知任务什么进修终止. 这里我们再掺入发布-订阅套路, 在工作做完之后, 监理向广大工程队, 通过PUB类型的socket发送"活干活了"的消息, 而工程队用SUB类型的socket一旦收到监理的消息, 就停止运行.

包工头ventilator的代码和上一章的一毛一样, 只是对所有的ZMQ库函数调用增加了错误处理. 照顾大家, 这里再帖一遍

#include <zmq.h> #include <stdio.h> #include <time.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); assert(socket_to_worker); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_bind(socket_to_worker, "tcp://*:5557") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n"); if(s_send(socket_to_sink, "Get ur ass up") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } srandom((unsigned)time(NULL)); int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); if(s_send(socket_to_worker, string) == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } } printf("Total expected cost: %d ms\n", total_ms); zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context); return 0; }

接下来是工程队worker的代码, 这一版新增了一个socket_to_sink_of_control来接收来自监理的停止消息:

#include <zmq.h> #include <assert.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); assert(socket_to_ventilator); if(zmq_connect(socket_to_ventilator, "tcp://localhost:5557") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); assert(socket_to_sink); if(zmq_connect(socket_to_sink, "tcp://localhost:5558") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } void * socket_to_sink_of_control = zmq_socket(context, ZMQ_SUB); assert(socket_to_sink_of_control); if(zmq_connect(socket_to_sink_of_control, "tcp://localhost:5559") == -1) { printf("E: connect failed: %s\n", strerror(errno)); return -1; } if(zmq_setsockopt(socket_to_sink_of_control, ZMQ_SUBSCRIBE, "", 0) == -1) { printf("E: setsockopt failed: %s\n", strerror(errno)); } zmq_pollitem_t items [] = { { socket_to_ventilator, 0, ZMQ_POLLIN, 0 }, { socket_to_sink_of_control, 0, ZMQ_POLLIN, 0 }, }; while(1) { if(zmq_poll(items, 2, -1) == -1) { printf("E: poll failed: %s\n", strerror(errno)); return -1; } if(items[0].revents & ZMQ_POLLIN) { char * strWork = s_recv(socket_to_ventilator); assert(strWork); printf("%s.", strWork); fflush(stdout); s_sleep(atoi(strWork)); free(strWork); if(s_send(socket_to_sink, "") == -1) { printf("E: s_send failed %s\n", strerror(errno)); return -1; } } if(items[1].revents & ZMQ_POLLIN) { break; } } zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_close(socket_to_sink_of_control); zmq_ctx_destroy(context); return 0; }

接下来是监理的代码, 这一版新增了socket_to_worker_of_control来在任务结束之后给工程队发布停止消息:

#include <zmq.h> #include <assert.h> #include <stdint.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); assert(context); void * socket_to_worker = zmq_socket(context, ZMQ_PULL); if(zmq_bind(socket_to_worker, "tcp://*:5558") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } void * socket_to_worker_of_control = zmq_socket(context, ZMQ_PUB); if(zmq_bind(socket_to_worker_of_control, "tcp://*:5559") == -1) { printf("E: bind failed: %s\n", strerror(errno)); return -1; } char * strBeginMsg = s_recv(socket_to_worker); assert(strBeginMsg); free(strBeginMsg); int64_t i64StartTime = s_clock(); for(int i = 0; i < 100; ++i) { char * strRes = s_recv(socket_to_worker); assert(strRes); free(strRes); if(i % 10 == 0) { printf(":"); } else { printf("."); } fflush(stdout); } printf("Total elapsed time: %d msec\n", (int)(s_clock() - i64StartTime)); if(s_send(socket_to_worker_of_control, "STOP") == -1) { printf("E: s_send failed: %s\n", strerror(errno)); return -1; } zmq_close(socket_to_worker); zmq_close(socket_to_worker_of_control); zmq_ctx_destroy(context); return 0; }

这个例子也展示了如何将多种套路揉合在一个场景中. 所以说写代码, 思维要灵活.

处理POSIX Signal

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgfs.html