RocketMQ源码:consumer 提交消费偏移量

浅浅的花香味﹌ 2024-03-23 11:41 168阅读 0赞

1.什么是消费偏移量offset?

我们先看一幅图

format_png

消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。

consumequeue中一个消息的索引单元就是一个offset值。

在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。

2. 服务端管理offset

这里的服务端就是broker

  1. broker在初始化(initialize())时会通过消费者offset管理类ConsumerOffsetManager来加载配置文件中的offset值,然后设置到offsetTable属性中。

    public class ConsumerOffsetManager extends ConfigManager {

  1. //TODO: key = topic@group
  2. //TODO: value = Map[key = queueId, value = offset]
  3. private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
  4. new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
  5. //TOOD:...other
  6. }
  7. 复制代码

offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:

  1. {
  2. "offsetTable":{
  3. //topic@consumerGroupName
  4. "batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
  5. },
  6. //key是queueid,value是offset值,就是我们今天讨论的主角
  7. "ordered_topic@CG":{0:0,1:15,2:0,3:35
  8. },
  9. "qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
  10. },
  11. "hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
  12. },
  13. "qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
  14. },
  15. "qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
  16. }
  17. }
  18. }
  19. 复制代码
  1. 消费者消费后,会将offset发送到broker,这里会先写入到上面的消费者offset管理类ConsumerOffsetManageroffsetTable中,然后通过定时任务将其刷盘到文件中。属性中。然后通过3将数据持久化到文件中

稍后我们分析消费者时还会看到

  1. broker在初始化(initialize())时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManageroffsetTable属性中的值持久化到文件中。

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    1. @Override
    2. public void run() {
    3. try {
    4. BrokerController.this.consumerOffsetManager.persist();
    5. } catch (Throwable e) {
    6. log.error("schedule persist consumerOffset error.", e);
    7. }
    8. }

    }, 1000 10, 1000 5, TimeUnit.MILLISECONDS);
    复制代码

那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。

总的来说就是,消费者定时的将消费过的offset值上传到broker的内存offsetTable中,然后通过定时任务将其刷盘到文件中。

那么接下来就看看消费者是如何使用这个offset值的。

3.消费者使用offset

3.1 消费者初始化offset

它会启动一个消息拉取服务PullMessageService对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。

我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。

这里我们就只看和offset初始化相关的部分

RebalanceService#run()一步一步来到初始化offset的地方

  1. private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
  2. final boolean isOrder) {
  3. boolean changed = false;
  4. ...
  5. /**
  6. * 遍历本次负载分配到的队列集合,如果
  7. * processQueueTable中没有包含该消息队列,表明这是本次新增加的消
  8. * 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
  9. * 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
  10. * 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
  11. * CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、
  12. * CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
  13. * DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
  14. */
  15. List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  16. for (MessageQueue mq : mqSet) {
  17. if (!this.processQueueTable.containsKey(mq)) {
  18. /**
  19. * 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
  20. * 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
  21. * 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
  22. * 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
  23. */
  24. // 顺序消息
  25. if (isOrder && !this.lock(mq)) {
  26. log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
  27. continue;
  28. }
  29. this.removeDirtyOffset(mq);
  30. ProcessQueue pq = new ProcessQueue();
  31. // todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
  32. long nextOffset = this.computePullFromWhere(mq);
  33. if (nextOffset >= 0) {
  34. ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
  35. if (pre != null) {
  36. log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
  37. } else {
  38. log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
  39. PullRequest pullRequest = new PullRequest();
  40. pullRequest.setConsumerGroup(consumerGroup);
  41. pullRequest.setNextOffset(nextOffset);
  42. pullRequest.setMessageQueue(mq

发表评论

表情:
评论列表 (有 0 条评论,168人围观)

还没有评论,来说两句吧...

相关阅读