RocketMQ源码:consumer 提交消费偏移量
1.什么是消费偏移量offset?
我们先看一幅图
消费偏移量offset
就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心
(当然,极端情况下还是可能会导致重复消费)。
consumequeue
中一个消息的索引单元就是一个offset
值。
在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。
2. 服务端管理offset
这里的服务端就是broker
broker在初始化(
initialize()
)时会通过消费者offset管理类ConsumerOffsetManager
来加载配置文件中的offset值,然后设置到offsetTable
属性中。public class ConsumerOffsetManager extends ConfigManager {
//TODO: key = topic@group
//TODO: value = Map[key = queueId, value = offset]
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
//TOOD:...other
}
复制代码
offset文件默认路径:
$user.home/store/config/consumerOffset.json
文件内容例子:
{
"offsetTable":{
//topic@consumerGroupName
"batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
},
//key是queueid,value是offset值,就是我们今天讨论的主角
"ordered_topic@CG":{0:0,1:15,2:0,3:35
},
"qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
},
"hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
},
"qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
},
"qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
}
}
}
复制代码
- 消费者消费后,会将offset发送到broker,这里会先写入到上面的消费者offset管理类
ConsumerOffsetManager
的offsetTable
中,然后通过定时任务将其刷盘到文件
中。属性中。然后通过3将数据持久化到文件中
稍后我们分析消费者时还会看到
broker在初始化(
initialize()
)时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManager
的offsetTable
属性中的值持久化到文件中。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 10, 1000 5, TimeUnit.MILLISECONDS);
复制代码
那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset
来进行消息消费的。
总的来说就是,消费者定时的将消费过的
offset
值上传到broker的内存offsetTable
中,然后通过定时任务将其刷盘到文件中。
那么接下来就看看消费者是如何使用这个offset值的。
3.消费者使用offset
3.1 消费者初始化offset
它会启动一个消息拉取服务PullMessageService
对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService
对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。
我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。
这里我们就只看和offset初始化相关的部分
RebalanceService#run()
一步一步来到初始化offset的地方
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
...
/**
* 遍历本次负载分配到的队列集合,如果
* processQueueTable中没有包含该消息队列,表明这是本次新增加的消
* 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
* 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
* 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
* CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、
* CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
* DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
*/
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
/**
* 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
* 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
* 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
* 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
*/
// 顺序消息
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq
还没有评论,来说两句吧...