RocketMQ-Push模式下并发消费和顺序消费的区别
- 注册的消息监听不同
并发消费:consumer.registerMessageListener(new MessageListenerConcurrently() {}
顺序消费:consumer.registerMessageListener(new MessageListenerOrderly() {}
返回状态码不同
并发消费:
public enum ConsumeConcurrentlyStatus {
// 消费成功
CONSUME_SUCCESS,
// 消费失败,稍后再从Broker拉取消息重新消费(重新拉取的消息是由Broker复制原消息的新消息)
RECONSUME_LATER;
}
顺序消费:
public enum ConsumeOrderlyStatus {
// 消费成功
SUCCESS,
/**
* Rollback consumption(only for binlog consumption)
*/
@Deprecated
ROLLBACK,
/**
* Commit offset(only for binlog consumption)
*/
@Deprecated
COMMIT,
// 消费失败,挂起当前队列,挂起期间,当前消息重试消费,直到消息进入死信队列
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
消息重新消费的逻辑不同
并发消费(重新消费的消息由
Broker
复制原消息,并丢入重试队列):
消费者返回ConsumeConcurrentlyStatus.RECONSUME_LATER
时,Broker
会创建一条与原先消息属性相同的消息,并分配新的唯一的msgId
,另外存储原消息的msgId
,新消息会存入到commitLog
文件中,并进入重试队列,拥有一个全新的队列偏移量,延迟5s
后重新消费。如果消费者仍然返回RECONSUME_LATER
,那么会重复上面的操作,直到重新消费maxReconsumerTimes
次,当重新消费次数超过最大次数时,进入死信队列,消息消费成功。
顺序消费(重新消费不涉及Broker
):
消费者返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
时,当前队列会挂起(此消息后面的消息停止消费,直到此消息完成消息重新消费的整个过程),然后此消息会在消费者的线程池中重新消费,即不需要Broker
重新创建新的消息(不涉及重试队列),如果消息重新消费超过maxReconsumerTimes
最大次数时,进入死信队列。当消息放入死信队列时,Broker
服务器认为消息时消费成功的,继续消费该队列后续消息。顺序消费设置自动提交
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 省略...
}
}
- 涉及的主题不同
RocketMQ
有三种主题:NORMAL、RETRY、DLQ
并发消费:NORMAL、RETRY、DLQ
顺序消费:NORMAL、DLQ
- 顺序消费在拉取任务时需要在Broker服务器上锁定该消息队列
还没有评论,来说两句吧...