用了这么久的@Scheduled,你知道它的实现原理吗?

末蓝、 2022-12-19 06:20 334阅读 0赞

这两天使用Scheduled注解来解决定时问题的时候,发现不能正常使用。所以就有了这一篇博客

  1. @Scheduled(initialDelay = 2000,fixedDelay = 1000)
  2. private void test(){
  3. System.out.println(Math.random());
  4. }

单从源码的doc文件中可以看到这么一段

  1. You can add the `@Scheduled` annotation to a method, along with trigger metadata. For
  2. example, the following method is invoked every five seconds with a fixed delay,
  3. meaning that the period is measured from the completion time of each preceding
  4. invocation:
  5. [source,java,indent=0]
  6. [subs="verbatim,quotes"]
  7. ----
  8. @Scheduled(fixedDelay=5000)
  9. public void doSomething() {
  10. // something that should run periodically
  11. }
  12. ----

将源码放进我的环境运行,发现并不能生效。那就只能先看看源码来看看它究竟是怎么生效的

注解Scheduled的源码

  1. @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Repeatable(Schedules.class)
  5. public @interface Scheduled {
  6. String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
  7. String cron() default "";
  8. String zone() default "";
  9. long fixedDelay() default -1;
  10. String fixedDelayString() default "";
  11. long fixedRate() default -1;
  12. String fixedRateString() default "";
  13. long initialDelay() default -1;
  14. String initialDelayString() default "";
  15. }

然后,动态加载的代码在 ScheduledAnnotationBeanPostProcessor 的 postProcessAfterInitialization 方法中

  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) {
  3. if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
  4. bean instanceof ScheduledExecutorService) {
  5. // Ignore AOP infrastructure such as scoped proxies.
  6. return bean;
  7. }
  8. Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
  9. if (!this.nonAnnotatedClasses.contains(targetClass) &&
  10. AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
  11. Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  12. (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
  13. Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
  14. method, Scheduled.class, Schedules.class);
  15. return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
  16. });
  17. if (annotatedMethods.isEmpty()) {
  18. this.nonAnnotatedClasses.add(targetClass);
  19. if (logger.isTraceEnabled()) {
  20. logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
  21. }
  22. }
  23. else {
  24. // Non-empty set of methods
  25. annotatedMethods.forEach((method, scheduledMethods) ->
  26. scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
  27. if (logger.isTraceEnabled()) {
  28. logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
  29. "': " + annotatedMethods);
  30. }
  31. }
  32. }
  33. return bean;
  34. }

首先,通过AopProxyUtils.ultimateTargetClass获取传入的Bean的最终类(是哪个类),然后判断当前类有没有在this.nonAnnotatedClasses中,如果没有在,则继续使用AnnotationUtils.isCandidateClass判断当前类是不是一个非抽象类或者接口,如果都满足,则调用

  1. Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  2. (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
  3. Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
  4. method, Scheduled.class, Schedules.class);
  5. return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
  6. });

获取到某个方法的所有Scheduled。也就是说,一个方法,是可以同时被多次定义周期化的。也就是这样

  1. @Scheduled(fixedDelay = 5000)
  2. @Schedules({@Scheduled(fixedDelay = 5000),@Scheduled(fixedDelay = 3000)})
  3. public void test(){
  4. logger.info("123");
  5. }

继续分析源码,我们可以发现,在得到targetClass(目标类)的所有带有@Scheduled或者@Schedules注解的方法并放到annotatedMethods中后,如果annotatedMethods的大小为0,则将当前目标targetClass放到this.nonAnnotatedClasses中,标记这个类中没有被相关注解修饰,方便新的调用方进行判断。如果annotatedMethods的大小不为空,则

  1. nnotatedMethods.forEach((method, scheduledMethods) ->
  2. scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));

