RocketMQ源码:consumer 负载均衡(重平衡)

今天药忘吃喽~ 2024-03-24 17:58 134阅读 0赞

前面我们有提到一个非常重要的概念,就是重平衡,只有在经过重平衡后,消息的拉取对象PullMessageService才可以去Broker拉取消息,那么这篇文章就单独分析下什么是重平衡?

重平衡分析的前提是:集群消费模式。 重平衡要做的也很简单,就是给当前消费者分配属于它的逻辑消费队列

1.什么是重平衡?

Rebalance(重平衡)机制指的是:将一个Topic下的多个队列,在同一个消费者组(consumer group)下的多个消费者实例(consumer instance)之间进行重新分配。

Rebalance机制的本意是为了提升消息的并行消费能力。例如,⼀个Topic下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加⼀个消费者,那么就可以给其中⼀个消费者分配2个队列,给另⼀个分配3个队列,从而提升消息的并行消费能力。

由于⼀个队列最多分配给消费者组下的⼀个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。

format_png

接下来以集群模式下的消息推模式DefaultMQPushConsumerImpl为例,看一下负载均衡的过程。

2. 消费者启动DefaultMQPushConsumerImpl

首先,消费者在启动时会做如下操作:

  1. 从NameServer更新当前消费者订阅主题的路由信息;
  2. 向Broker发送心跳,注册消费者;
  3. 唤醒负载均衡服务,触发一次负载均衡;

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    1. public synchronized void start() throws MQClientException {
    2. // ...
    3. // 更新当前消费者订阅主题的路由信息
    4. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    5. this.mQClientFactory.checkClientInBroker();
    6. // 向Broker发送心跳
    7. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    8. // 唤醒负载均衡服务
    9. this.mQClientFactory.rebalanceImmediately();
    10. }

    }
    复制代码

2.1 更新主题路由信息

为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在updateTopicSubscribeInfoWhenSubscriptionChanged方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:

  1. public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  2. private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
  3. // 获取当前消费者订阅的主题信息
  4. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  5. if (subTable != null) {
  6. // 遍历订阅的主题信息
  7. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  8. final String topic = entry.getKey();
  9. // 从NameServer更新主题的路由信息
  10. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  11. }
  12. }
  13. }
  14. }
  15. 复制代码

2.2 注册消费者

2.2.1 发送心跳

由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用sendHeartbeatToAllBrokerWithLock向Broker发送心跳包,进行消费者注册:

  1. public class MQClientInstance {
  2. public void sendHeartbeatToAllBrokerWithLock() {
  3. if (this.lockHeartbeat.tryLock()) {
  4. try {
  5. // todo 调用sendHeartbeatToAllBroker向Broker发送心跳
  6. this.sendHeartbeatToAllBroker();
  7. this.uploadFilterClassSource();
  8. } catch (final Exception e) {
  9. log.error("sendHeartbeatToAllBroker exception", e);
  10. } finally {
  11. this.lockHeartbeat.unlock();
  12. }
  13. } else {
  14. log.warn("lock heartBeat, but failed. [{}]", this.clientId);
  15. }
  16. }
  17. }
  18. 复制代码

sendHeartbeatToAllBroker方法中,可以看到从brokerAddrTable中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用MQClientAPIImplsendHearbeat方法向每一个Broker发送心跳请求进行注册:

  1. public class MQClientInstance {
  2. // Broker路由表
  3. private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
  4. new ConcurrentHashMap<String, HashMap<Long, String>>();
  5. // 发送心跳
  6. private void sendHeartbeatToAllBroker() {
  7. final HeartbeatData heartbeatData = this.prepareHeartbeatData();
  8. // ...
  9. if (!this.brokerAddrTable.isEmpty()) {
  10. long times = this.sendHeartbeatTimesTotal.getAndIncrement();
  11. // 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)
  12. Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
  13. while (it.hasNext()) {
  14. Entry<String, HashMap<Long, String>> entry = it.next();
  15. String brokerName = entry.getKey(); // broker name
  16. // 获取同一个Broker Name下的所有Broker实例
  17. HashMap<Long, String> oneTable = entry.getValue();
  18. if (oneTable != null) {
  19. // 遍历所有的实例
  20. for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
  21. Long id = entry1.getKey();
  22. String addr = entry1.getValue();
  23. if (addr != null) { // 如果地址不为空
  24. // ...
  25. try {
  26. // 发送心跳
  27. int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
  28. // ...
  29. } catch (Exception e) {
  30. // ...
  31. }
  32. }
  33. }
  34. }
  35. }
  36. }
  37. }
  38. }
  39. 复制代码

