源码:EventBus源码原理分析
一直对EventBus的实现原理比较好奇,最近看了一下源码,版本是19.0,在这里做一个记录,首先看一下EventBus的其中几个关键变量的类图:
老规矩直接在代码上通过注解分析,这里的核心只是EventBus,不涉及到发布订阅,这个一会再讲
@Beta
public class EventBus {
private static final Logger logger = Logger.getLogger(com.google.common.eventbus.EventBus.class.getName());
private final String identifier;
/**
* 提供线程池的服务
*/
private final Executor executor;
/**
* 异常处理类,在这里LoggingHandler实现了这个接口,用户日志的输出
*/
private final SubscriberExceptionHandler exceptionHandler;
/**
* 包含所有注册者的注册中心
* new一个新的EventBus的时候,就把一个新的SubscriberRegistry创建出来
* ,并把其自身设置到SubscriberRegistry中
*/
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
/**
* 分发处理,这块我们单独划分处理
*/
private final Dispatcher dispatcher;
/**
* 不指定名称的时候默认是default
* 这里的名字主要是日志打印的时候使用
* 所以最好使用不同的名字来指定
*/
public EventBus() {
this("default");
}
/**
* 指定米ing子
*/
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
}
/**
* 指定异常处理类的时候默认default
*/
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default",
MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
/**
* 这是默认的构造函数,所有的
* 1、EventBus(SubscriberExceptionHandler exceptionHandler)
* 2、EventBus(String identifier)
* 3、EventBus()
* 最终都是指向这里,这个在设计上有很好的拓展性,只要保证这个核心不变,怎么拓展都可以
*/
EventBus(String identifier, Executor executor, Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = checkNotNull(identifier);
this.executor = checkNotNull(executor);
this.dispatcher = checkNotNull(dispatcher);
this.exceptionHandler = checkNotNull(exceptionHandler);
}
/**
* 返回EventBus的名称
*/
public final String identifier() {
return identifier;
}
/**
* executor用户给subscribers分发事件,这里有两种情况
* 1、默认不指定的时候是MoreExecutors.directExecutor()方式
* 2、我们自己指定executor,这个在AsyncEventBus中,异步的方式由多线程处理
* 这里我们重点看下MoreExecutors.directExecutor()这种方式,看一下源码的实现,
* 执行方法不会新开线程都在当前线程做,调用的是execute方法
* public static Executor directExecutor() {
* return DirectExecutor.INSTANCE;
* }
*
* 这里很有意思的一点是枚举是枚举是实现接口,这利用了枚举类型实例受限的原理
* 具体可以看这篇文章
* https://www.jianshu.com/p/e1619882682d
* https://blog.csdn.net/qq_21508059/article/details/78806610
* private enum DirectExecutor implements Executor {
* INSTANCE;
* @Override public void execute(Runnable command) {
* command.run();
* }
*
* @Override public String toString() {
* return "MoreExecutors.directExecutor()";
* }
* }
*/
final Executor executor() {
return executor;
}
/**
* 由任何一个subscriber抛出的异常都由这个方法处理
*/
void handleSubscriberException(Throwable e, SubscriberExceptionContext context) {
checkNotNull(e);
checkNotNull(context);
try {
exceptionHandler.handleException(e, context);
} catch (Throwable e2) {
// if the handler threw an exception... well, just log it
logger.log(Level.SEVERE,
String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e),
e2);
}
}
/**
* 注册一个订阅者,这里我们通常是业务中的Service
*/
public void register(Object object) {
subscribers.register(object);
}
/**
* 解绑
*/
public void unregister(Object object) {
subscribers.unregister(object);
}
/**
* 发送我们的事件给所有订阅者,有订阅者就会进行相应的分发
*/
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.addValue(identifier)
.toString();
}
/**
* 这里定制了一个LoggingHandler的日志实现类
* 处理由一个subscriber抛出的异常,因为要继承上下文,所以要单独实现
* 但是为啥要放在EventBus里边呢,我想这是因为所有的subscriber都需要EventBus统一管理的原因,另外有两种方式
* 1、默认就是下边静态类的方式
* 2、各个业务也可以实现自己的异常处理方式
* 3、异常处理作为EventBus的成员变量可以有很好的拓展性
* 最近发现了很多静态内部类使用的情况,为什么这样呢,其实静态内部类可以提高其封装性,比如Person类中指定Home内部静态类
* home是独立存在的,不依赖Person的,具体可以看https://www.cnblogs.com/DreamDrive/p/5428729.html
*/
static final class LoggingHandler implements SubscriberExceptionHandler {
static final LoggingHandler INSTANCE = new LoggingHandler();
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
Logger logger = logger(context);
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception);
}
}
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
private static String message(SubscriberExceptionContext context) {
Method method = context.getSubscriberMethod();
return "Exception thrown by subscriber method "
+ method.getName() + '(' + method.getParameterTypes()[0].getName() + ')'
+ " on subscriber " + context.getSubscriber()
+ " when dispatching event: " + context.getEvent();
}
}
}
下边我们来看一下注册和发布,这是EventBus的核心了,不过这块有个疑问,观察者模式和发布订阅模式有什么区别呢?通过这篇文章先来了解一下,我们来看下三个关键的类SubscriberRegistry、Subscriber和Dispatcher,先看一下类图
看一下SubscriberRegistry
/**
* 这是一个final类,一个final类是无法被任何人继承的,意味着设计的很完美
*/
final class SubscriberRegistry {
/**
* ConcurrentMap的key是post的对一个的类型,后者是所有的订阅者
* 某一个类型-某一个服务中所有的方法
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
/**
* 注册中心是属于那一个EventBus的
*/
@Weak private final EventBus bus;
SubscriberRegistry(EventBus bus) {
this.bus = checkNotNull(bus);
}
/**
* 注册所有的subscriber有Subscribe注解的方法
*/
void register(Object listener) {
/**
* 所有的listener(通常就是业务的service)的添加Subscribe注解的方法
*/
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
/**
* 获取一个类型对应的所有的Subscriber
*/
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
/**
* 获取key和value
*/
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
/**
* 获取这个类型对应的所有方法集合
*/
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
/**
* 如果是空的则创建一个新的
*/
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
/**
* 不是空的就把所有的监听这个类型所有方法都添加进去
* 为什么有这样的判断呢,这是因为一个eventbus可能被多个服务注册
* 先进行的服务肯定是空的,后进行的直接加入进去就可以
*/
eventSubscribers.addAll(eventMethodsInListener);
}
}
/**
* 解绑,就是一个正常移除的裹层这里不做过去的讲解
*/
void unregister(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
throw new IllegalArgumentException(
"missing event subscriber for an annotated method. Is " + listener + " registered?");
}
}
}
/**
* 根据event获取所有的订阅者,这里的event就是指的我们post的数据,也就是根据我们post的数据类型找到所有的Subscribers
* 这里是通过缓存获取的数据类型,会节省大量的时间
*/
Iterator<Subscriber> getSubscribers(Object event) {
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
/**
* 包括当前的类型以及父类或者接口,都会返回给这个数据的调用者
*/
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
/**
* 返回的是一个迭代器,好让我们迭代调用
*/
return Iterators.concat(subscriberIterators.iterator());
}
/**
* 带Subscriber注解方法的缓存,在GC的时候由于是weakKeys会回收掉
*/
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
/**
* listener是我们的服务
* 返回的是type-{方法一的Subscriber、方法二的Subscriber等}
*/
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
/**
* 这块为什么用Multimap呢,这是因为一个服务中的一个类型可能会对应不同的方法
*/
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
/**
* 获取对应服务的字节码
*/
Class<?> clazz = listener.getClass();
/**
* 获取所有包含Subscribe注解方法,注意
* 1、这里加了一个全生命周期的缓存,可以有效减少通过反射获取方法的时间
* 2、判断注解的方法
*/
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
/**
* 只是取了方法的第一个参数作为类型
* 存储的key-value可以理解为
* type-{方法一的Subscriber、方法二的Subscriber等}
*/
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
return subscriberMethodsCache.getUnchecked(clazz);
}
/**
* 获取clazz中带有Subscribe注解的方法
*/
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
/**
* 重写了eqaul和hash重新定义了方法的唯一的标志,作为唯一的key
*/
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(SubscriberRegistryTest.class) && !method.isSynthetic()) {
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method, parameterTypes.length);
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
/**
* 获取raw types:一般类型,例如:String,Collections ,Math,Number的缓存
* https://blog.csdn.net/Fran_Lee/article/details/75331837
*/
private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> flattenHierarchyCache =
CacheBuilder.newBuilder()
.weakKeys()
.build(new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() {
@SuppressWarnings("RedundantTypeArguments") // <Class<?>> is actually needed to compile
@Override
public ImmutableSet<Class<?>> load(Class<?> concreteClass) {
return ImmutableSet.<Class<?>>copyOf(
TypeToken.of(concreteClass).getTypes().rawTypes());
}
});
/**
* 获取的是当前的数据结构包括,这个数据结构对应的父类以及接口
*/
@VisibleForTesting
static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) {
try {
return flattenHierarchyCache.getUnchecked(concreteClass);
} catch (UncheckedExecutionException e) {
throw Throwables.propagate(e.getCause());
}
}
private static final class MethodIdentifier {
private final String name;
private final List<Class<?>> parameterTypes;
MethodIdentifier(Method method) {
this.name = method.getName();
this.parameterTypes = Arrays.asList(method.getParameterTypes());
}
@Override
public int hashCode() {
return Objects.hashCode(name, parameterTypes);
}
@Override
public boolean equals(@Nullable Object o) {
if (o instanceof MethodIdentifier) {
MethodIdentifier ident = (MethodIdentifier) o;
return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes);
}
return false;
}
}
}
再看一下Subscriber:
class Subscriber {
/**
* 根据是不是线程安全的来创建,这是我们在SubscriberRegistry中的create方法
*/
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
/**
* Subscriber属于哪一个bus
*/
@Weak private EventBus bus;
/**
* Subscriber属于哪一个服务
*/
@VisibleForTesting
final Object target;
/**
* Subscribertarget对应的方法
*/
private final Method method;
/**
* 线程池,没有的话默认是当前线程执行,是bus中的executor
*/
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = checkNotNull(target);
this.method = method;
method.setAccessible(true);
this.executor = bus.executor();
}
/**
* 开启线程开始分发数据
*/
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
/**
* 调用method的invoke方法利用反射进行,event是我们要post的数据,
* 这是这个类里唯一的变量,其他的在初始化的时候就已经创建好了
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
/**
* 有这个AllowConcurrentEvents注解的时候就表示线程不安全的,这个我们一般在异步eventbus中使用,
* 自己保证代码是线程安全的
*/
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
/**
* synchronized保证了线程安全,我们在使用普通的EventBus的时候都是加锁了的
*/
@VisibleForTesting
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
}
最后看一下Dispatcher:
/**
* 抽象类
*/
abstract class Dispatcher {
/**
* 默认的方式,提供重入的方式确保所有的数据都能发送出去
*/
static Dispatcher perThreadDispatchQueue() {
return new PerThreadQueuedDispatcher();
}
static Dispatcher legacyAsync() {
return new LegacyAsyncDispatcher();
}
static Dispatcher immediate() {
return ImmediateDispatcher.INSTANCE;
}
/**
* 分发方法
*/
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
/**
* 一个实现类对subscribers分发event,ThreadLocal的方式
*/
private static final class PerThreadQueuedDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of EventBus.
/**
* 每个线程都有一个独立的队列
*/
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
/**
* 每个线程都有一个独立的bool
*/
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
/**
* 一个实现类对subscribers分发event,通过队列的方式异步发送
*/
private static final class LegacyAsyncDispatcher extends Dispatcher {
/**
* Global event queue.
*/
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
/**
* 一个实现类对subscribers分发event,立即发送
*/
private static final class ImmediateDispatcher extends Dispatcher {
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}
}
照例总结一下:
1、使用了发布订阅者模式,注意和观察者模式的区别,具体两者的区别可以看这篇文章https://www.jianshu.com/p/594f018b68e7
2、EventBus等使用了内部静态类提高代码的封装性,对应Person-Home的关系
3、枚举实现接口,利用枚举类型受限的原理,在枚举里边实现不同的类型,比如一个飞翔接口,就是三个种枚举分别是麻雀、喜鹊、布谷,限定这三种那就可以用这种方式
4、利用类的组件不断通过new 对象的方式将数据进行传递,区别于业务中service变量的方式进行
5、总体流程:
A、EventBus在创建的时候就指定好Dispatcher,专门用于帮助分发
B、指定好executor用于执行线程,默认是当前线程执行
C、SubscriberRegistry是发布订阅的核心,在创建EventBus的时候,就将EventBus作为变量传递给SubscriberRegistry
D、EventBus中的register方法实际是调用SubscriberRegistry的register方法进行注册,注册是key-value的形式:key是post的数据类型,包括父类和接口,value是多个值包括所有的这个类型的方法,通过注解查找到;value实际就是Subscriber,在注册的时候把eventbus、类型,方法等传递到Subscriber中去
E、post方法调用Dispatcher获取所有的Subscriber进行分发数据
6、由于TypeToken.of查找和getDeclaredMethods()比较频繁耗时,源码里边加上了缓存,由于启动之后就不会再变所以都是永久的
7、通过方法调用传递服务和通过注解的方式注入服务一样主要,一个是工具类使用,一个是业务方法使用
还没有评论,来说两句吧...