rabbitmq五种模式详解(含实现代码) (3)


控制台打印,发现10条消息 偶数条消费者1获取,奇数条消费者2获取,并且平均分配。
当然通过代码实现按需分配,即谁的性能强,谁优先原则,实现负载均衡。
配置可控分配数

4. 发布订阅模式(Publish/Subscibe模式)

订阅模式–多个消费者监听不同的队列,但队列都绑定同一个交换机

4.1 编写订阅配置类 package com.gmtgo.demo.fanout; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 大帅 */ @Configuration public class FanoutQueueConfig { /** * 声明队列名. */ private final String fanout1 = "fanout_queue_1"; private final String fanout2 = "fanout_queue_2"; /** * 声明交换机的名字. */ private final String fanoutExchange = "fanoutExchange"; /** * 声明队列. * * @return */ @Bean public Queue fanoutQueue1() { return new Queue(fanout1); } @Bean public Queue fanoutQueue2() { return new Queue(fanout2); } /** * 声明交换机. */ @Bean public FanoutExchange exchange() { return new FanoutExchange(fanoutExchange); } /** * 队列绑定交换机,也可在可视化工具中进行绑定. * * @return */ @Bean public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) { return BindingBuilder.bind(fanoutQueue1).to(exchange); } @Bean public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) { return BindingBuilder.bind(fanoutQueue2).to(exchange); } } 4.2 编写订阅生产者 package com.gmtgo.demo.fanout; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author 大帅 */ @Slf4j @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage() { for (int i = 0; i < 5; i++) { String message = "订阅模式消息" + i; log.info("我是生产信息:{}", message); rabbitTemplate.convertAndSend("fanoutExchange", "", message); } } } 4.3 编写订阅消费者1 package com.gmtgo.demo.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帅 */ @Slf4j @Component public class FanoutConsumers1 { @RabbitListener(queues = "fanout_queue_1") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消费信息1:{}", new String(message.getBody())); } } 4.4 编写订阅消费者2 package com.gmtgo.demo.fanout; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author 大帅 */ @Slf4j @Component public class FanoutConsumers2 { @RabbitListener(queues = "fanout_queue_2") public void readMessage(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("我是消费信息2:{}", new String(message.getBody())); } } 4.5 编写测试方法 package com.gmtgo.demo.fanout; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author 大帅 */ @RestController @RequestMapping(value = "rabbitMq") public class FanoutRabbitMqController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping(value = "fanoutQueueTest") public String fanoutQueueTest() { fanoutProducer.sendMessage(); return "success"; } } 3.6 测试启动项目访问 fanoutQueueTest

访问地址:8801/rabbitMq/fanoutQueueTest

结果:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgyzj.html