MQClientAPIImplsendHearbeat方法中,可以看到构建了HEART_BEAT请求,然后向Broker发送:

  1. public class MQClientAPIImpl {
  2. public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis
  3. ) throws RemotingException, MQBrokerException, InterruptedException {
  4. // 创建HEART_BEAT请求
  5. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
  6. request.setLanguage(clientConfig.getLanguage());
  7. request.setBody(heartbeatData.encode());
  8. // 发送请求
  9. RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  10. // ...
  11. }
  12. }
  13. 复制代码

2.2.2 broker心跳请求处理

Broker在启动时注册了HEART_BEAT请求的处理器,可以看到请求处理器是ClientManageProcessor

  1. public class BrokerController {
  2. public void registerProcessor() {
  3. ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
  4. // 注册HEART_BEAT请求的处理器ClientManageProcessor
  5. this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
  6. }
  7. }
  8. 复制代码

进入到ClientManageProcessorprocessRequest方法,如果请求是HEART_BEAT类型会调用heartBeat方法进行处理,这里也能看还有UNREGISTER_CLIENT类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):

  1. public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. @Override
  3. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  4. throws RemotingCommandException {
  5. switch (request.getCode()) {
  6. case RequestCode.HEART_BEAT: // 处理心跳请求
  7. return this.heartBeat(ctx, request);
  8. case RequestCode.UNREGISTER_CLIENT: // 取消注册请求
  9. return this.unregisterClient(ctx, request);
  10. case RequestCode.CHECK_CLIENT_CONFIG:
  11. return this.checkClientConfig(ctx, request);
  12. default:
  13. break;
  14. }
  15. return null;
  16. }
  17. }
  18. 复制代码

进入到heartBeat方法,可以看到,调用了ConsumerManagerregisterConsumer注册消费者:

  1. public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
  2. public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
  3. // ...
  4. for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
  5. // ...
  6. // 注册Consumer
  7. boolean changed = this.brokerController.getConsumerManager().registerConsumer(
  8. data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),
  9. data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);
  10. // ...
  11. }
  12. // ...
  13. return response;
  14. }
  15. }
  16. 复制代码

进行注册

ConsumerManagerregisterConsumer方法的理逻辑如下:

  1. 根据组名称获取该消费者组的信息ConsumerGroupInfo对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息;
  2. 判断消费者是否发生了变更,如果如果发生了变化,会触发CHANGE变更事件(这个稍后再看);
  3. 触发REGISTER注册事件;

    public class ConsumerManager {

    1. public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    2. ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    3. final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    4. // 根据组名称获取消费者组信息
    5. ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    6. if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象
    7. ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    8. ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    9. consumerGroupInfo = prev != null ? prev : tmp;
    10. }
    11. boolean r1 =
    12. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
    13. consumeFromWhere);
    14. boolean r2 = consumerGroupInfo.updateSubscription(subList);
    15. // 如果有变更
    16. if (r1 || r2) {
    17. if (isNotifyConsumerIdsChangedEnable) {
    18. // 通知变更
    19. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    20. }
    21. }
    22. // 注册Consumer
    23. this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    24. return r1 || r2;
    25. }

    }
    复制代码

进入到DefaultConsumerIdsChangeListenerhandle方法中,可以看到如果是REGISTER事件,会通过ConsumerFilterManagerregister方法进行注册,注册的详细过程这里先不展开讲解:

  1. public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
  2. @Override
  3. public void handle(ConsumerGroupEvent event, String group, Object... args) {
  4. if (event == null) {
  5. return;
  6. }
  7. switch (event) {
  8. case CHANGE:// 如果是消费者变更事件
  9. // ...
  10. break;
  11. case UNREGISTER: //

发表评论

表情:
评论列表 (有 0 条评论,134人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Dubbo - 负载均衡

    前言 负载均衡,英文名称为Load Balance,其含义就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行。 例如:在Dubbo中,同一个服务有多个服务提