ActiveMQ——2.消息的接收方式以及选择器

刺骨的言语ヽ痛彻心扉 2021-09-26 09:22 471阅读 0赞

消息的接收方式

概述

消息有两种接收方式:同步接收和异步接收。

同步接收:主线程阻塞式等待下一个消息的到来,可以设置timeout,超时则返回null。

异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。

同步接收又称为阻塞式接收;异步接收又称为事件驱动的接收。

API

同步接收, 是在获取MessageConsumer实例之后,调用以下的API:

  • receive():Message
    获取下一个消息。这个调用将导致无限期的阻塞,直到有新的消息产生。
  • receive(long timeout):Message
    获取下一个消息。这个调用可能导致一段时间的阻塞,直到超时或者有新的消息产生。超时则返回null。
  • receiveNoWait():Message
    获取下一个消息。这个调用不会导致阻塞,如果没有下一个消息,直接返回null。

异步接收,是在获取MessageConsumer实例之后,调用下面的API:

  • setMessageListener(MessageListener):void
    设置消息监听器

MessageListener是一个接口,只定义了一个方法:

  • onMessage(Message message):void
    这是一个回调方法,当有新的消息产生,这个方法会被自动调用。

所以,为实现异步接收,只需要对MessageListener进行实现,然后设置为consumer实例的messageListener。

核心代码

同步接收

  1. long timeout = 10 * 1000;
  2. for (Message message = consumer.receive(timeout); message != null; message = consumer
  3. .receive(timeout)) {
  4. String text = ((TextMessage) message).getText();
  5. System.out.println(String.format("receive a message:%s", text));
  6. }

异步接收

  1. consumer.setMessageListener(new MessageListener() {
  2. public void onMessage(Message message) {
  3. try {
  4. String text = ((TextMessage) message).getText();
  5. System.out.println(text);
  6. } catch (JMSException e) {
  7. e.printStackTrace();
  8. }
  9. }
  10. });

消息选择器

概述

消息选择器使用类似于SQL语法,为Consumer指定基于Message属性的筛选条件。发送的时候,给消息添加一些属性;在接收的时候,根据属性进行过滤。

API

javax.jms.Message提供了一系列的方法,用于设置属性:

  • setIntProperty(String name, int value):void
    设置int类型的属性
  • setStringProperty(String name, String value):void
    设置字符串类型的属性
  • setObjectProperty(String name, Object value):void
    设置对象类型的属性

除此之外,还有设置boolean、byte、short、long、float、double类型的API,模式相同。

javax.jms.Session则提供了在创建MessageConsumer时指定选择器的方法:

  • createConsumer(Destination destination, String messageSelector):MessageConsumer
    为指定的destination创建一个consumer。只有当消息的属性满足选择器的表达式时,才会被接收;选择器为null或者空字符串时,表示没有选择器。

在发送的时候为消息指定属性,在接收的时候为Consumer指定选择器。

选择器语法

选择器是字符串类型,官方文档说,选择器的语法类似于SQL的语法。以下仅对int、String类型举例:

  1. // 假如有int类型的属性order,而我们接收order>10的消息
  2. String messageSelector = "order > 10";
  3. // 假如有String类型的属性kind,而我们接收kind="06"的消息
  4. String messageSelector = "kind='06'";

核心代码

  1. final int NUM = 3;
  2. List<Message> result = new ArrayList<Message>(NUM);
  3. try {
  4. for (int i = 0; i < NUM; i++) {
  5. TextMessage message = session.createTextMessage();
  6. message.setText(String.format("This is the %dth message.",
  7. i + 1));
  8. message.setIntProperty("order", i + 1);
  9. result.add(message);
  10. }
  11. } catch (JMSException e) {
  12. e.printStackTrace();
  13. return null;
  14. }
  15. return result;

接收消息

  1. String messageSelector = "order > 1";
  2. consumer = session.createConsumer(destination, messageSelector);

发表评论

表情:
评论列表 (有 0 条评论,471人围观)

还没有评论,来说两句吧...

相关阅读