源码简析Spring-Integration执行过程 (3)

上面的代码中,我们就能清楚的看到为什么我们在demo中没有注册输入通道也能正常应用的原因了,从而回答之前的疑问。

protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel, List<Annotation> annotations) { .... else if (inputChannel instanceof SubscribableChannel) { //生成SubscribableChannel类型对应的执行端点 return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler); } else if (inputChannel instanceof PollableChannel) { return pollingConsumer(inputChannel, handler, pollers); } else { throw new IllegalArgumentException("Unsupported 'inputChannel' type: '" + inputChannel.getClass().getName() + "'. " + "Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'"); } }

通道类型一共有两种,一种是发布订阅,一种是可轮询的,我们是默认是走的第一种,因为DirectChannel默认就是个SubscribableChannel。所以最终我们生成了对应的信息端点类EventDrivenConsumer。

我们先看下EventDrivenConsumer整体结构:

源码简析Spring-Integration执行过程

EventDrivenConsumer上面有一个抽象类AbstractEndpoint,最上面实现了Lifecycle接口,所以生命周期跟着容器走,我们直接跳到star方法看:

@Override protected void doStart() { this.logComponentSubscriptionEvent(true); //把handler和inputChannel进行绑定 this.inputChannel.subscribe(this.handler); if (this.handler instanceof Lifecycle) { ((Lifecycle) this.handler).start(); } } @Override public synchronized boolean addHandler(MessageHandler handler) { Assert.notNull(handler, "handler must not be null"); Assert.isTrue(this.handlers.size() < this.maxSubscribers, "Maximum subscribers exceeded"); boolean added = this.handlers.add(handler); if (this.handlers.size() == 1) { this.theOneHandler = handler; } else { this.theOneHandler = null; } return added; }

上面的代码主要就是把handler注册到inputChannel中,这样只要inputChannel通道一收到信息,就会通知他注册的handlers进行处理。代码中比较清楚的记录了一切的操作,就不多解释了。

四,发送信息

执行完上面一系列的注册,已经把这一些的通道打通了,剩下的就是真正的发送操作了。下面分析下inputChannel.send(new GenericMessage<String>("World"));看看send操作:

/** * 在此频道上发送消息。 如果通道已满,则此方法将阻塞,直到发生超时或发送线程中断。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(请参阅send(Message) )。 * 参数: * messageArg – 要发送的消息 * timeout - 以毫秒为单位的超时时间 * 返回: * true如果消息发送成功, false如果消息无法在规定时间内发送或发送线程被中断 */ @Override public boolean send(Message<?> messageArg, long timeout) { ... try { //message是否需要转换 message = convertPayloadIfNecessary(message); //发送前拦截器 if (interceptorList.getSize() > 0) { interceptorStack = new ArrayDeque<>(); message = interceptorList.preSend(message, this, interceptorStack); if (message == null) { return false; } } if (this.metricsCaptor != null) { sample = this.metricsCaptor.start(); } //发送操作 sent = doSend(message, timeout); if (sample != null) { sample.stop(sendTimer(sent)); } metricsProcessed = true; if (debugEnabled) { logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message); } //发送后拦截器 if (interceptorStack != null) { interceptorList.postSend(message, this, sent); interceptorList.afterSendCompletion(message, this, sent, null, interceptorStack); } return sent; } catch (Exception ex) { ... } }

真正的send操作跟下去,会发现层次极深,碍于篇幅,我们直接跟到重点代码:

@Override protected final void handleMessageInternal(Message<?> message) { Object result; if (this.advisedRequestHandler == null) { //反射执行对应的端点方法 result = handleRequestMessage(message); } else { result = doInvokeAdvisedRequestHandler(message); } if (result != null) { //往outputChannel发送执行结果 sendOutputs(result, message); } ... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjwzz.html