RocketMQ源码17-consumer 负载均衡(重平衡)

重平衡分析的前提是:集群消费模式。 重平衡要做的也很简单,就是给当前消费者分配属于它的逻辑消费队列


Rebalance(重平衡)机制指的是:将一个Topic下的多个队列,在同一个消费者组(consumer group)下的多个消费者实例(consumer instance)之间进行重新分配。





2. 消费者启动DefaultMQPushConsumerImpl


  1. 从NameServer更新当前消费者订阅主题的路由信息;
  2. 向Broker发送心跳,注册消费者;
  3. 唤醒负载均衡服务,触发一次负载均衡;

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    1. public synchronized void start() throws MQClientException {
    2. // ...
    3. // 更新当前消费者订阅主题的路由信息
    4. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    5. this.mQClientFactory.checkClientInBroker();
    6. // 向Broker发送心跳
    7. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    8. // 唤醒负载均衡服务
    9. this.mQClientFactory.rebalanceImmediately();
    10. }


2.1 更新主题路由信息


  1. public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  2. private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
  3. // 获取当前消费者订阅的主题信息
  4. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  5. if (subTable != null) {
  6. // 遍历订阅的主题信息
  7. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  8. final String topic = entry.getKey();
  9. // 从NameServer更新主题的路由信息
  10. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  11. }
  12. }
  13. }
  14. }
2.2 注册消费者

2.2.1 发送心跳


  1. public class MQClientInstance {
  2. public void sendHeartbeatToAllBrokerWithLock() {
  3. if (this.lockHeartbeat.tryLock()) {
  4. try {
  5. // todo 调用sendHeartbeatToAllBroker向Broker发送心跳
  6. this.sendHeartbeatToAllBroker();
  7. this.uploadFilterClassSource();
  8. } catch (final Exception e) {
  9. log.error("sendHeartbeatToAllBroker exception", e);
  10. } finally {
  11. this.lockHeartbeat.unlock();
  12. }
  13. } else {
  14. log.warn("lock heartBeat, but failed. [{}]", this.clientId);
  15. }
  16. }
  17. }
  1. public class MQClientInstance {
  2. // Broker路由表
  3. private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
  4. new ConcurrentHashMap<String, HashMap<Long, String>>();
  5. // 发送心跳
  6. private void sendHeartbeatToAllBroker() {
  7. final HeartbeatData heartbeatData = this.prepareHeartbeatData();
  8. // ...
  9. if (!this.brokerAddrTable.isEmpty()) {
  10. long times = this.sendHeartbeatTimesTotal.getAndIncrement();
  11. // 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
  12. Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
  13. while (it.hasNext()) {
  14. Entry<String, HashMap<Long, String>> entry =;
  15. String brokerName = entry.getKey(); // broker name
  16. // 获取同一个Broker Name下的所有Broker实例
  17. HashMap<Long, String> oneTable = entry.getValue();
  18. if (oneTable != null) {
  19. // 遍历所有的实例
  20. for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
  21. Long id = entry1.getKey();
  22. String addr = entry1.getValue();
  23. if (addr != null) { // 如果地址不为空
  24. // ...
  25. try {
  26. // 发送心跳
  27. int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
  28. // ...
  29. } catch (Exception e) {
  30. // ...
  31. }
  32. }
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
  1. public class MQClientAPIImpl {
  2. public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
  3. ) throws RemotingException, MQBrokerException, InterruptedException {
  4. // 创建HEART_BEAT请求
  5. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
  6. request.setLanguage(clientConfig.getLanguage());
  7. request.setBody(heartbeatData.encode());
  8. // 发送请求
  9. RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  10. // ...
  11. }
  12. }
2.2.2 broker心跳请求处理


  1. public class BrokerController {
  2. public void registerProcessor() {
  3. ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
  4. // 注册HEART_BEAT请求的处理器ClientManageProcessor
  5. this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
  6. }
  7. }
  1. public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. @Override
  3. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  4. throws RemotingCommandException {
  5. switch (request.getCode()) {
  6. case RequestCode.HEART_BEAT: // 处理心跳请求
  7. return this.heartBeat(ctx, request);
  8. case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
  9. return this.unregisterClient(ctx, request);
  10. case RequestCode.CHECK_CLIENT_CONFIG:
  11. return this.checkClientConfig(ctx, request);
  12. default:
  13. break;
  14. }
  15. return null;
  16. }
  17. }
  1. public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
  3. // ...
  4. for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
  5. // ...
  6. // 注册Consumer
  7. boolean changed = this.brokerController.getConsumerManager().registerConsumer(
  8. data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
  9. data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
  10. // ...
  11. }
  12. // ...
  13. return response;
  14. }
  15. }
  1. 根据组名称获取该消费者组的信息ConsumerGroupInfo对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息;
  2. 判断消费者是否发生了变更,如果如果发生了变化,会触发CHANGE变更事件(这个稍后再看);
  3. 触发REGISTER注册事件;

    public class ConsumerManager {

    1. public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    2. ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    3. final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    4. // 根据组名称获取消费者组信息
    5. ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    6. if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
    7. ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    8. ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    9. consumerGroupInfo = prev != null ? prev : tmp;
    10. }
    11. boolean r1 =
    12. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
    13. consumeFromWhere);
    14. boolean r2 = consumerGroupInfo.updateSubscription(subList);
    15. // 如果有变更
    16. if (r1 || r2) {
    17. if (isNotifyConsumerIdsChangedEnable) {
    18. // 通知变更
    19. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    20. }
    21. }
    22. // 注册Consumer
    23. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    24. return r1 || r2;
    25. }



  1. public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
  2. @Override
  3. public void handle(ConsumerGroupEvent event, String group, Object... args) {
  4. if (event == null) {
  5. return;
  6. }
  7. switch (event) {
  8. case CHANGE:// 如果是消费者变更事件
  9. // ...
  10. break;
  11. case UNREGISTER: // 如果是取消注册事件
  12. this.brokerController.getConsumerFilterManager().unRegister(group);
  13. break;
  14. case REGISTER: // 如果是注册事件
  15. if (args == null || args.length < 1) {
  16. return;
  17. }
  18. Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
  19. // 进行注册
  20. this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
  21. break;
  22. default:
  23. throw new RuntimeException("Unknown event " + event);
  24. }
  25. }
  26. }
