Write Code 4 ActiveMQ
来一个HelloWorld级别的例子,来感受下ActiveMQ。具体来说,我这边会写一个生产者用于发送消息,一个消费者用于接收消息。实际上,JMS是有“套路”的,下面我将以生产者为例详细说明。
第一步:创建ConnectionFactory连接工厂
ConnectionFactory
实际上,这里是存在安全隐患的,也就是任何人一旦知道MQ的地址,就可以连接访问了,我们可以在activemq.xml中配置指定的用户、密码才能访问ActiveMQ。
关于broker_bind_url,默认就是tcp://localhost:61616,说明是采用TCP协议,61616端口。其实对于ActiveMQ不仅仅支持TCP协议,还有其他协议,开启了多个端口。
第二步:创建Connection
Connection
Connection就代表了应用程序和消息服务器之间的通信链路。获得了连接工厂后,就可以创建Connection。
事实上,ConnectionFactory存在重载方法:
Connection createConnection(String username,String password)
也就是说我们也可以在这里指定用户名、密码进行验证
第三步:创建Session
Session
Session,用于发送和接受消息,而且是单线程的,支持事务的。如果Session开启事务支持,那么Session将保存一组信息,要么commit到MQ,要么回滚这些消息。Session可以创建MessageProducer/MessageConsumer。
第四步:创建Destination
Destination
所谓消息目标,就是消息发送和接受的地点,要么queue,要么topic。
第五步:创建MessageProducer
MessageProducer
第六步:设置持久化方式
持久化方式设置
第七步:定义消息对象,并发送
Message
生产者和消费者之间传递的对象,由3个主要部分构成:
消息头(路由)+消息属性(消息选择器,以后介绍)+消息体(JMS规范的5种类型消息)
消息类型
第八步:释放连接
release resource
必须close connection,只有这样ActiveMQ才会释放资源!
消费者的代码和上面非常类似,只不过就是创建MessageConsumer进行receive而已,注意receive()/receive(long)/receiveNoWait(),这些说明消费者可以采用阻塞模式、非阻塞模式接受消息。
程序运行后,我们来看一下管控台:
ActiveMQ Web Info
Messages Enqueued:表示生产了多少条消息,记做P
Messages Dequeued:表示消费了多少条消息,记做C
Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息
Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C
在说说Session
在通过Connection创建Session的时候,需要设置2个参数,一个是否支持事务,另一个是签收的模式。我们重点说一下签收模式:
签收模式
什么是签收?通俗点说,就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收
CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收
DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销