RocketMQ生产者消息发送源码解析总结

左手的ㄟ右手 2022-05-30 10:30 345阅读 0赞

RocketMQ在发送消息时,内部使用DefaultMQProducerImpl#sendDefaultImpl(…)来做消息的发送工作。看其代码:

  1. /** * 发送消息。 * 1. 获取消息路由信息 * 2. 选择要发送到的消息队列 * 3. 执行消息发送核心方法 * 4. 对发送结果进行封装返回 * * @param msg 消息 * @param communicationMode 通信模式 * @param sendCallback 发送回调 * @param timeout 发送消息请求超时时间 * @return 发送结果 * @throws MQClientException 当Producer发生异常 * @throws RemotingException 当远程请求发生异常 * @throws MQBrokerException 当Broker发生异常 * @throws InterruptedException 当线程被打断 */
  2. private SendResult sendDefaultImpl(Message msg,
  3. final CommunicationMode communicationMode,
  4. final SendCallback sendCallback,
  5. final long timeout
  6. ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  7. // 校验 Producer 处于运行状态
  8. this.makeSureStateOK();
  9. // 校验消息格式
  10. Validators.checkMessage(msg, this.defaultMQProducer);
  11. //
  12. final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
  13. long beginTimestampFirst = System.currentTimeMillis();
  14. long beginTimestampPrev = beginTimestampFirst;
  15. @SuppressWarnings("UnusedAssignment")
  16. long endTimestamp = beginTimestampFirst;
  17. // 获取 Topic路由信息
  18. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); //#35
  19. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  20. MessageQueue mq = null; // 最后选择消息要发送到的队列
  21. Exception exception = null;
  22. SendResult sendResult = null; // 最后一次发送结果
  23. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步3次调用
  24. int times = 0; // 第几次发送
  25. String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
  26. // 循环调用发送消息,直到成功
  27. for (; times < timesTotal; times++) {
  28. String lastBrokerName = null == mq ? null : mq.getBrokerName();
  29. @SuppressWarnings("SpellCheckingInspection") //#47
  30. MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列,默认策略下,下次发送选择其他的Broker
  31. if (tmpmq != null) {
  32. mq = tmpmq;
  33. brokersSent[times] = mq.getBrokerName();
  34. try {
  35. beginTimestampPrev = System.currentTimeMillis();
  36. // 调用发送消息核心方法
  37. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
  38. endTimestamp = System.currentTimeMillis();
  39. // 更新Broker可用性信息,发送时间超过550ms后会有不可用时长,至少30S
  40. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  41. switch (communicationMode) {
  42. case ASYNC:
  43. return null;
  44. case ONEWAY:
  45. return null;
  46. case SYNC:
  47. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  48. if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
  49. continue;
  50. }
  51. }
  52. return sendResult;
  53. default:
  54. break;
  55. }
  56. } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,停用10M,更新继续循环
  57. endTimestamp = System.currentTimeMillis();
  58. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  59. log.warn(String
  60. .format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
  61. mq), e);
  62. log.warn(msg.toString());
  63. exception = e;
  64. continue;
  65. } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,停用10M,继续循环
  66. endTimestamp = System.currentTimeMillis();
  67. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  68. log.warn(String
  69. .format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
  70. mq), e);
  71. log.warn(msg.toString());
  72. exception = e;
  73. continue;
  74. } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
  75. endTimestamp = System.currentTimeMillis();
  76. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  77. log.warn(String
  78. .format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
  79. mq), e);
  80. log.warn(msg.toString());
  81. exception = e;
  82. switch (e.getResponseCode()) {
  83. // 如下异常continue,进行发送消息重试
  84. case ResponseCode.TOPIC_NOT_EXIST:
  85. case ResponseCode.SERVICE_NOT_AVAILABLE:
  86. case ResponseCode.SYSTEM_ERROR:
  87. case ResponseCode.NO_PERMISSION:
  88. case ResponseCode.NO_BUYER_ID:
  89. case ResponseCode.NOT_IN_CURRENT_UNIT:
  90. continue;
  91. // 如果有发送结果,进行返回,否则,抛出异常;
  92. default:
  93. if (sendResult != null) {
  94. return sendResult;
  95. }
  96. throw e;
  97. }
  98. } catch (InterruptedException e) {
  99. endTimestamp = System.currentTimeMillis();
  100. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  101. log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID,
  102. endTimestamp - beginTimestampPrev, mq), e);
  103. log.warn(msg.toString());
  104. throw e;
  105. }
  106. } else {
  107. break;
  108. }
  109. }
  110. // 返回发送结果
  111. if (sendResult != null) {
  112. return sendResult;
  113. }
  114. ......
  115. }

#35 : 在发送消息之前,需要获取到当前Topic的路由信息,也就是BrokerData,QueueData
tryToFindTopicPublishInfo(final String topic):

  1. /** * 获取 Topic发布信息 * 如果获取不到,或者状态不正确,则从 Namesrv获取一次 * * @param topic Topic * @return topic 信息 */
  2. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  3. // 缓存中获取 Topic发布信息
  4. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  5. // 当无可用的 Topic发布信息时,从Namesrv获取一次
  6. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  7. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  8. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  9. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  10. }
  11. // 若获取的 Topic发布信息时候可用,则返回
  12. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  13. return topicPublishInfo;
  14. } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
  15. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  16. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  17. return topicPublishInfo;
  18. }
  19. }