2.3 立即重平衡


  1. public void rebalanceImmediately() {
  2. this.rebalanceService.wakeup();
  3. }
这里会唤醒org.apache.rocketmq.client.impl.consumer.RebalanceService的run方法中的 this.mqClientFactory.doRebalance();:

  1. @Override
  2. public void run() {
  3. + " service started");
  4. while (!this.isStopped()) {
  5. // todo 等待20s执行一次 内部使用了juc的CountDownLatch, 使得这里启动后仍然是阻塞的
  6. this.waitForRunning(waitInterval);
  7. // todo
  8. this.mqClientFactory.doRebalance();
  9. }
  10. + " service end");
  11. }
接下来我们重点看一下 重平衡



  1. public class RebalanceService extends ServiceThread {
  2. private static long waitInterval = Long.parseLong(System.getProperty(
  3. "rocketmq.client.rebalance.waitInterval", "20000"));
  4. //TODO:...略.....
  5. @Override
  6. public void run() {
  7. + " service started");
  8. while (!this.isStopped()) {
  9. this.waitForRunning(waitInterval);
  10. this.mqClientFactory.doRebalance();
  11. }
  12. + " service end");
  13. }
  14. }
  15. 复制代码




  1. public void doRebalance(final boolean isOrder) {
  2. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  3. if (subTable != null) {
  4. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  5. final String topic = entry.getKey();
  6. try {
  7. // todo 客户端负载均衡:根据主题来处理负载均衡
  8. this.rebalanceByTopic(topic, isOrder);
  9. } catch (Throwable e) {
  10. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  11. log.warn("rebalanceByTopic Exception", e);
  12. }
  13. }
  14. }
  15. }
  16. this.truncateMessageQueueNotMyTopic();
  17. }
