消息被拒绝Nack,Reject,并且requeue参数为false(重点强调一下,生产实践中通常不能打开requeue,因为打开后队列中的消息就会出现乱序的情况,且性能很差);消息过期;队列达到最大长度。
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。
命名规范:队列类型,[生产者.消费者.队列名后缀];Topic类型,[生产者.exchange.队列名后缀]
延迟队列
延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。使用延迟消息场景如在订单系统中,希望用户下单后30分钟内支付,否则取消订单。那么业务系统可以在下单后,发送延迟消息,到达指定时间后消费该消息来判断是否支持。该方式在数据量比较大的场景中比通过Job扫描数据表合适。
在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面 所介绍的DLX和TTL模拟出延迟队列的功能,这部分在实践与思考部分进行介绍。
持久化
交换器和队列元数据持久化和消息的持久化,消息的持久化可以直接使用MessageProperties.PERSISTENT_TEXT_PLAIN。
2.2.生产者生产者客户端的代码比较简洁,如下所示。
byte[] messageBodyBytes = "Hello , Xionger!".getBytes(); channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);从高可用HA的角度不经要问,消息的生产者将消息发送出去之后,Broker是否收到消息。RabbitMQ针对这个问题提供了两种解决方案,分别是事务机制和发送方确认PublisherConfirm。发送者确认的实现继续细分为3种形式,包括单条同步、批量同步和异步方式。事务机制和单条同步确认方式的性能都比较差,通常只能达到2000QPS左右,因此通常推荐使用发送方确认的批量方式和异步方式,其QPS可以达到8000QPS以上。其中批量方式也存在一个隐患,即发送一批消息到服务端时,如果有一条消息失败,那么该批次所有消息都需要重试。因此目前生产实践中 ,使用的是异步方式,简化的代码实践如下所示。
SortedSet confirmSet = Sets.newTreeSet(); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag - 1); } else { confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //omit //消息重新投递处理 } }); 2.3.消费者消费模式:拉模式,推模式,RabbitMQ推荐推模式,保持消息消费的有序性。
boolean autoAck = false; channel.basicQos(64);//prefetchCount channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } });tip:
对于消息生产者,过去还有一个消息投递不可达被返回的概念,涉及mandatory和immediate两个参数,但其在生产实践中并不常用。
3.实践与思考 3.1.环境搭建安装:Mac环境 brew install rabbitmq,非常简便
管理界面
unacked: 消费端没有Ack的数量
Publish: 推送消息的QPS
Deliver(manual ack): 手动Ack
durable: 持久化
Policy: 队列的规则
Mirrors: 镜像Broker
3.2.Client组件开发在介绍了RabbitMQ主要知识后,扩展的分享一个简易的基于RabbitMQ消息中间件的思路。由于RabbitMQ是基于Erlang开发,虽然很棒但毕竟比较小众,Java技术栈的工程师一般很难去修改RabbitMQ的源码,因此通常只是通过构建一个合理的客户端SDK来支持业务开发。
生产者
生产者目标比较简单,需要实现健壮性强的的发送者确认机制【异步】和支持队列分片,队列分片可以给队列加上后缀标识,然后轮训处理即可。
消费者