Producer每隔30S从Namesrv处获取当前最新的TopicRouteData,其内包含了topic的所有BrokerData,QueueData。Producer从TopicRouteData中提取可写入的(Master)的Queue新封装成MessageQueue,然后放入TopicPublishInfo对象中。一个Topic可能含有多个Broker上的多个可写的MessageQueue。

Producer在发送消息前,提取相应Topic的TopicPublishInfo ,如果发送的是当前Topic的第一条消息,那么Broker上还没有当前Topic的数据,因此从Broker上获取默认的CreateTopic的TopicPublishInfo ,这个TopicPublishInfo相当于Topic的模板,当发送给Broker后,其发现消息内含有此模板信息时,会按模板创建一个TopicRouteData,然后赋值给消息内的实际Topic。

#47从多个MessageQueue选择一个来发送消息
selectOneMessageQueue(topicPublishInfo, lastBrokerName)

  1. /** * 根据 Topic发布信息 选择一个消息队列 * 会尽量选择上次尝试的Broker * 默认情形下向所有Broker的MessageQueue按顺序轮流发送 * * @param tpInfo Topic发布信息 * @param lastBrokerName * @return 消息队列 */
  2. public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
  3. if (this.sendLatencyFaultEnable) { //若开启了延迟容错机制,默认未开启
  4. try {
  5. //循环所有MessageQueue
  6. // 当 lastBrokerName == null 时,获取第一个可用的MessageQueue
  7. // 当 lastBrokerName != null 时, 获取 brokerName=lastBrokerName && 可用的MessageQueue
  8. int index = tpInfo.getSendWhichQueue().getAndIncrement();
  9. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  10. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  11. if (pos < 0) {
  12. pos = 0;
  13. }
  14. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  15. if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
  16. if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
  17. return mq;
  18. }
  19. }
  20. }
  21. // 选择一个相对好的broker,并获得其对应的一个消息队列,按 可用性 > 延迟 > 开始可用时间 选择
  22. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  23. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  24. if (writeQueueNums > 0) {
  25. final MessageQueue mq = tpInfo.selectOneMessageQueue();
  26. if (notBestBroker != null) {
  27. mq.setBrokerName(notBestBroker);
  28. mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
  29. }
  30. return mq;
  31. } else {
  32. latencyFaultTolerance.remove(notBestBroker);
  33. }
  34. } catch (Exception e) {
  35. log.error("Error occurred when selecting message queue", e);
  36. }
  37. // 选择一个消息队列,不考虑队列的可用性
  38. return tpInfo.selectOneMessageQueue();
  39. }
  40. // 默认情形下的MessageQueue获取,按顺序轮流获取所有的Broker的MessageQueue
  41. return tpInfo.selectOneMessageQueue(lastBrokerName);
  42. }
  43. /** * 默认策略下的MessageQueue选择 * * @param lastBrokerName * @return */
  44. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  45. if (lastBrokerName == null) {
  46. return selectOneMessageQueue();
  47. } else {
  48. int index = this.sendWhichQueue.getAndIncrement();
  49. for (int i = 0; i < this.messageQueueList.size(); i++) {
  50. int pos = Math.abs(index++) % this.messageQueueList.size();
  51. if (pos < 0)
  52. pos = 0;
  53. MessageQueue mq = this.messageQueueList.get(pos);
  54. if (!mq.getBrokerName().equals(lastBrokerName)) {
  55. return mq;
  56. }
  57. }
  58. return selectOneMessageQueue();
  59. }
  60. }
  61. /** * 直接选择上次发送队列的下一位 * * @return */
  62. public MessageQueue selectOneMessageQueue() {
  63. int index = this.sendWhichQueue.getAndIncrement();
  64. int pos = Math.abs(index) % this.messageQueueList.size();
  65. if (pos < 0) { pos = 0; }
  66. return this.messageQueueList.get(pos);
  67. }

Producer在选择MessageQueue时,会尽量选择同一个Broker上的。同步发送下,最多发送3次,发送一次后,第二次发送会尽量选择上次发送的Broker,前提是上次发送的Broker未被隔离。

Producer在发送失败,出现异常时,大部分情况下会隔离Broker,也就是在内部标识此Broker在一段时间内不可用,也就是尽量不选择他们最为发送目标,隔离时间10分钟。当向一个Broker发送失败,隔离此Broker。

若开启了MessageQueue延迟容错机制时,选择其他未被隔离的Broker作为发送目标,如果所有的Broker都被隔离了,则按 可用性 > 延迟 > 开始可用时间 的对比性选择。

若未开启延迟容错机制(默认):第一次发送,会按顺序获取所有MessageQueue里上次发送队列的的下一位;若第一次发送失败了,按顺序隔位筛选,直到找到一个不是上个Broker的MessageQueue,也就是不要再向发送失败的Broker发送消息了。

当Producer发送成功,但耗时较长,超过550ms时,会触发隔离,隔离时间按发送耗时阶梯增长,至少30S,隔离仅在延迟容错机制开启下才能发生作用。

发表评论

表情:
评论列表 (有 0 条评论,345人围观)

还没有评论,来说两句吧...

相关阅读