我们继续往下走: org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

  1. private void rebalanceByTopic(final String topic, final boolean isOrder) {
  2. switch (messageModel) {
  3. // 广播模式:不需要处理负载均衡,每个消费者都要消费,只需要更新负载信息
  4. case BROADCASTING: {
  5. // 更新负载均衡信息,这里传入的参数是mqSet,即所有队列
  6. Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  7. if (mqSet != null) {
  8. boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
  9. if (changed) {
  10. this.messageQueueChanged(topic, mqSet, mqSet);
  11."messageQueueChanged {} {} {} {}",
  12. consumerGroup,
  13. topic,
  14. mqSet,
  15. mqSet);
  16. }
  17. } else {
  18. log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  19. }
  20. break;
  21. }
  22. // todo 集群模式
  23. case CLUSTERING: {
  24. // 从主题订阅信息缓存表中获取主题的队列信息, 获取这个topic下的所有队列(默认是4个)
  25. Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
  26. //发送请求从Broker中获取该消费组内当前所有的消费者客户端ID,主题的队
  27. //列可能分布在多个Broker上,那么请求该发往哪个Broker呢?
  28. //RocketeMQ从主题的路由信息表中随机选择一个Broker
  29. List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
  30. if (null == mqSet) {
  31. if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  32. log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  33. }
  34. }
  35. if (null == cidAll) {
  36. log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
  37. }
  38. // 如果mqSet、cidAll任意一个为空,则忽略本次消息队列负载
  39. if (mqSet != null && cidAll != null) {
  40. List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  41. mqAll.addAll(mqSet);
  42. // 对cidAll、mqAll进行排序
  43. // 这一步很重要,同一个消费组内看到的视图应保持一致,确保同一个消费队列不会被多个消费者分配
  44. Collections.sort(mqAll);
  45. Collections.sort(cidAll);
  46. // 默认是平均分配策略
  47. AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  48. List<MessageQueue> allocateResult = null;
  49. try {
  50. // todo 分配算法
  51. allocateResult = strategy.allocate(
  52. this.consumerGroup,
  53. this.mQClientFactory.getClientId(),
  54. mqAll,
  55. cidAll);
  56. } catch (Throwable e) {
  57. log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
  58. e);
  59. return;
  60. }
  61. //TODO:保存了当前消费者需要消费的队列
  62. Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  63. if (allocateResult != null) {
  64. allocateResultSet.addAll(allocateResult);
  65. }
  66. // todo 对比消息队列是否发生变化 更新负载均衡信息,传入参数是 allocateResultSet,即当前consumer分配到的队列
  67. boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  68. if (changed) {
  70. "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
  71. strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
  72. allocateResultSet.size(), allocateResultSet);
  73. this.messageQueueChanged(topic, mqSet, allocateResultSet);
  74. }
  75. }
  76. break;
  77. }
  78. default:
  79. break;
  80. }
  81. }
  1. 广播模式:消费者要消费所有队列的数据,所以如果queue变动了,就更新消费者的消费队列,总之就是消费者要消费所有队列。
  2. 集群模式:相同Consumer Group的每个Consumer实例平均分摊同一个Topic的消息。即每条消息只会被发送到Consumer Group中的某个Consumer



  1. 平均分配策略(默认)


举例:如果是4个queue,3个消费者; consumer1分配queueid=0,1;consumer2分配queueid=2,consumer3分配queueid=3。

  1. 环形平均策略


举例:如果是4个queue,3个消费者; consumer1分配queueid=0,3;consumer2分配queueid=1,consumer3分配queueid=2。它和平均分配不同的是,它先给consumer1分配queueid=0,consumer2分配queueid=1,consumer3分配queueid=2,然后再将多余的queueid=3分配给consumer1。

  1. 一致性hash策略


  1. 同机房策略


  1. 就近机房策略



  1. /**
  2. * queue分配策略
  3. */
  4. public interface AllocateMessageQueueStrategy {
  5. /**
  6. * Allocating by consumer id
  7. *
  8. * @param consumerGroup:消费者组
  9. * @param currentCID:当前的消费者id
  10. * @param mqAll:topic下的所有队列
  11. * @param cidAl:消费者组下的所有消费者id
  12. * @return :返回当前消费者需要消费的队列
  13. */
  14. List<MessageQueue> allocate(
  15. final String consumerGroup,
  16. final String currentCID,
  17. final List<MessageQueue> mqAll,
  18. final List<String> cidAll
  19. );
  20. }
