Skip to the content.

阅读Spring IOC 源码时必定会遇到Spring事件监听相关的代码。并且Dubbo的Provider和Consumer的初始化都用到了Spring的事件监听,因此想了解一下Spring是怎么实现的。

想要使用事件监听,无非需要这样几个步骤。创建事件发布者,注册事件监听者,发布事件,接收事件。

Spring的版本是 5.2.6.RELEASE

spring-boot-starter-parent的版本是 2.3.0.RELEASE

组件介绍

ApplicationEventPublisher 是事件发布者,它只有一个功能就是发布事件给广播器

ApplicationEventMulticaster 是事件广播器,它接收来自发布者的事件,并为事件寻找对应的监听者,然后将事件发送给所有监听者。

ApplicationEvent 是事件,它有许多抽象类和实现类表示不同的事件。

ApplicationListener 事件的接收者,它只有一个功能就是实现消费事件的逻辑。

初始化

事件监听相关的初始化,也是和IOC初始化的流程嵌套在一起的。

// AbstractApplicationContext.java 584
// 这是IOC初始化refer方法内部调用的第一个方法
protected void prepareRefresh() {
    // Switch to active.
    this.startupDate = System.currentTimeMillis();
    this.closed.set(false);
    this.active.set(true);

    if (logger.isDebugEnabled()) {...}

    initPropertySources();

    getEnvironment().validateRequiredProperties();

    // 创建一个提前的监听者。如果没有就直接创建,如果已经存在就先清除然后创建。
    if (this.earlyApplicationListeners == null) {
        this.earlyApplicationListeners = new LinkedHashSet<>(this.applicationListeners);
    } else {
        this.applicationListeners.clear();
        this.applicationListeners.addAll(this.earlyApplicationListeners);
    }

    // 创建一个提前的事件存储,用于存储初始化之前的事件
    this.earlyApplicationEvents = new LinkedHashSet<>();
}

接下来就是初始化广播器

// AbstractApplicationContext.java 763
// 初始化事件广播器
protected void initApplicationEventMulticaster() {
    ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    // 如果beanFactory中已经有那就用已经有的,否则使用默认的SimpleApplicationEventMulticaster
    //,也就是Spring事件监听的默认实现
    if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
        this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
    }
    else {
        this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
        beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
    }
}

初始化广播器完成之后,就是注册监听者,并且销毁earlyApplicationEvents

// AbstractApplicationContext.java 823
// 前面初始化广播器成功了,现在则是往广播器注册监听者
protected void registerListeners() {
    // 向广播器注册监听者对象
    for (ApplicationListener<?> listener : getApplicationListeners()) {
        getApplicationEventMulticaster().addApplicationListener(listener);
    }

    // 注册使用注解的监听者的beanName。上面的监听者列表加上这个监听者beanName才是所有的监听者。
    String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
    for (String listenerBeanName : listenerBeanNames) {
        getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
    }

    // 将earlyEventsToProcess的事件全部发送出去,然后清空earlyEventsToProcess
    Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
    this.earlyApplicationEvents = null;
    if (earlyEventsToProcess != null) {
        for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
            getApplicationEventMulticaster().multicastEvent(earlyEvent);
        }
    }
}

到了这一步,事件的初始化已经完成了。

总结一下,就是创建一个提前事件用于存储广播器创建之前的事件。然后创建默认的广播器,然后注册监听者,然后注册其余使用注解的监听者的beanName,然后将提前事件全部用广播器发送出去,接着清空提前事件。

广播事件

首先看一下广播器大致有什么方法。 大致有三类方法:添加监听者、移除监听者、广播事件

// ApplicationEventMulticaster.java
// 这是Spring 广播器的接口
public interface ApplicationEventMulticaster {

    // 向广播器添加监听者
    void addApplicationListener(ApplicationListener<?> listener);

    // 向广播器添加监听者beanName
    void addApplicationListenerBean(String listenerBeanName);

