RocketMQ源码解析八(Consumer并发消费消息)
RocketMQ版本4.6.0,记录自己看源码的过程
回顾一下消息拉取,PullMessageService 负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存人ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。RocketMQ使用ConsumeMessageService来实现消息消费的处理逻辑。RocketMQ支持顺序消费与并发消费,这里将重点关注并发消费的消费流程,顺序消费将在之后的文档分析。
消息拉取一文中提到会将消息任务提交给ConsumeMessageConcurrentlyService中的线程池执行,这里再重新看下
/** * 提交消费消息请求到线程池中消费 */
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 一次消费几条消息,默认为1条
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 如果一次拉取的消息数量<=并发消费数量,则一次性将这一批消息都放到consumeRequest中提交给消费线程池消费
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 将消费任务提交给消费线程池消费
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 分批次提交,默认每次提交1条
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
如果拉取的消息条数大于consumeMessageBatchMaxSize,则对拉取消息进行分页,每页consumeMessageBatchMaxSize条消息,创建多个ConsumeRequest任务并提交到消费线程池。
/** * 消费线程 */
class ConsumeRequest implements Runnable {
// 默认是1条
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public List<MessageExt> getMsgs() {
return msgs;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
@Override
public void run() {
// 先判断processQueue的dropped,如果为true,则说明可能在重平衡时该消息队列被分配给
// 别的消费者了,应该停止对该消息队列的消费
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 在启动consumer时设置的监听
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
// 如果此消息的主题为(%RETRY%+消费组的名称),那么将会将此消息的topic重置为原始消息的topic。
// 即此消息的真实topic会存储在properties当中,键为RETRY_TOPIC,值为真实topic,将真实topic取出赋予此消息
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 调用消息监听器
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
// 表示消费出现异常了并且没有进行处理
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
// 返回null
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 超时
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 消费失败
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 消费成功
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
// 不管是异常还是返回null,都需要重试消息
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
// 处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
}
Step1:先判断processQueue的dropped,如果为true,则说明可能在重平衡时该消息队列被分配给别的消费者了,应该停止对该消息队列的消费。
Step2:如果消息是重试消息,则会将消息的topic和queueId还原成原本的topic和queueId。
Step3:构建ConsumeMessageContext对象。
Step4:调用消息监听器。
Step5:如果processQueue没被停止则处理消费结果。
看看如何处理消费结果
/** * 处理消费结果 */
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
// 设置ackIndex,这个值在创建ConsumeConcurrentlyContext时默认为Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
// 设置ackIndex值为 消息数-1,默认情况下这个值是0
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 需要重试消息
case RECONSUME_LATER:
// 设置为-1用于下面重发消息
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播模式不会重试消息,消费失败就丢弃了,也相当于消费成功了
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 重发这批并发消息,默认只有1条(会重新生成新的消息,这批消息默认消费成功,会更新消费进度,不会卡在这)
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 重新将消息发到broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 如果重发消息失败,则会将这些发送失败的消息重新包装起来5S后转发给消费线程池继续消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 不管这条消息消费成功与否,都会更新消费进度,表示该条消息消费成功
// 将这次消费的消息从ProcessQueue中移除
// 这里的返回值分两种情况:
// 1、如果删除该消息列表后processQueue为空了,则offset为processQueue中最大的偏移量+1,消费进度推进到最后面
// 2、如果processQueue不为空,则offset为processQueue中剩余的第一条消息,消费进度只能推进到这
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新本地消费进度,后台定时任务会定时将该消费进度同步到broker中
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
消息消费状态有两种,CONSUME_SUCCESS表示消费成功,RECONSUME_LATER表示需要重试消息。当重试时,这些并发消息都会进行重试,rocketmq会为每条消息重新创建一条与原先消息属性相同的消息,拥有一个唯一的新msgId,并存储原消息ID,该消息会存入到commitLog文件中,与原先的消息没有任何关联,该消息也会进入到ConsumerQueue队列中,将拥有一个全新的队列偏移量。重试消息详细流程在后面文档再分析。
当然,不管消费成功与否,都会将这次消费的消息从ProcessQueue中删除
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
// 将这次批量消费的消息都从msgTreeMap中删除
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
// 如果删除完后processQueue已经没消息了,表示消息都消费成功了,则消费进度更新到
// 队列中最大的偏移量+1处,如果map不为空,则表示还没消费完,不能将消费进度更新到
// 最大偏移量处,需设置在最小处,等最前面的消费成功消费后消费进度才能向后推移更新
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
并根据返回的offset更新本地消费进度,由后台定时任务定时同步到broker
public class RemoteBrokerOffsetStore implements OffsetStore {
/** * 保存一个消费组中每个消息队列的消费进度,会定时同步到broke中 */
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
/** * 更新本地消费进度,消费者在后台有一个定时任务,有定时将该消费进度同步到broker */
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
}
参考资料
《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》
还没有评论,来说两句吧...