默认使用的是平均分配策略 AllocateMessageQueueAveragely


  1. public List<MessageQueue> allocate(String consumerGroup, String currentCID,
  2. List<MessageQueue> mqAll, List<String> cidAll) {
  3. // 返回值
  4. List<MessageQueue> result = new ArrayList<MessageQueue>();
  5. // 省略一些判断操作
  6. ...
  7. int index = cidAll.indexOf(currentCID);
  8. int mod = mqAll.size() % cidAll.size();
  9. // 1. 消费者数量大于队列数量:averageSize = 1
  10. // 2. 消费者数量小于等于队列数量:averageSize = 队列数量 / 消费者数量,还要处理个+1的操作
  11. int averageSize = mqAll.size() <= cidAll.size()
  12. ? 1 : (mod > 0 && index < mod
  13. ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
  14. int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
  15. int range = Math.min(averageSize, mqAll.size() - startIndex);
  16. for (int i = 0; i < range; i++) {
  17. result.add(mqAll.get((startIndex + i) % mqAll.size()));
  18. }
  19. return result;
  20. }
  1. 计算取余操作:6 % 4 = 2,这表明messageQueue不能平均分配给每个consumer,接下来就来看看这个余数2是如何处理的
  2. 计算每个consumer平均处理的messageQueue数量

    • 这里需要注意,如果consumer数量大于messageQueue数量,那每个consumer最多只会分配到一个messageQueue,这种情况下,余数2不会进行处理,并且有的consumer处理的messageQueue数量为0,同一个messageQueue不会同时被两个及以上的consumer消费掉
    • 这里的messageQueue数量为6,consumer为4,计算得到每个consumer处理的队列数最少为1,除此之外,为了实现“平均”,有2个consumer会需要多处理1个messageQueue,按“平均”的分配原则,如果index小于mod,则会分配多1个messageQueue,这里的mod为2,结果如下:

    消费者索引 0 1 2 3
    处理数量 2 2 1 1
  3. 分配完每个consumer处理的messageQueue数量后,这些messageQueue该如何分配呢?从代码来看,分配时会先分配完一个consumer,再分配下一个consumer,最终结果就是这样:

    队列 Q0 Q1 Q2 Q3 Q4 Q5
    消费者 C1 C1 C2 C2 C4 C5


其他分配策略 大家可自行阅读源码


  1. /**
  2. * 对比消息队列是否发生变化,主要思路是遍历当前负载队列集
  3. * 合,如果队列不在新分配队列的集合中,需要将该队列停止消费并保
  4. * 存消费进度;遍历已分配的队列,如果队列不在队列负载表中
  5. * (processQueueTable),则需要创建该队列拉取任务PullRequest,
  6. * 然后添加到PullMessageService线程的pullRequestQueue中,
  7. * PullMessageService才会继续拉取任务
  8. */
  9. private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
  10. final boolean isOrder) {
  11. boolean changed = false;
  12. /**
  13. * ConcurrentMap〈MessageQueue, ProcessQueue〉
  14. * processQueueTable是当前消费者负载的消息队列缓存表,如果缓存表
  15. * 中的MessageQueue不包含在mqSet中,说明经过本次消息队列负载后,
  16. * 该mq被分配给其他消费者,需要暂停该消息队列消息的消费。方法是
  17. * 将ProccessQueue的状态设置为droped=true,该ProcessQueue中的消
  18. * 息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是
  19. * 否将MessageQueue、ProccessQueue从缓存表中移除。
  20. * removeUnnecessaryMessageQueue在RebalanceImple中定义为抽象方
  21. * 法。removeUnnecessaryMessageQueue方法主要用于持久化待移除
  22. * MessageQueue的消息消费进度。在推模式下,如果是集群模式并且是
  23. * 顺序消息消费,还需要先解锁队列
  24. */
  25. Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  26. while (it.hasNext()) {
  27. Entry<MessageQueue, ProcessQueue> next =;
  28. MessageQueue mq = next.getKey();
  29. ProcessQueue pq = next.getValue();
  30. if (mq.getTopic().equals(topic)) {
  31. if (!mqSet.contains(mq)) {
  32. pq.setDropped(true);
  33. if (this.removeUnnecessaryMessageQueue(mq, pq)) {
  34. it.remove();
  35. changed = true;
  36."doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
  37. }
  38. } else if (pq.isPullExpired()) {
  39. switch (this.consumeType()) {
  41. break;
  43. pq.setDropped(true);
  44. if (this.removeUnnecessaryMessageQueue(mq, pq)) {
  45. it.remove();
  46. changed = true;
  47. log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
  48. consumerGroup, mq);
  49. }
  50. break;
  51. default:
  52. break;
  53. }
  54. }
  55. }
  56. }
  57. /**
  58. * 遍历本次负载分配到的队列集合,如果
  59. * processQueueTable中没有包含该消息队列,表明这是本次新增加的消
  60. * 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
  61. * 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
  62. * 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
  64. * CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
  65. * DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
  66. */
  67. List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  68. for (MessageQueue mq : mqSet) {
  69. if (!this.processQueueTable.containsKey(mq)) {
  70. /**
  71. * 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
  72. * 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
  73. * 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
  74. * 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
  75. */
  76. // 顺序消息
  77. if (isOrder && !this.lock(mq)) {
  78. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
  79. continue;
  80. }
  81. this.removeDirtyOffset(mq);
  82. ProcessQueue pq = new ProcessQueue();
  83. // todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
  84. long nextOffset = this.computePullFromWhere(mq);
  85. if (nextOffset >= 0) {
  86. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  87. if (pre != null) {
  88."doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  89. } else {
  90."doRebalance, {}, add a new mq, {}", consumerGroup, mq);
  91. PullRequest pullRequest = new PullRequest();
  92. pullRequest.setConsumerGroup(consumerGroup);
  93. pullRequest.setNextOffset(nextOffset);
  94. pullRequest.setMessageQueue(mq);
  95. pullRequest.setProcessQueue(pq);
  96. pullRequestList.add(pullRequest);
  97. changed = true;
  98. }
  99. } else {
  100. log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
  101. }
  102. }
  103. }
  104. // todo 将PullRequest加入PullMessageService,以便唤醒PullMessageService线程
  105. this.dispatchPullRequest(pullRequestList);
  106. return changed;
  107. }
  1. 先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对