    // 从广播器移除监听者
    void removeApplicationListener(ApplicationListener<?> listener);

    // 从广播器移除监听者beanName
    void removeApplicationListenerBean(String listenerBeanName);

    // 移除广播器的所有监听者
    void removeAllListeners();

    // 广播事件
    void multicastEvent(ApplicationEvent event);

    // 广播事件并指定事件的类型
    void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType);

}

广播的类型是ApplicationEvent,这是一个抽象类,继承自Java的EventObject。ApplicationEvent被很多的事件类继承了,每个类表示一个事件。

广播一个事件从发布者发布事件开始

广播一个事件

public interface ApplicationEventPublisher {

    default void publishEvent(ApplicationEvent event) {
        publishEvent((Object) event);
    }
    // 发布事件
    void publishEvent(Object event);
}

// AbstractApplicationContext.java 372行
public void publishEvent(Object event) {
    publishEvent(event, null);
}

// AbstractApplicationContext.java 383行
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
    Assert.notNull(event, "Event must not be null");

    // event可以转成ApplicationEvent就转成这个。否则用PayloadApplicationEvent来包装。
    // event的source属性为当前的ApplicationContext
    ApplicationEvent applicationEvent;
    if (event instanceof ApplicationEvent) {
        applicationEvent = (ApplicationEvent) event;
    }
    else {
        applicationEvent = new PayloadApplicationEvent<>(this, event);
        if (eventType == null) {
            eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
        }
    }

    // Spring允许在事件广播器初始化之前就发布事件。因为这时候还没有事件广播器
    // ,所以这时候发布的事件得先暂时存储到earlyApplicationEvents中
    // ,待广播器初始化完成之后再发送。
    // 广播器初始化之后的发送逻辑见:AbstractApplicationContext.java 837行
    if (this.earlyApplicationEvents != null) {
        this.earlyApplicationEvents.add(applicationEvent);
    }
    else {
        // 事件广播器已经初始化完成了,直接发送
        getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
    }

    if (this.parent != null) {
        if (this.parent instanceof AbstractApplicationContext) {
            ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
        }
        else {
            this.parent.publishEvent(event);
        }
    }
}

首先进行类型转化,然后调用事件广播器广播方法将事件广播出去。

// ApplicationEvent.java
public abstract class ApplicationEvent extends EventObject {

    private static final long serialVersionUID = 7099057708183571937L;

    private final long timestamp;
    
    // source字段是EventObject的一个字段,表示发布这个事件的对象。
    // 例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。
    public ApplicationEvent(Object source) {
        super(source);
        this.timestamp = System.currentTimeMillis();
    }

    public final long getTimestamp() {
        return this.timestamp;
    }

}

回到AbstractApplicationContext.java的841行,追踪一下发布事件的流程。

// ApplicationEventMulticaster.java
public interface ApplicationEventMulticaster {

    // 到这里了,然后看它的实现
    void multicastEvent(ApplicationEvent event);
}


public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
    // 从上面跳转到了它的实现类 SimpleApplicationEventMulticaster.java 126行
    // 虽然上面调用的是只有一个参数的方法,其实它是先手动解析到类型,然后再去调用两个参数的方法
    @Override
    public void multicastEvent(ApplicationEvent event) {
        multicastEvent(event, resolveDefaultEventType(event));
    }

    @Override
    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
        // 如果eventType不为空就用eventType,否则的话解析一个eventType出来
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        
        // 拿到了线程池说明是异步的模式,同步的模式是没有线程池的
        Executor executor = getTaskExecutor();
        for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            
            // 异步广播事件
            if (executor != null) {
                executor.execute(() -> invokeListener(listener, event));
            } else {
                // 同步广播事件
                invokeListener(listener, event);
            }
        }
    }
}

上面的multicastEvent就是核心逻辑了,首先拿到需要发送事件的监听者,然后向每个监听者发送事件。

我们看看上个方法里面的 invokeListener

