ActiceMQ详解 (12)

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端,中间件,不同产品,不同开发语言等条件限制。

STOMP协议

STOP,Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

GitLub查看MQTT示例代码:https://github.com/fusesource/mqtt-client

7.3 NIO协议案例

ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。

NIO网络IO模型简单配置

修改配置文件activemq.xml

如果你 不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型,所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。如下所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。

<transportConnectors> <!-- 新增NIO协议 --> <transportConnector uri="nio://0.0.0.0:61618?trace=true" /></transportConnectors>

SpringBoot修改端口即可

server: port: 6666 spring: activemq: broker-url: nio://mpolaris.top:61618 user: admin password: admin jms: pub-sub-domain: true mytopic: boot-activemq-topic

NIO增强

image-20210117230310659

修改activemq.xml配置文件(其实只要auto+nio一条都行了)

auto: 针对所有的协议,他会识别我们是什么协议。

nio:使用NIO网络IO模型

<transportConnectors> <transportConnector uri="tcp://0.0.0.0:61626?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector uri="amqp://0.0.0.0:5682?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector uri="stomp://0.0.0.0:61623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector uri="mqtt://0.0.0.0:1893?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector uri="ws://0.0.0.0:61624?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> <transportConnector uri="nio://0.0.0.0:61618?trace=true" /> <transportConnector uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/> </transportConnectors>

修改端口号为61608即可

server: port: 6666 spring: activemq: # broker-url: tcp://mpolaris.top:61608 适配多种协议(注意有些协议代码不一样) broker-url: nio://mpolaris.top:61608 user: admin password: admin jms: pub-sub-domain: true mytopic: boot-activemq-topic 8. ActiveMQ的消息存储和持久化 8.1 理解

此处持久化和之前持久性的区别

MQ高可用:事务、持久性、签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久性是MQ的外在表现,现在讲的的持久是是底层实现。

image-20210117233600172

8.2 持久化是什么

官网文档:

持久化是什么?一句话就是:ActiveMQ宕机了消息不会丢失的机制。

说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有则会先把存储位置中的消息发出去。

8.3 MQ持久化机制有哪些

AMQ Message Store

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyysx.html