eventbus源码解读(发布和订阅)
根据订阅类型分组注册
void register(Object listener) {
//获取订阅的所有对象和方法
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//事件类型,事件对象中的注解方法的第一个参数
Class<?> eventType = entry.getKey();
//订阅者集合
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(
subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
获取事件的所有注解方法
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
//获取事件对象的类
Class<?> clazz = listener.getClass();
//获取注解方法
for (Method method : getAnnotatedMethods(clazz)) {
//获取参数
Class<?>[] parameterTypes = method.getParameterTypes();
//获取参数的第一个作为类型
Class<?> eventType = parameterTypes[0];
//添加到map中
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
发布指定类型事件
public void post(Object event) {
//根据事件类型获取订阅者集合
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
LegacyAsyncDispatcher.dispatch
private final ConcurrentLinkedQueue queue =
Queues.newConcurrentLinkedQueue();
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
//把事件放入队列
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
//从队列里面获取并执行事件
e.subscriber.dispatchEvent(e.event);
}
}
调用订阅者的方法
/** * Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
//调用订阅者的方法
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
/** * Invokes the subscriber method. This method can be overridden to make the invocation * synchronized. */
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
实际用例
eventBus功用类
@Component
@Slf4j
public class CommonEventBus {
private static ThreadPoolExecutor executors = new ThreadPoolExecutor(8,
16,
1000,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(128),
new ThreadPoolExecutor.CallerRunsPolicy());
private static AsyncEventBus eventBus = new AsyncEventBus(executors);
private ConcurrentHashMap<Object,String> eventMap = new ConcurrentHashMap<>();
public void registerEvent(Object event,String desc){
eventBus.register(event);
eventMap.put(event,desc);
}
public void unregister(Object event){
eventBus.unregister(event);
}
public void post(Object event){
eventBus.post(event);
log.info("EVENT.CONTENT:"+ JSON.toJSONString(event));
}
}
事件注册中心
/** * 事件注册中心 */
@Component
public class EventRegisterCenter{
@Autowired
private CommonEventBus commonEventBus;
@Autowired
private CheckLogEvent checkLogEvent;
@PostConstruct
public void init(){
commonEventBus.registerEvent(checkLogEvent,"审核日志事件");
}
}
审核日志事件
@Component
@Slf4j
public class CheckLogEvent {
@Subscribe
public void HandlerCheckLog(VerifyInfoRequest verifyInfoRequest){
log.info("日志审核消息:"+ JSON.toJSONString(verifyInfoRequest));
}
}
发布事件
commonEventBus.post(object);
总结
1.guava EventBus事件框架,首先要注册事件类,事件类里面要有@Subscribe方法,因为注册的时候,要用这个方法的第一个参数,作为事件的类型。
2.一个事件类型,可以包含多个订阅者
3.发布事件的时候,调用post方法,其实是按照事件类型发布事件的
4.eventBus根据事件类型,查找订阅者
5.执行订阅方法
还没有评论,来说两句吧...