format_png 1

  • 上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法。
  • 上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry。


  1. 为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImplprocessQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),并创建拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。


关于上面的核心逻辑方法我在举个栗子:假设有两个消费者c1,c2; 队列默认是4个。

  1. 消费者c1先启动,由于只有一个消费者,所以分配给c1的队列是4个(mqSet.size()=4);
  2. 然后开始处理重平衡逻辑,第一次进来 processQueueTable肯定是empty的,所以先跳过遍历逻辑。
  3. 遍历分配到的mqSet集合(遍历4次)

    • 创建ProcessQueue对象
    • 读取当前MessageQueue的偏移量,去broker中读取
    • 将当前MessageQueueProcessQueue 放入processQueueTable
    • 创建PullRequest对象,设置队列MessageQueue,缓存消息队列ProcessQueue,以及当前MessageQueue的偏移量,然后放入pullRequestList集合中
  4. 然后分发pullRequestList,将拉取消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起拉取消息的请求。


  1. 消费者c2启动,先不关注它重平衡的逻辑,就假设它分配了2个队列


  1. 消费者c1再次重平衡,经过平均分配算法后,c1也分配了2个queue(因为c2分配了2个,他们平均分),所以mqSet.size()=2
  2. 然后开始处理重平衡逻辑,由于消费者c1第一次重平衡时,processQueueTable已经放入了4个queue(MessageQueue),所以这里开始遍历。如果mqSet(这里是2个)不包含当前entry的key(MessageQueue),则:

    • 将当前entry中的ProcessQueue的属性 dropped 设置为true.
    • RemoteBrokerOffsetStore#offsetTable属性中,移除当前queue(MessageQueue).因为这个队列已经部署了c1了。
    • 将当前entry从processQueueTable中移除
  3. 这样遍历完,processQueueTable 还剩下两个entry(这两个entry的key是在mqSet集合中的)

  4. 继续遍历mqSet(2个queue),但是继续判断(!processQueueTable.containsKey(mq)),这里肯定是false,因为前面分析过了,它是还剩下2个MessageQueue,所以跳过if,最终装PullRequest的容器集合是empty的,所以就没有以后了。。。。 不难发现,c1在第2次重平衡后,并没有构建全新的pullRequestList,所以dispatchPullRequest(pullRequestList)方法内部不会做任何逻辑。这样c1还是有4个PullRequest, 那么第2次重平衡好像就没有什么用处了?

    • 不是的,这就是前面提到的一个非常重要的属性设置dropped=true
    • 在拉取消息时,首先就会判断这个属性

      public void pullMessage(final PullRequest pullRequest) {

      1. final ProcessQueue processQueue = pullRequest.getProcessQueue();
      2. //TODO:如果是dropped=true,则本次拉取请求直接丢弃,不会在将PullRequest放回阻塞队列中
      3. if (processQueue.isDropped()) {
      4."the pull request[{}] is dropped.", pullRequest.toString());
      5. return;
      6. }
      7. //TODO:.......






  1. public void run() {
  2. /**
  3. * 先检查processQueue的dropped,如果设置为true,则停止该队列的消费。在进行消息重新负
  4. * 载时,如果该消息队列被分配给消费组内的其他消费者,需要将
  5. * droped设置为true,阻止消费者继续消费不属于自己的消息队列
  6. */
  7. if (this.processQueue.isDropped()) {
  8."the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  9. return;
  10. }
  11. // 取出消息监听器 获取消息监听器MessageListenerConcurrently
  12. MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
  13. ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
  14. ConsumeConcurrentlyStatus status = null;
  15. /**
  16. * 恢复重试消息主题名。这是为什么呢?这是由消息重试
  17. * 机制决定的,RocketMQ将消息存入CommitLog文件时,如果发现消息的
  18. * 延时级别delayTimeLevel大于0,会先将重试主题存入消息的属性,然
  19. * 后将主题名称设置为SCHEDULE_TOPIC_XXXX,以便之后重新参与消息消
  20. * 费
  21. */
  22. defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
  23. ...
  24. try {
  25. ...
  26. // todo 执行具体的消息消费,调用应用程序消息监听器的
  27. //consumeMessage方法,进入具体的消息消费业务逻辑,返回该批消息
  28. //的消费结果,即CONSUME_SUCCESS(消费成功)或
  29. //RECONSUME_LATER(需要重新消费)
  30. status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  31. }
  32. ...
  33. ...
  34. /**
  35. * 执行业务消息消费后,在处理结果前再次验证一次
  36. * ProcessQueue的isDropped状态值。如果状态值为true,将不对结果进
  37. * 行任何处理。也就是说,在消息消费进入第四步时,如果因新的消费
  38. * 者加入或原先的消费者出现宕机,导致原先分配给消费者的队列在负
  39. * 载之后分配给了别的消费者,那么消息会被重复消费
  40. */
  41. if (!processQueue.isDropped()) {
  42. // todo
  43. ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  44. } else {
  45. log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  46. }
  47. }
  1. //TODO:可能会导致重复消费
  2. if (!processQueue.isDropped()) {
  3. ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  4. } else {
  5. log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  6. }
  7. 复制代码



