RocketMQ结合实际场景顺序消费,它是如何保证顺序消费的?

左手的ㄟ右手 2023-10-04 21:23 69阅读 0赞

format_png

前言

大家好,我是小郭,在业务研发的过程中,我们会涉及到非常多的业务场景与消息队列相关,通常我们会考虑利用消息队列来做异步解耦的工作,结合一些实际的场景我们考虑到消息的顺序性,如果没有严格按照顺序去处理消息,轻则给用户带来不好的体验,严重的话可能会导更多问题的产生,今天我们主要从实战、发送顺序消息流程到顺序消息的消费,以及如何保证顺序消费为重心进行一些扩展。

一、实战场景

用户更新钱包金额

->用户向钱包中转入100元,短信通知用户A目前剩余金额100元

->用户下单商品消费50元,短信通知用户A目前剩余金额50元。

普通消息:不顺序发送余额短信,则用户可能存在先收到余额50元,再收到余额100元的信息,带来不好的用户体验。

代码环节

为了更加体现消息的顺序性差异,我们在一次调用中循环发送10次

发送普通消息

  1. public Boolean updateUser(UserUpdateReqDTO userUpdateReqDTO) {
  2. String userTopic = rocketMqConfig.getSyncUserTopic();
  3. IntStream.range(0, 10).forEach(i ->{
  4. MessageWrapper messageSend = MessageWrapper.builder()
  5. .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+ i)
  6. .timestamp(System.currentTimeMillis()).build();
  7. MessageWrapper messageSend1 = MessageWrapper.builder()
  8. .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+ i)
  9. .timestamp(System.currentTimeMillis()).build();
  10. rocketMQTemplate.syncSend(userTopic, messageSend);
  11. rocketMQTemplate.syncSend(userTopic, messageSend1);
  12. });
  13. return Boolean.TRUE;
  14. }
  15. 复制代码

发送顺序消息

  1. public Boolean updateUser(UserUpdateReqDTO userUpdateReqDTO) {
  2. String userTopic = rocketMqConfig.getSyncUserTopic();
  3. IntStream.range(0, 10).forEach(i ->{
  4. MessageWrapper messageSend = MessageWrapper.builder()
  5. .keys(userTopic).message("用户向钱包中转入100元,短信通知用户目前剩余金额100元:"+ i)
  6. .timestamp(System.currentTimeMillis()).build();
  7. MessageWrapper messageSend1 = MessageWrapper.builder()
  8. .keys(userTopic).message("用户下单商品消费50元,短信通知用户目前剩余金额50元:"+ i)
  9. .timestamp(System.currentTimeMillis()).build();
  10. rocketMQTemplate.syncSend(userTopic, messageSend);
  11. rocketMQTemplate.syncSend(userTopic, messageSend1);
  12. });
  13. return Boolean.TRUE;
  14. }
  15. 复制代码

消费者服务

  1. @Service
  2. @RocketMQMessageListener(topic = "${rocketmq.sync.user-topic}", consumerGroup = "user_consumer", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY)
  3. @Slf4j
  4. public class syncUserConsumer implements RocketMQListener<MessageWrapper> {
  5. @Override
  6. public void onMessage(MessageWrapper mes) {
  7. log.info("user consumer message : {}", JSON.toJSONString(mes));
  8. }
  9. }
  10. 复制代码

发送普通消息结果:

format_png 1

二、发送顺序消息流程

DefaultMQProducerDefaultMQProducerImplValidatorsMQClientInstanceMQAdminImplMessageAccessorClientConfigMessageQueueSelectorsend()1send() >> sendSelectImpl()2makeSureStateOK() >> 检查服务状态3checkMessage() >> 校验信息4校验结果5tryToFindTopicPublishInfo() >> 获取主题信息6getMQAdminImpl()7parsePublishMessageQueues()8返回消息队列列表9cloneMessage >> 复制一份消息10返回Message11getClientConfig12queueWithNamespace >> 获取队列13返回消息队列14select 根据传入的选择器规则获取队列15返回MessageQueue16sendKernelImpl() >> 向MessageQueue投递消息17返回SendResult结果18DefaultMQProducerDefaultMQProducerImplValidatorsMQClientInstanceMQAdminImplMessageAccessorClientConfigMessageQueueSelector

投递消息队列策略

Hash策略

在顺序消息中,我们使用Hash策略,将同一个HashKey分配到同一个队列中。

  1. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  2. int value = arg.hashCode() % mqs.size();
  3. if (value < 0) {
  4. value = Math.abs(value);
  5. }
  6. return mqs.get(value);
  7. }
  8. 复制代码

获取消息消费队列

  1. // 查询主题下消息队列列表
  2. List<MessageQueue> messageQueueList = this.mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
  3. // 获取指定队列
  4. String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
  5. userMessage.setTopic(userTopic);
  6. mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
  7. 复制代码

