消息队列作为分布式系统中的重要组件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至于各种消息队列的优缺点比较,在这里就不做扩展了,网上资源很多。
更多内容可参考 消息队列及常见消息队列介绍。我在这里选用的是RabbitMq。
官网地址:
安装和配置:Windows下RabbitMq安装及配置
二、RabbitMq简单介绍RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、 Erlang等。在RabbitMq中首先要弄清楚的概念是 交换机、队列、绑定。基本的消息通讯步骤就是首先定义ExChange,然后定义队列,然后绑定交换机和队列。
需要明确的一点儿是,发布者在发送消息是,并不是把消息直接发送到队列中,而是发送到Exchang,然后由交互机根据定义的消息匹配规则,在将消息发送到队列中。
Exchange有四种消息消息分发规则:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
详细的概念介绍推荐查看:消息队列之RabbitMq
三、EasyNetQ使用Easynetq是一个简单易用的Rabbitmq Net客户端。同时支持 NetFramework和NetCore。GitHub地址。它是针对RabbitMq Net客户端的进一步封装。关于EasyNetQ的简单使用推荐教程:EasyNetQ的介绍。
本文主要介绍基于EasyNeq的高级API的使用。EasyNetQ的作者在核心的IBus接口中尽量避免暴露AMQP中的交换机、队列、绑定这些概念,使用者即使不去了解这些概念,也能完成消息的发送接收。这相当简洁,但某些情况下,基于应用场景的需要,我们需要自定义交换机、队列、绑定这些信息,EasyNetQ允许你这么做,这些都是通过IAdvanceBus接口实现。
3.1 项目装备这里为了演示,首先新建一个项目,包括一个发布者,两个接收者,一个公共的类库
安装EasyNetQ: NuGet>Install-Package EasyNetQ
3.2 简单封装在Common项目里面是针对Easynetq的使用封装,主要目录如下
在RabbitMq文件夹下,是针对消息发送接收的简单封装。
首先来看下RabbitMqManage,主要的发送和订阅操作都在这个类中。其中ISend接口定义了发送消息的规范,SendMessageManage是ISend的实现。IMessageConsume接口定义订阅规范。
MesArg 和PushMsg分别是订阅和发送需用到的参数类。RabbitMQManage是暴露在外的操作类。
首先看发送的代码
public enum SendEnum { 订阅模式 = 1, 推送模式 = 2, 主题路由模式 = 3 } public class PushMsg { /// <summary> /// 发送的数据 /// </summary> public object sendMsg { get; set; } /// <summary> /// 消息推送的模式 /// 现在支持:订阅模式,推送模式,主题路由模式 /// </summary> public SendEnum sendEnum { get; set; } /// <summary> /// 管道名称 /// </summary> public string exchangeName { get; set; } /// <summary> /// 路由名称 /// </summary> public string routeName { get; set; } } internal interface ISend { Task SendMsgAsync(PushMsg pushMsg, IBus bus); void SendMsg(PushMsg pushMsg, IBus bus); } internal class SendMessageMange : ISend { public async Task SendMsgAsync(PushMsg pushMsg, IBus bus) { //一对一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判断推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.订阅模式) { //广播订阅模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主题路由模式) { //主题路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(""), false, message) .ContinueWith(task => { if (!task.IsCompleted && task.IsFaulted)//消息投递失败 { //记录投递失败的消息信息 } }); } public void SendMsg(PushMsg pushMsg, IBus bus) { //一对一推送 var message = new Message<object>(pushMsg.sendMsg); IExchange ex = null; //判断推送模式 if (pushMsg.sendEnum == SendEnum.推送模式) { ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct); } if (pushMsg.sendEnum == SendEnum.订阅模式) { //广播订阅模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout); } if (pushMsg.sendEnum == SendEnum.主题路由模式) { //主题路由模式 ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic); } bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(""), false, message); } }