RocketMq4.7源码解析之八(消息过滤)

落日映苍穹つ 2022-12-04 07:57 264阅读 0赞

消息过滤

RocketMQ 支持

  1. 表达式过滤
    TAG:消息定义标签,根据消息属性tag 进行匹配。
    SQL92:消息属性过滤上下文, 实现SQL 条件过滤表达式
  2. 类过滤
    消费者自定义消息过滤实现类并将其代码上传到FilterServer 上,消息消费者向FilterServer 拉取消息, FilterServer 将消息消费者的拉取命令转发到Broker ,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端,

TAG过滤

  • 消息发送者在消息发送时如果设置了消息的tags 属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列,消息消费队列会用8 个字节存储消息tag的hashcode ,之所以不直接存储tag 字符串,是因为将ConumeQueue 设计为定长结构,加快消息消费的加载性能。
  • 在Broker 端拉取消息时,遍历ConsumeQueue, 对比消息tag 的hashcode,如果匹配则返回,否则忽略该消息。
  • Consume 在收到消息后,同样需要先对消息进行过滤,比较消息tag 的值.

注册主题到rebalance
DefaultMQPushConsumerImpl
在这里插入图片描述
根据订阅消息构建消息拉取标记,设置subExpression 、classFilter 消息过滤信息。
DefaultMQPushConsumerImpl#pullMessage
在这里插入图片描述
broker端会根据解析标志
PullMessageProcessor#processRequest在这里插入图片描述
然后根据消息过滤机制是否为表达式进行相应的处理
在这里插入图片描述
紧接着构建消息过滤对象,随后查找消息
在这里插入图片描述
根据ConsumeQueue 条目进行消息过滤
DefaultMessageStore#getMessage
在这里插入图片描述
ExpressionMessageFilter

  1. public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
  2. //如果订阅消息为空,
  3. if (null == subscriptionData) {
  4. //表示不过滤
  5. return true;
  6. }
  7. //如果是类过滤模式
  8. if (subscriptionData.isClassFilterMode()) {
  9. return true;
  10. }
  11. // by tags code.
  12. //tag过滤模式处理
  13. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
  14. //如果tagsCode为空或tagsCode小于0 ,返回true
  15. //说明消息在发送时没有设置tag
  16. if (tagsCode == null) {
  17. return true;
  18. }
  19. //全匹配,直接true
  20. if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
  21. return true;
  22. }
  23. //订阅消息的TAG hashcodes 集合中包含消息的tagsCode,返回true
  24. return subscriptionData.getCodeSet().contains(tagsCode.intValue());
  25. } else {
  26. // no expression or no bloom
  27. if (consumerFilterData == null || consumerFilterData.getExpression() == null
  28. || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
  29. return true;
  30. }
  31. // message is before consumer
  32. if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
  33. log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
  34. return true;
  35. }
  36. byte[] filterBitMap = cqExtUnit.getFilterBitMap();
  37. BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
  38. if (filterBitMap == null || !this.bloomDataValid
  39. || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
  40. return true;
  41. }
  42. BitsArray bitsArray = null;
  43. try {
  44. bitsArray = BitsArray.create(filterBitMap);
  45. boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
  46. log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
  47. return ret;
  48. } catch (Throwable e) {
  49. log.error("bloom filter error, sub=" + subscriptionData
  50. + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
  51. }
  52. }
  53. return true;
  54. }

这里说明broker比较的是tagCode,所以在client还需要比较tag
在这里插入图片描述
如果消息根据ConsumeQueue 条目通过过滤,则需要从CommitLog 文件中加载整个消息体,然后根据属性进行过滤
在这里插入图片描述
ExpressionMessageFilter

  1. @Override
  2. public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
  3. //订阅信息为空,返回true
  4. if (subscriptionData == null) {
  5. return true;
  6. }
  7. //如果是类过滤模式,返回true
  8. if (subscriptionData.isClassFilterMode()) {
  9. return true;
  10. }
  11. //如果是TAG 模式,返回true
  12. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
  13. return true;
  14. }
  15. ConsumerFilterData realFilterData = this.consumerFilterData;
  16. Map<String, String> tempProperties = properties;
  17. // no expression
  18. if (realFilterData == null || realFilterData.getExpression() == null
  19. || realFilterData.getCompiledExpression() == null) {
  20. return true;
  21. }
  22. if (tempProperties == null && msgBuffer != null) {
  23. tempProperties = MessageDecoder.decodeProperties(msgBuffer);
  24. }
  25. Object ret = null;
  26. try {
  27. MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
  28. //该方法主要是为表达式模式SQL92 服务的,根据消息属性实现类似于数据库SQLwhere 条件过滤方式
  29. ret = realFilterData.getCompiledExpression().evaluate(context);
  30. } catch (Throwable e) {
  31. log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
  32. }
  33. log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
  34. if (ret == null || !(ret instanceof Boolean)) {
  35. return false;
  36. }
  37. return (Boolean) ret;
  38. }

消费者收到broker返回消息后
将消息字节数组解码成消息列表填充msgFoundList ,井对消息进行消息过滤( TAG )模式
DefaultMQPushConsumerImpl#pullMessage
在这里插入图片描述
比较TAG字符串是否包含,包含的话,就记录消息.
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,264人围观)

还没有评论,来说两句吧...

相关阅读