// SimpleApplicationEventMulticaster.java 154
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
    ErrorHandler errorHandler = getErrorHandler();
    if (errorHandler != null) {
        try {
            doInvokeListener(listener, event);
        }
        catch (Throwable err) {
            errorHandler.handleError(err);
        }
    }
    else {
        doInvokeListener(listener, event);
    }
}

// SimpleApplicationEventMulticaster.java 170
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    try {
        // 回调继承的方法
        listener.onApplicationEvent(event);
    }
    catch (ClassCastException ex) {...}
}

到这里整个事件的广播就已经完成了。监听者已经收到了消息

接下来看看getApplicationListeners方法是如何通过eventType和sourceType获取到事件的监听者列表的。

getApplicationListeners

// AbstractApplicationEventMulticaster.java 173
public interface AbstractApplicationEventMulticaster {
    protected Collection<ApplicationListener<?>> getApplicationListeners(
            ApplicationEvent event, ResolvableType eventType) {

        // source 表示事件的发布者。例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。
        Object source = event.getSource();
        Class<?> sourceType = (source != null ? source.getClass() : null);
        ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

        // 尝试去缓存中获取
        ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
        if (retriever != null) {
            return retriever.getApplicationListeners();
        }

        if (this.beanClassLoader == null 
        || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) 
            && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
            // 加锁,然后double check,再创建一个ListenerRetriever并放到缓存中
            synchronized (this.retrievalMutex) {
                retriever = this.retrieverCache.get(cacheKey);
                if (retriever != null) {
                    return retriever.getApplicationListeners();
                }
                retriever = new ListenerRetriever(true);
                
                // 有缓存的情况,初始化ListenerRetriever之后加入到缓存中
                Collection<ApplicationListener<?>> listeners =
                        retrieveApplicationListeners(eventType, sourceType, retriever);
                this.retrieverCache.put(cacheKey, retriever);
                return listeners;
            }
        }
        else {
            // 不能用缓存,每次都实时生成。这个方法入参的retriever为null,也就是不使用缓存
            return retrieveApplicationListeners(eventType, sourceType, null);
        }
    }
}

ListenerRetriever存储的是eventType和sourceType对应的监听者列表。

而retrieverCache这个ConcurrentHashMap的键为eventType和sourceType,值则是对应的监听者列表

分析一下retriever.getApplicationListeners();

retriever.getApplicationListeners()

// AbstractApplicationEventMulticaster.java 418
private class ListenerRetriever {

    public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();

    public final Set<String> applicationListenerBeans = new LinkedHashSet<>();

    private final boolean preFiltered;

    public ListenerRetriever(boolean preFiltered) {
        this.preFiltered = preFiltered;
    }

    public Collection<ApplicationListener<?>> getApplicationListeners() {
        // 构建一个列表用于存储 listener 和 listenerBean
        List<ApplicationListener<?>> allListeners = new ArrayList<>(
                this.applicationListeners.size() + this.applicationListenerBeans.size());
        // listener可以直接加到返回列表中
        allListeners.addAll(this.applicationListeners);
        // listenerBean需要实时去beanFactory中通过beanName获取。
        if (!this.applicationListenerBeans.isEmpty()) {
            BeanFactory beanFactory = getBeanFactory();
            for (String listenerBeanName : this.applicationListenerBeans) {
                try {
                    ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                    if (this.preFiltered || !allListeners.contains(listener)) {
                        allListeners.add(listener);
                    }
                }
                catch (NoSuchBeanDefinitionException ex) {
                    // Singleton listener instance (without backing bean definition) disappeared -
                    // probably in the middle of the destruction phase
                }
            }
        }
        // 按照广播的优先级排序再返回
        if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) {
            AnnotationAwareOrderComparator.sort(allListeners);
        }
        return allListeners;
    }
}

retriever.getApplicationListeners(); 这个方法就是返回eventType和sourceType对应的监听者。

存储监听者的数据结构有两个:

初始化retriever

