假设你已经知道什么是ZeroMQ(不知道的话可以看这个:%C3%98MQ),以下就给出在Clojure中如何使用ZeroMQ(感谢此文作者:)。
1. 创建一个Clojure项目,这里我们用leiin。lein new app zmq-test
2. 在project.clj文件中添加
[com.rmoquin.bundle/jeromq "0.2.0"]
cheshire "5.3.1"]]
其中jeromq就是我们需要使用的ZeroMQ类库(纯Java实现),cheshire用于双向处理json。
3. 打开core.clj文件,输入如下代码:
(ns zmq-test.core
(:import [org.jeromq ZMQ])
(:require (cheshire [core :as c])))
(def ctx (ZMQ/context 1))
;; REQ/REP [Request-Reply] Pattern
;; In REPL, input
;; (future-call echo-server)
;; (echo "hi")
;; to run the demo function
(defn echo-server
[]
(let [s (.socket ctx ZMQ/REP)]
(.bind s "tcp:// 127.0.0.1:5555")
(loop [msg (.recv s)]
(.send s msg)
(recur (.recv s)))))
(defn echo
[msg]
(let [s (.socket ctx ZMQ/REQ)]
(.connect s "tcp:// 127.0.0.1:5555")
(.send s msg)
(println "Server replied:" (String. (.recv s)))
(.close s)))
;; PUB/SUB [Publish-Subscribe] Pattern
;; In REPL, input
;; (future-call market-data-publisher)
;; (get-market-data 100)
;; to run the demo function
(defn market-data-publisher
[]
(let [s (.socket ctx ZMQ/PUB)
market-data-event (fn []
{:symbol (rand-nth ["CAT" "UTX"])
:size (rand-int 1000)
:price (format "%.2f" (rand 50.0))})]
(.bind s "tcp:// 127.0.0.1:6666")
(while :true
(.send s (c/generate-string (market-data-event))))))
(defn get-market-data
[num-events]
(let [s (.socket ctx ZMQ/SUB)]
(.subscribe s "")
(.connect s "tcp://127.0.0.1:6666")
(dotimes [_ num-events]
(println (c/parse-string (String. (.recv s)))))
(.close s)))
;; PUSH/PULL [Pipeline] Pattern
;; In REPL, input
;; (future-call collector)
;; (future-call worker)
;; (future-call worker)
;; (future-call worker)
;; (dispatcher 100)
;; to run the demo function
(defn dispatcher
[jobs]
(let [s (.socket ctx ZMQ/PUSH)]
(.bind s "tcp://127.0.0.1:7777")
(Thread/sleep 1000)
(dotimes [n jobs]
(.send s (str n)))
(.close s)))
(defn worker
[]
(let [rcv (.socket ctx ZMQ/PULL)
snd (.socket ctx ZMQ/PUSH)
id (str (gensym "w"))]
(.connect rcv "tcp://127.0.0.1:7777")
(.connect snd "tcp://127.0.0.1:8888")
(while :true
(let [job-id (String. (.recv rcv))
proc-time (rand-int 100)]
(Thread/sleep proc-time)
(.send snd (c/generate-string {:worker-id id
:job-id job-id
:processing-time proc-time}))))))
(defn collector
[]
(let [s (.socket ctx ZMQ/PULL)]
(.bind s "tcp://127.0.0.1:8888")
(while :true
(->> (.recv s)
(String.)
(c/parse-string)
(println "Job completed:")))))
代码中包括了ZeroMQ的三种模式,可以直接在REPL中进行测试。但是这只是很简单的Hello World程序,如果要将ZeroMQ用于实际生产环境中的话,还有很多环节需要考虑和完善。
相关阅读:
在Ubuntu 11.04上安装ZeroMQ