通过ZeroMQ的pub/sub模式,我们可以实现发送推送消息的功能。以下为示例代码(入门可参考此文:):
(def ctx (ZMQ/context 1))
(def msg-list (atom ())) ; 消息列表
(def stop-signal (atom false)) ; 停止发送服务标识
(defn msg-publisher
[]
(let [s (.socket ctx ZMQ/PUB)]
(.bind s “tcp://x.x.x.x:xxxx”)
(while (false? @stop-signal) ; 遇到停止信号则退出发送循环
(loop [msgs @msg-list] ; 对消息列表进行循环发送处理
(if (empty? msgs)
(do
(reset! msg-list ()) ; 全部发送后清空消息列表
(.send s (c/generate-string "0")) ; 发送结束标识
(Thread/sleep 1000) ; 延时1秒后再重新读取,以免发送空数据太频繁
)
(do
(.send s (c/generate-string (first msgs))) ; 发送消息
(recur (rest msgs))) ; 发送下一条消息
)))
(.close s)))
通过(future-call msg-publisher)将msg-publisher常驻线程后,msg-publisher会自动读取msg-list列表,将新增加的内容推送给客户端。下面附上测试代码:
(deftest test-msg-publisher
(do
(let [f (future-call msg-publisher)
s (.socket ctx ZMQ/SUB)]
(reset! stop-signal false)
f
(.subscribe s ZMQ/SUBSCRIPTION_ALL)
(.connect s “tcp://x.x.x.x:xxxx”)
(reset! msg-list (range 10000)) ; 产生消息10000条,但是只接收1000条,这是因为连接延时的问题,
(loop [exec-times 1000 ; 导致不可能将全部消息收全
msg-count 0]
(if (= 0 exec-times)
(is (= 1000 msg-count))
(do
(let [msg (c/parse-string (.recvStr s))]
;(println msg)
(if (not (= "0" msg)) ; 如果为0则表示不是我们希望要的数据
(recur (dec exec-times) (inc msg-count))
(recur (dec exec-times) msg-count)))))
)
(.close s)
(reset! stop-signal true)
(future-cancel f)
(is (future-cancelled? f)))))
运行lein test,如果输出如下就表示运行正常。
相关阅读:
在Ubuntu 11.04上安装ZeroMQ