RocketMq4.7源码解析之八(消息过滤)
消息过滤
RocketMQ 支持
- 表达式过滤
TAG:消息定义标签,根据消息属性tag 进行匹配。
SQL92:消息属性过滤上下文, 实现SQL 条件过滤表达式 - 类过滤
消费者自定义消息过滤实现类并将其代码上传到FilterServer 上,消息消费者向FilterServer 拉取消息, FilterServer 将消息消费者的拉取命令转发到Broker ,然后对返回的消息执行消息过滤逻辑,最终将消息返回给消费端,
TAG过滤
- 消息发送者在消息发送时如果设置了消息的tags 属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列,消息消费队列会用8 个字节存储消息tag的hashcode ,之所以不直接存储tag 字符串,是因为将ConumeQueue 设计为定长结构,加快消息消费的加载性能。
- 在Broker 端拉取消息时,遍历ConsumeQueue, 对比消息tag 的hashcode,如果匹配则返回,否则忽略该消息。
- Consume 在收到消息后,同样需要先对消息进行过滤,比较消息tag 的值.
注册主题到rebalance
DefaultMQPushConsumerImpl
根据订阅消息构建消息拉取标记,设置subExpression 、classFilter 消息过滤信息。
DefaultMQPushConsumerImpl#pullMessage
broker端会根据解析标志
PullMessageProcessor#processRequest
然后根据消息过滤机制是否为表达式进行相应的处理
紧接着构建消息过滤对象,随后查找消息
根据ConsumeQueue 条目进行消息过滤
DefaultMessageStore#getMessage
ExpressionMessageFilter
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
//如果订阅消息为空,
if (null == subscriptionData) {
//表示不过滤
return true;
}
//如果是类过滤模式
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
//tag过滤模式处理
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
//如果tagsCode为空或tagsCode小于0 ,返回true
//说明消息在发送时没有设置tag
if (tagsCode == null) {
return true;
}
//全匹配,直接true
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
//订阅消息的TAG hashcodes 集合中包含消息的tagsCode,返回true
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
// no expression or no bloom
if (consumerFilterData == null || consumerFilterData.getExpression() == null
|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
return true;
}
// message is before consumer
if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
return true;
}
byte[] filterBitMap = cqExtUnit.getFilterBitMap();
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (filterBitMap == null || !this.bloomDataValid
|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
return true;
}
BitsArray bitsArray = null;
try {
bitsArray = BitsArray.create(filterBitMap);
boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
return ret;
} catch (Throwable e) {
log.error("bloom filter error, sub=" + subscriptionData
+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
}
}
return true;
}
这里说明broker比较的是tagCode,所以在client还需要比较tag
如果消息根据ConsumeQueue 条目通过过滤,则需要从CommitLog 文件中加载整个消息体,然后根据属性进行过滤
ExpressionMessageFilter
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
//订阅信息为空,返回true
if (subscriptionData == null) {
return true;
}
//如果是类过滤模式,返回true
if (subscriptionData.isClassFilterMode()) {
return true;
}
//如果是TAG 模式,返回true
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}
ConsumerFilterData realFilterData = this.consumerFilterData;
Map<String, String> tempProperties = properties;
// no expression
if (realFilterData == null || realFilterData.getExpression() == null
|| realFilterData.getCompiledExpression() == null) {
return true;
}
if (tempProperties == null && msgBuffer != null) {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
//该方法主要是为表达式模式SQL92 服务的,根据消息属性实现类似于数据库SQLwhere 条件过滤方式
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
消费者收到broker返回消息后
将消息字节数组解码成消息列表填充msgFoundList ,井对消息进行消息过滤( TAG )模式
DefaultMQPushConsumerImpl#pullMessage
比较TAG字符串是否包含,包含的话,就记录消息.
还没有评论,来说两句吧...