RocketMQ源码解析十一(Consumer上报消费进度流程(集群模式))

怼烎@ 2022-09-12 07:47 65阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

Consumer

在消费者启动过程中,会启动MQClientInstance,而MQClientInstance中会启动多个定时任务,其中就包括定时上报消费进度:

  1. private void startScheduledTask() {
  2. // 省略其它定时任务。。。
  3. // 定时持久化消费进度,默认5s
  4. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. MQClientInstance.this.persistAllConsumerOffset();
  9. } catch (Exception e) {
  10. log.error("ScheduledTask persistAllConsumerOffset exception", e);
  11. }
  12. }
  13. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  14. // 省略其它定时任务。。。
  15. }
  16. /** * 持久化全部消费进度 */
  17. private void persistAllConsumerOffset() {
  18. // 获取该JVM上全部消费者实例
  19. Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  20. while (it.hasNext()) {
  21. Entry<String, MQConsumerInner> entry = it.next();
  22. MQConsumerInner impl = entry.getValue();
  23. // 调用各个消费者的持久化接口
  24. impl.persistConsumerOffset();
  25. }
  26. }

调用各个消费者的上报接口
DefaultMQPushConsumerImpl

  1. /** * 持久化当前消费者消费进度 */
  2. @Override
  3. public void persistConsumerOffset() {
  4. try {
  5. this.makeSureStateOK();
  6. Set<MessageQueue> mqs = new HashSet<MessageQueue>();
  7. // 获取该消费者分配的消息队列
  8. Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
  9. mqs.addAll(allocateMq);
  10. // 使用消费进度组件持久化全部队列消费进度
  11. this.offsetStore.persistAll(mqs);
  12. } catch (Exception e) {
  13. log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
  14. }
  15. }

使用消费进度组件上报该消费者的所有消费队列消费进度

  1. /** * 持久化消费进度 */
  2. @Override
  3. public void persistAll(Set<MessageQueue> mqs) {
  4. if (null == mqs || mqs.isEmpty())
  5. return;
  6. final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
  7. // 遍历每个队列当前的消费进度
  8. for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
  9. MessageQueue mq = entry.getKey();
  10. // 消费进度
  11. AtomicLong offset = entry.getValue();
  12. if (offset != null) {
  13. if (mqs.contains(mq)) {
  14. try {
  15. // 更新消费进度到broker中
  16. this.updateConsumeOffsetToBroker(mq, offset.get());
  17. log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
  18. this.groupName,
  19. this.mQClientFactory.getClientId(),
  20. mq,
  21. offset.get());
  22. } catch (Exception e) {
  23. log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
  24. }
  25. } else {
  26. unusedMQ.add(mq);
  27. }
  28. }
  29. }
  30. if (!unusedMQ.isEmpty()) {
  31. for (MessageQueue mq : unusedMQ) {
  32. this.offsetTable.remove(mq);
  33. log.info("remove unused mq, {}, {}", mq, this.groupName);
  34. }
  35. }
  36. }
  37. /** * 以单向方式更新消费进度到broker */
  38. private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
  39. MQBrokerException, InterruptedException, MQClientException {
  40. updateConsumeOffsetToBroker(mq, offset, true);
  41. }
  42. /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */
  43. @Override
  44. public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
  45. MQBrokerException, InterruptedException, MQClientException {
  46. // 获得一个broker地址,正常情况下都是master broker
  47. FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
  48. if (null == findBrokerResult) {
  49. this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
  50. findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
  51. }
  52. if (findBrokerResult != null) {
  53. // 构建更新队列消费进度得请求数据
  54. UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
  55. requestHeader.setTopic(mq.getTopic());
  56. requestHeader.setConsumerGroup(this.groupName);
  57. requestHeader.setQueueId(mq.getQueueId());
  58. requestHeader.setCommitOffset(offset);
  59. if (isOneway) {
  60. // 单向发送更新进度请求
  61. this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
  62. findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
  63. } else {
  64. this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
  65. findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
  66. }
  67. } else {
  68. throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
  69. }
  70. }

以单向的方式发送请求,请求码为UPDATE_CONSUMER_OFFSET

  1. public void updateConsumerOffsetOneway(
  2. final String addr,
  3. final UpdateConsumerOffsetRequestHeader requestHeader,
  4. final long timeoutMillis
  5. ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
  6. InterruptedException {
  7. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
  8. this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
  9. }

Broker

broker处理更新消费进度的处理器是ConsumerManageProcessor

  1. @Override
  2. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  3. throws RemotingCommandException {
  4. switch (request.getCode()) {
  5. case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
  6. return this.getConsumerListByGroup(ctx, request);
  7. // 处理consumer发过来的更新消费进度的请求
  8. case RequestCode.UPDATE_CONSUMER_OFFSET:
  9. return this.updateConsumerOffset(ctx, request);
  10. case RequestCode.QUERY_CONSUMER_OFFSET:
  11. return this.queryConsumerOffset(ctx, request);
  12. default:
  13. break;
  14. }
  15. return null;
  16. }
  17. /** * 更新消息队列消费进度 */
  18. private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
  19. throws RemotingCommandException {
  20. final RemotingCommand response =
  21. RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
  22. final UpdateConsumerOffsetRequestHeader requestHeader =
  23. (UpdateConsumerOffsetRequestHeader) request
  24. .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
  25. // 向消费进度管理组件提交消费进度
  26. this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
  27. requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
  28. response.setCode(ResponseCode.SUCCESS);
  29. response.setRemark(null);
  30. return response;
  31. }

向消费进度管理组件提交消费进度,消费进度统一由消费进度管理器管理

  1. /** * 用来管理消费者的消费进度 */
  2. public class ConsumerOffsetManager extends ConfigManager {
  3. /** * 消费进度表 */
  4. private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
  5. new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
  6. /** * 上报消费进度 * @param clientHost 消费端地址 * @param group 消费组 * @param topic 主题 * @param queueId 队列id * @param offset 要更新的进度 */
  7. public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
  8. final long offset) {
  9. // topic@group
  10. String key = topic + TOPIC_GROUP_SEPARATOR + group;
  11. this.commitOffset(clientHost, key, queueId, offset);
  12. }
  13. private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
  14. ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
  15. // 第一次上报消费进度
  16. if (null == map) {
  17. map = new ConcurrentHashMap<Integer, Long>(32);
  18. map.put(queueId, offset);
  19. this.offsetTable.put(key, map);
  20. } else {
  21. // 更新队列的消费进度
  22. Long storeOffset = map.put(queueId, offset);
  23. if (storeOffset != null && offset < storeOffset) {
  24. log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
  25. }
  26. }
  27. }
  28. }

这部分逻辑还是很清晰的。
在这里插入图片描述

参考资料

《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,65人围观)

还没有评论,来说两句吧...

相关阅读