Guava中EventBus的使用和实现原理
Healthy Mind Lv3

在项目中为了实现模块解耦,我们可以使用订阅-消费(观察者或事件)这种模式来实现模块解耦,实现方式可以使用现有的,也可以自己实现,现有的比较普遍的:

  • spring 事件-监听 实现
  • jdk自带的Observer Observable
  • Guava 中的EventBus

我们可以根据项目中的实际环境来做使用,本文主要介绍Guava 中的 Event Bus。

​ 在guava中添加也实现 了 观察者模式:

  • 提供了异步功能
  • 支持方法级别@Subscribe 的订阅

Guava EventBus 示例

EventListener 监听器 相当于 Observer

在guava中 @Subscribe 标注的方法就相当于 一个事件监听,即观察者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.trs.guava.enventBus;

import com.google.common.eventbus.Subscribe;

/**
* @Description
* @DATE 2021.5.11 13:36
**/

public class EventListener {


private String name;

public EventListener(String name) {
this.name = name;
}

@Subscribe
public String createOrder(CreateOrderEvent event){
Object source = event.getSource();
System.out.println(this.name + "createOrder = "+source);
return source.toString();
}
@Subscribe
public String playOrder(PlayOrderEvent event){
Object source = event.getSource();

System.out.println("playOrder = "+source);
return source.toString();
}
}

CreateOrderEvent事件类 guava中的事件可以使用任意类型的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.trs.guava.enventBus;

import java.util.EventObject;

/**
* @Description
* @DATE 2021.5.11 13:37
**/

public class CreateOrderEvent extends EventObject {



/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public CreateOrderEvent(Object source) {
super(source);
}
}

BusStart 即注册监听 在实际项目使用中可以使用 配置文件进行配置注册,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.trs.guava.enventBus;

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.concurrent.Executor;

/**
* @Description
* @DATE 2021.5.11 13:43
**/

public class BusStart {

public static void main(String[] args) {
Executor executor = MoreExecutors.directExecutor();
AsyncEventBus asyncEventBus = new AsyncEventBus(executor);
//注册监听
asyncEventBus.register(new EventListener("Listener_1"));
asyncEventBus.register(new EventListener("Listener_2"));
asyncEventBus.post(new CreateOrderEvent(new String("创建订单")));
// EventBus eventBus = new EventBus();
// eventBus.register(new EventListener("Listener_1"));
// eventBus.register(new EventListener("Listener_2"));
// eventBus.post(new CreateOrderEvent(new String("创建订单")));
}
}

运行结果:

Listener_1createOrder = 创建订单
Listener_2createOrder = 创建订单

Guava EventBus 源码学习

​ guava eventbus模块主要有三个类EventBusSubscriberRegistrySubscriber

​ 类结构如图:

监听的注册

1
asyncEventBus.register(new EventListener("Listener_2"));

​ AsyncEventBus .register()方法进去

1
2
3
public void register(Object object) {
subscribers.register(object);
}

在点到 SubscriberRegistry .register 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
//根据传入的listener对象获取 被标注了 @Subscribe的方法
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();

CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}

eventSubscribers.addAll(eventMethodsInListener);
}
}

SubscriberRegistry findAllSubscribers方法

1
2
3
4
5
6
7
8
9
10
11
12
13
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
//获取被标注的方法
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
//根据解析出来的 subscriber创建 监听,我们可以从 Subscriber的类成员变量可以看出
//在触发事件的时候会根据 事件类型和 监听的 Subscriber 的 method的参数来进行 调用
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}

SubscriberRegistry getAnnotatedMethods 方法

1
2
3
4
5
6
7
8
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
try {
return subscriberMethodsCache.getUnchecked(clazz);
} catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
throw e;
}
}

subscriberMethodsCache这里的是一个 guava cache 源码:

1
2
3
4
5
6
7
8
9
10
11
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(
new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});

当缓存中不存在 该监听的时候,会调用 getAnnotatedMethodsNotCached方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
//这里才是真正 解析注解的地方
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(
parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters. "
+ "Subscriber methods must have exactly 1 parameter.",
method,
parameterTypes.length);

checkArgument(
!parameterTypes[0].isPrimitive(),
"@Subscribe method %s's parameter is %s. "
+ "Subscriber methods cannot accept primitives. "
+ "Consider changing the parameter to %s.",
method,
parameterTypes[0].getName(),
Primitives.wrap(parameterTypes[0]).getSimpleName());

MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}

事件分发

​ 事件分发是通过 EventBuspost 方法进行的

1
2
3
4
5
6
7
8
9
10
11
12
public void post(Object event) {
//这里根据 事件类型获取 @Subscirber 标注的方法是该类型的 Subscirber
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//这里开始分发事件,最终所有类型的 dispatcher 都调用的 是
//Subscriber类的 dispatchEvent 方法
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}

我们看看 Subscriber类的 dispatchEvent 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
//调用下面的方法
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}

/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}

invokeSubscriberMethod 可以看到其实 就是一个