RocketMQ源码解析八(Consumer并发消费消息)

快来打我* 2022-09-12 07:47 90阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

回顾一下消息拉取,PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存人ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。RocketMQ使用ConsumeMessageService来实现消息消费的处理逻辑。RocketMQ支持顺序消费与并发消费,这里将重点关注并发消费的消费流程,顺序消费将在之后的文档分析。

消息拉取一文中提到会将消息任务提交给ConsumeMessageConcurrentlyService中的线程池执行,这里再重新看下

  1. /** * 提交消费消息请求到线程池中消费 */
  2. @Override
  3. public void submitConsumeRequest(
  4. final List<MessageExt> msgs,
  5. final ProcessQueue processQueue,
  6. final MessageQueue messageQueue,
  7. final boolean dispatchToConsume) {
  8. // 一次消费几条消息,默认为1条
  9. final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
  10. // 如果一次拉取的消息数量<=并发消费数量,则一次性将这一批消息都放到consumeRequest中提交给消费线程池消费
  11. if (msgs.size() <= consumeBatchSize) {
  12. ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
  13. try {
  14. // 将消费任务提交给消费线程池消费
  15. this.consumeExecutor.submit(consumeRequest);
  16. } catch (RejectedExecutionException e) {
  17. this.submitConsumeRequestLater(consumeRequest);
  18. }
  19. } else {
  20. // 分批次提交,默认每次提交1条
  21. for (int total = 0; total < msgs.size(); ) {
  22. List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
  23. for (int i = 0; i < consumeBatchSize; i++, total++) {
  24. if (total < msgs.size()) {
  25. msgThis.add(msgs.get(total));
  26. } else {
  27. break;
  28. }
  29. }
  30. ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
  31. try {
  32. this.consumeExecutor.submit(consumeRequest);
  33. } catch (RejectedExecutionException e) {
  34. for (; total < msgs.size(); total++) {
  35. msgThis.add(msgs.get(total));
  36. }
  37. this.submitConsumeRequestLater(consumeRequest);
  38. }
  39. }
  40. }
  41. }

如果拉取的消息条数大于consumeMessageBatchMaxSize,则对拉取消息进行分页,每页consumeMessageBatchMaxSize条消息,创建多个ConsumeRequest任务并提交到消费线程池。

  1. /** * 消费线程 */
  2. class ConsumeRequest implements Runnable {
  3. // 默认是1条
  4. private final List<MessageExt> msgs;
  5. private final ProcessQueue processQueue;
  6. private final MessageQueue messageQueue;
  7. public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
  8. this.msgs = msgs;
  9. this.processQueue = processQueue;
  10. this.messageQueue = messageQueue;
  11. }
  12. public List<MessageExt> getMsgs() {
  13. return msgs;
  14. }
  15. public ProcessQueue getProcessQueue() {
  16. return processQueue;
  17. }
  18. @Override
  19. public void run() {
  20. // 先判断processQueue的dropped,如果为true,则说明可能在重平衡时该消息队列被分配给
  21. // 别的消费者了,应该停止对该消息队列的消费
  22. if (this.processQueue.isDropped()) {
  23. log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  24. return;
  25. }
  26. // 在启动consumer时设置的监听
  27. MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
  28. ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
  29. ConsumeConcurrentlyStatus status = null;
  30. // 如果此消息的主题为(%RETRY%+消费组的名称),那么将会将此消息的topic重置为原始消息的topic。
  31. // 即此消息的真实topic会存储在properties当中,键为RETRY_TOPIC,值为真实topic,将真实topic取出赋予此消息
  32. defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
  33. ConsumeMessageContext consumeMessageContext = null;
  34. if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  35. consumeMessageContext = new ConsumeMessageContext();
  36. consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  37. consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
  38. consumeMessageContext.setProps(new HashMap<String, String>());
  39. consumeMessageContext.setMq(messageQueue);
  40. consumeMessageContext.setMsgList(msgs);
  41. consumeMessageContext.setSuccess(false);
  42. ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
  43. }
  44. long beginTimestamp = System.currentTimeMillis();
  45. boolean hasException = false;
  46. ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  47. try {
  48. if (msgs != null && !msgs.isEmpty()) {
  49. for (MessageExt msg : msgs) {
  50. MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
  51. }
  52. }
  53. // 调用消息监听器
  54. status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  55. } catch (Throwable e) {
  56. log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
  57. RemotingHelper.exceptionSimpleDesc(e),
  58. ConsumeMessageConcurrentlyService.this.consumerGroup,
  59. msgs,
  60. messageQueue);
  61. hasException = true;
  62. }
  63. long consumeRT = System.currentTimeMillis() - beginTimestamp;
  64. if (null == status) {
  65. // 表示消费出现异常了并且没有进行处理
  66. if (hasException) {
  67. returnType = ConsumeReturnType.EXCEPTION;
  68. } else {
  69. // 返回null
  70. returnType = ConsumeReturnType.RETURNNULL;
  71. }
  72. } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 超时
  73. returnType = ConsumeReturnType.TIME_OUT;
  74. } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 消费失败
  75. returnType = ConsumeReturnType.FAILED;
  76. } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 消费成功
  77. returnType = ConsumeReturnType.SUCCESS;
  78. }
  79. if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  80. consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
  81. }
  82. // 不管是异常还是返回null,都需要重试消息
  83. if (null == status) {
  84. log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
  85. ConsumeMessageConcurrentlyService.this.consumerGroup,
  86. msgs,
  87. messageQueue);
  88. status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
  89. }
  90. if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  91. consumeMessageContext.setStatus(status.toString());
  92. consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
  93. ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
  94. }
  95. ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
  96. .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
  97. if (!processQueue.isDropped()) {
  98. // 处理消费结果
  99. ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  100. } else {
  101. log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  102. }
  103. }
  104. public MessageQueue getMessageQueue() {
  105. return messageQueue;
  106. }
  107. }

