RocketMQ源码解析之延迟消息实现原理
原创不易,转载请注明出处
文章目录
- 前言
- 1.延时消息的demo
- 2.实现的原理
前言
今天要谈论的话题其实非常轻松,但是我们有些业务场景是离不开它的,其实说到延迟消息,不知道大家有没有想到它的业务场景,我这里说几个,比如我们购物下单,然后多长时间没有付款就会取消,或者我们下单打车,多长时间没有调度到车订单取消,不知道大家怎样实现这些业务场景,比如说我一个线程,然后一直扫订单表,扫出过期的订单啥的,但是这种方式会有一个问题,那就是订单量大的时候会给数据库带来很大的压力,而且很low,这种场景我们完全可以交给RocketMQ的延迟消息来处理。本文将从RocketMQ的延迟消息demo讲起,然后解释下其实现原理。
1.延时消息的demo
这里我们直接写一个RocketMQ消息生产者发送延时消息的一个demo
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
其实这个demo非常简单,与发送普通消息区别不大,唯一区别的就是需要你在Message里面设置延迟等级,默认的延迟等级与时间对应关系如下
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
也就是延迟等级1级对应的是1s,2级是5s,以此类推。如果你想改变这个等级的话,你可以设置broker的messageDelayLevel
参数,s代表秒,m是分钟,h是小时,这里有点要注意的是,你消息消费失败重试也是要走延迟消息的,这块你需要注意下,然后别改变里延迟消息规则,然后造成失败重试有问题,默认失败重试规则就是3+失败重试次数的延时等级
2.实现的原理
实现原理其实挺简单的,在消息生产者端,你设置这延迟消息等级,其实就是往消息的property设置一个DELAY的key与value值,key就是DELAY,然后value 就是你延迟的等级,这就是消息生产者端要做的事情,接下来介绍下broker端的实现原理。
在broker 端,收到一个消息,在往commitlog进行putMessage的时候,也就是追加写入消息的时候,会检查这个消息的延迟等级,也就是下面这端代码
会将你的topic设置成SCHEDULE_TOPIC_XXXX
,然后queueId设置成你延时等级-1 ,其实你有没有发现,他的实现与事务消息的差不多,都是修改topic 与queueId。
同时,在broker启动的时候,它会启动一个定时任务,定义一个timer,设置几个定时任务(定时任务的数量与延迟等级是有关系的,一个任务对应一个定时任务)不断的扫SCHEDULE_TOPIC_XXXX
这个topic的消息。
接着我们看下这scheduleMessageService 的start方法的实现
其实这个start方法就是遍历这延时等级表,然后往timer里面添加延时任务,最后创建一个定时任务,每10s执行一次持久化任务。这个持久化任务其实不用担心,就是往磁盘里面写下,延时等级对应的offset。
接着我们看下DeliverDelayedMessageTimerTask
这个任务的 实现
可以看到run方法里面执行了executeOnTimeup
这个方法
/// 取出consumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
/// 取出
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
///遍历取
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
// 消息存入时间
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
// 计算交付时间
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 计算正确的交付时间戳
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 拿出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
///转成MessageExtBrokerInner
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
/// 写到 commitlog中
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me 失败
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
// 这么做主要是为了防止os cache 繁忙
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
/// 延迟一段时间 10000L = 10s
nextOffset), DELAY_FOR_A_PERIOD);
/// 更新 这个等级的offset
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/* * XXX: warn and notify me */
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
// 重新添加任务
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 更新offset
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
//100ms
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
// 找出最小的
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
这个方法首先是获取对应延迟等级的consumeQueue这个队列,取出offset往后的消息,进行遍历,找出每个unit对应的commitlog真实offset,然后通过commitlog offset 从commitlog获取到真实的那个消息,根据它的存储实现与延迟时间,算出与真实交付时间的差值,如果是小于等于0的话,说明延迟时间到了,这个时候就要暴露给消息消费者了,它就会将topic与queueId转成之前的那个topic queueId,然后重新扔到commitlog中,这个时候通过reput线程的dispatch处理,消息消费者就能发现这个消息并消费,如果这个差值还不够的话,重新创建调度任务,然后延迟执行时间是这个差值,扔到timer中。
还没有评论,来说两句吧...