eventbus源码解读(发布和订阅)

川长思鸟来 2023-06-26 05:49 46阅读 0赞

根据订阅类型分组注册

  1. void register(Object listener) {
  2. //获取订阅的所有对象和方法
  3. Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
  4. for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
  5. //事件类型,事件对象中的注解方法的第一个参数
  6. Class<?> eventType = entry.getKey();
  7. //订阅者集合
  8. Collection<Subscriber> eventMethodsInListener = entry.getValue();
  9. CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
  10. if (eventSubscribers == null) {
  11. CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
  12. eventSubscribers = MoreObjects.firstNonNull(
  13. subscribers.putIfAbsent(eventType, newSet), newSet);
  14. }
  15. eventSubscribers.addAll(eventMethodsInListener);
  16. }
  17. }

获取事件的所有注解方法

  1. private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
  2. Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
  3. //获取事件对象的类
  4. Class<?> clazz = listener.getClass();
  5. //获取注解方法
  6. for (Method method : getAnnotatedMethods(clazz)) {
  7. //获取参数
  8. Class<?>[] parameterTypes = method.getParameterTypes();
  9. //获取参数的第一个作为类型
  10. Class<?> eventType = parameterTypes[0];
  11. //添加到map中
  12. methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  13. }
  14. return methodsInListener;
  15. }

发布指定类型事件

  1. public void post(Object event) {
  2. //根据事件类型获取订阅者集合
  3. Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  4. if (eventSubscribers.hasNext()) {
  5. dispatcher.dispatch(event, eventSubscribers);
  6. } else if (!(event instanceof DeadEvent)) {
  7. // the event had no subscribers and was not itself a DeadEvent
  8. post(new DeadEvent(this, event));
  9. }
  10. }

LegacyAsyncDispatcher.dispatch

private final ConcurrentLinkedQueue queue =
Queues.newConcurrentLinkedQueue();

  1. void dispatch(Object event, Iterator<Subscriber> subscribers) {
  2. checkNotNull(event);
  3. while (subscribers.hasNext()) {
  4. //把事件放入队列
  5. queue.add(new EventWithSubscriber(event, subscribers.next()));
  6. }
  7. EventWithSubscriber e;
  8. while ((e = queue.poll()) != null) {
  9. //从队列里面获取并执行事件
  10. e.subscriber.dispatchEvent(e.event);
  11. }
  12. }

调用订阅者的方法

  1. /** * Dispatches {@code event} to this subscriber using the proper executor. */
  2. final void dispatchEvent(final Object event) {
  3. executor.execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. //调用订阅者的方法
  8. invokeSubscriberMethod(event);
  9. } catch (InvocationTargetException e) {
  10. bus.handleSubscriberException(e.getCause(), context(event));
  11. }
  12. }
  13. });
  14. }
  15. /** * Invokes the subscriber method. This method can be overridden to make the invocation * synchronized. */
  16. @VisibleForTesting
  17. void invokeSubscriberMethod(Object event) throws InvocationTargetException {
  18. try {
  19. method.invoke(target, checkNotNull(event));
  20. } catch (IllegalArgumentException e) {
  21. throw new Error("Method rejected target/argument: " + event, e);
  22. } catch (IllegalAccessException e) {
  23. throw new Error("Method became inaccessible: " + event, e);
  24. } catch (InvocationTargetException e) {
  25. if (e.getCause() instanceof Error) {
  26. throw (Error) e.getCause();
  27. }
  28. throw e;
  29. }
  30. }

实际用例

eventBus功用类

  1. @Component
  2. @Slf4j
  3. public class CommonEventBus {
  4. private static ThreadPoolExecutor executors = new ThreadPoolExecutor(8,
  5. 16,
  6. 1000,
  7. TimeUnit.MILLISECONDS,
  8. new LinkedBlockingDeque<>(128),
  9. new ThreadPoolExecutor.CallerRunsPolicy());
  10. private static AsyncEventBus eventBus = new AsyncEventBus(executors);
  11. private ConcurrentHashMap<Object,String> eventMap = new ConcurrentHashMap<>();
  12. public void registerEvent(Object event,String desc){
  13. eventBus.register(event);
  14. eventMap.put(event,desc);
  15. }
  16. public void unregister(Object event){
  17. eventBus.unregister(event);
  18. }
  19. public void post(Object event){
  20. eventBus.post(event);
  21. log.info("EVENT.CONTENT:"+ JSON.toJSONString(event));
  22. }
  23. }

事件注册中心

  1. /** * 事件注册中心 */
  2. @Component
  3. public class EventRegisterCenter{
  4. @Autowired
  5. private CommonEventBus commonEventBus;
  6. @Autowired
  7. private CheckLogEvent checkLogEvent;
  8. @PostConstruct
  9. public void init(){
  10. commonEventBus.registerEvent(checkLogEvent,"审核日志事件");
  11. }
  12. }

审核日志事件

  1. @Component
  2. @Slf4j
  3. public class CheckLogEvent {
  4. @Subscribe
  5. public void HandlerCheckLog(VerifyInfoRequest verifyInfoRequest){
  6. log.info("日志审核消息:"+ JSON.toJSONString(verifyInfoRequest));
  7. }
  8. }

发布事件

  1. commonEventBus.post(object);

总结

1.guava EventBus事件框架,首先要注册事件类,事件类里面要有@Subscribe方法,因为注册的时候,要用这个方法的第一个参数,作为事件的类型。
2.一个事件类型,可以包含多个订阅者
3.发布事件的时候,调用post方法,其实是按照事件类型发布事件的
4.eventBus根据事件类型,查找订阅者
5.执行订阅方法

发表评论

表情:
评论列表 (有 0 条评论,46人围观)

还没有评论,来说两句吧...

相关阅读

    相关 EventBus分析

    前言 EventBus是一种用于Android的发布/订阅事件总线。它有很多优点:简化应用组件间的通信;解耦事件的发送者和接收者;避免复杂和容易出错的依赖和生命周期的问题