Step1:先判断processQueue的dropped,如果为true,则说明可能在重平衡时该消息队列被分配给别的消费者了,应该停止对该消息队列的消费。
Step2:如果消息是重试消息,则会将消息的topic和queueId还原成原本的topic和queueId。
Step3:构建ConsumeMessageContext对象。
Step4:调用消息监听器。
Step5:如果processQueue没被停止则处理消费结果。

看看如何处理消费结果

  1. /** * 处理消费结果 */
  2. public void processConsumeResult(
  3. final ConsumeConcurrentlyStatus status,
  4. final ConsumeConcurrentlyContext context,
  5. final ConsumeRequest consumeRequest
  6. ) {
  7. // 设置ackIndex,这个值在创建ConsumeConcurrentlyContext时默认为Integer.MAX_VALUE
  8. int ackIndex = context.getAckIndex();
  9. if (consumeRequest.getMsgs().isEmpty())
  10. return;
  11. switch (status) {
  12. // 消费成功
  13. case CONSUME_SUCCESS:
  14. // 设置ackIndex值为 消息数-1,默认情况下这个值是0
  15. if (ackIndex >= consumeRequest.getMsgs().size()) {
  16. ackIndex = consumeRequest.getMsgs().size() - 1;
  17. }
  18. int ok = ackIndex + 1;
  19. int failed = consumeRequest.getMsgs().size() - ok;
  20. this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
  21. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
  22. break;
  23. // 需要重试消息
  24. case RECONSUME_LATER:
  25. // 设置为-1用于下面重发消息
  26. ackIndex = -1;
  27. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
  28. consumeRequest.getMsgs().size());
  29. break;
  30. default:
  31. break;
  32. }
  33. switch (this.defaultMQPushConsumer.getMessageModel()) {
  34. // 广播模式不会重试消息,消费失败就丢弃了,也相当于消费成功了
  35. case BROADCASTING:
  36. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  37. MessageExt msg = consumeRequest.getMsgs().get(i);
  38. log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
  39. }
  40. break;
  41. case CLUSTERING:
  42. List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
  43. // 重发这批并发消息,默认只有1条(会重新生成新的消息,这批消息默认消费成功,会更新消费进度,不会卡在这)
  44. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  45. MessageExt msg = consumeRequest.getMsgs().get(i);
  46. // 重新将消息发到broker
  47. boolean result = this.sendMessageBack(msg, context);
  48. if (!result) {
  49. msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
  50. msgBackFailed.add(msg);
  51. }
  52. }
  53. // 如果重发消息失败,则会将这些发送失败的消息重新包装起来5S后转发给消费线程池继续消费
  54. if (!msgBackFailed.isEmpty()) {
  55. consumeRequest.getMsgs().removeAll(msgBackFailed);
  56. this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
  57. }
  58. break;
  59. default:
  60. break;
  61. }
  62. // 不管这条消息消费成功与否,都会更新消费进度,表示该条消息消费成功
  63. // 将这次消费的消息从ProcessQueue中移除
  64. // 这里的返回值分两种情况:
  65. // 1、如果删除该消息列表后processQueue为空了,则offset为processQueue中最大的偏移量+1,消费进度推进到最后面
  66. // 2、如果processQueue不为空,则offset为processQueue中剩余的第一条消息,消费进度只能推进到这
  67. long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
  68. if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  69. // 更新本地消费进度,后台定时任务会定时将该消费进度同步到broker中
  70. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
  71. }
  72. }