6. Rebalance的触发

6.1 消费者启动时触发


6.2 消费者变更时触发


  • 当前注册的消费者对应的Channel对象之前不存在;
  • 当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除

    public class ConsumerManager {

    1. /**
    2. * 注册消费者
    3. * @param group 消费者组名称
    4. * @param clientChannelInfo 注册的消费者对应的Channel信息
    5. * @param consumeType 消费类型
    6. * @param messageModel
    7. * @param consumeFromWhere 消费消息的位置
    8. * @param subList 消费者订阅的主题信息
    9. * @param isNotifyConsumerIdsChangedEnable 是否通知变更
    10. * @return
    11. */
    12. public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    13. ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    14. final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    15. // 根据组名称获取消费者组信息
    16. ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    17. if (null == consumerGroupInfo) { // 如果为空新增
    18. ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    19. ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    20. consumerGroupInfo = prev != null ? prev : tmp;
    21. }
    22. // 更新Channel
    23. boolean r1 =
    24. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
    25. consumeFromWhere);
    26. // 更新订阅信息
    27. boolean r2 = consumerGroupInfo.updateSubscription(subList);
    28. // 如果有变更
    29. if (r1 || r2) {
    30. if (isNotifyConsumerIdsChangedEnable) {
    31. // 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
    32. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    33. }
    34. }
    35. // 注册Consumer
    36. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    37. return r1 || r2;
    38. }


6.2.1 Channel变更


  • 如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化;
  • 如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态updated不发生变化;


  1. // key为消费者对应的channle,value为chanel信息
  2. private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
  3. new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
  4. public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
  5. MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
  6. boolean updated = false; // 变更状态初始化为false
  7. this.consumeType = consumeType;
  8. this.messageModel = messageModel;
  9. this.consumeFromWhere = consumeFromWhere;
  10. // 从channelInfoTable中获取对应的Channel信息,
  11. ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
  12. if (null == infoOld) { // 如果为空
  13. // 新增
  14. ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
  15. if (null == prev) { // 如果之前不存在
  16."new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
  17. messageModel, infoNew.toString());
  18. // 变更状态置为true
  19. updated = true;
  20. }
  21. infoOld = infoNew;
  22. } else {
  23. // 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channel
  24. if (!infoOld.getClientId().equals(infoNew.getClientId())) {
  25. log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());
  26. this.channelInfoTable.put(infoNew.getChannel(), infoNew);
  27. }
  28. }
  29. this.lastUpdateTimestamp = System.currentTimeMillis();
  30. infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
  31. return updated;
  32. }
