RocketMQ源码:延迟消息的实现原理
1.发送延迟消息
public class Producer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 1; i++) {
Message message = new Message(MQConstant.DELAY_TOPIC, ("Hello scheduled message " + i).getBytes());
/**
* MessageStoreConfig
* messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*
* 共18个等级,依次是从1-18
* 比如,level=3, 表示延迟10s 消费
*/
message.setDelayTimeLevel(4);
// 发送消息
SendResult send = producer.send(message);
System.out.println("send = " + send);
}
// 关闭生产者
producer.shutdown();
}
}
复制代码
延迟消息的标志就是在发送时,通过消息对象Message
的setDelayTimeLevel(int level)
方法设置一个延迟等级,这样该条消息就是一个延迟消息了。那么延迟等级与延迟时间是如何对应的呢?
2.存储延迟消息
其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,进而会做一些额外的处理,那么我们就看下broker存储时判断是否为延迟消息的逻辑:
CommitLog#asyncPutMessage(..)
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
//TODO:延迟消息的判定
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//TODO:将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//TODO: 将queueid替换为(延迟级别-1)
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
//TODO:备份原始的topic/queueid, 留着后面解析
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQu
还没有评论,来说两句吧...