消息消费状态有两种,CONSUME_SUCCESS表示消费成功,RECONSUME_LATER表示需要重试消息。当重试时,这些并发消息都会进行重试,rocketmq会为每条消息重新创建一条与原先消息属性相同的消息,拥有一个唯一的新msgId,并存储原消息ID,该消息会存入到commitLog文件中,与原先的消息没有任何关联,该消息也会进入到ConsumerQueue队列中,将拥有一个全新的队列偏移量。重试消息详细流程在后面文档再分析。

当然,不管消费成功与否,都会将这次消费的消息从ProcessQueue中删除

  1. public long removeMessage(final List<MessageExt> msgs) {
  2. long result = -1;
  3. final long now = System.currentTimeMillis();
  4. try {
  5. this.lockTreeMap.writeLock().lockInterruptibly();
  6. this.lastConsumeTimestamp = now;
  7. try {
  8. if (!msgTreeMap.isEmpty()) {
  9. result = this.queueOffsetMax + 1;
  10. int removedCnt = 0;
  11. // 将这次批量消费的消息都从msgTreeMap中删除
  12. for (MessageExt msg : msgs) {
  13. MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
  14. if (prev != null) {
  15. removedCnt--;
  16. msgSize.addAndGet(0 - msg.getBody().length);
  17. }
  18. }
  19. msgCount.addAndGet(removedCnt);
  20. // 如果删除完后processQueue已经没消息了,表示消息都消费成功了,则消费进度更新到
  21. // 队列中最大的偏移量+1处,如果map不为空,则表示还没消费完,不能将消费进度更新到
  22. // 最大偏移量处,需设置在最小处,等最前面的消费成功消费后消费进度才能向后推移更新
  23. if (!msgTreeMap.isEmpty()) {
  24. result = msgTreeMap.firstKey();
  25. }
  26. }
  27. } finally {
  28. this.lockTreeMap.writeLock().unlock();
  29. }
  30. } catch (Throwable t) {
  31. log.error("removeMessage exception", t);
  32. }
  33. return result;
  34. }

并根据返回的offset更新本地消费进度,由后台定时任务定时同步到broker

  1. public class RemoteBrokerOffsetStore implements OffsetStore {
  2. /** * 保存一个消费组中每个消息队列的消费进度,会定时同步到broke中 */
  3. private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
  4. new ConcurrentHashMap<MessageQueue, AtomicLong>();
  5. /** * 更新本地消费进度,消费者在后台有一个定时任务,有定时将该消费进度同步到broker */
  6. @Override
  7. public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  8. if (mq != null) {
  9. AtomicLong offsetOld = this.offsetTable.get(mq);
  10. if (null == offsetOld) {
  11. offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
  12. }
  13. if (null != offsetOld) {
  14. if (increaseOnly) {
  15. MixAll.compareAndIncreaseOnly(offsetOld, offset);
  16. } else {
  17. offsetOld.set(offset);
  18. }
  19. }
  20. }
  21. }
  22. }

在这里插入图片描述

参考资料
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,90人围观)

还没有评论,来说两句吧...

相关阅读