6.2.2 主题信息订阅变更


  1. 判断是否有新增的主题订阅信息,主要是通过subscriptionTable是否存在某个主题进行判断的:

    • 如果不存在,表示之前没有订阅过某个主题的信息,将其加入到subscriptionTable中,并将变更状态置为true,表示主题订阅信息有变化;
    • 如果subscriptionTable中存在某个主题的订阅信息,表示之前就已订阅,将其更新为最新的,但是变更状态不发生变化;
  2. 判断是否有删除的主题,主要是通过subscriptionTablesubList的对比进行判断的,如果有删除的主题,将变更状态置为true;


  1. public class ConsumerGroupInfo {
  2. // 记录了订阅的主题信息,key为topic,value为订阅信息
  3. private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
  4. new ConcurrentHashMap<String, SubscriptionData>();
  5. public boolean updateSubscription(final Set<SubscriptionData> subList) {
  6. boolean updated = false;
  7. // 遍历订阅的主题信息
  8. for (SubscriptionData sub : subList) {
  9. //根据主题获取订阅信息
  10. SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
  11. // 如果获取为空
  12. if (old == null) {
  13. // 加入到subscriptionTable
  14. SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
  15. if (null == prev) {
  16. updated = true; // 变更状态置为true
  17."subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());
  18. }
  19. } else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化
  20. if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
  21."subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());
  22. }
  23. // 更新为最新的订阅信息
  24. this.subscriptionTable.put(sub.getTopic(), sub);
  25. }
  26. }
  27. Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
  28. // 进行遍历,这一步主要是判断有没有取消订阅的主题
  29. while (it.hasNext()) {
  30. Entry<String, SubscriptionData> next =;
  31. String oldTopic = next.getKey();
  32. boolean exist = false;
  33. // 遍历最新的订阅信息
  34. for (SubscriptionData sub : subList) {
  35. // 如果在旧的订阅信息中存在就终止,继续判断下一个主题
  36. if (sub.getTopic().equals(oldTopic)) {
  37. exist = true;
  38. break;
  39. }
  40. }
  41. // 走到这里,表示有取消订阅的主题
  42. if (!exist) {
  43. log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());
  44. // 进行删除
  45. it.remove();
  46. // 变更状态置为true
  47. updated = true;
  48. }
  49. }
  50. this.lastUpdateTimestamp = System.currentTimeMillis();
  51. return updated;
  52. }
  53. }
6.2.3 变更请求发送



  1. public class ConsumerManager {
  2. public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
  3. ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
  4. final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
  5. // ...
  6. // 更新Channel
  7. boolean r1 =
  8. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
  9. consumeFromWhere);
  10. // 更新订阅信息
  11. boolean r2 = consumerGroupInfo.updateSubscription(subList);
  12. // 如果有变更
  13. if (r1 || r2) {
  14. if (isNotifyConsumerIdsChangedEnable) {
  15. // 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channel
  16. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
  17. }
  18. }
  19. // ...
  20. }
  21. }
  22. // DefaultConsumerIdsChangeListener
  23. public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
  24. @Override
  25. public void handle(ConsumerGroupEvent event, String group, Object... args) {
  26. if (event == null) {
  27. return;
  28. }
  29. switch (event) {
  30. case CHANGE:// 如果是消费者变更事件
  31. case CHANGE:
  32. if (args == null || args.length < 1) {
  33. return;
  34. }
  35. // 获取所有的消费者对应的channel
  36. List<Channel> channels = (List<Channel>) args[0];
  37. if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
  38. for (Channel chl : channels) {
  39. // 向每一个消费者发送变更请求
  40. this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
  41. }
  42. }
  43. break;
  44. // ...
  45. }
  46. }
  47. }
  48. 复制代码


  1. public class Broker2Client {
  2. public void notifyConsumerIdsChanged(
  3. final Channel channel,
  4. final String consumerGroup) {
  5. if (null == consumerGroup) {
  6. log.error("notifyConsumerIdsChanged consumerGroup is null");
  7. return;
  8. }
  9. NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
  10. requestHeader.setConsumerGroup(consumerGroup);
  11. // 创建变更通知请求
  12. RemotingCommand request =
  13. RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
  14. try {
  15. // 发送请求
  16. this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
  17. } catch (Exception e) {
  18. log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
  19. }
  20. }
  21. }
