在项目中为了实现模块解耦,我们可以使用订阅-消费(观察者或事件)这种模式来实现模块解耦,实现方式可以使用现有的,也可以自己实现,现有的比较普遍的:
spring 事件-监听 实现
jdk自带的Observer 、 Observable
Guava 中的EventBus
我们可以根据项目中的实际环境来做使用,本文主要介绍Guava 中的 Event Bus。
在guava中添加也实现 了 观察者模式:
提供了异步功能
支持方法级别@Subscribe 的订阅
Guava EventBus 示例 EventListener 监听器 相当于 Observer
在guava中 @Subscribe 标注的方法就相当于 一个事件监听,即观察者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.trs.guava.enventBus;import com.google.common.eventbus.Subscribe;public class EventListener { private String name; public EventListener (String name) { this .name = name; } @Subscribe public String createOrder (CreateOrderEvent event) { Object source = event.getSource(); System.out.println(this .name + "createOrder = " +source); return source.toString(); } @Subscribe public String playOrder (PlayOrderEvent event) { Object source = event.getSource(); System.out.println("playOrder = " +source); return source.toString(); } }
CreateOrderEvent 事件类 guava中的事件可以使用任意类型的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.trs.guava.enventBus;import java.util.EventObject;public class CreateOrderEvent extends EventObject { public CreateOrderEvent (Object source) { super (source); } }
BusStart 即注册监听 在实际项目使用中可以使用 配置文件进行配置注册,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.trs.guava.enventBus;import com.google.common.eventbus.AsyncEventBus;import com.google.common.eventbus.EventBus;import com.google.common.util.concurrent.MoreExecutors;import java.util.concurrent.Executor;public class BusStart { public static void main (String[] args) { Executor executor = MoreExecutors.directExecutor(); AsyncEventBus asyncEventBus = new AsyncEventBus(executor); asyncEventBus.register(new EventListener("Listener_1" )); asyncEventBus.register(new EventListener("Listener_2" )); asyncEventBus.post(new CreateOrderEvent(new String("创建订单" ))); } }
运行结果:
Listener_1createOrder = 创建订单 Listener_2createOrder = 创建订单
Guava EventBus 源码学习 guava eventbus模块主要有三个类EventBus 、SubscriberRegistry 、Subscriber
类结构如图:
监听的注册 1 asyncEventBus.register(new EventListener("Listener_2" ));
AsyncEventBus .register()方法进去
1 2 3 public void register (Object object) { subscribers.register(object); }
在点到 SubscriberRegistry .register 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void register (Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null ) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
SubscriberRegistry findAllSubscribers 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0 ]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
SubscriberRegistry getAnnotatedMethods 方法
1 2 3 4 5 6 7 8 private static ImmutableList<Method> getAnnotatedMethods (Class<?> clazz) { try { return subscriberMethodsCache.getUnchecked(clazz); } catch (UncheckedExecutionException e) { throwIfUnchecked(e.getCause()); throw e; } }
subscriberMethodsCache这里的是一个 guava cache 源码:
1 2 3 4 5 6 7 8 9 10 11 private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder() .weakKeys() .build( new CacheLoader<Class<?>, ImmutableList<Method>>() { @Override public ImmutableList<Method> load (Class<?> concreteClass) throws Exception { return getAnnotatedMethodsNotCached(concreteClass); } });
当缓存中不存在 该监听的时候,会调用 getAnnotatedMethodsNotCached方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private static ImmutableList<Method> getAnnotatedMethodsNotCached (Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument( parameterTypes.length == 1 , "Method %s has @Subscribe annotation but has %s parameters. " + "Subscriber methods must have exactly 1 parameter." , method, parameterTypes.length); checkArgument( !parameterTypes[0 ].isPrimitive(), "@Subscribe method %s's parameter is %s. " + "Subscriber methods cannot accept primitives. " + "Consider changing the parameter to %s." , method, parameterTypes[0 ].getName(), Primitives.wrap(parameterTypes[0 ]).getSimpleName()); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
事件分发 事件分发是通过 EventBus 的 post 方法进行的
1 2 3 4 5 6 7 8 9 10 11 12 public void post (Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { post(new DeadEvent(this , event)); } }
我们看看 Subscriber类的 dispatchEvent 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 final void dispatchEvent (final Object event) { executor.execute( new Runnable() { @Override public void run () { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } @VisibleForTesting void invokeSubscriberMethod (Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
在 invokeSubscriberMethod 可以看到其实 就是一个