import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Program { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.101"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("rabbitmq", "fanout"); String routingKey = "rabbitmq_routingkey"; String message = "{\"name\":\"Welcome to RabbitMQ message push!\"}"; channel.basicPublish("rabbitmq", routingKey,null, message.getBytes()); System.out.println("[x] Sent Message:"+message); channel.close(); connection.close(); } }
这里我们利用 RabbitMQ 官方提供的 Java Client Library 来实现消息的发送,消息队列使用的过程大概如图 3 所示:
图 3. 客户端投递消息流程exchange 接收到消息后,会根据消息的 key 和已经设置的 binding 进行消息路由,最终投递到一个或多个队列里进行消息处理。RabbitMQ 预置了一些 exchange,如果客户端未声明 exchange 时,RabbitMQ 会根据 exchange 类型使用默认的 exchange,具体见表 1。
表 1. 预置 exchange 名称 NameDefault pre declared namesDirect exchange amq.direct
Fanount exchange amq.fanout
Topic exchange amq.topic
Heades exchange amq.headers
Exchange 类型
exchange 存在以下几种类型:
1.Direct exchange
Direct exchange 完全根据 key 进行投递,只有 key 与绑定时的 routing key 完全一致的消息才会收到消息,参考官网提供的图 4 更直观地了解 Direct exchange。
图 4.Direct exchange2.Fanount exchange
Fanount 完全不关心 key,直接采取广播的方式进行消息投递,与该交换机绑定的所有队列都会收到消息,具体参考官网提供的图 5。
图 5.Fanount exchange3.Topic exchange
Topic exchange 会根据 key 进行模式匹配然后进行投递,与设置的 routing key 匹配上的队列才能收到消息。
4.Headers exchange
Header exchange 使用消息头代替 routing key 作为关键字进行路由,不过在实际应用过程中这种类型的 exchange 使用较少。
消息持久化
RabbitMQ 支持消息的持久化,即将消息数据持久化到磁盘上,如果消息服务器中途断开,下次开启会将持久化的消息重新发送,消息队列持久化需要保证 exchange(指定 durable=1)、queue(指定 durable=1)和消息(delivery_mode=2)3 个部分都是持久化。出于数据安全考虑,一般消息都会进行持久化。
消息接收者 清单 2.JavaScript 代码// Stomp.js boilerplate if (location.search == '?ws') { var ws = new WebSocket('ws://192.168.1.102:15674/ws'); } else { var ws = new SockJS('http://192.168.1.102:15674/stomp'); } // Init Client var client = Stomp.over(ws); // SockJS does not support heart-beat: disable heart-beats client.heartbeat.outgoing = 0; client.heartbeat.incoming = 0; // Declare on_connect var on_connect = function(x) { client.subscribe("/exchange/rabbitmq/rabbitmq_routingkey", function(d) { print_first(d.body); }); }; // Declare on_error var on_error = function() { console.log('error'); }; // Conect to RabbitMQ client.connect('guest', 'guest', on_connect, on_error, '/');
RabbitMQ Web STOMP 插件可以理解为 HTML5 WebSocket 与 STOMP 协议间的桥接,目的也是为了让浏览器能够使用 RabbitMQ。当 RabbitMQ 消息服务器开启了 STOMP 和 Web STOMP 插件后,浏览器端就可以轻松地使用 WebSocket 或者 SockerJS 客户端实现与 RabbitMQ 服务器进行通信。
RabbitMQ Web STOMP 是对 STOMP 协议的桥接,因此其语法也完全遵循 STOMP 协议。STOMP 是基于 frame 的协议,与 HTTP 的 frame 相似。一个 Frame 包含一个 command,一系列可选的 headers 和一个 body。STOMP client 的用户代理可以充当两个角色,当然也可能同时充当:作为生产者,通过 SEND frame 发送消息到服务器;作为消费者���发送 SUBCRIBE frame 到目的地并且通过 MESSAGE frame 从服务器获取消息。