RabbitMQ客户端开发向导 (3)

​ 一般情况下我们将其设置为false,然后在回调函数中消息处理完毕之后手动进行回应。

void basicAck( long deliveryTag, // 指定推送标识编号 boolean multiple // 是否批量回应 ) throws IOException

注意:推送标识

​ deliveryTag表示的是推送编号,是一个单调递增正整数编号,它用来标识channel中一次消息推送,与消息绑在一起。

​ 当一个消费者向服务器注册basicConsume之后,服务器就会使用basic.delivery给消费者推送消息,deliveryTag就用来标识这样一个推送。但要注意,它只在当前信道channel内有效。

​ 消费者受到消息的时候同时会收到这个推送标识,当要回应、拒绝消息的时候就需要带着这个标识。

注意:批量回应

​ multiple参数为boolean值,用来表示是否进行批量回应,当值为true时表示进行批量回应,它会对推送编号小于给定编号的所有消息进行回应。false表示只回应指定编号的消息。

知识点:推送范围

​ noLocal这个参数表示是否可以将消息推送给与消息发布者同一连接内的消费者,如果noLocal=false表示可以推送,noLocal=true则表示不能推送,那么就只能推送到其他连接中的消费者。

知识点:排他

知识点:回调函数

​ 回调函数主要用于定义针对消息的处理逻辑,一般采用如下方式定义:

Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery( String consumerTag, // 消费者标签,可用于消费者验证发送目标的正确性 Envelope envelope, // AMQP操作参数,可以获取其中参数进行消息处理 Amqp.BasicProperties properties, // 消息的基本属性 Byte[] body // 消息体,用于存放推送的消息 ){ // do something to handle deleveried message } }

知识点:AMQP操作参数

​ Envelope是RabbitMQ定义的用于封装AMQP操作参数的类,里面主要封装了四个参数:

private final long _deliveryTag;// 推送编号 private final boolean _redeliver; // 是否重发标签 private final String _exchange; // 当前操作对应的交换器 private final String _routingKey; // 关联的路由Key

​ 其中_redeliber=true表示这是一个失败的basicAck之后的消息的重新推送。这里面的这四个参数我们都可以在消费者客户端处理消息的时候使用。

​ 其实basicConsume是消费者订阅方法,目的在于订阅某个队列,是消息推送的前提,真正的消息推送并没有在Channel类中实现,因为推送操作是由RabbitMQ服务器自动发起的,不需要生产者或者消费者手动触发,所以也就没用提供接口。

​ 真正的消息推送是由RabbitMQ服务器的Basic.delivery方法实现的。

2、拉取消息basicGet GetResponse basicGet( String queue, // 指定队列名称 boolean autoAck // 是否自动回应 ) throws IOException;

​ 拉取消息就是指消费者主动从服务器获取消息,每次只能获取一条消息。推送消息是被动的获取。这里的autoAck和之前推送的设置一样,一般设置为false,表示不主动回应,采用手动回应(高可用要点之三)

【十】拒绝消息basicReject、basicNack 1、拒绝一个消息basicReject void basicReject( long deliveryTag, // 消息推送编号 boolean requeue // 是否重新入队 ) throws IOException; 2、拒绝多个消息basicNack void basicNack( long deliveryTag, // 消息推送编号 boolean multiple, // 是否批量拒绝 boolean requeue // 是否重新入队 )throws IOException;

知识点:重新入队

​ 当一个消息推送到某一个消费者,这个消费者无法处理时它进行了拒绝操作,如果指定requeue值为true,表示被拒绝的消息还可以重新发送到队列,可被继续推送到其他消费者,如果设置为false,那么这条消息会被立刻从队列删除。

​ 如果将这两个方法的requeue参数设置为false,那么可以启用死信队列功能,因为这样的话,返回的消息会变成死信,如果服务器中设置有死信交换器DLX,并且已关联到该队列,那么这个消息就会被发送到死信交换器,从而被路由到绑定的死信队列中得以保留,我们可以通过排查这些消息来进行服务器优化。

知识点:批量拒绝

​ 消息的单个拒绝与批量拒绝使用的不是同一个方法,批量拒绝的方法basicNack中有个决定是否批量拒绝的参数mutiple,如果设置为false,表示不执行批量拒绝,那么它的效果等同于basicReject方法,如果设置为true,表示拒绝小于指定推送编号的所有未被当前消费者消费的消息。

【十一】取消消费者basicCancel void basicCancel(String consumerTag // 指定要取消的消费者标签 ) throws IOException; 【十二】关闭连接 channel.close();// 关闭信道 connection.close();// 关闭TCP连接

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywzds.html