RocketMQ源码:延迟消息的实现原理

刺骨的言语ヽ痛彻心扉 2024-03-23 17:04 147阅读 0赞

1.发送延迟消息

  1. public class Producer {
  2. public static void main(String[] args) throws Exception {
  3. // 实例化一个生产者来产生延时消息
  4. DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");
  5. producer.setNamesrvAddr("127.0.0.1:9876");
  6. // 启动生产者
  7. producer.start();
  8. for (int i = 0; i < 1; i++) {
  9. Message message = new Message(MQConstant.DELAY_TOPIC, ("Hello scheduled message " + i).getBytes());
  10. /**
  11. * MessageStoreConfig
  12. * messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  13. *
  14. * 共18个等级,依次是从1-18
  15. * 比如,level=3, 表示延迟10s 消费
  16. */
  17. message.setDelayTimeLevel(4);
  18. // 发送消息
  19. SendResult send = producer.send(message);
  20. System.out.println("send = " + send);
  21. }
  22. // 关闭生产者
  23. producer.shutdown();
  24. }
  25. }
  26. 复制代码

延迟消息的标志就是在发送时,通过消息对象MessagesetDelayTimeLevel(int level)方法设置一个延迟等级,这样该条消息就是一个延迟消息了。那么延迟等级与延迟时间是如何对应的呢?

format_png

2.存储延迟消息

其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,进而会做一些额外的处理,那么我们就看下broker存储时判断是否为延迟消息的逻辑:

CommitLog#asyncPutMessage(..)

  1. public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
  2. // Set the storage time
  3. msg.setStoreTimestamp(System.currentTimeMillis());
  4. // Set the message body BODY CRC (consider the most appropriate setting
  5. // on the client)
  6. msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  7. // Back to Results
  8. AppendMessageResult result = null;
  9. StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
  10. String topic = msg.getTopic();
  11. int queueId = msg.getQueueId();
  12. final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
  13. if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
  14. || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
  15. //TODO:延迟消息的判定
  16. if (msg.getDelayTimeLevel() > 0) {
  17. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  18. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  19. }
  20. //TODO:将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
  21. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
  22. //TODO: 将queueid替换为(延迟级别-1)
  23. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  24. //TODO:备份原始的topic/queueid, 留着后面解析
  25. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  26. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQu

发表评论

表情:
评论列表 (有 0 条评论,147人围观)

还没有评论,来说两句吧...

相关阅读