RocketMQ消费者核心配置和核心知识讲解
1. RocketMQ消费者核心配置讲解
consumeFromWhere配置
- CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还存在broker的),全部消费一遍,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_LAST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半个小时以前,后续再启动着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
allocateMessageQueueStrategy配置(不用单独配置)
负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配- offsetStore配置:消息消费进度存储器offsetStore有两个配置:(不用单独配置)
LocalFileOffsetStore和RemoteBrokerOffsetStor
广播模式默认使用LocalFileOffsetStore
集群式默认使用RemoteBrokerOffsetStore - consumeThreadMin:最小消费线程池数量
- consumeThreadMax:最大消费线程池数量
- pullBatchSize:消费者去broker拉取消息时,一次拉取多少条。可选配置
- consumeMessageBatchMaxSize:单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
- messageModel:消费者消费模式
CLUSTERING——默认是集群模式CLUSTERING
BROADCASTING——广播模式
2. 集群和广播模式下RocketMQ消费端处理
- Topic下队列的奇偶数会影响Consumer个数里面的消费数量
- 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均。
- 如果Consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
- 集群模式(默认):
Consumer实例平均分摊消息生产者发送的消息 - 广播模式:
广播模式下消费消息:投递到broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用 怎么切换:通过consumer.setMessageModel
consumer.setMessageModel(MessageModel.BROADCASTING); # 广播模式
consumer.setMessageModel(MessageModel.CLUSTERING); # 集群模式
3. RocketMQ里面的标签Tag实战和消息过滤原理
- 一个Message只有一个Topic,tag是二级分类
比如商品订单里有数码类订单,食品类订单等,订单的topic都是一样的topic,订单类型可以使用topic来辨别 - 过滤分为Broker端和Consumer端过滤
Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
Consumer端过滤,完全可以根据业务需要进行筛选,但是增加了很多无用的消息传输 - 一般是监听*, 或者指定tag,||运算,SQL92,FilterServer等
- * 代表监听所有
consumer.subscribe(JmsConfig.TOPIC, “*”); - tag性能高,逻辑简单, 只消费tag名abc的消息
consumer.subscribe(JmsConfig.TOPIC, “abc”); - ||运算, 消费tag为abc或者def的消息
consumer.subscribe(JmsConfig.TOPIC, “abc || def”); - SQL92性能差点,支持复杂逻辑运算(只支持PushConsumer中使用)
MessageSelector.bySql
语法:>,<,=,IS NULL, AND,OR,NOT等, sql语法中where后续的语法大部分都可以使用
- * 代表监听所有
- 注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失
订阅关系一致:订阅关系有topic和tag组成,同一个group name,订阅的topic和tag必须是一样的 - 在Broker端进行MessageTag过滤,遍历message queue存储的message tag和订阅传递的tag的hashcode不一样则跳过,符合的则传输给Consumer,在consumer queue存储的是对应的hashcode,对比也是通过hashcode对比;Consumer收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode
broker过滤tag的优点:- consumer queue存储使用hashcode定长,节约空间
- 过滤中不访问commit log,可以搞笑过滤
- 如果存在hash冲突,Consumer端可以进行再次确认
- 如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列。
备注:使用sql过滤时需要配置enablePropertyFilter
错误: The broker does not support consumer to filter message by SQL92
解决办法:broker.conf里面配置如下
enablePropertyFilter=true
修改之后需要重启broker
master节点配置:vim conf/2m-2s-async/broker-a.properties
slave节点配置:vim conf/2m-2s-async/broker-a-s.properties
4. PushConsumer和PullConsumer消费模式分析
PushConsumer和PullConsume优缺点分析:
- Push:
实时性高;但增加服务端负载,消费端能力不同,如果push推送过快,消费端会出现很多问题 - Pull:
消费者从Server端拉取消息,主动权在消费端,可控性好;但间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理 - 长轮询:
Client请求Server端也就是Broker的时候,Broker会保持当前连接一段时间默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer,没消息的话超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息也不会主动推送Consumer,缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控否则会有一堆连接
- Push:
PushConsumer本质是长轮询
- 系统收到消息后自动处理offset,如果有新的Consumer加入会自动做负载均衡
- 在broker端可以通过longPollingEnable=true来开启长轮询
- 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
- 服务端代码:broker.longpolling
- 虽然是Push,但是代码里面大量使用了pull,是因为使用长轮询方式达到push效果,既有pull的特性,又有push的实时性
- 优雅关闭:主要是释放资源和保持Offset,调用Shutdown即可,参考@PostConstruct、@PreDestroy
package com.springboot.rocketmq.producer;
import com.alibaba.fastjson.JSON;
import com.springboot.rocketmq.content.UserContent;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;import javax.annotation.PostConstruct;
/* @Author 18011618 @Date 10:31 2018/7/18 @Function 模拟用户消息发送 */
@Component
public class UserProducer {/** * 生产者的组名 */
@Value("${suning.rocketmq.producerGroup}")
private String producerGroup;
/** * NameServer 地址 */
@Value("${suning.rocketmq.namesrvaddr}")
private String namesrvAddr;
@PostConstruct
public void produder() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
for (int i = 0; i < 100; i++) {
UserContent userContent = new UserContent(String.valueOf(i),"abc"+i);
String jsonstr = JSON.toJSONString(userContent);
System.out.println("发送消息:"+jsonstr);
Message message = new Message("user-topic", "user-tag", jsonstr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.err.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
PullConsumer需要自己维护Offset(参考官方例子)
- 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
- 获取MessageQueue遍历
- 客户维护Offset,需要用户本地存储Offset,存储内存、磁盘、数据库等
- 处理不同状态的消息FOUND、NO_NEW_MSG̵OFFSET_ILLRGL̵
NO_MATCHED_MSG、4种状态 - 灵活性高可控性强、但是编码复杂度会高
- 优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候
还没有评论,来说两句吧...