RabbitMQ不讲武德,发个消息也这么多花招 (7)

当我们的消费者的消费速度跟不上生产者的生产速度时,就会导致消息堆积在队列中,而默认消息是没有持久化的,存在于内存之中,所以假如服务器宕机等故障发生,就会导致队列中的数据丢失。

这里的解决方案也很简单,就是将消息进行持久化,在 RabbitMQ 当中,持久化也可以分为 3 种:交换机持久化,队列持久化和消息持久化。

虽然说持久化能一定程度上保证消息的可靠性,然而当出现了服务器的磁盘损坏,依然可能出现消息丢失,所以为了更加完美,RabbitMQ 集群可能是必须的,当然,本文不会涉及到集群的知识,集群的知识以及搭建会放到下次再来分析。

交换机持久化

声明交换机时,durable 参数设置为 true。

队列持久化

声明队列时,durable 参数设置为 true。

消息持久化

发送消息时可以将消息设置为持久化。

Java API 消息持久化

在 Java API 中,可以通过如下方式设置消息持久化:

//deliveryMode=2表示消息持久化 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); channel.basicPublish("exchangeName","routingKey",properties,msg.getBytes()); Spring Boot 消息持久化

在 Spring Boot 中可以通过如下方式将消息设置为持久化:

MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//消息持久化 Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("exchangeName","routingKey",message); 消费者消费消息失败了怎么办

踏遍千山万水,经过 3 层地狱模式,消息终于被消费者拿到手了,然而悲剧的事情又发生了,消费者消费消息的时候可能因为消费者本身的问题或者其他意外导致了消费者消费消息失败了,这时候消息还是没能被正确处理,这时候难道眼睁睁看着最后关头了束手无策了吗?

非也,作为一款如此优秀的消息队列,怎么可能没考虑到这种场景呢。还记不记得上面我们提到的确认模式,实际上,上面的两种确认模式都属于服务端的确认,在 RabbitMQ 中为消费者也提供了确认模式,这就是消费者的确认。

消费者确认(ack)

队列当中会把消息删除的前提就是这条消息被消费者消费掉了,但是服务器如何知道消息被消费了呢?这就是需要通过消费者确认之后才会删除,而我们前面在介绍消息发送的时候貌似并没有看到消费者确认流程,这是因为消费者默认在收到消息后会给服务器一个应答,服务端收到消费者的应答之后,就会删除消息。

Java API 实现消费者应答

在 Java API 中应答方式有两种,自动应答和手动应答,当自动应答时,则只要消费者收到消息就会给服务端确认,不在乎消息是否消费成功。

1、新建一个消费者 AckConsumer 类(省略了包名和导入),这里为了实现方便,通过生产者的头部标记来决定采用何种应答策略:

public class AckConsumer { private static String QUEUE_NAME = "ACK_QUEUE"; public static void main(String[] args) throws Exception{ //1.声明连接 ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://username:password@ip:port"); //2.建立连接 Connection conn = factory.newConnection(); //3.创建消息通道 Channel channel = conn.createChannel(); //4.声明队列(默认交换机AMQP default,Direct) channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" 等待接收消息..."); // 创建消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息: " + new String(body, "UTF-8")); Map<String,Object> map = properties.getHeaders();//获取头部消息 String ackType = map.get("ackType").toString(); if (ackType.equals("ack")){//手动应答 channel.basicAck(envelope.getDeliveryTag(),true); }else if(ackType.equals("reject-single")){//拒绝单条消息 //拒绝消息。requeue参数表示消息是否重新入队 channel.basicReject(envelope.getDeliveryTag(),false); // channel.basicNack(envelope.getDeliveryTag(),false,false); }else if (ackType.equals("reject-multiple")){//拒绝多条消息 //拒绝消息。multiple参数表示是否批量拒绝,为true则表示<deliveryTag的消息都被拒绝 channel.basicNack(envelope.getDeliveryTag(),true,false); } } }; //开始获取消息,第二个参数 autoAck表示是否开启自动应答 channel.basicConsume(QUEUE_NAME, false, consumer); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsxyy.html