消息中间件 Apache Qpid 简介(2)

程序代码的基本框架

在一个采用了消息中间件的通信体系中有三个基本的角色,一个是发送消息的进程,一个是接受消息的进程,他们彼此之间通过 broker 连接起来,传递消息。


图 2. 基本 Qpid 通信系统的几个组件

图 2. 基本 Qpid 通信系统的几个组件

Broker 无需编写,如前所述,Qpid 实现了两种 Broker,您只需要根据需要启动其中之一既可。Sender 和 Receiver 则是需要用户编写的应用程序。这两类程序都有一些基本的框架,在此简要介绍一下。

首先他们都是 client,需要和 broker 进行连接。链接成功后便生成一个会话 Session。基本代码如下:
清单 1. 基本的 Qpid 程序框架

Connection connection(broker, connectionOptions); try { connection.open(); Session session = connection.createSession(); 。。。 connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; }  

在 0.5 版本之前,QPID 编程接口和 AMQP 的基本概念一一对应,比如需要创建 queue,exchange,用 routing key 进行绑定,等等。必须对 AMQP 的模型完全理解才能自如地动手写程序。

新的 Messaging API 将复杂的 AMQP 的细节全部都隐藏了起来,极大地简化了编程。

应用程序从而可以将注意力专注于如何处理他们接收到或者将要发出的消息本身,将 AMPQ 模型处理的细节交给 Qpid 库。如图 2 所示,在一个消息通信系统中只有 3 个基本角色,除了 broker 之外,就只有一个 Sender 一个 Receiver。应用程序看不到 Exchange 或者 Queue 这些细节。与此相应,在 Qpid 的 Messaging API 编程接口中,只有两个基本对象:Sender 和 Receiver。Sender 类,或者叫 Producer 生产者。即消息的发送方。Receiver 类自然就是信息的接收者,也叫做 Consumer。

这种抽象带来更好的功能可扩展性:各种各样的通信模型都经由修改 Sender 和 Receiver 的地址来实现,当需要修改通信模型时,也只需要修改 Address 和 broker 的配置,而无需修改应用的代码。

下面将详细介绍 Address 类。

Address 地址

写信的时候,人们需要地址。类似地在 Qpid 中,表示消息的目的地,或者源。

Qpid Address 表示一个节点,有两种节点:一种是 queue,另外一种是 topic。Queue 节点能够缓存消息,直到被读取走为止;而 topic 节点则即时进行转发,比如假如有 4 个 consumer 对某消息感兴趣,当消息到达节点时,有 3 个 consumer 正在运行,那么 topic 节点会将消息转发给这 3 个 consumer,然后就将该消息丢弃。剩下的那个 consumer 再运行时,则收不到这个消息。

Qpid 的地址 Address 是一个带格式的字符串,其语法如下:

address_string ::= <address> [ / <subject> ] [ ; <options> ] options ::= { <key> : <value>, ... }  

其中 address,subject 和 key 都是字符串。

Subject 类似 email 的主题。每个消息都可以有一个主题,接收者可以通过主题对消息进行过滤。

Option 的具体含义有点儿复杂,可以参考 Qpid 的编程手册获取完整的描述。

了解了以上这些概念,就可以开始具体的编程了。和学习其他技术一样,我们从研究例子程序开始。Qpid 源代码包的 example 目录下有大量的例子程序,Messaging 目录下面是新的 Message API。我们主要研究 Message API 提供的 Spout 和 Drain 这两个例子程序。

Spout 和 Drain 的代码

将 Spout 的主要代码精简一下如下:


清单 2. Spout 代码

int main(int argc, char** argv) { Connection connection(options.url, options.connectionOptions); connection.open(); Session session = connection.createSession(); Sender sender = session.createSender(options.address); sender.send(message); session.sync(); connection.close(); return 0; }  

可以看到 spout 首先用命令行参数 Address 初始化一个 Sender 对象,然后用 Sender 的 send 方法发送消息。

对 Drain 做一些类似的事情:


清单 3. Drain 代码

