RocketMQ源码解析之延迟消息实现原理

Love The Way You Lie 2022-12-27 07:48 251阅读 0赞

原创不易,转载请注明出处

文章目录

      • 前言
      • 1.延时消息的demo
      • 2.实现的原理

前言

今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场景,我这里说几个,比如我们购物下单,然后多长时间没有付款就会取消,或者我们下单打车,多长时间没有调度到车订单取消,不知道大家怎样实现这些业务场景,比如说我一个线程,然后一直扫订单表,扫出过期的订单啥的,但是这种方式会有一个问题,那就是订单量大的时候会给数据库带来很大的压力,而且很low,这种场景我们完全可以交给RocketMQ的延迟消息来处理。本文将从RocketMQ的延迟消息demo讲起,然后解释下其实现原理。

1.延时消息的demo

这里我们直接写一个RocketMQ消息生产者发送延时消息的一个demo

  1. public class ScheduledMessageProducer {
  2. public static void main(String[] args) throws Exception {
  3. // 实例化一个生产者来产生延时消息
  4. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  5. // 启动生产者
  6. producer.start();
  7. int totalMessagesToSend = 100;
  8. for (int i = 0; i < totalMessagesToSend; i++) {
  9. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  10. // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  11. message.setDelayTimeLevel(3);
  12. // 发送消息
  13. producer.send(message);
  14. }
  15. // 关闭生产者
  16. producer.shutdown();
  17. }
  18. }

其实这个demo非常简单,与发送普通消息区别不大,唯一区别的就是需要你在Message里面设置延迟等级,默认的延迟等级与时间对应关系如下

  1. "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

也就是延迟等级1级对应的是1s,2级是5s,以此类推。如果你想改变这个等级的话,你可以设置broker的messageDelayLevel参数,s代表秒,m是分钟,h是小时,这里有点要注意的是,你消息消费失败重试也是要走延迟消息的,这块你需要注意下,然后别改变里延迟消息规则,然后造成失败重试有问题,默认失败重试规则就是3+失败重试次数的延时等级

2.实现的原理

