一、五种模式详解
1.简单模式(Queue模式)
当生产端发送消息到交换机,交换机根据消息属性发送到队列,消费者监听绑定队列实现消息的接收和消费逻辑编写.简单模式下,强调的一个队列queue只被一个消费者监听消费.
1.1 结构
data:image/s3,"s3://crabby-images/648af/648afc1a363c7c7f573e838bfd695065064e9202" alt=""
生产者:生成消息,发送到交换机
交换机:根据消息属性,将消息发送给队列
消费者:监听这个队列,发现消息后,获取消息执行消费逻辑
1.2应用场景
常见的应用场景就是一发,一接的结构
例如:
手机短信
邮件单发
2.争抢模式(Work模式)
强调的也是后端队列与消费者绑定的结构
2.1结构
data:image/s3,"s3://crabby-images/7e121/7e121840f476bca36ac432410986fd4fbe7500c4" alt=""
生产者:发送消息到交换机
交换机:根据消息属性将消息发送给队列
消费者:多个消费者,同时绑定监听一个队列,之间形成了争抢消息的效果
2.2应用场景
抢红包
资源分配系统
3.路由模式(Route模式 Direct定向)
从路由模式开始,关心的就是消息如何到达的队列,路由模式需要使用的交换机类型就是路由交换机(direct)
3.1 结构
data:image/s3,"s3://crabby-images/edbea/edbea2619583aa83d919d5a48255aa173eb5e9aa" alt=""
生产端:发送消息,在消息中处理消息内容,携带一个routingkey
交换机:接收消息,根据消息的routingkey去计算匹配后端队列的routingkey
队列:存储交换机发送的消息
消费端:简单模式 工作争抢
3.2应用场景
短信
聊天工具
邮箱。。
手机号/邮箱地址,都可以是路由key
4.发布订阅模式(Pulish/Subscribe模式 Fanout广播)
不计算路由的一种特殊交换机
4.1结构
data:image/s3,"s3://crabby-images/d77de/d77ded623d501606efeb93f6fc7a7f182ef51c39" alt=""
4.2应用场景
消息推送
广告
5.主题模式(Topics模式 Tpoic通配符)
路由key值是一种多级路径。中国.四川.成都.武侯区
5.1结构
data:image/s3,"s3://crabby-images/a594e/a594ee9fcc1996fcace6c2df4bc6ab1cca3a2653" alt=""
生产端:携带路由key,发送消息到交换机
队列:绑定交换机和路由不一样,不是一个具体的路由key,而可以使用*和#代替一个范围
| * | 字符串,只能表示一级 |
| --- | --- |
| # | 多级字符串 |
交换机:根据匹配规则,将路由key对应发送到队列
消息路由key:
北京市.朝阳区.酒仙桥
北京市.#: 匹配true
上海市.浦东区.*: 没匹配false
新疆.乌鲁木齐.#
5.2 应用场景
做物流分拣的多级传递.
6.完整结构
data:image/s3,"s3://crabby-images/51078/51078848b3ccd39515f17b4e70ee99bf440ab082" alt=""
二、代码实现
1.创建SpringBoot工程
1.1 工程基本信息
data:image/s3,"s3://crabby-images/5488f/5488fb79ce12b14c747981cf25272fe9051b5543" alt=""
1.2 依赖信息
data:image/s3,"s3://crabby-images/9bd88/9bd881a83a3f206c93f449a0a367c307e38d90c4" alt=""
1.3 配置文件applicasion.properties
# 应用名称
spring.application.name=springboot-demo
# Actuator Web 访问端口
management.server.port=8801
management.endpoints.jmx.exposure.include=*
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
# 应用服务 WEB 访问端口
server.port=8801
######################### RabbitMQ配置 ########################
# RabbitMQ主机
spring.rabbitmq.host=127.0.0.1
# RabbitMQ虚拟主机
spring.rabbitmq.virtual-host=demo
# RabbitMQ服务端口
spring.rabbitmq.port=5672
# RabbitMQ服务用户名
spring.rabbitmq.username=admin
# RabbitMQ服务密码
spring.rabbitmq.password=admin
# RabbitMQ服务发布确认属性配置
## NONE值是禁用发布确认模式,是默认值
## CORRELATED值是发布消息成功到交换器后会触发回调方法
## SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
spring.rabbitmq.publisher-confirm-type=simple
# RabbitMQ服务开启消息发送确认
spring.rabbitmq.publisher-returns=true
######################### simple模式配置 ########################
# RabbitMQ服务 消息接收确认模式
## NONE:不确认
## AUTO:自动确认
## MANUAL:手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
# 指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=1
# 开启支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
2.简单模式
2.1 创建SimpleQueueConfig 简单队列配置类
package com.gmtgo.demo.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帅
*/
@Configuration
public class SimpleQueueConfig {
/**
* 定义简单队列名.
*/
private final String simpleQueue = "queue_simple";
@Bean
public Queue simpleQueue() {
return new Queue(simpleQueue);
}
}
2.2 编写生产者
package com.gmtgo.demo.simple;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帅
*/
@Slf4j
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "简单消息" + i;
log.info("我是生产信息:{}", message);
rabbitTemplate.convertAndSend( "queue_simple", message);
}
}
}
2.3 编写消费者
package com.gmtgo.demo.simple;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帅
*/
@Slf4j
@Component
public class SimpleConsumers {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消费信息:{}", new String(message.getBody()));
}
}
2.4 编写访问类
package com.gmtgo.demo.simple;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帅
*/
@RestController
@RequestMapping(value = "/rabbitMq")
public class SimpleRabbitMqController {
@Autowired
private SimpleProducer simpleProducer;
@RequestMapping(value = "/simpleQueueTest")
public String simpleQueueTest() {
simpleProducer.sendMessage();
return "success";
}
}
2.5 测试启动项目访问 simpleQueueTest
访问地址::8801/rabbitMq/simpleQueueTest
结果: