ActiveMQ之消息选择器(Message Selectors)
JMS Selectors用在获取消息的时候,可以基于消息属性和Xpath语法对消息进行过滤。JMS Selectors由SQL92语义定义。以下是个Selectors的例子:
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
1:JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等
2:需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值
3:表达式中的属性不会自动进行类型转换,例如:
myMessage.setStringProperty("NumberOfOrders", "2");
那么此时“NumberOfOrders > 1” 求值结果会是false
4:Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,
例如:
设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:
(1)producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。
(2)如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。
下面我们使用JMS Selectors来实现消息分组功能
消息生产者
MessageProducer producer = session.createProducer(destination);
for (int i=0;i<10;i++) {TextMessage msg = session.createTextMessage("hello " + i);
if (i % 2 == 0)
msg.setStringProperty("JMSXGroupID","groupA");
else
msg.setStringProperty("JMSXGroupID","groupB");
producer.send(msg);
}
消费者1
MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’groupA’”);
consumer.setMessageListener(msg -> {try {
System.out.println("Received msg:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
消费者2
MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’groupB’”);
consumer.setMessageListener(msg -> {try {
System.out.println("Received msg:" + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
消费者1控制台输出:
Received msg:hello 0
Received msg:hello 2
Received msg:hello 4
Received msg:hello 6
Received msg:hello 8
消费者2控制台输出:
Received msg:hello 1
Received msg:hello 3
Received msg:hello 5
Received msg:hello 7
Received msg:hello 9
参考:《ActiveMQ in Action》、ActiveMQ(23):Consumer高级特性
还没有评论,来说两句吧...