int main(int argc, char** argv) { Connection connection(options.url, options.connectionOptions); connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(options.address); receiver.fetch(message, timeout)) session.acknowledge(); receiver.close(); session.close(); connection.close(); return 0; }  

Drain 接收消息,用命令行参数中的 Address 初始化一个 Receiver 对象,然后调用 Receiver 的 fetch() 方法接收消息。收到消息后需要调用 session 的 acknowledge() 方法确认。

点对点通信小例子

PTP 通信类似写信。

其一,这种通信是异步的,人们把信发出去之后并不清楚何时能送到收信人的手中。在 Qpid 中,Sender 将消息发给 Broker,并不要求 Receiver 在消息发送的时候也有一个和 Broker 的链接并准备接受该消息。Sender 只管将消息发给 Broker,就可以放手去做其他的事情了;

其二,信是唯一的,您写给朋友的信一定不希望其他人也收到吧。在 Qpid 的 PTP 通信中,一个 Receiver 收到消息后,该消息就被消除,其他 Receiver 不能再收到。

下面用例子来说明这种通信模型。首先要建立一个 Queue 节点。如之前在 Address 一节所讲,Qpid 目前有两种 Address,一种叫做 Queue,一种叫做 Topic,我们这里就要用 Queue 这种节点。Queue 节点满足前面所说的两个重要的 PTP 通信的特征,存储转发和只接收一次。

创建一个 queue:

qpid-config add queue hello-world  

现在我们建立了一个叫做 hello-world 的 queue。

用 spout 发送消息给地址 hello-world:

./spout hello-world  

这就相当于将信发给了 hello-world。您已经看到,此时接收者 drain 还没有启动,但 Queue 的存储转发特性保证 drain 还是可以收到这条消息:

./drain hello-world Message(properties={spout-id:fbb93f3 … :0 … )  

当我们打开另外一个 shell 窗口执行 drain,会发现不会再收到这条消息了。

Browse 模式 vs Consume 模式

一个有趣的的例子是如果我们修改一下 Address 的 Option,上面的通信模型就变成另外一种样子。之前我们看到,第二次执行 ./drain hello-world 将得不到任何信息,因为信息已经被第一次执行 ./drain 消费掉了。或者说这个 message 已经从 queue 里面移除了。这在 Qpid 中被称为消费模式 (Consume)。

有时候人们可能需要另外一种模式,叫做 Browse,即浏览。正如我们浏览网页上的新闻一样,一条新闻并不会因为第一个人阅读了它之后就被消费掉,从网页中消失了。而是一直在那里供人浏览。假如我们希望实现类似这种通信模式,不需要修改 spout 和 Drain 的代码,只需要稍微修改 Address 即可。Address 的选项 mode 可以用来设置 Browse 和 Consume 模式。如下例所示:

$ qpid-config add queue my-queue  

建立一个 queue。

$ ./spout my-queue --content one $ ./spout my-queue --content two $ ./spout my-queue --content three  

发送了三条消息,接着我们用 drain 来接收这些消息吧。请注意,我们对 Address 字符串进行了小小的修改,在名字之后加了一个分号,后面用花括号添加了一个 mode 选项,并设置该地址为 Browse 模式。

$ ./drain 'my-queue; {mode: browse}' Message(properties={spout-id:fbb93f3 … :0}, content='one') Message(properties={spout-id:ab9e7c3 … :0}, content='two') Message(properties={spout-id:ea75d6e … :0}, content='three') 再运行一次: $ ./drain 'my-queue; {mode: browse}' Message(properties={spout-id:fbb93f … :0}, content='one') Message(properties={spout-id:ab9e76 … :0}, content='two') Message(properties={spout-id:ea75d6 … :0}, content='three')  

仅仅修改了Address的Option,我们就发现用spout和Drain可以实现另外一种通信模型了,这真是非常令人着迷的一个特性啊。

编写 sub-pub 通信的例子

Pub-sub 是另一种很有用的通信模型。恐怕它的名字就源于出版发行这种现实中的信息传递方式吧,publisher 就是出版商,subscriber 就是订阅者。

这种模型的特点在于:其一,消息可以根据订阅的信息而转发给不同的订阅者;其二,消息并不存储,broker 收到消息后立即将其转发给当时正在注册的订阅者,假如某个订阅者当时并没有链接到 broker,那么它就不能再收到该消息了。没有多少人愿意购买几天前的旧报纸吧?这是和 Queue 的一个区别。

创建一个 Topic 节点:

qpid-config add exchange topic hello-world  

还是用 spout 和 drain 来演示,先运行 spout:

./spout hello-world  

再运行 drain

./drain hello-world  

哦,什么也没有收到。这说明消息没有被 broker 缓存。

Pub-sub 的主要优点在于订阅消息的灵活性,broker 会根据消息的主题分发给不同的 subscriber。比如我们创建一个 news-service 的 exchange:

qpid-config add exchange topic hello-world  

打开两个 shell 窗口,一个运行 drain 并订阅 news-service/sport,体育新闻;另一个订阅 news-service/ent 娱乐新闻:

$ ./drain -t 30 news-service/#.news $ ./drain -t 30 news-service/#.ent $ ./spout news-service/news $ ./spout news-service/sports $ ./spout news-service/usa.news $ ./spout news-service/usa.sports $ ./spout news-service/usa.faux.news $ ./spout news-service/usa.faux.sports  

可以看到不同的消息被自动分发给不同的订阅者。第一个 shell 接收 sport 的 drain 将打印:

Message(properties={qpid.subject:news, spout-id:cbd42b0f... Message(properties={qpid.subject:usa.news, spout-id:234a78d7... Message(properties={qpid.subject:usa.faux.news, spout-id:6029...  

可另外一个接收 news 的 drain 程序将打印 :

Message(properties={qpid.subject:sports, spout-id:cbd42b0f... Message(properties={qpid.subject:usa.sports, spout-id:234a78d7...  

编写 Request/Response 模型的应用

在很多 P2P 和 Pub-Sub 应用中,Sender 和 Reciever 可以见面也不相识。他们多数情况下根本不关心对方是否存在。然而在现实中还有一种典型的通信模型:Request/Response。这种模型由 client 和 server 两部分组成,即人们常说的 C/S 模型。

Server 必须知道是谁发送了请求,以便回复给正确的 Requester。这是通过解析 Requester 发过来的消息中的 ReplyTo 字段得到的。

代码清单 4 展示的是 Server 的例子代码。


清单 4. Server 代码

Sender sender = session.createSender("service_queue"); Address responseQueue("#response-queue; {create:always, delete:always}"); Receiver receiver = session.createReceiver(responseQueue); Message request; request.setReplyTo(responseQueue); request.setContent("ping"); sender.send(request); Message response = receiver.fetch(); std::cout << request.getContent() << " -> " << response.getContent() << std::endl;  

代码清单 5 展示的是 Client 的例子代码。


清单 5. Client 代码

Sender sender = session.createSender("service_queue"); Address responseQueue("#response-queue; {create:always, delete:always}"); Receiver receiver = session.createReceiver(responseQueue); Message request; request.setReplyTo(responseQueue); request.setContent("ping"); sender.send(request); Message response = receiver.fetch(); std::cout << request.getContent() << " -> " << response.getContent() << std::endl;  

在 client 代码中,我们需要调用 Message 的 setReplyTo 方法,设置回复的地址。

代码清单展示的是 Client 的例子代码。

小结

至此,我们看到了 Qpid 最基本的一些使用方法。演示了人们通常所使用的两类通信模式,传统的面向消息的中间件就实现了这两个通信模型。但 Qpid 提供了一种更简洁灵活的编程接口,仅通过修改 Address,无需修改代码就可以改变应用程序的通信模型。

Qpid 是一个 AMQP 的实现,这意味这它不是一个私有的产品,使用 Qpid,您可以和其他任何符合 AMQP 协议的软件系统进行通信。

不过假如这就是 Qpid 的全部,相信您一定也不以为然,如果这些还不足以打动您,那么我力图在下一部分中向您介绍 Qpid 的一些高级特性。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/65790989a26526f210db1b69300a9c21.html