6.2.4 变更通知请求处理


  1. public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. @Override
  3. public RemotingCommand processRequest(ChannelHandlerContext ctx,
  4. RemotingCommand request) throws RemotingCommandException {
  5. switch (request.getCode()) {
  6. case RequestCode.CHECK_TRANSACTION_STATE:
  7. return this.checkTransactionState(ctx, request);
  9. // 处理变更请求
  10. return this.notifyConsumerIdsChanged(ctx, request);
  11. // ...
  12. default:
  13. break;
  14. }
  15. return null;
  16. }
  17. public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
  18. RemotingCommand request) throws RemotingCommandException {
  19. try {
  20. // ...
  21. // 触发负载均衡
  22. this.mqClientFactory.rebalanceImmediately();
  23. } catch (Exception e) {
  24. log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
  25. }
  26. return null;
  27. }
  28. }
6.3 消费者停止时触发


  1. public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  2. public synchronized void shutdown(long awaitTerminateMillis) {
  3. switch (this.serviceState) {
  4. case CREATE_JUST:
  5. break;
  6. case RUNNING:
  7. this.consumeMessageService.shutdown(awaitTerminateMillis);
  8. this.persistConsumerOffset();
  9. // 取消注册
  10. this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
  11. this.mQClientFactory.shutdown();
  12. // ...
  13. break;
  15. break;
  16. default:
  17. break;
  18. }
  19. }
  20. }
  1. public class MQClientInstance {
  2. public synchronized void unregisterConsumer(final String group) {
  3. this.consumerTable.remove(group);
  4. // 取消注册
  5. this.unregisterClient(null, group);
  6. }
  7. private void unregisterClient(final String producerGroup, final String consumerGroup) {
  8. // 获取所有的Broker
  9. Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
  10. // 进行遍历
  11. while (it.hasNext()) {
  12. // ...
  13. if (oneTable != null) {
  14. for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
  15. String addr = entry1.getValue();
  16. if (addr != null) {
  17. try {
  18. // 发送取消注册请求
  19. this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
  20. // ...
  21. } // ...
  22. }
  23. }
  24. }
  25. }
  26. }
  27. }
  1. public class MQClientAPIImpl {
  2. public void unregisterClient(final String addr, final String clientID, final String producerGroup, final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
  3. // ...
  4. requestHeader.setConsumerGroup(consumerGroup);
  6. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
  7. // 发送请求
  8. RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  9. // ...
  10. }
  11. }
  1. public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. @Override
  3. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  4. throws RemotingCommandException {
  5. switch (request.getCode()) {
  6. case RequestCode.HEART_BEAT: // 处理心跳请求
  7. return this.heartBeat(ctx, request);
  8. case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
  9. return this.unregisterClient(ctx, request);
  10. case RequestCode.CHECK_CLIENT_CONFIG:
  11. return this.checkClientConfig(ctx, request);
  12. default:
  13. break;
  14. }
  15. return null;
  16. }
  17. public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
  18. throws RemotingCommandException {
  19. // ...
  20. {
  21. final String group = requestHeader.getConsumerGroup();
  22. if (group != null) {
  23. // ...
  24. // 取消消费者的注册
  25. this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
  26. }
  27. }
  28. response.setCode(ResponseCode.SUCCESS);
  29. response.setRemark(null);
  30. return response;
  31. }
  32. }
  1. public class ConsumerManager {
  2. public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
  3. boolean isNotifyConsumerIdsChangedEnable) {
  4. ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
  5. if (null != consumerGroupInfo) {
  6. consumerGroupInfo.unregisterChannel(clientChannelInfo);
  7. if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
  8. ConsumerGroupInfo remove = this.consumerTable.remove(group);
  9. if (remove != null) {
  10. // 触发取消注册事件
  11. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
  12. }
  13. }
  14. // 触发消费者变更事件
  15. if (isNotifyConsumerIdsChangedEnable) {
  16. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
  17. }
  18. }
  19. }
  20. }
6.4 消费者定时触发


  1. public class RebalanceService extends ServiceThread {
  2. private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
  3. @Override
  4. public void run() {
  5. + " service started");
  6. while (!this.isStopped()) {
  7. this.waitForRunning(waitInterval); // 等待
  8. // 负载均衡
  9. this.mqClientFactory.doRebalance();
  10. }
  11. + " service end");
  12. }
  13. }
  1. queue的分配算法
  2. 重平衡的代码分析
  3. 重复消费产生的原因
  4. Rebalance的触发时机



