handleRequestMessage的操作就是用之前我们handler中的委托类MessagingMethodInvokerHelper去反射运行对应的端点方法,然后把执行结果发送outputChannel。最后我们直接定位到具体的发送操作:
@Override protected boolean doSend(Message<?> message, long timeout) { Assert.notNull(message, "'message' must not be null"); try { if (this.queue instanceof BlockingQueue) { BlockingQueue<Message<?>> blockingQueue = (BlockingQueue<Message<?>>) this.queue; if (timeout > 0) { return blockingQueue.offer(message, timeout, TimeUnit.MILLISECONDS); } if (timeout == 0) { return blockingQueue.offer(message); } blockingQueue.put(message); return true; } else { try { return this.queue.offer(message); } finally { this.queueSemaphore.release(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } }看到这,我们就明白了数据的去向,存储在队列里了,生产者产生的数据就已经生成了,所以发送的操作基本上就告一段落了。
五,接收信息数据已经生成,后面就是看如何消费操作了,下面分析下 outputChannel.receive(0).getPayload()操作:
/** * 从该通道接收第一条可用消息。 如果通道不包含任何消息,则此方法将阻塞,直到分配的超时时间过去。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(参见receive() )。 * 参数: * timeout - 以毫秒为单位的超时时间 * 返回: * 如果在分配的时间内没有可用的消息或接收线程被中断,则为第一个可用消息或null 。 */ @Override // NOSONAR complexity @Nullable public Message<?> receive(long timeout) { ... try { //接受前拦截器操作 if (interceptorList.getSize() > 0) { interceptorStack = new ArrayDeque<>(); //一旦调用接收并在实际检索消息之前调用 if (!interceptorList.preReceive(this, interceptorStack)) { return null; } } //接收操作 Message<?> message = doReceive(timeout); ... //在检索到 Message 之后但在将其返回给调用者之前立即调用。 必要时可以修改消息 if (interceptorStack != null && message != null) { message = interceptorList.postReceive(message, this); } //在接收完成后调用,而不管已引发的任何异常,从而允许适当的资源清理 interceptorList.afterReceiveCompletion(message, this, null, interceptorStack); return message; } catch (RuntimeException ex) { ... } }最后的doReceive操作,其实大家都心知肚明了,就是从上面的队列中直接读取数据,代码比较简单,就不注释了:
@Override @Nullable protected Message<?> doReceive(long timeout) { try { if (timeout > 0) { if (this.queue instanceof BlockingQueue) { return ((BlockingQueue<Message<?>>) this.queue).poll(timeout, TimeUnit.MILLISECONDS); } else { return pollNonBlockingQueue(timeout); } } if (timeout == 0) { return this.queue.poll(); } if (this.queue instanceof BlockingQueue) { return ((BlockingQueue<Message<?>>) this.queue).take(); } else { Message<?> message = this.queue.poll(); while (message == null) { this.queueSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS); // NOSONAR ok to ignore result message = this.queue.poll(); } return message; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } 六,结语能坚持看到这的,基本上都是勇士了。这一系列的执行过程其实还是比较绕的,我估计有些人看得也是云里雾里。其实我已经尽量精简了许多,Spring-Integration其实涉及到的应用分支更多,我这也只是十分基础的东西,我只能把我自己知道的先记录下来。如果让你对Spring-Integration产生了兴趣,那本文的目的就达到了。这需要你自己去实地操作研究下,总是有收获的。O(∩_∩)O谢谢