ZeroMQ 教程 002 : 高级技巧 (7)

掮客可以记录会话状态, 可以保证某一个特定的客户端始终与一个固定的服务端进行数据交互. 某种程度上, 掮客与客户端分别记录部分会话信息, 服务端可以在无状态的情况下实现"唠嗑套路"

所以, 在请求回应套路中加入掮客, 是一个很明智的选择, 这就是第二种思路, 这种思路不是没有缺陷, 有, 而且很明显: 掮客是整个系统中最脆弱的部分.

但这个缺陷可以在一定程度上克服掉:

如果单机掮客转发能力不够, 那么可以搞多个掮客. 比如N个客户端,M个服务端, 3个掮客. 客户端与3个掮客建立全连接, 3个掮客与M个服务端建立全连接. 总是要好过N个客户端与M个服务端建立全连接的.

如果单机掮客缓冲能力不够, 甚至可以加多层掮客. 这种使用方法就把掮客的缓冲特性放在了首位.

ZMQ中, 有两个特殊的socket类型特别适合掮客使用:

ROUTER 用于掮客与多个客户端连接. 掮客bind, 客户端connect.

DEALER 用于掮客和多个服务端连接. 掮客bind, 服务端connect.

关于这两种特殊的socket的特性, 后续我们会仔细深入, 目前来说, 你只需要了解

它们实现了消息的缓冲

它们通过一种特殊的机制记录了会话

多说无益, 来看代码. 下面是在客户端与服务端中插入掮客的代码实例:

客户端

#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REQ); zmq_connect(socket, "tcp://localhost:5559"); for(int i = 0; i < 10; ++i) { s_send(socket, "Hello"); char * strRsp = s_recv(socket); printf("Received reply %d [%s]\n", i, strRsp); free(strRsp); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }

服务端

#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_REP); zmq_connect(socket, "tcp://localhost:5560"); while(1) { char * strReq = s_recv(socket); printf("Received request: [%s]\n", strReq); free(strReq); sleep(1); s_send(socket, "World"); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }

掮客

#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket_for_client = zmq_socket(context, ZMQ_ROUTER); void * socket_for_server = zmq_socket(context, ZMQ_DEALER); zmq_bind(socket_for_client, "tcp://*:5559"); zmq_bind(socket_for_server, "tcp://*:5560"); zmq_pollitem_t items[] = { { socket_for_client, 0, ZMQ_POLLIN, 0 }, { socket_for_server, 0, ZMQ_POLLIN, 0 }, }; while(1) { zmq_msg_t message; zmq_poll(items, 2, -1); if(items[0].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_client, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_server, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } if(items[1].revents & ZMQ_POLLIN) { while(1) { zmq_msg_init(&message); zmq_msg_recv(&message, socket_for_server, 0); int more = zmq_msg_more(&message); zmq_msg_send(&message, socket_for_client, more ? ZMQ_SNDMORE : 0); zmq_msg_close(&message); if(!more) { break; } } } } zmq_close(socket_for_client); zmq_close(socket_for_server); zmq_ctx_destroy(context); return 0; }

客户端和服务端由于掮客的存在, 代码都简单了不少, 对于掮客的代码, 有以下几点需要思考:

为什么客户端和服务端双方在代码中以s_send与s_recv互相传递字符串, 但在掮客那里就需要用zmq_msg_t进行转发呢?

为什么掮客在转发消息的时候, 还需要判断是否是多帧消息呢?

更进一步的, 如果有多个客户端与多个服务端, 客户端A向掮客发送请求, 掮客将其转发到了服务端B, 然后B回包, 发向掮客, 当回包消息到达掮客时, 掮客是如何将回包消息正确投递给A, 而不是其它客户端的呢?

上面三点其实是同一个问题: 掮客是如何实现带会话追踪的转发消息的?

另外, 如果你先启动掮客, 再启动客户端, 再启动服务端. 你会看到在服务端正确启动后, 客户端显示它收到了回包.那么:

在服务端未启动时, 显然在客户端的角度来讲, 客户端已经将第一个请求投递给了掮客. 如果此时有1000个客户端与掮客相连, 1000个首请求消息是如何存储的? 10000个呢? 什么时候掮客会丢弃请求?

这就是有关掮客的第二个问题: 如何配置缓冲区.

本章目前暂时不会对这三个问题做出解答, 大家先思考一下. 我们将在下一章深入掮客的细节进行进一步探索.

ZMQ内置的掮客函数

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypgfs.html