阅读Spring IOC 源码时必定会遇到Spring事件监听相关的代码。并且Dubbo的Provider和Consumer的初始化都用到了Spring的事件监听,因此想了解一下Spring是怎么实现的。
想要使用事件监听,无非需要这样几个步骤。创建事件发布者,注册事件监听者,发布事件,接收事件。
Spring的版本是 5.2.6.RELEASE
spring-boot-starter-parent的版本是 2.3.0.RELEASE
组件介绍
ApplicationEventPublisher 是事件发布者,它只有一个功能就是发布事件给广播器
ApplicationEventMulticaster 是事件广播器,它接收来自发布者的事件,并为事件寻找对应的监听者,然后将事件发送给所有监听者。
ApplicationEvent 是事件,它有许多抽象类和实现类表示不同的事件。
ApplicationListener 事件的接收者,它只有一个功能就是实现消费事件的逻辑。
初始化
事件监听相关的初始化,也是和IOC初始化的流程嵌套在一起的。
// AbstractApplicationContext.java 584
// 这是IOC初始化refer方法内部调用的第一个方法
protected void prepareRefresh() {
// Switch to active.
this.startupDate = System.currentTimeMillis();
this.closed.set(false);
this.active.set(true);
if (logger.isDebugEnabled()) {...}
initPropertySources();
getEnvironment().validateRequiredProperties();
// 创建一个提前的监听者。如果没有就直接创建,如果已经存在就先清除然后创建。
if (this.earlyApplicationListeners == null) {
this.earlyApplicationListeners = new LinkedHashSet<>(this.applicationListeners);
} else {
this.applicationListeners.clear();
this.applicationListeners.addAll(this.earlyApplicationListeners);
}
// 创建一个提前的事件存储,用于存储初始化之前的事件
this.earlyApplicationEvents = new LinkedHashSet<>();
}
接下来就是初始化广播器
// AbstractApplicationContext.java 763
// 初始化事件广播器
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
// 如果beanFactory中已经有那就用已经有的,否则使用默认的SimpleApplicationEventMulticaster
//,也就是Spring事件监听的默认实现
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
}
}
初始化广播器完成之后,就是注册监听者,并且销毁earlyApplicationEvents
// AbstractApplicationContext.java 823
// 前面初始化广播器成功了,现在则是往广播器注册监听者
protected void registerListeners() {
// 向广播器注册监听者对象
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// 注册使用注解的监听者的beanName。上面的监听者列表加上这个监听者beanName才是所有的监听者。
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// 将earlyEventsToProcess的事件全部发送出去,然后清空earlyEventsToProcess
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
到了这一步,事件的初始化已经完成了。
总结一下,就是创建一个提前事件用于存储广播器创建之前的事件。然后创建默认的广播器,然后注册监听者,然后注册其余使用注解的监听者的beanName,然后将提前事件全部用广播器发送出去,接着清空提前事件。
广播事件
首先看一下广播器大致有什么方法。 大致有三类方法:添加监听者、移除监听者、广播事件
// ApplicationEventMulticaster.java
// 这是Spring 广播器的接口
public interface ApplicationEventMulticaster {
// 向广播器添加监听者
void addApplicationListener(ApplicationListener<?> listener);
// 向广播器添加监听者beanName
void addApplicationListenerBean(String listenerBeanName);
// 从广播器移除监听者
void removeApplicationListener(ApplicationListener<?> listener);
// 从广播器移除监听者beanName
void removeApplicationListenerBean(String listenerBeanName);
// 移除广播器的所有监听者
void removeAllListeners();
// 广播事件
void multicastEvent(ApplicationEvent event);
// 广播事件并指定事件的类型
void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType);
}
广播的类型是ApplicationEvent,这是一个抽象类,继承自Java的EventObject。ApplicationEvent被很多的事件类继承了,每个类表示一个事件。
广播一个事件从发布者发布事件开始
广播一个事件
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
// 发布事件
void publishEvent(Object event);
}
// AbstractApplicationContext.java 372行
public void publishEvent(Object event) {
publishEvent(event, null);
}
// AbstractApplicationContext.java 383行
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// event可以转成ApplicationEvent就转成这个。否则用PayloadApplicationEvent来包装。
// event的source属性为当前的ApplicationContext
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Spring允许在事件广播器初始化之前就发布事件。因为这时候还没有事件广播器
// ,所以这时候发布的事件得先暂时存储到earlyApplicationEvents中
// ,待广播器初始化完成之后再发送。
// 广播器初始化之后的发送逻辑见:AbstractApplicationContext.java 837行
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 事件广播器已经初始化完成了,直接发送
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
首先进行类型转化,然后调用事件广播器广播方法将事件广播出去。
// ApplicationEvent.java
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp;
// source字段是EventObject的一个字段,表示发布这个事件的对象。
// 例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
public final long getTimestamp() {
return this.timestamp;
}
}
回到AbstractApplicationContext.java的841行,追踪一下发布事件的流程。
// ApplicationEventMulticaster.java
public interface ApplicationEventMulticaster {
// 到这里了,然后看它的实现
void multicastEvent(ApplicationEvent event);
}
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
// 从上面跳转到了它的实现类 SimpleApplicationEventMulticaster.java 126行
// 虽然上面调用的是只有一个参数的方法,其实它是先手动解析到类型,然后再去调用两个参数的方法
@Override
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));
}
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// 如果eventType不为空就用eventType,否则的话解析一个eventType出来
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 拿到了线程池说明是异步的模式,同步的模式是没有线程池的
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 异步广播事件
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
} else {
// 同步广播事件
invokeListener(listener, event);
}
}
}
}
上面的multicastEvent就是核心逻辑了,首先拿到需要发送事件的监听者,然后向每个监听者发送事件。
我们看看上个方法里面的 invokeListener
// SimpleApplicationEventMulticaster.java 154
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
// SimpleApplicationEventMulticaster.java 170
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 回调继承的方法
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {...}
}
到这里整个事件的广播就已经完成了。监听者已经收到了消息
接下来看看getApplicationListeners方法是如何通过eventType和sourceType获取到事件的监听者列表的。
getApplicationListeners
// AbstractApplicationEventMulticaster.java 173
public interface AbstractApplicationEventMulticaster {
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
// source 表示事件的发布者。例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// 尝试去缓存中获取
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null
|| (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader)
&& (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// 加锁,然后double check,再创建一个ListenerRetriever并放到缓存中
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
// 有缓存的情况,初始化ListenerRetriever之后加入到缓存中
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// 不能用缓存,每次都实时生成。这个方法入参的retriever为null,也就是不使用缓存
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
}
ListenerRetriever存储的是eventType和sourceType对应的监听者列表。
而retrieverCache这个ConcurrentHashMap的键为eventType和sourceType,值则是对应的监听者列表
分析一下retriever.getApplicationListeners();
retriever.getApplicationListeners()
// AbstractApplicationEventMulticaster.java 418
private class ListenerRetriever {
public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
public final Set<String> applicationListenerBeans = new LinkedHashSet<>();
private final boolean preFiltered;
public ListenerRetriever(boolean preFiltered) {
this.preFiltered = preFiltered;
}
public Collection<ApplicationListener<?>> getApplicationListeners() {
// 构建一个列表用于存储 listener 和 listenerBean
List<ApplicationListener<?>> allListeners = new ArrayList<>(
this.applicationListeners.size() + this.applicationListenerBeans.size());
// listener可以直接加到返回列表中
allListeners.addAll(this.applicationListeners);
// listenerBean需要实时去beanFactory中通过beanName获取。
if (!this.applicationListenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : this.applicationListenerBeans) {
try {
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (this.preFiltered || !allListeners.contains(listener)) {
allListeners.add(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
// 按照广播的优先级排序再返回
if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) {
AnnotationAwareOrderComparator.sort(allListeners);
}
return allListeners;
}
}
retriever.getApplicationListeners(); 这个方法就是返回eventType和sourceType对应的监听者。
存储监听者的数据结构有两个:
- applicationListeners 存储初始化之后就不会变化的监听者,所以可以直接存储对象。
- applicationListenerBeans 存储的事监听者的beanName,每次返回的时候都需要去beanFactory中实时getBean。例如 property 类型的Bean的监听者就需要放到这个结构中。
初始化retriever
// AbstractApplicationEventMulticaster.java 215
// 这个方法的作用是初始化retrieval
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
// allListeners用于存储需要返回的所有监听者,包括已经getBean的listenerBean
List<ApplicationListener<?>> allListeners = new ArrayList<>();
// 定义并初始化
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
// 遍历所有监听者,如果监听者支持eventType和sourceType事件,那就加入到返回结果中
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
// 同时将eventType和sourceType对应的监听者加到对应的retriever中。
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
// 上面处理listener,这里处理listenersBeans
if (!listenerBeans.isEmpty()) {
ConfigurableBeanFactory beanFactory = getBeanFactory();
// 遍历所有listenerBeanName
for (String listenerBeanName : listenerBeans) {
try {
// 如果listenerBeanName支持eventType那就加入到返回结果中
if (supportsEvent(beanFactory, listenerBeanName, eventType)) {
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
// 如果是单例bean不会再变化了,那就加到listeners中
if (beanFactory.isSingleton(listenerBeanName)) {
retriever.applicationListeners.add(listener);
}
else {
// 其它类型的bean后续可能变化,就不能直接存储bean对象
// ,我们得存储beanName,然后每次返回的时候实时去beanFactory中获取最新的bean对象。
retriever.applicationListenerBeans.add(listenerBeanName);
}
}
allListeners.add(listener);
}
}
else {
// 如果listenerBeanName不支持eventType那就从返回结果中移除,并且从retriever中移除
Object listener = beanFactory.getSingleton(listenerBeanName);
if (retriever != null) {
retriever.applicationListeners.remove(listener);
}
allListeners.remove(listener);
}
}
catch (NoSuchBeanDefinitionException ex) {
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
if (retriever != null && retriever.applicationListenerBeans.isEmpty()) {
retriever.applicationListeners.clear();
retriever.applicationListeners.addAll(allListeners);
}
return allListeners;
}
问题
IOC初始化流程中,提前的事件存储是用来存储什么事件的,在这个类中没有看见过它被添加元素?
AbstractApplicationContext.java的399行。 Spring允许在事件广播器初始化之前就发布事件。因为这时候还没有事件广播器,所以这时候的发布的事件得先暂时存储到earlyApplicationEvents中,待广播器初始化完成之后再发送。广播器初始化之后的发送逻辑见:AbstractApplicationContext.java 837行
registerListeners方法里面,已经注册了监听者,为什么还要注册监听者的beanName?有什么用?
applicationListeners 存储初始化之后就不会变化的监听者,所以可以直接存储对象。 applicationListenerBeans 存储的事监听者的beanName,每次返回的时候都需要去beanFactory中实时getBean。例如 property 类型的Bean的监听者就需要放到这个结构中。
ApplicationEvent类中的source表示什么?
表示发布这个事件的对象。例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。
ListenerRetriever和retrieverCache的关系是怎么样的?有什么用?
retrieverCache是一个ConcurrentHashMap类,用于缓存eventType和soureType对应的监听者列表。 retrieverCache的key是ListenerCacheKey类,它有两个对象eventType和sourceType。 retrieverCache的value是ListenerRetriever类,它存储监听者列表。通过getApplicationListeners返回它存储的列表。