将单独处理每个周期性任务。下面来看看究竟是怎么处理的

  1. protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
  2. // 可以看见,scheduled method的运行必须在Bean环境中,所以用@Schedules或者@Scheduled的方法必须在一个bean类里面
  3. try {
  4. Runnable runnable = createRunnable(bean, method);
  5. boolean processedSchedule = false;
  6. String errorMessage =
  7. "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
  8. Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
  9. // Determine initial delay 定义开始时间
  10. long initialDelay = scheduled.initialDelay();
  11. String initialDelayString = scheduled.initialDelayString();
  12. // initialDelay 和 initialDelayString 只能同时定义一个
  13. if (StringUtils.hasText(initialDelayString)) {
  14. Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
  15. if (this.embeddedValueResolver != null) {
  16. initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
  17. }
  18. if (StringUtils.hasLength(initialDelayString)) {
  19. try {
  20. initialDelay = parseDelayAsLong(initialDelayString);
  21. }
  22. catch (RuntimeException ex) {
  23. throw new IllegalArgumentException(
  24. "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
  25. }
  26. }
  27. }
  28. // Check cron expression
  29. String cron = scheduled.cron();
  30. if (StringUtils.hasText(cron)) {
  31. String zone = scheduled.zone();
  32. if (this.embeddedValueResolver != null) {
  33. // 调用 this.embeddedValueResolver.resolveStringValue 解析cron
  34. cron = this.embeddedValueResolver.resolveStringValue(cron);
  35. zone = this.embeddedValueResolver.resolveStringValue(zone);
  36. }
  37. if (StringUtils.hasLength(cron)) {
  38. // 如果在initialDelay定义的情况下,cron是不生效的
  39. Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
  40. processedSchedule = true;
  41. // String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
  42. // public static final String CRON_DISABLED = "-";
  43. // 如果cron不等于 '-'
  44. if (!Scheduled.CRON_DISABLED.equals(cron)) {
  45. TimeZone timeZone;
  46. // 解析timeZone
  47. if (StringUtils.hasText(zone)) {
  48. timeZone = StringUtils.parseTimeZoneString(zone);
  49. }
  50. else {
  51. timeZone = TimeZone.getDefault();
  52. }
  53. // 使用 new CronTrigger(cron, timeZone) 创建定时触发器
  54. // 使用 new CronTask(runnable, new CronTrigger(cron, timeZone)) 创建一个定时任务,定时触发器会触发runnable
  55. // 调用 this.registrar.scheduleCronTask 注册任务到当前环境中
  56. // tasks是一个集合,避免重复注册相同的任务
  57. tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
  58. }
  59. }
  60. }
  61. // At this point we don't need to differentiate between initial delay set or not anymore
  62. if (initialDelay < 0) {
  63. initialDelay = 0;
  64. }
  65. // Check fixed delay
  66. long fixedDelay = scheduled.fixedDelay();
  67. if (fixedDelay >= 0) {
  68. // 如果当前任务没有被加入到tasks
  69. Assert.isTrue(!processedSchedule, errorMessage);
  70. processedSchedule = true;
  71. // 使用 new FixedDelayTask(runnable, fixedDelay, initialDelay) 来注册延迟任务
  72. tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
  73. }
  74. String fixedDelayString = scheduled.fixedDelayString();
  75. if (StringUtils.hasText(fixedDelayString)) {
  76. // 如果没有传fixedDelay,但是传了fixedDelayString,可以使用它的值
  77. if (this.embeddedValueResolver != null) {
  78. fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
  79. }
  80. if (StringUtils.hasLength(fixedDelayString)) {
  81. Assert.isTrue(!processedSchedule, errorMessage);
  82. processedSchedule = true;
  83. try {
  84. fixedDelay = parseDelayAsLong(fixedDelayString);
  85. }
  86. catch (RuntimeException ex) {
  87. throw new IllegalArgumentException(
  88. "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
  89. }
  90. tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
  91. }
  92. }
  93. // Check fixed rate
  94. // 如果上面的都没满足,则判断fixedDate和fixedDateString的值
  95. long fixedRate = scheduled.fixedRate();
  96. if (fixedRate >= 0) {
  97. Assert.isTrue(!processedSchedule, errorMessage);
  98. processedSchedule = true;
  99. tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
  100. }
  101. String fixedRateString = scheduled.fixedRateString();
  102. if (StringUtils.hasText(fixedRateString)) {
  103. if (this.embeddedValueResolver != null) {
  104. fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
  105. }
  106. if (StringUtils.hasLength(fixedRateString)) {
  107. Assert.isTrue(!processedSchedule, errorMessage);
  108. processedSchedule = true;
  109. try {
  110. fixedRate = parseDelayAsLong(fixedRateString);
  111. }
  112. catch (RuntimeException ex) {
  113. throw new IllegalArgumentException(
  114. "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
  115. }
  116. tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
  117. }
  118. }
  119. // Check whether we had any attribute set
  120. Assert.isTrue(processedSchedule, errorMessage);
  121. // Finally register the scheduled tasks
  122. // 同步的注册任务 加入缓存,方便使用
  123. synchronized (this.scheduledTasks) {
  124. Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
  125. regTasks.addAll(tasks);
  126. }
  127. }
  128. catch (IllegalArgumentException ex) {
  129. throw new IllegalStateException(
  130. "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
  131. }
  132. }

下面为某个类里面的方法是创建Runnabled的方法,传入方法和目标类就可以得到

  1. protected Runnable createRunnable(Object target, Method method) {
  2. Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
  3. Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
  4. return new ScheduledMethodRunnable(target, invocableMethod);
  5. }

下面是scheduleCronTask方法的定义,可以看见这里会对task去重

  1. @Nullable
  2. public ScheduledTask scheduleCronTask(CronTask task) {
  3. ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
  4. boolean newTask = false;
  5. if (scheduledTask == null) {
  6. scheduledTask = new ScheduledTask(task);
  7. newTask = true;
  8. }
  9. if (this.taskScheduler != null) {
  10. scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
  11. }
  12. else {
  13. addCronTask(task);
  14. this.unresolvedTasks.put(task, scheduledTask);
  15. }
  16. return (newTask ? scheduledTask : null);
  17. }

通过processScheduled方法会将某个被@Scheduled或者@Schedules注解修饰的方法注册进全局的scheduledTask环境中。

也就是说,方法postProcessAfterInitialization会将整个bean中的所有被@Scheduled或者@Schedules注解修饰的方法都注册进全局定时执行环境。

哪些地方调用了postProcessAfterInitialization

视线移到抽象类AbstractAutowireCapableBeanFactory的applyBeanPostProcessorsAfterInitialization方法中,这里是源码中唯一调用postProcessAfterInitialization的地方,也就是说,所有的周期任务都是在这里被注入到环境中的(其实不只是被@Scheduled或者@Schedules修饰的周期性任务)

  1. @Override
  2. public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
  3. throws BeansException {
  4. Object result = existingBean;
  5. for (BeanPostProcessor processor : getBeanPostProcessors()) {
  6. Object current = processor.postProcessAfterInitialization(result, beanName);
  7. if (current == null) {
  8. return result;
  9. }
  10. result = current;
  11. }
  12. return result;
  13. }

那么既然来了,我们还是分析一下,这里究竟做了什么。

首先,通过调用getBeanPostProcessors()获取到了所有的BeanPostProcessor,这个类可以理解为是各种Bean的加载器。而我们的ScheduledAnnotationBeanPostProcessor就是其中之一。根据调用栈可以发现,最终追溯到了AbstractBeanFactory.beanPostProcessors(List类型),下面两个方法是添加函数

  1. @Override
  2. public void addBeanPostProcessor(BeanPostProcessor beanPostProcessor) {
  3. Assert.notNull(beanPostProcessor, "BeanPostProcessor must not be null");
  4. // Remove from old position, if any
  5. this.beanPostProcessors.remove(beanPostProcessor);
  6. // Add to end of list
  7. this.beanPostProcessors.add(beanPostProcessor);
  8. }
  9. public void addBeanPostProcessors(Collection<? extends BeanPostProcessor> beanPostProcessors) {
  10. this.beanPostProcessors.removeAll(beanPostProcessors);
  11. this.beanPostProcessors.addAll(beanPostProcessors);
  12. }

可以看出来,beanPostProcessors中没有重复的Processors。我们把上面那个函数定义为方法一,后面那个方法定义为方法二,方法二在下面这个方法中被调用了,整个调用栈如下:

  1. private static void registerBeanPostProcessors(
  2. ConfigurableListableBeanFactory beanFactory, List<BeanPostProcessor> postProcessors) {
  3. if (beanFactory instanceof AbstractBeanFactory) {
  4. // Bulk addition is more efficient against our CopyOnWriteArrayList there
  5. ((AbstractBeanFactory) beanFactory).addBeanPostProcessors(postProcessors);
  6. }
  7. else {
  8. for (BeanPostProcessor postProcessor : postProcessors) {
  9. beanFactory.addBeanPostProcessor(postProcessor);
  10. }
  11. }
  12. }

file

这四次调用都出现在同一个函数registerBeanPostProcessors中,那么我们可以假设,这里的调用顺序,就是Bean加载的先后顺序(做java开发的应该都知道,如果代码写得不当,定义了错误的Bean加载顺序回导致注入无法完成,从而造成代码无法运行的问题)。那么,Bean的注册顺序就是

priorityOrderedPostProcessors > orderedPostProcessors > nonOrderedPostProcessors > internalPostProcessors

registerBeanPostProcessors的源码分析与标题没有什么关系,这里就不做分析了。留着下次分析Bean的时候再仔细分析。

从调用方分析的路走不通,我们可以尝试从最远头出发

我们都知道,使用@Scheduled或者@Schedules之前,必须要在全局加上@EnableScheduling的注解。那么我们就可以从这个注解入手进行分析。

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulingConfiguration.class)
  4. @Documented
  5. public @interface EnableScheduling {
  6. }

可惜的是,源码中并没有对注解@EnableScheduling进行解析的代码。可是这是为什么呢?我们注意到,修饰这个注解的有一个我们从来没有见过的注解@Import,会不会是@Import

其中,@Import的源码如下

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface Import {
  5. Class<?>[] value();
  6. }

其中,定义@Import注解行为的源码在类ConfigurationClassParser的collectImports方法中,来看看吧

  1. private void collectImports(SourceClass sourceClass, Set<SourceClass> imports, Set<SourceClass> visited)
  2. throws IOException {
  3. if (visited.add(sourceClass)) {
  4. for (SourceClass annotation : sourceClass.getAnnotations()) {
  5. String annName = annotation.getMetadata().getClassName();
  6. if (!annName.equals(Import.class.getName())) {
  7. collectImports(annotation, imports, visited);
  8. }
  9. }
  10. imports.addAll(sourceClass.getAnnotationAttributes(Import.class.getName(), "value"));
  11. }
  12. }

这个函数是一个递归函数,会不断地查找某个注解以及修饰它的注解所有被Import注解导入的配置文件。这个函数的调用栈如下

  1. private Set<SourceClass> getImports(SourceClass sourceClass) throws IOException {
  2. Set<SourceClass> imports = new LinkedHashSet<>();
  3. Set<SourceClass> visited = new LinkedHashSet<>();
  4. collectImports(sourceClass, imports, visited);
  5. return imports;
  6. }
  7. // getImports在ConfigurationClassParser的doProcessConfigurationClass方法中被调用
  8. processImports(configClass, sourceClass, getImports(sourceClass), filter, true);
  9. // doProcessConfigurationClass在ConfigurationClassParser的processConfigurationClass方法中被调用
  10. do {
  11. sourceClass = doProcessConfigurationClass(configClass, sourceClass, filter);
  12. }
  13. while (sourceClass != null);
  14. this.configurationClasses.put(configClass, configClass);

由于调用栈实在是太深,最后会到FrameworkServlet的refresh()方法上,暂时我只能下一个结论就是,在Application的主类上面修饰的注解并不会单独写反射方法来实现,而是会通过spring提供的统一处理方式进行处理。因为在整个spring框架源码中都没有找到对该注解进行反射操作的内容。

file

总结

通过这一篇文章,我们从源码中学习了@Scheduled 和 @Schedules 这两个注解的,他们是如何解析参数,如何加入时间触发器,不过目前还欠缺时间触发器究竟是如何工作的这部分的内容,后续我会补上。

另外,我们也初次了解了,这种注解是如何被spring框架调用到的,知道了BeanFactory,也知道了ConfigurationClassParser,这给我们接下来全面研究Spring容器这一块提供了契机。

最后的最后,我代码里面的问题就是没有在主类里面加@EnableScheduling注解

炒鸡辣鸡原创文章,转载请注明来源

发表评论

表情:
评论列表 (有 0 条评论,334人围观)

还没有评论,来说两句吧...

相关阅读