下面是发布者的代码:
#include <zmq.h> #include <unistd.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5563"); while(1) { s_sendmore(socket, "A"); s_send(socket, "We don\'t want to see this"); s_sendmore(socket, "B"); s_send(socket, "We would like to see this"); sleep(1); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }下面是订阅者的代码:
#include <zmq.h> #include "zmq_helper.h" int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5563"); zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "B", 1); while(1) { char * strMsgType = s_recv(socket); char * strMsgContent = s_recv(socket); printf("[%s] %s\n", strMsgType, strMsgContent); free(strMsgType); free(strMsgContent); } zmq_close(socket); zmq_ctx_destroy(socket); return 0; }这里有两点:
0. 过滤器过滤的是整个消息, 第一帧对不上, 后面所有的帧都不要了
0. ZMQ库保证, 多帧消息的传输是原子性的. 你不会收到一个缺帧的消息
消息越发越快, 越发越多, 你慢慢的就会意识到一个问题: 内存资源很宝贵, 并且很容易被用尽. 如果你不注意到这一点, 服务器上某个进程阻塞个几秒钟, 就炸了.
想象一下这个场景: 在同一台机器上, 有一个进程A在疯狂的向进程B发送消息. 突然, B觉得很累, 休息了3秒(比如CPU过载, 或者B在跑GC吧, 无所谓什么原因), 这3秒钟B处理不过来A发送的数据了. 那么在这3秒钟, A依然疯狂的试图向B发送消息, 会发生什么? 如果B有收包缓冲区的话, 这个缓冲区肯定被塞满了, 如果A有发送缓冲区的话, 这个缓冲区也应该被塞满了. 剩余的没被发出去的消息就堆积到A进程的内存空间里, 这个时候如果A程序写的不好, 那么A进程由于内存被疯狂占用, 很快就会挂掉.
这是一个消息队列里的经典问题, 就是消息生产者和消费者的速度不匹配的时候, 消息中间件应当怎么设计的问题. 这个问题的根其实是在B身上, 但B对于消息队列的设计者来说是不可控的: 这是消息队列使用者写的B程序, 你怎么知道那波屌人写的啥屌代码? 所以虽然问题由B产生, 但最好还是在A那里解决掉.
最简单的策略就是: A保留一些缓存能力, 应对突发性的状况. 超过一定限度的话, 就要扔消息了. 不能把这些生产出来的消息, 发不出去还存着. 这太蠢了.
另外还有一种策略, 如果A只是一个消息中转者, 可以在超过限度后, 告诉生产消息的上流, 你停一下, 我这边满了, 请不要再给我发消息了. 这种情况下的解决方案, 其实就是经典的"流控"问题. 这个方案其实也不好, A只能向上游发出一声呻吟, 但上游如果执意还是要发消息给A, A也没办法去剪网线, 所以转一圈又回来了: 还是得扔消息.
ZMQ里, 有一个概念叫"高水位阈值", (high-water mark. HWM), 这个值其实是网络结点自身能缓存的消息的能力. 在ZMQ中, 每一个活动的连接, 即socket, 都有自己的消息缓冲队列, HWM指的就是这个队列的容量. 对于某些socket类型, 诸如SUB/PULL/REQ/REP来说, 只有收包队列. 对于某此socket类型来说, 诸如DEALER/ROUTER/PAIR, 既能收还能发, 就有两个队列, 一个用于收包, 一个用于发包.
在ZMQ 2.X版本中, HWM的值默认是无限的. 这种情况下很容易出现我们这一小节开头讲的问题: 发送消息的api接口永远不会报错, 对端假死之后内存就会炸. 在ZMQ 3.X版本中, 这个值默认是1000, 这就合理多了.
当socket的HWM被触及后, 再调用发送消息接口, ZMQ要么会阻塞接口, 要么就扔掉消息. 具体哪种行为取决于sokcet的类型.
对于PUB和ROUTER类型的socket来说, 会扔数据.
对于其它类型的socket, 会阻塞接口.
显然在这种情况下, 如果以非阻塞形式发包, 接口会返回失败.
另外, 很特殊的是, inproc类型两端的两个socket共享同一个队列: 真实的HWM值是双方设置的HWM值的总和. 你可以将inproc方式想象成一根管子, 双方设置HWM时只是在宣称我需要占用多长的管子, 但真实的管子长度是二者的总和.
最后, 很反直觉的是, HWM的单位是消息个数, 而不是字节数. 这就很有意思了. 另外, HWM触顶时, 队列中的消息数量一般不好刚好就等于你设置的HWM值, 真实情况下, 可能会比你设置的HWM值小, 极端情况下可能只有你设置的HWM的一半.
数据丢失问题当你写代码, 编译, 链接, 运行, 然后发现收不到消息, 这个时候你应当这样排查:
如果你使用的是SUB类型的socket, 检查一下有没有调用zmq_setsockopt设置过滤器
如果你使用的是SUB类型的socket, 谨记在建立连接过程中, 对端的PUB发送的数据你是收不到了, 如果你确实想要这部分数据, 请做同步处理