ActiveMQ之消息选择器(Message Selectors)

绝地灬酷狼 2022-02-22 16:50 430阅读 0赞

JMS Selectors用在获取消息的时候,可以基于消息属性和Xpath语法对消息进行过滤。JMS Selectors由SQL92语义定义。以下是个Selectors的例子:

  1. consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");

1:JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等

2:需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值

3:表达式中的属性不会自动进行类型转换,例如:

  1. myMessage.setStringProperty("NumberOfOrders", "2");

那么此时“NumberOfOrders > 1” 求值结果会是false

4:Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,

例如:

设想有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:

(1)producer必须知道当前正在运行的consumers,也就是说producer和consumer被耦合到一起。

(2)如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

下面我们使用JMS Selectors来实现消息分组功能

  • 消息生产者

    MessageProducer producer = session.createProducer(destination);
    for (int i=0;i<10;i++) {

    1. TextMessage msg = session.createTextMessage("hello " + i);
    2. if (i % 2 == 0)
    3. msg.setStringProperty("JMSXGroupID","groupA");
    4. else
    5. msg.setStringProperty("JMSXGroupID","groupB");
    6. producer.send(msg);

    }

  • 消费者1

    MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’groupA’”);
    consumer.setMessageListener(msg -> {

    1. try {
    2. System.out.println("Received msg:" + ((TextMessage)msg).getText());
    3. } catch (JMSException e) {
    4. e.printStackTrace();
    5. }

    });

  • 消费者2

    MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’groupB’”);
    consumer.setMessageListener(msg -> {

    1. try {
    2. System.out.println("Received msg:" + ((TextMessage)msg).getText());
    3. } catch (JMSException e) {
    4. e.printStackTrace();
    5. }

    });

消费者1控制台输出:

  1. Received msg:hello 0
  2. Received msg:hello 2
  3. Received msg:hello 4
  4. Received msg:hello 6
  5. Received msg:hello 8

消费者2控制台输出:

  1. Received msg:hello 1
  2. Received msg:hello 3
  3. Received msg:hello 5
  4. Received msg:hello 7
  5. Received msg:hello 9

参考:《ActiveMQ in Action》、ActiveMQ(23):Consumer高级特性

发表评论

表情:
评论列表 (有 0 条评论,430人围观)

还没有评论,来说两句吧...

相关阅读