RocketMQ源码解析六(Consumer重平衡流程)
RocketMQ版本4.6.0,记录自己看源码的过程
重平衡服务是在Consumer启动的时候启动的一个后台线程,每次重平衡后过20s后再发起一次重平衡
/** * 重平衡服务,随着consumer启动而启动 */
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 每20s执行一次
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
MQClientInstance
/** * 重平衡 */
public void doRebalance() {
// 遍历每个消费组的消费者实例
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
DefaultMQPushConsumerImpl
@Override
public void doRebalance() {
if (!this.pause) {
// 调用重平衡组件
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
调用重平衡组件RebalanceImpl进行重平衡逻辑
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
// 当前消费者分配到的队列
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 保存所订阅的每个主题中的消息队列列表
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
new ConcurrentHashMap<String, Set<MessageQueue>>();
/** * 订阅数据,由于一个消费者可以订阅多个topic,所以这里的订阅数据用map类型 * key是订阅的topic名称,value其实也只是topic名称外加其它一些东西 */
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
/** * 消费模式 */
protected MessageModel messageModel;
/** * 负载策略,默认是平均算法 */
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
/** * 开始执行重平衡逻辑 * * @param isOrder 是否是顺序消费 */
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍历订阅的每个topic,对每个topic中的队列都重新负载均衡
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
/** * 根据topic对其中的队列进行重平衡 */
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
// 集群模式
case CLUSTERING: {
// 该topic对应的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 从随机的一个broker中获取该消费组中所有的消费者客户端ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
// 如果有一个为空,则放弃本次负载均衡
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
// 将消息队列从Set中复制到List中
mqAll.addAll(mqSet);
// 对消息队列和消费者客户端id排序,这样可以保证每个消费者重平衡时队列顺序一样,就不会有重复分配问题
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
// 分配给该消费者的队列
List<MessageQueue> allocateResult = null;
try {
// 根据负载均衡策略重平衡
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 判断该消费者的订阅的队列是否有变
// 如果是顺序消费,还会对消费队列进行加解锁
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
// 更改后需要立即发送心跳通知所有的broker
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
/** * 判断订阅的队列是否有变 * */
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 当前消费者已经分配到的消息队列
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
// 不属于该topic的队列跳过
if (mq.getTopic().equals(topic)) {
// 如果重新分配后的队列不包含旧的队列mq,说明经过重平衡后,该队列被分配给别的消费者了,
// 所以需要暂停当前消费者对该队列的消费,将该ProcessQueue设置dropped=true,并从本地缓存中移除
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
// 从订阅缓存中移除该队列
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 如果旧的缓存订阅的队列中没有新的队列,说明经过重平衡后,该消费者分配到了新的队列
// 则需要创建PullRequest拉取消息任务去拉取消息
if (!this.processQueueTable.containsKey(mq)) {
// 这里在顺序消费那里详细讲解
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 从内存中删除该消息队列的消费进度
this.removeDirtyOffset(mq);
// 创建一个新的ProcessQueue与该消息队列对应
ProcessQueue pq = new ProcessQueue();
// 计算从哪开始拉取
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 立即去执行拉取任务,其实就是将拉取请求任务放到拉取请求队列中
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
重平衡详细流程都写在注释里了,参考注释。
最终是为所分配到新的消费队列创建一个拉取请求任务放到拉取任务阻塞队列中,这样消费者就可以开始拉取该消息队列的消息了。拉取任务只在这一处创建。
参考资料
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》
还没有评论,来说两句吧...