三、保证顺序消费的机制

  1. 根据不同的消息监听器初始化消费消息线程池、定时线程池、扫描过期消息清除线程池。

    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {

    1. this.consumeOrderly = true;
    2. // 顺序消息模式,不初始化扫描过期消息清除线程池
    3. this.consumeMessageService =
    4. new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());

    }
    复制代码

  2. 启动顺序消息消费者服务。

    this.consumeMessageService.start();
    复制代码

  3. 默认每隔20s执行一次锁定分配给自己的消息消费队列。

    public void start() {

    1. if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
    2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. ConsumeMessageOrderlyService.this.lockMQPeriodically();
    7. } catch (Throwable e) {
    8. log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
    9. }
    10. }
    11. }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    12. }

    }

    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty(“rocketmq.client.rebalance.lockInterval”, “20000”));
    复制代码

  4. 消息队列负载

集群模式下,同一个主题内的消费者组内,消费者们共同承担订阅消息队列的消费。

为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费。

获取到消息队列之后向Broker发起锁定该消息队列的请求。

DefaultMQPushConsumerImplMQClientInstanceRebalanceServiceAllocateMessageQueueStrategystart() >> 启动实例1start()2run()3doRebalance() >> 做负载均衡4rebalanceByTopic5allocate6返回结果7updateProcessQueueTableInRebalance >> 重新负载8返回结果9返回结果10DefaultMQPushConsumerImplMQClientInstanceRebalanceServiceAllocateMessageQueueStrategy

updateProcessQueueTableInRebalance逻辑

主要目的是为了将消费消息队列上锁,并且创建该消息队列的拉取任务。

  1. 向Broker发起锁定该消息队列的请求。

    if (isOrder && !this.lock(mq)) {

    1. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    2. continue;

    }
    复制代码

  2. 拉取消费位置。

    long nextOffset = -1L;
    try {

    1. nextOffset = this.computePullFromWhereWithException(mq);

    } catch (Exception e) {

    1. log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
    2. continue;

    }
    复制代码

  3. 加锁成功则创建该消息队列的拉取任务,否则等待其他消费者释放该消息队列的锁。

    log.info(“doRebalance, {}, add a new mq, {}”, consumerGroup, mq);
    PullRequest pullRequest = new PullRequest();
    pullRequest.setConsumerGroup(consumerGroup);
    pullRequest.setNextOffset(nextOffset);
    pullRequest.setMessageQueue(mq);
    pullRequest.setProcessQueue(pq);
    pullRequestList.add(pullRequest);
    changed = true;
    复制代码

  4. 消息拉取

PullMessageServiceDefaultMQPushConsumerImplPullRequestRebalanceImplpullMessage() >>拉取消息1getProcessQueue() >>获取消费队列快照2返回消费队列快照3makeSureStateOK() >>检验状态4executePullRequestLater() >>提交拉取请求延后,放入其他线程5opt[已暂停]computePullFromWhereWithException >>获取偏移位置6返回偏移位置7executePullRequestLater() >>提交拉取请求延后,放入其他线程8alt[消息队列快照已上锁][未上锁]PullMessageServiceDefaultMQPushConsumerImplPullRequestRebalanceImpl

如果消息处理队列没有被上锁,则延后一会儿延迟3s将pullRequest对象放入拉取拉取任务中。

消息消费

  1. 提交消费请求,消息提交到内部的线程池。

    // 提交消费请求,消息提交到内部的线程池
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(

    1. pullResult.getMsgFoundList(),
    2. processQueue,
    3. pullRequest.getMessageQueue(),
    4. dispatchToConsume);

    复制代码

  2. ConsumeMessageOrderlyService#submitConsumeRequest() 执行方法。

    public void submitConsumeRequest(

    1. final List<MessageExt> msgs,
    2. final ProcessQueue processQueue,
    3. final MessageQueue messageQueue,
    4. final boolean dispathToConsume) {
    5. if (dispathToConsume) {
    6. ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
    7. this.consumeExecutor.submit(consumeRequest);
    8. }

    }
    复制代码

  3. 提交消费任务核心逻辑

入口:ConsumeMessageService#ConsumeRequest#run()

第一步,如果消息队列已经下线,则跳出本次消费。

  1. if (this.processQueue.isDropped()) {
  2. log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
  3. return;
  4. }
  5. 复制代码

第二步,根据前面得到的消费队列,获取对象并且申请一个锁

目的是保证同一时刻,消费队列只会被一个线程池中的一个线程消费。

  1. final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
  2. synchronized (objLock) {
  3. //...
  4. }
  5. 复制代码

第三步,进入核心逻辑处理

