Spring-Integration基于Spring,在应用程序中启用了轻量级消息传递,并支持通过声明式适配器与外部系统集成。这一段官网的介绍,概况了整个Integration的用途。个人感觉消息传递是真正的重点。
如上图所示,典型的生产者-消费者模式,中间通过一个特定的通道进行数据传输,说到这,是不是隐隐感觉到queue的存在。确实事实上这个所谓的通道默认就是用的 blockingqueue。
Spring-Integration网上的资料是真少,再加上源码分析的是更少。关于Spring-Integration的基本介绍直接去官网上看更加的直观,这边就不累述了。
今天主要是看个简单的hello word进来分析下整个执行过程。
先看下代码:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd https://www.springframework.org/schema/integration/spring-integration.xsd"> <annotation-config/> <channel > <queue/> </channel> <beans:bean/> </beans:beans> @Configuration public class Beans { @ServiceActivator(inputChannel = "ic", outputChannel = "oc") public String sayHello(String name) { return "Hello " + name; } } public class HelloWorldDemo { @Test public void testDemo() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("/demo.xml", HelloWorldDemo.class); DirectChannel inputChannel = context.getBean("ic", DirectChannel.class); PollableChannel outputChannel = context.getBean("oc", PollableChannel.class); inputChannel.send(new GenericMessage<String>("World")); System.out.println("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload()); context.close(); } } out: ==> HelloWorldDemo: Hello World 二,ServiceActivator上面的代码演示了调用方法的入站通道适配器和标准的出站通道适配器, 它们之间是一个带注解的ServiceActivator。关于这个ServiceActivator就是一个消息端点。
消息端点的主要作用是以非侵入性方式将应用程序代码连接到消息传递框架。换句话说,理想情况下,应用程序代码应该不知道消息对象或消息管道。这类似于 MVC 范式中controller 的作用。正如controller 处理 HTTP 请求一样,消息端点处理消息。以及controller 映射到 URL 模式一样,消息端点映射到消息通道。这两种情况的目标是相同的。
ServiceActivator是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,还可以提供输出消息通道。
具体流程如下图:
上面的代码比较简单,但是或许会发现我们只定义了输出通道oc,输入通道ic竟然没有定义也能正常应用,是不是很奇怪?带着疑问我们先看下ServiceActivator的源码:
注释上写的很清楚,如果输入通道不存在,将在应用程序上下文中注册具有此名称的DirectChannel 。具体在哪定义,我们后面会看到,现在不急,先一步步来看他的执行过程。
我们全局查找ServiceActivator,看他是哪边进行处理的,最后发现了MessagingAnnotationPostProcessor类,用来处理方法级消息注解的BeanPostProcessor实现。
@Override public void afterPropertiesSet() { Assert.notNull(this.beanFactory, "BeanFactory must not be null"); ((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition( IntegrationContextUtils.DISPOSABLES_BEAN_NAME, BeanDefinitionBuilder.genericBeanDefinition(Disposables.class, Disposables::new) .getRawBeanDefinition()); this.postProcessors.put(Filter.class, new FilterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Transformer.class, new TransformerAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(InboundChannelAdapter.class, new InboundChannelAdapterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(BridgeFrom.class, new BridgeFromAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(BridgeTo.class, new BridgeToAnnotationPostProcessor(this.beanFactory)); Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> customPostProcessors = setupCustomPostProcessors(); if (!CollectionUtils.isEmpty(customPostProcessors)) { this.postProcessors.putAll(customPostProcessors); } }