// AbstractApplicationEventMulticaster.java 215
// 这个方法的作用是初始化retrieval
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
        ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) {
    
    // allListeners用于存储需要返回的所有监听者,包括已经getBean的listenerBean
    List<ApplicationListener<?>> allListeners = new ArrayList<>();
    
    // 定义并初始化
    Set<ApplicationListener<?>> listeners;
    Set<String> listenerBeans;
    synchronized (this.retrievalMutex) {
        listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
        listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
    }

    // 遍历所有监听者,如果监听者支持eventType和sourceType事件,那就加入到返回结果中
    for (ApplicationListener<?> listener : listeners) {
        if (supportsEvent(listener, eventType, sourceType)) {
            // 同时将eventType和sourceType对应的监听者加到对应的retriever中。
            if (retriever != null) {
                retriever.applicationListeners.add(listener);
            }
            allListeners.add(listener);
        }
    }

    // 上面处理listener,这里处理listenersBeans
    if (!listenerBeans.isEmpty()) {
        ConfigurableBeanFactory beanFactory = getBeanFactory();
        // 遍历所有listenerBeanName
        for (String listenerBeanName : listenerBeans) {
            try {
                // 如果listenerBeanName支持eventType那就加入到返回结果中
                if (supportsEvent(beanFactory, listenerBeanName, eventType)) {
                    ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                    if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
                        if (retriever != null) {
                            // 如果是单例bean不会再变化了,那就加到listeners中
                            if (beanFactory.isSingleton(listenerBeanName)) {
                                retriever.applicationListeners.add(listener);
                            }
                            else {
                                // 其它类型的bean后续可能变化,就不能直接存储bean对象
                                // ,我们得存储beanName,然后每次返回的时候实时去beanFactory中获取最新的bean对象。
                                retriever.applicationListenerBeans.add(listenerBeanName);
                            }
                        }
                        allListeners.add(listener);
                    }
                }
                else {
                    // 如果listenerBeanName不支持eventType那就从返回结果中移除,并且从retriever中移除
                    Object listener = beanFactory.getSingleton(listenerBeanName);
                    if (retriever != null) {
                        retriever.applicationListeners.remove(listener);
                    }
                    allListeners.remove(listener);
                }
            }
            catch (NoSuchBeanDefinitionException ex) {

            }
        }
    }

    AnnotationAwareOrderComparator.sort(allListeners);
    if (retriever != null && retriever.applicationListenerBeans.isEmpty()) {
        retriever.applicationListeners.clear();
        retriever.applicationListeners.addAll(allListeners);
    }
    return allListeners;
}

问题

IOC初始化流程中,提前的事件存储是用来存储什么事件的,在这个类中没有看见过它被添加元素?

AbstractApplicationContext.java的399行。 Spring允许在事件广播器初始化之前就发布事件。因为这时候还没有事件广播器,所以这时候的发布的事件得先暂时存储到earlyApplicationEvents中,待广播器初始化完成之后再发送。广播器初始化之后的发送逻辑见:AbstractApplicationContext.java 837行

registerListeners方法里面,已经注册了监听者,为什么还要注册监听者的beanName?有什么用?

applicationListeners 存储初始化之后就不会变化的监听者,所以可以直接存储对象。 applicationListenerBeans 存储的事监听者的beanName,每次返回的时候都需要去beanFactory中实时getBean。例如 property 类型的Bean的监听者就需要放到这个结构中。

ApplicationEvent类中的source表示什么?

表示发布这个事件的对象。例如A对象调用事件发布方法发布了一个参与事件,那么参与事件中的source就是A。

ListenerRetriever和retrieverCache的关系是怎么样的?有什么用?

retrieverCache是一个ConcurrentHashMap类,用于缓存eventType和soureType对应的监听者列表。 retrieverCache的key是ListenerCacheKey类,它有两个对象eventType和sourceType。 retrieverCache的value是ListenerRetriever类,它存储监听者列表。通过getApplicationListeners返回它存储的列表。