@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
//获取事件类型
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
//遍历所有和事件匹配的事件监听器
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
//获取事件发布器内的任务执行器,默认该方法返回null
Executor executor = getTaskExecutor();
if (executor != null) {
//异步回调监听方法
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
//同步回调监听方法
invokeListener(listener, event);
}
}
}
首先通过传入的参数或者通过调用resolveDefaultEventType(event)方法获取事件的事件类型信息,之后会通过
getApplicationListeners(event, type)方法得到所有和该事件类型匹配的事件监听器,其实现逻辑后面会细说,这里先跳过。对于匹配的每一个监听器,视事件发布器内是否设置了
任务执行器实例Executor,决定以何种方式对监听器的监听方法进行回调。
若执行器实例Executor未设置,则进行同步回调,即在当前线程执行监听器的回调方法
若用户设置了Executor实例(通常而言是线程池),则会进行异步回调,监听器的监听方法会交由线程池中的线程去执行。
默认情况下容器不会为用户创建执行器实例,因而对监听器的回调是同步进行的,即所有监听器的监听方法都在推送事件的线程中被执行,通常这也是处理业务逻辑的线程,若其中一个监听器回调执行
阻塞,则会阻塞整个业务处理的线程,造成严重的后果。而异步回调的方式,虽然不会导致业务处理线程被阻塞,但是不能共享一些业务线程的上下文资源,比如类加载器,事务等等。因而究竟选择哪种回调
方式,要视具体业务场景而定。
好了,现在可以来探究下困扰我们很久的一个问题了,那就是:如何根据事件类型找到匹配的所有事件监听器?这部分逻辑在getApplicationListeners方法中
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
//获取事件中的事件源对象
Object source = event.getSource();
//获取事件源类型
Class<?> sourceType = (source != null ? source.getClass() : null);
//以事件类型和事件源类型为参数构建一个cacheKey,用于从缓存map中获取与之匹配的监听器列表
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
//从缓存中获取监听器列表
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
//查找所有与发布事件匹配的监听器列表
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
//将匹配结果缓存到map中
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
整个流程可以概括为:
1.首先从缓存map中查找,这个map定义在事件发布器的抽象类中
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache =
new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);