实现原理其实挺简单的,在消息生产者端,你设置这延迟消息等级,其实就是往消息的property设置一个DELAY的key与value值,key就是DELAY,然后value 就是你延迟的等级,这就是消息生产者端要做的事情,接下来介绍下broker端的实现原理。
在broker 端,收到一个消息,在往commitlog进行putMessage的时候,也就是追加写入消息的时候,会检查这个消息的延迟等级,也就是下面这端代码
在这里插入图片描述
会将你的topic设置成SCHEDULE_TOPIC_XXXX,然后queueId设置成你延时等级-1 ,其实你有没有发现,他的实现与事务消息的差不多,都是修改topic 与queueId。
同时,在broker启动的时候,它会启动一个定时任务,定义一个timer,设置几个定时任务(定时任务的数量与延迟等级是有关系的,一个任务对应一个定时任务)不断的扫SCHEDULE_TOPIC_XXXX这个topic的消息。
在这里插入图片描述
接着我们看下这scheduleMessageService 的start方法的实现
在这里插入图片描述
其实这个start方法就是遍历这延时等级表,然后往timer里面添加延时任务,最后创建一个定时任务,每10s执行一次持久化任务。这个持久化任务其实不用担心,就是往磁盘里面写下,延时等级对应的offset。
接着我们看下DeliverDelayedMessageTimerTask这个任务的 实现
在这里插入图片描述
可以看到run方法里面执行了executeOnTimeup这个方法

  1. /// 取出consumeQueue
  2. ConsumeQueue cq =
  3. ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
  4. delayLevel2QueueId(delayLevel));
  5. long failScheduleOffset = offset;
  6. if (cq != null) {
  7. /// 取出
  8. SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
  9. if (bufferCQ != null) {
  10. try {
  11. long nextOffset = offset;
  12. int i = 0;
  13. ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
  14. ///遍历取
  15. for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
  16. long offsetPy = bufferCQ.getByteBuffer().getLong();
  17. int sizePy = bufferCQ.getByteBuffer().getInt();
  18. long tagsCode = bufferCQ.getByteBuffer().getLong();
  19. if (cq.isExtAddr(tagsCode)) {
  20. if (cq.getExt(tagsCode, cqExtUnit)) {
  21. tagsCode = cqExtUnit.getTagsCode();
  22. } else {
  23. //can't find ext content.So re compute tags code.
  24. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
  25. tagsCode, offsetPy, sizePy);
  26. // 消息存入时间
  27. long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
  28. // 计算交付时间
  29. tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
  30. }
  31. }
  32. long now = System.currentTimeMillis();
  33. // 计算正确的交付时间戳
  34. long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
  35. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  36. long countdown = deliverTimestamp - now;
  37. if (countdown <= 0) {
  38. // 拿出消息
  39. MessageExt msgExt =
  40. ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
  41. offsetPy, sizePy);
  42. if (msgExt != null) {
  43. try {
  44. ///转成MessageExtBrokerInner
  45. MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
  46. /// 写到 commitlog中
  47. PutMessageResult putMessageResult =
  48. ScheduleMessageService.this.writeMessageStore
  49. .putMessage(msgInner);
  50. if (putMessageResult != null
  51. && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
  52. continue;
  53. } else {
  54. // XXX: warn and notify me 失败
  55. log.error(
  56. "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
  57. msgExt.getTopic(), msgExt.getMsgId());
  58. // 这么做主要是为了防止os cache 繁忙
  59. ScheduleMessageService.this.timer.schedule(
  60. new DeliverDelayedMessageTimerTask(this.delayLevel,
  61. /// 延迟一段时间 10000L = 10s
  62. nextOffset), DELAY_FOR_A_PERIOD);
  63. /// 更新 这个等级的offset
  64. ScheduleMessageService.this.updateOffset(this.delayLevel,
  65. nextOffset);
  66. return;
  67. }
  68. } catch (Exception e) {
  69. /* * XXX: warn and notify me */
  70. log.error(
  71. "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
  72. + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
  73. + offsetPy + ",sizePy=" + sizePy, e);
  74. }
  75. }
  76. } else {
  77. // 重新添加任务
  78. ScheduleMessageService.this.timer.schedule(
  79. new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
  80. countdown);
  81. // 更新offset
  82. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  83. return;
  84. }
  85. } // end of for
  86. nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
  87. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
  88. //100ms
  89. this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
  90. // 更新
  91. ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
  92. return;
  93. } finally {
  94. bufferCQ.release();
  95. }
  96. } // end of if (bufferCQ != null)
  97. else {
  98. // 找出最小的
  99. long cqMinOffset = cq.getMinOffsetInQueue();
  100. if (offset < cqMinOffset) {
  101. failScheduleOffset = cqMinOffset;
  102. log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
  103. + cqMinOffset + ", queueId=" + cq.getQueueId());
  104. }
  105. }
  106. } // end of if (cq != null)
  107. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
  108. failScheduleOffset), DELAY_FOR_A_WHILE);

这个方法首先是获取对应延迟等级的consumeQueue这个队列,取出offset往后的消息,进行遍历,找出每个unit对应的commitlog真实offset,然后通过commitlog offset 从commitlog获取到真实的那个消息,根据它的存储实现与延迟时间,算出与真实交付时间的差值,如果是小于等于0的话,说明延迟时间到了,这个时候就要暴露给消息消费者了,它就会将topic与queueId转成之前的那个topic queueId,然后重新扔到commitlog中,这个时候通过reput线程的dispatch处理,消息消费者就能发现这个消息并消费,如果这个差值还不够的话,重新创建调度任务,然后延迟执行时间是这个差值,扔到timer中。

发表评论

表情:
评论列表 (有 0 条评论,251人围观)

还没有评论,来说两句吧...

相关阅读