RocketMQ消费者核心配置和核心知识讲解

雨点打透心脏的1/2处 2023-07-05 11:40 67阅读 0赞

1. RocketMQ消费者核心配置讲解

  • consumeFromWhere配置

    1. CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还存在broker的),全部消费一遍,后续再启动接着上次消费的进度开始消费
    2. CONSUME_FROM_LAST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
    3. CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半个小时以前,后续再启动着上次消费的进度开始消费

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

  • allocateMessageQueueStrategy配置(不用单独配置)
    负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配

  • offsetStore配置:消息消费进度存储器offsetStore有两个配置:(不用单独配置)
    LocalFileOffsetStore和RemoteBrokerOffsetStor
    广播模式默认使用LocalFileOffsetStore
    集群式默认使用RemoteBrokerOffsetStore
  • consumeThreadMin:最小消费线程池数量
  • consumeThreadMax:最大消费线程池数量
  • pullBatchSize:消费者去broker拉取消息时,一次拉取多少条。可选配置
  • consumeMessageBatchMaxSize:单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
  • messageModel:消费者消费模式
    CLUSTERING——默认是集群模式CLUSTERING
    BROADCASTING——广播模式

2. 集群和广播模式下RocketMQ消费端处理

  • Topic下队列的奇偶数会影响Consumer个数里面的消费数量
    1. 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均。
    2. 如果Consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
  • 集群模式(默认):
    Consumer实例平均分摊消息生产者发送的消息
  • 广播模式:
    广播模式下消费消息:投递到broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
  • 怎么切换:通过consumer.setMessageModel

    consumer.setMessageModel(MessageModel.BROADCASTING); # 广播模式
    consumer.setMessageModel(MessageModel.CLUSTERING); # 集群模式

3. RocketMQ里面的标签Tag实战和消息过滤原理

  • 一个Message只有一个Topic,tag是二级分类
    比如商品订单里有数码类订单,食品类订单等,订单的topic都是一样的topic,订单类型可以使用topic来辨别
  • 过滤分为Broker端和Consumer端过滤
    Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
    Consumer端过滤,完全可以根据业务需要进行筛选,但是增加了很多无用的消息传输
  • 一般是监听*, 或者指定tag,||运算,SQL92,FilterServer等
    1. * 代表监听所有
      consumer.subscribe(JmsConfig.TOPIC, “*”);
    2. tag性能高,逻辑简单, 只消费tag名abc的消息
      consumer.subscribe(JmsConfig.TOPIC, “abc”);
    3. ||运算, 消费tag为abc或者def的消息
      consumer.subscribe(JmsConfig.TOPIC, “abc || def”);
    4. SQL92性能差点,支持复杂逻辑运算(只支持PushConsumer中使用)
      MessageSelector.bySql
      语法:>,<,=,IS NULL, AND,OR,NOT等, sql语法中where后续的语法大部分都可以使用
      在这里插入图片描述
      在这里插入图片描述
  • 注意:消费者订阅关系要一致,不然会消费混乱,甚至消息丢失
    订阅关系一致:订阅关系有topic和tag组成,同一个group name,订阅的topic和tag必须是一样的
  • 在Broker端进行MessageTag过滤,遍历message queue存储的message tag和订阅传递的tag的hashcode不一样则跳过,符合的则传输给Consumer,在consumer queue存储的是对应的hashcode,对比也是通过hashcode对比;Consumer收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode
    broker过滤tag的优点:
    1. consumer queue存储使用hashcode定长,节约空间
    2. 过滤中不访问commit log,可以搞笑过滤
    3. 如果存在hash冲突,Consumer端可以进行再次确认
  • 如果想使用多个Tag,可以使用sql表达式,但是不建议,单一职责,多个队列。

备注:使用sql过滤时需要配置enablePropertyFilter

  1. 错误: The broker does not support consumer to filter message by SQL92
  2. 解决办法:broker.conf里面配置如下
  3. enablePropertyFilter=true
  4. 修改之后需要重启broker
  5. master节点配置:vim conf/2m-2s-async/broker-a.properties
  6. slave节点配置:vim conf/2m-2s-async/broker-a-s.properties

4. PushConsumer和PullConsumer消费模式分析

  • PushConsumer和PullConsume优缺点分析:

    1. Push:
      实时性高;但增加服务端负载,消费端能力不同,如果push推送过快,消费端会出现很多问题
    2. Pull:
      消费者从Server端拉取消息,主动权在消费端,可控性好;但间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理
    3. 长轮询:
      Client请求Server端也就是Broker的时候,Broker会保持当前连接一段时间默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer,没消息的话超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息也不会主动推送Consumer,缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控否则会有一堆连接
  • PushConsumer本质是长轮询

    1. 系统收到消息后自动处理offset,如果有新的Consumer加入会自动做负载均衡
    2. 在broker端可以通过longPollingEnable=true来开启长轮询
    3. 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
    4. 服务端代码:broker.longpolling
    5. 虽然是Push,但是代码里面大量使用了pull,是因为使用长轮询方式达到push效果,既有pull的特性,又有push的实时性
    6. 优雅关闭:主要是释放资源和保持Offset,调用Shutdown即可,参考@PostConstruct、@PreDestroy

    package com.springboot.rocketmq.producer;

    import com.alibaba.fastjson.JSON;
    import com.springboot.rocketmq.content.UserContent;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;

    import javax.annotation.PostConstruct;

    /* @Author 18011618 @Date 10:31 2018/7/18 @Function 模拟用户消息发送 */
    @Component
    public class UserProducer {

    1. /** * 生产者的组名 */
    2. @Value("${suning.rocketmq.producerGroup}")
    3. private String producerGroup;
    4. /** * NameServer 地址 */
    5. @Value("${suning.rocketmq.namesrvaddr}")
    6. private String namesrvAddr;
    7. @PostConstruct
    8. public void produder() {
    9. DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
    10. producer.setNamesrvAddr(namesrvAddr);
    11. try {
    12. producer.start();
    13. for (int i = 0; i < 100; i++) {
    14. UserContent userContent = new UserContent(String.valueOf(i),"abc"+i);
    15. String jsonstr = JSON.toJSONString(userContent);
    16. System.out.println("发送消息:"+jsonstr);
    17. Message message = new Message("user-topic", "user-tag", jsonstr.getBytes(RemotingHelper.DEFAULT_CHARSET));
    18. SendResult result = producer.send(message);
    19. System.err.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
    20. }
    21. } catch (Exception e) {
    22. e.printStackTrace();
    23. } finally {
    24. producer.shutdown();
    25. }
    26. }

    }

  • PullConsumer需要自己维护Offset(参考官方例子)

    1. 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
    2. 获取MessageQueue遍历
    3. 客户维护Offset,需要用户本地存储Offset,存储内存、磁盘、数据库等
    4. 处理不同状态的消息FOUND、NO_NEW_MSG̵OFFSET_ILLRGL̵
      NO_MATCHED_MSG、4种状态
    5. 灵活性高可控性强、但是编码复杂度会高
    6. 优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候

发表评论

表情:
评论列表 (有 0 条评论,67人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ核心机制

    RocketMQ基于“顺序写”“随机读”的原则来设计,利用“零拷贝”技术(nio) 高可用机制 RocketMQ分布式集群是通过Master和Slave的配合达到高可用