  1. /** * 重平衡服务,随着consumer启动而启动 */
  2. public class RebalanceService extends ServiceThread {
  3. private static long waitInterval =
  4. Long.parseLong(System.getProperty(
  5. "rocketmq.client.rebalance.waitInterval", "20000"));
  6. private final InternalLogger log = ClientLogger.getLog();
  7. private final MQClientInstance mqClientFactory;
  8. public RebalanceService(MQClientInstance mqClientFactory) {
  9. this.mqClientFactory = mqClientFactory;
  10. }
  11. @Override
  12. public void run() {
  13. + " service started");
  14. while (!this.isStopped()) {
  15. // 每20s执行一次
  16. this.waitForRunning(waitInterval);
  17. this.mqClientFactory.doRebalance();
  18. }
  19. + " service end");
  20. }
  21. @Override
  22. public String getServiceName() {
  23. return RebalanceService.class.getSimpleName();
  24. }
  25. }


  1. /** * 重平衡 */
  2. public void doRebalance() {
  3. // 遍历每个消费组的消费者实例
  4. for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
  5. MQConsumerInner impl = entry.getValue();
  6. if (impl != null) {
  7. try {
  8. impl.doRebalance();
  9. } catch (Throwable e) {
  10. log.error("doRebalance exception", e);
  11. }
  12. }
  13. }
  14. }


  1. @Override
  2. public void doRebalance() {
  3. if (!this.pause) {
  4. // 调用重平衡组件
  5. this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
  6. }
  7. }


  1. public abstract class RebalanceImpl {
  2. protected static final InternalLogger log = ClientLogger.getLog();
  3. // 当前消费者分配到的队列
  4. protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
  5. // 保存所订阅的每个主题中的消息队列列表
  6. protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
  7. new ConcurrentHashMap<String, Set<MessageQueue>>();
  8. /** * 订阅数据,由于一个消费者可以订阅多个topic,所以这里的订阅数据用map类型 * key是订阅的topic名称,value其实也只是topic名称外加其它一些东西 */
  9. protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
  10. new ConcurrentHashMap<String, SubscriptionData>();
  11. protected String consumerGroup;
  12. /** * 消费模式 */
  13. protected MessageModel messageModel;
  14. /** * 负载策略,默认是平均算法 */
  15. protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
  16. protected MQClientInstance mQClientFactory;
  17. /** * 开始执行重平衡逻辑 * * @param isOrder 是否是顺序消费 */
  18. public void doRebalance(final boolean isOrder) {
  19. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  20. if (subTable != null) {
  21. // 遍历订阅的每个topic,对每个topic中的队列都重新负载均衡
  22. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  23. final String topic = entry.getKey();
  24. try {
  25. this.rebalanceByTopic(topic, isOrder);
  26. } catch (Throwable e) {
  27. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  28. log.warn("rebalanceByTopic Exception", e);
  29. }
  30. }
  31. }
  32. }
  33. this.truncateMessageQueueNotMyTopic();
  34. }
  35. /** * 根据topic对其中的队列进行重平衡 */
  36. private void rebalanceByTopic(final String topic, final boolean isOrder) {
  37. switch (messageModel) {
  38. case BROADCASTING: {
  39. Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  40. if (mqSet != null) {
  41. boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  42. if (changed) {
  43. this.messageQueueChanged(topic, mqSet, mqSet);
  44."messageQueueChanged {} {} {} {}",
  45. consumerGroup,
  46. topic,
  47. mqSet,
  48. mqSet);
  49. }
  50. } else {
  51. log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  52. }
  53. break;
  54. }
  55. // 集群模式
  56. case CLUSTERING: {
  57. // 该topic对应的消息队列
  58. Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  59. // 从随机的一个broker中获取该消费组中所有的消费者客户端ID
  60. List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
  61. if (null == mqSet) {
  62. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  63. log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  64. }
  65. }
  66. if (null == cidAll) {
  67. log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
  68. }
  69. // 如果有一个为空,则放弃本次负载均衡
  70. if (mqSet != null && cidAll != null) {
  71. List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  72. // 将消息队列从Set中复制到List中
  73. mqAll.addAll(mqSet);
  74. // 对消息队列和消费者客户端id排序,这样可以保证每个消费者重平衡时队列顺序一样,就不会有重复分配问题
  75. Collections.sort(mqAll);
  76. Collections.sort(cidAll);
  77. AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  78. // 分配给该消费者的队列
  79. List<MessageQueue> allocateResult = null;
  80. try {
  81. // 根据负载均衡策略重平衡
  82. allocateResult = strategy.allocate(
  83. this.consumerGroup,
  84. this.mQClientFactory.getClientId(),
  85. mqAll,
  86. cidAll);
  87. } catch (Throwable e) {
  88. log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
  89. e);
  90. return;
  91. }
  92. Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  93. if (allocateResult != null) {
  94. allocateResultSet.addAll(allocateResult);
  95. }
  96. // 判断该消费者的订阅的队列是否有变
  97. // 如果是顺序消费,还会对消费队列进行加解锁
  98. boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  99. if (changed) {
  101. "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
  102. strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
  103. allocateResultSet.size(), allocateResultSet);
  104. // 更改后需要立即发送心跳通知所有的broker
  105. this.messageQueueChanged(topic, mqSet, allocateResultSet);
  106. }
  107. }
  108. break;
  109. }
  110. default:
  111. break;
  112. }
  113. }
  114. /** * 判断订阅的队列是否有变 * */
  115. private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
  116. final boolean isOrder) {
  117. boolean changed = false;
  118. // 当前消费者已经分配到的消息队列
  119. Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  120. while (it.hasNext()) {
  121. Entry<MessageQueue, ProcessQueue> next =;
  122. MessageQueue mq = next.getKey();
  123. ProcessQueue pq = next.getValue();
  124. // 不属于该topic的队列跳过
  125. if (mq.getTopic().equals(topic)) {
  126. // 如果重新分配后的队列不包含旧的队列mq,说明经过重平衡后,该队列被分配给别的消费者了,
  127. // 所以需要暂停当前消费者对该队列的消费,将该ProcessQueue设置dropped=true,并从本地缓存中移除
  128. if (!mqSet.contains(mq)) {
  129. pq.setDropped(true);
  130. if (this.removeUnnecessaryMessageQueue(mq, pq)) {
  131. // 从订阅缓存中移除该队列
  132. it.remove();
  133. changed = true;
  134."doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
  135. }
  136. } else if (pq.isPullExpired()) {
  137. switch (this.consumeType()) {
  139. break;
  141. pq.setDropped(true);
  142. if (this.removeUnnecessaryMessageQueue(mq, pq)) {
  143. it.remove();
  144. changed = true;
  145. log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
  146. consumerGroup, mq);
  147. }
  148. break;
  149. default:
  150. break;
  151. }
  152. }
  153. }
  154. }
  155. List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  156. for (MessageQueue mq : mqSet) {
  157. // 如果旧的缓存订阅的队列中没有新的队列,说明经过重平衡后,该消费者分配到了新的队列
  158. // 则需要创建PullRequest拉取消息任务去拉取消息
  159. if (!this.processQueueTable.containsKey(mq)) {
  160. // 这里在顺序消费那里详细讲解
  161. if (isOrder && !this.lock(mq)) {
  162. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
  163. continue;
  164. }
  165. // 从内存中删除该消息队列的消费进度
  166. this.removeDirtyOffset(mq);
  167. // 创建一个新的ProcessQueue与该消息队列对应
  168. ProcessQueue pq = new ProcessQueue();
  169. // 计算从哪开始拉取
  170. long nextOffset = this.computePullFromWhere(mq);
  171. if (nextOffset >= 0) {
  172. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  173. if (pre != null) {
  174."doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  175. } else {
  176."doRebalance, {}, add a new mq, {}", consumerGroup, mq);
  177. PullRequest pullRequest = new PullRequest();
  178. pullRequest.setConsumerGroup(consumerGroup);
  179. pullRequest.setNextOffset(nextOffset);
  180. pullRequest.setMessageQueue(mq);
  181. pullRequest.setProcessQueue(pq);
  182. pullRequestList.add(pullRequest);
  183. changed = true;
  184. }
  185. } else {
  186. log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
  187. }
  188. }
  189. }
  190. // 立即去执行拉取任务,其实就是将拉取请求任务放到拉取请求队列中
  191. this.dispatchPullRequest(pullRequestList);
  192. return changed;
  193. }
  194. }


