activemq 消息选择器Selector

拼搏现实的明天。 2022-06-13 03:16 284阅读 0赞

一、序言

  1. 消息大多数情况都是发送到broker 的,在知道Destination 的情况下,都可以消费,因此有些情况下需要我们将消息分组、隔离,或则指定A消息,只能有A消费者消费等等情况,这里做个大概的介绍和实例。

二、实例场景

  1. 我们通过有时候我们需要一个queue/topic 通道,然后发送消息,但是我们要对不同消费者接受的消息进行限制,或者说过滤,就可以使用这种情况。
  2. 我这里建立一个queue,分别发送 300A,B消息,分别有消费者A,B接收

三、代码实例

  1. 基本连接代码这里就就不贴了,建议前的
  2. 发送者代码:

Java代码 收藏代码

  1. Destination send_destination = session.createQueue(“order_queue”);
  2. MessageProducer producer = session.createProducer(send_destination);
  3. for(int i =0;i<300;i++){
  4. // 创建一个文本消息
  5. TextMessage message = session.createTextMessage(“A-张三-“+i);
  6. // 这里我们分别设置对应的消息信息,当成是一组消息
  7. message.setStringProperty(“JMSXGroupID”,”A”);
  8. producer.send(message);
  9. TextMessage message1 = session.createTextMessage(“B-李四-“+i);
  10. message1.setStringProperty(“JMSXGroupID”,”B”);
  11. producer.send(message1);
  12. }

消费者代码:

Java代码 收藏代码

  1. Destination destination = session.createQueue(“order_queue”);
  2. // 创建消费者
  3. MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’A’”);
  4. consumer.setMessageListener(new MessageListener() {
  5. @Override
  6. public void onMessage(Message message) {
  7. TextMessage textMessage = (TextMessage) message;
  8. try {
  9. System.out.println(“A:”+textMessage.getText());
  10. } catch (JMSException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. });

    消费者B

Java代码 收藏代码

  1. // 指定接收消息的地方
  2. Destination destination = session.createQueue(“order_queue”);
  3. // 创建消费者
  4. MessageConsumer consumer = session.createConsumer(destination,”JMSXGroupID=’B’”);
  5. consumer.setMessageListener(new MessageListener() {
  6. @Override
  7. public void onMessage(Message message) {
  8. TextMessage textMessage = (TextMessage) message;
  9. try {
  10. System.out.println(“B:”+textMessage.getText());
  11. } catch (JMSException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. });
  1. 然后开启A,B消费者监听,启动发送者,那么就能看到消息分别消费了
  2. 同时Selector 支持一些表达式的过滤,比如可以写成:JMSXGroupID = 'A' or JMSXGroupID = 'B'

更多可参考:http://activemq.apache.org/features.html

四、其他情况分析

  1. 1.如果A挂掉,那么B收消息会收到影响,因为在同一个队列,A 积压的消息量越多,那么B收到最后消息的时间越长。道理很简单,A挂了,B前面把大部分消息收到了,后面只有少量B的消息以及积压的A消息,分配几率一定的情况下,B发送的时间就拖久了。
  2. 2.如果消费者不指定"JMSXGroupID='A'" 这个,那么会默认收取未收到的所有消息,他会默认当成一个分组了。
  3. 3.这种模式和应答模式结合的时候,有朋友[测试][Link 1]会卡主,但是我一直没重现~。~ 希望多测测

五、小结

  1. 1.AMQ 提供的这个东西还是有一定用处,虽然可以减少了我们的通道的数量,同样的会照成通道压力过大,小范围的消息是可以的。
  2. 2.有问题请指出,有其他的场景欢迎分享,或者需要满足其他场景的可以一起讨论。

发表评论

表情:
评论列表 (有 0 条评论,284人围观)

还没有评论,来说两句吧...

相关阅读