RabbitMQ不讲武德,发个消息也这么多花招 (8)

2、新建一个生产者 AckProducer 类(省略了包名和导入):

public class AckProducter { private static String QUEUE_NAME = "ACK_QUEUE";//队列 private static String EXCHANGE_NAME = "ACK_EXCHANGE";//交换机 private static String ROUTEING_KEY = "test"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://admin:123456@47.107.155.197:5672"); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); Map<String, Object> headers = new HashMap<String, Object>(1); headers.put("ackType", "ack");//请应答 // headers.put("ackType", "reject-single");//请单条拒绝 // headers.put("ackType", "reject-multiple");//请多条拒绝 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .contentEncoding("UTF-8") // 编码 .headers(headers) // 自定义属性 .messageId(String.valueOf(UUID.randomUUID())) .build(); String msg = "I'm a ack message"; //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false); //队列和交换机进行绑定 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTEING_KEY); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTEING_KEY, properties, msg.getBytes()); channel.close(); conn.close(); } } Spring Boot 实现消费者应答

在 Spring Boot 中消费者给服务端的确认方式分为 3 种:

NONE:自动应答(ack)。

MANUAL:手动应答(ack)。如果设置为手动应答,而消费者又迟迟不给服务器应答,那么消息就会一直存在队列,可能会造成消息堆积和重复消费现象。

AUTO:当没有抛出异常时会自动应答(ack)。除此外,当发生异常时,分为以下三种情况:

1、当抛出 AmqpRejectAndDontRequeueException 异常时,消息会被拒绝,也不会重新入队。

2、当抛出 ImmediateAcknowledgeAmqpException 异常时,消费者会自动发送应答给服务端。

3、当抛出其他异常时,消息会被拒绝,且会重新入队。当出现这种情况且消费者只有一个时,非常容易造成死循环,所以应该极力避免这种情况的发生。

1、Spring Boot 中可以通过参数控制应答类型:

spring: rabbitmq: listener: type: simple # direct类型是2.0之后才有的 simple: acknowledge-mode: manual

2、在消费者类 ExchangeConsumer 中新建一个方法来监听队列,其中第一个注释掉的方法是原本存在的,第二个方法是新增的,主要新增了几个参数,注意 Channel 是 com.rabbitmq.client.Channel 包下的:

/** * 监听绑定了direct交换机的的消息队列 */ // @RabbitHandler // @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE") // public void directConsumer(String msg){ // System.out.println("direct交换机收到消息:" + msg); // } /** * 监听绑定了direct交换机的的消息队列,并进行手动应答 */ @RabbitHandler @RabbitListener(queues = "LONGLY_WOLF_ORDER_QUEUE") public void manualDirectConsumer(String msg, Channel channel,Message message) throws IOException { System.out.println("direct交换机收到消息:" + msg + "。此消息需要手动应答"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手动应答 }

3、或者也可以通过 SimpleMessageListenerContainer 类实现监听,新建一个 RabbitAckConfig 类(省略了包名和导入):

@Configuration public class RabbitAckConfig { @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("LONGLY_WOLF_ORDER_QUEUE");//设置监听队列名 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//手动确认 container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {//消息处理 System.out.println("收到消息:" + new String(message.getBody()) + "。此消息需要手动应答"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }); return container; } }

PS:需要注意的是,这两种方式不要同时使用,否则无法保证消息会被哪个监听到。

仅靠 RabbitMQ 自身可靠性能实现业务需求吗

上面介绍的两种确认模式,服务端确认和消费者确认。其中服务端确认是会回调给生产者的,所以生产者可以知道消息是否已经到达服务器且是否正确路由到队列,然而,对于消费者的确认,生产者是不知道的,这是因为消息队列的作用之一就是为了实现生产者和消费者的解耦,换言之,消费者知道消息成功发送到队列,但是无法知道消息是否被消费者消费

所以为了知道消息是否被成功消费,主要有两种思路:

1、消费者在消费成功之后需要回调生产者提供的API来告知消息已经被消费

2、服务端在收到消费者确认后给生产者一个回执通知

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsxyy.html