MQ 入门实践

Message Queue,消息队列,FIFO 结构。

MQ 入门实践

例如电商平台,在用户支付订单后执行对应的操作;

MQ 入门实践

优点:

异步

削峰

解耦

缺点

增加系统复杂性

数据一致性

可用性

JMS

Java Message Service,Java消息服务,类似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通信的规范;

区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。

ConnectionFactory

Connection

Destination

Session

MessageConsumer

MessageProducer

Message

协作方式图示为;

MQ 入门实践

业界产品 ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量   万级   万级   10 万级   10 万级  
可用性       非常高   非常高  
可靠性   较低概率丢失消息   基本不丢   可以做到 0 丢失   可以做到 0 丢失  
功能支持   较为完善   基于 erlang,并发强,性能好,延时低   分布式,拓展性好,支持分布式事务   较为简单,主要应用与大数据实时计算,日志采集等  
社区活跃度          
ActiveMQ

作为 Apache 下的开源项目,完全支持 JMS 规范。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适合不过。

快速开始

添加依赖;

<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>

消息发送;

// 1. 创建连接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 2. 工厂创建连接 Connection connection = factory.createConnection(); // 3. 启动连接 connection.start(); // 4. 创建连接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 根据session创建消息队列目的地 Destination queue = session.createQueue("test-queue"); // 6. 根据session和目的地queue创建生产者 MessageProducer producer = session.createProducer(queue); // 7. 根据session创建消息实体 Message message = session.createTextMessage("hello world!"); // 8. 通过生产者producer发送消息实体 producer.send(message); // 9. 关闭连接 connection.close(); Spring Boot 集成

自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

添加依赖;

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>

添加 yaml 配置;

spring: activemq: broker-url: tcp://localhost:61616 jms: #消息模式 true:广播(Topic),false:队列(Queue),默认时false pub-sub-domain: true

收发消息;

@Autowired private JmsTemplate jmsTemplate; // 接收消息 @JmsListener(destination = "test") public void receiveMsg(String msg) { System.out.println(msg); } // 发送消息 public void sendMsg(String destination, String msg) { jmsTemplate.convertAndSend(destination, msg); } 高可用

基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;

<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/levelDB" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183" zkPath="/activemq/leveldb-stores" hostname="localhost" /> </persistenceAdapter>

broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

负载均衡

在高可用集群节点 activemq.xml 添加节点 networkConnectors;

<networkConnectors> <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/> </networkConnectors>

更多详细信息可参考:https://blog.csdn.net/haoyuyang/article/details/53931710

集群消费

由于发布订阅模式,所有订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。

ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwdgsw.html