ActiveMQ——2.消息的接收方式以及选择器
消息的接收方式
概述
消息有两种接收方式:同步接收和异步接收。
同步接收:主线程阻塞式等待下一个消息的到来,可以设置timeout,超时则返回null。
异步接收:主线程设置MessageListener,然后继续做自己的事,子线程负责监听。
同步接收又称为阻塞式接收;异步接收又称为事件驱动的接收。
API
同步接收, 是在获取MessageConsumer实例之后,调用以下的API:
- receive():Message
获取下一个消息。这个调用将导致无限期的阻塞,直到有新的消息产生。 - receive(long timeout):Message
获取下一个消息。这个调用可能导致一段时间的阻塞,直到超时或者有新的消息产生。超时则返回null。 - receiveNoWait():Message
获取下一个消息。这个调用不会导致阻塞,如果没有下一个消息,直接返回null。
异步接收,是在获取MessageConsumer实例之后,调用下面的API:
- setMessageListener(MessageListener):void
设置消息监听器
MessageListener是一个接口,只定义了一个方法:
- onMessage(Message message):void
这是一个回调方法,当有新的消息产生,这个方法会被自动调用。
所以,为实现异步接收,只需要对MessageListener进行实现,然后设置为consumer实例的messageListener。
核心代码
同步接收
long timeout = 10 * 1000;
for (Message message = consumer.receive(timeout); message != null; message = consumer
.receive(timeout)) {
String text = ((TextMessage) message).getText();
System.out.println(String.format("receive a message:%s", text));
}
异步接收
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
消息选择器
概述
消息选择器使用类似于SQL语法,为Consumer指定基于Message属性的筛选条件。发送的时候,给消息添加一些属性;在接收的时候,根据属性进行过滤。
API
javax.jms.Message提供了一系列的方法,用于设置属性:
- setIntProperty(String name, int value):void
设置int类型的属性 - setStringProperty(String name, String value):void
设置字符串类型的属性 - setObjectProperty(String name, Object value):void
设置对象类型的属性
除此之外,还有设置boolean、byte、short、long、float、double类型的API,模式相同。
javax.jms.Session则提供了在创建MessageConsumer时指定选择器的方法:
- createConsumer(Destination destination, String messageSelector):MessageConsumer
为指定的destination创建一个consumer。只有当消息的属性满足选择器的表达式时,才会被接收;选择器为null或者空字符串时,表示没有选择器。
在发送的时候为消息指定属性,在接收的时候为Consumer指定选择器。
选择器语法
选择器是字符串类型,官方文档说,选择器的语法类似于SQL的语法。以下仅对int、String类型举例:
// 假如有int类型的属性order,而我们接收order>10的消息
String messageSelector = "order > 10";
// 假如有String类型的属性kind,而我们接收kind="06"的消息
String messageSelector = "kind='06'";
核心代码
final int NUM = 3;
List<Message> result = new ArrayList<Message>(NUM);
try {
for (int i = 0; i < NUM; i++) {
TextMessage message = session.createTextMessage();
message.setText(String.format("This is the %dth message.",
i + 1));
message.setIntProperty("order", i + 1);
result.add(message);
}
} catch (JMSException e) {
e.printStackTrace();
return null;
}
return result;
接收消息
String messageSelector = "order > 1";
consumer = session.createConsumer(destination, messageSelector);
还没有评论,来说两句吧...