测试类
@SpringBootTest(classes = Application.class) @RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration public class TestActiveMQ { @Resource //这个是java 的注解,而Autowried是spring 的 private QueueProduce produce; @Test public void testSend() { produce.produceMsg(); } } 6.2 队列案例 - 生产者间隔定投QueueProduce新增定时投递方法
/** * 间隔3秒定时投送 */ @Scheduled(fixedDelay = 3000) public void produceMsgScheduled() { jmsMessagingTemplate.convertAndSend(queue,"定时投送 => " + UUID.randomUUID().toString().substring(0,6)); }主启动类添加一个注解
@SpringBootApplication @EnableScheduling //允许开启定时投送功能 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class,args); } }直接开启主启动类,间隔投递消息
6.3 队列案例 - 消费者监听 @Component public class QueueCustomer { @JmsListener(destination = "${myqueue}") public void receive(TextMessage message) throws JMSException { System.out.println("消费者收到消息 => " + message.getText()); } } 6.4 主题基本案例application.yml配置文件
server: port: 6666 spring: activemq: broker-url: tcp://mpolaris.top:61616 user: admin password: admin jms: # 目的地是queue还是topic, false(默认)=queue true=topic pub-sub-domain: true mytopic: boot-activemq-topicActiveMQ配置文件
@Configuration @EnableJms //开启Jms适配的注解 public class ConfigBean { @Value("${mytopic}") private String myTopic; @Bean public Topic topic() { return new ActiveMQTopic(myTopic); } }主题生产者
@Component public class TopicProduce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Topic topic; public void produceMsg() { jmsMessagingTemplate.convertAndSend(topic,"===> SpringBoot + ActiveMQ消息"); } @Scheduled(fixedDelay = 3000) public void produceMsgScheduled() { jmsMessagingTemplate.convertAndSend(topic,"定时投送 => " + UUID.randomUUID().toString().substring(0,6)); System.out.println("定时投送"); } }主题消费者
@Component public class QueueCustomer { @JmsListener(destination = "${mytopic}") public void receive(TextMessage message) throws JMSException { System.out.println("消费者收到消息 => " + message.getText()); } } 7. ActiveMQ传输协议 7.1 简介ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM等。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。
activemq传输协议的官方文档:
除了tcp和nio协议其他的了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。
注意:协议不同,我们的代码都会不同。
7.2 各协议理解TCP协议
Transmission Control Protocol(TCP)是默认的,TCP的Client监听端口61616
在网络传输数据前必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下ActiveMQ把wrie protocol 叫做 OpenWire,它的目的就是促使网络上的效率更高和数据快速交换。
TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
TCP传输的的优点:
TCP协议传输可靠性高,稳定性强
高效率:字节流方式传递,效率很高
有效性、可用性:应用广泛,支持任何平台
关于Transport协议的可选配置参数可以参考官网
NIO协议
New I/O API Protocol(NIO)。NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
适合使用NIO协议的场景:
可能有大量的Client去连接到Broker上,一般情况下大量的Client去连接Broker是被操作系统的线程所限制的。因此NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
NIO连接的URI形式:nio://hostname:port?key=value&key=value
关于Transport协议的可选配置参数可以参考官网
AMQP协议