JAVA 代码:
订阅消息并消费 /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue 【所订阅消费的队列】 * @param autoAck true if the server should consider messages 【是否为自动确定消息,好像TCP的ack syn 啊,可怕的7层模型!!!一般写true就可以】 * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object 【回调callback 这个马上上代码看看】 * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;接收到消息之后,马上有回调,来看看回调接口:
/** * Interface for application callback objects to receive notifications and messages from * a queue by subscription. * Most implementations will subclass {@link DefaultConsumer}. * <p/> * The methods of this interface are invoked in a dispatch * thread which is separate from the {@link Connection}'s thread. This * allows {@link Consumer}s to call {@link Channel} or {@link * Connection} methods without causing a deadlock. * <p/> * The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more * dispatch threads. {@link Consumer}s should avoid executing long-running code * because this will delay dispatch of messages to other {@link Consumer}s on the same * {@link Channel}. * * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer) * @see Channel#basicCancel */ public interface Consumer { /** * Called when the consumer is registered by a call to any of the * {@link Channel#basicConsume} methods. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleConsumeOk(String consumerTag); /** * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleCancelOk(String consumerTag); /** * Called when the consumer is cancelled for reasons <i>other than</i> by a call to * {@link Channel#basicCancel}. For example, the queue has been deleted. * See {@link #handleCancelOk} for notification of consumer * cancellation due to {@link Channel#basicCancel}. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @throws IOException */ void handleCancel(String consumerTag) throws IOException; /** * Called when either the channel or the underlying connection has been shut down. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down */ void handleShutdownSignal(String consumerTag, ShutdownSignalException sig); /** * Called when a <code><b>basic.recover-ok</b></code> is received * in reply to a <code><b>basic.recover</b></code>. All messages * received before this is invoked that haven't been <i>ack</i>'ed will be * re-delivered. All messages received afterwards won't be. * @param consumerTag the <i>consumer tag</i> associated with the consumer */ void handleRecoverOk(String consumerTag); /** * Called when a <code><b>basic.deliver</b></code> is received for this consumer. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @param envelope packaging data for the message * @param properties content header data for the message * @param body the message body (opaque, client-specific byte array) * @throws IOException if the consumer encounters an I/O error while processing the message * @see Envelope 宝宝累了 看重点,这个方法就是消息到达的时候回调的方法其他你们喜欢研究 可以认真看看英文备注。 */ void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException; }