Spring Boot整合rabbitmq (2)

创建生产者

@SpringBootTest public class RabbitTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void router() { rabbitTemplate.convertAndSend("exchange2", "info", "router"); System.out.println("router"); } } 5. 主题模式

创建消费者

@Component public class MQ { /** * topic topics */ @RabbitListener(bindings = { @QueueBinding(value = @Queue, exchange = @Exchange(value = "exchange3", type = ExchangeTypes.TOPIC), key = {"user.#"} //路由键 ) }) public void topic2(String s) { System.out.println("topic2 " + s); } }

创建生产者

@SpringBootTest public class RabbitTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void topic() { rabbitTemplate.convertAndSend("exchange3", "user.name", "hhh"); System.out.println("topic"); } } 默认消息是持久化的,也可以设置不持久化,以简单队列示例 @SpringBootTest public class RabbitTest { @Autowired private RabbitTemplate rabbitTemplate; /** * AMQP 默认消息是持久化的,但只有在队列也是持久化时才有作用,原文如下: * Messages are persistent by default with Spring AMQP. * Note the queue the message will end up in needs to be durable as well, * otherwise the message will not survive a broker restart as a non-durable queue does not itself survive a restart. * <p> * MessageProperties类中源码如下: * static { * DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; * DEFAULT_PRIORITY = 0; * } * <p> * 如何设置消息不持久化? * 设置消息不持久化,默认是持久化的,这里只为记录如何设置消息不持久化,一般不设置 * 发送消息时,添加 MessagePostProcessor即可,这里使用 lambda 表达式 * (message) -> { * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); * return message; * } * <p> * 完整示例如下: * rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue", * (message) -> { * message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); * return message; * }); */ @Test public void simpleQueue() { rabbitTemplate.convertAndSend("simpleQueue", "this is simpleQueue", (message) -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; }); System.out.println("simple success"); } } 如何设置生产者消息确认,避免消息发送失败而丢失(确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。)

参考资料:https://www.cnblogs.com/wangiqngpei557/p/9381478.html

在配置文件中添加

spring: rabbitmq: #rabbit连接配置信息 publisher-returns: true #开启消息从 交换机----》队列发送失败的回调 publisher-confirm-type: correlated #开启消息从 生产者----》交换机的回调

添加配置类

@Component public class ProducerConfig { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void pre() { /** * * 消息发送到交换机的回调 * * public void confirm(CorrelationData correlationData, boolean b, String s) { * * System.out.println("消息唯一标识:"+correlationData); * System.out.println("确认结果:"+ b); * System.out.println("失败原因:"+ s); * } * */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("setConfirmCallback-------------------"); System.out.println("correlationData: " + correlationData); System.out.println(ack); System.out.println(cause); if (ack) { System.out.println("发送成功"); } else { System.out.println("发送失败"); // 可以记录下来,也可以重新发送消息。。。 } }); /** * * 消息从交换机发送到队列的回调,只有发送失败时才会回调 * public void returnedMessage(Message message, int i, String s, String s1, String s2) { * System.out.println("消息主体 message : "+message); * System.out.println("消息主体 message : "+ i); * System.out.println("描述:"+ s); * System.out.println("消息使用的交换器 exchange : "+ s1); * System.out.println("消息使用的路由键 routing : "+ s2); * } */ rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey) -> { System.out.println("setReturnCallback---------------------"); System.out.println("消息主体 message : " + message); System.out.println("响应码 replyCode: " + replyCode); System.out.println("响应内容 replyText:" + replyText); System.out.println("消息使用的交换器 exchange : " + exchange); System.out.println("消息使用的路由键 routeKey : " + routingKey); //也可以重新发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, new String(message.getBody())); System.out.println("重新发送消息: -----" + new String(message.getBody())); }); /** * 网上都说必须设置rabbitTemplate.setMandatory(true),才能触发ReturnCallback回调, * 我尝试了一下,并不需要设置为true,交换机发送消息给队列失败时,也能触发回调 */ //rabbitTemplate.setMandatory(true); } }

代码github地址:https://github.com/1612480331/Spring-Boot-rabbitmq

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppyxs.html