集群模式:前提条件是消息队列上锁成功且锁未过期。

  1. (this.processQueue.isLocked() && !this.processQueue.isLockExpired())
  2. 复制代码

当消费市场大于MAX_TIME_CONSUME_CONTINUOUSLY设置值,则跳出本次任务,交给线程池其他线程处理。

  1. long interval = System.currentTimeMillis() - beginTime;
  2. if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
  3. ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
  4. break;
  5. }
  6. 复制代码

获取消息默认每次拉取一条信息,在之前我们已经循环读取消息list,存入msgTreeMap。

现在从msgTreeMap中获取数据,如果数据为空则continueConsume设为false,跳出当前任务。

  1. final int consumeBatchSize =
  2. ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
  3. List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
  4. 复制代码

第四步,向ConsumeMessageContext对象填充数据,执行消费的钩子函数。

  1. ConsumeMessageContext consumeMessageContext = null;
  2. if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  3. consumeMessageContext = new ConsumeMessageContext();
  4. consumeMessageContext
  5. .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
  6. consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  7. consumeMessageContext.setMq(messageQueue);
  8. consumeMessageContext.setMsgList(msgs);
  9. consumeMessageContext.setSuccess(false);
  10. // init the consume context type
  11. consumeMessageContext.setProps(new HashMap<String, String>());
  12. ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
  13. }
  14. 复制代码

第五步,申请消费锁

  1. this.processQueue.getConsumeLock().lock();
  2. 复制代码

第六步,执行消费注册的消息消费监听器业务逻辑,返回 ConsumeOrderlyStatus 结果。

  1. status = messageListener.consumeMessage(
  2. Collections.unmodifiableList(msgs), context);
  3. 复制代码

第七步,如果一切正常则返回 ConsumeOrderlyStatus.SUCCESS 值

  1. continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
  2. // 执行commit提交消息消费进度
  3. case SUCCESS:commitOffset = consumeRequest.getProcessQueue().commit();
  4. // 读取旧消息进度,并更新返回
  5. Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
  6. msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
  7. for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
  8. msgSize.addAndGet(0 - msg.getBody().length);
  9. }
  10. this.consumingMsgOrderlyTreeMap.clear();
  11. if (offset != null) {
  12. return offset + 1;
  13. }
  14. 复制代码

最后,如果消息进度偏移量大于0且消费队列没有停止,则更新消息消费进度。

  1. if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  2. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
  3. }
  4. 复制代码

消息队列重试失败:如果重试达到最大次数重试次数并且向Broker服务器发送ACK消息返回成功,将消息存入DLQ队列,被认定消息消费成功,继续执行后面的消息。

总结:

为了保证消息的顺序性,我们必须保证同一个消息队列在同一时刻只能被消费者组内一个消费者消费,

从负载均衡方面,向Broker发起锁定该消息队列的请求,上锁成功则新建一个拉取任务PullRequest,

从消息消费方面,批量拉取消息成功后,进行提交消费请求,消息提交到内部的线程池,为了保证消息的顺序性,

我们必须为消费队列上锁,来保证同一时刻消费队列只会被线程池中的一个线程消费。

四、消息消费时保持顺序性

上面的通过源码的阅读,我们知道消费失败是有重试机制,默认重试 16 次,重试的次数达到最大之后,将消息存入DLQ队列,即被认定消息消费成功,这里就会中断重试消息与下一跳消息的顺序性。

例:发送消息顺序为 消息A -> 消息B ->消息C

A

B

C

因为消息B进行最大次数的重试后依然没有成功,消息存入了DLQ队列中,

最终我们的消息顺序变成了 消息A ->消息B,破坏了我们的顺序性。

A

C

解决方案:在消费消息前,增加一些前置条件,查询同一个订单号下,上一个消息是否被成功消费或者存入DLQ队列中,可以引入消息辅助表,来进行记录。

五、如何提高顺序消费的消费速度?

根据上面的源码,我们了解到为了满足顺序消费,所以对消费队列进行了加锁,

所以消费端的并发度并不取决消费端线程池的大小,而是取决于分给给消费者的队列数量。

解决方案:提高消费者的队列数量。

六、扩容需要注意什么?

顺序消息在消费消息时会锁定消息消费队列,在分配到消息队列时,能从该队列拉取消息还需要在 Broker 端申请该消费队列的锁。

在进行横向扩容的时候会进行重新负载,为了保证消息能够进入同一个队列,就需要保证在扩容的时候队列中没有滞留的消息。

format_png 2

发表评论

表情:
评论列表 (有 0 条评论,69人围观)

还没有评论,来说两句吧...

相关阅读

    相关 rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才

    相关 RocketMQ 顺序消费机制

    顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 顺序消息分为分区顺序消息和全局