ActiveMQ_topic使用
注意:消息发布后若没有调用者不保存在服务器。有相应持久化方法
一、发送:
@Test
public void addTopic() throws Exception{
//创建 一个连接工厂对象,需要指定服务的IP和端口号(默认:61616)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.134:61616");
//使用工厂对象创建一个connection对象
Connection connection = connectionFactory.createConnection();
//开启连接,调用connection对象的start方法
connection.start();
//创建session对象,其中有两个参数:第一个参数:是否开启事务,如果为true,开启,第二个参数无意义;
//false,关闭,一般关闭;第二个参数,应答模式:手动应答还是自动应答,一般自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用session对象创建一个Destination对象,两种形式:queue和topic
Topic topic = session.createTopic("test_topic");
//使用session对象创建一个producer对象
MessageProducer createProducer = session.createProducer(topic);
//创建一个message对象,key使用textMessage;两种创建方式
/*TextMessage message = new ActiveMQTextMessage();
message.setStringProperty("test", "hello");*/
TextMessage message = session.createTextMessage("hello");
//发送消息
createProducer.send(message);
//关闭资源
createProducer.close();
session.close();
connection.close();
}
二、接收:
@Test
public void consumerTopic() throws Exception{
// 创建 一个连接工厂对象,需要指定服务的IP和端口号(默认:61616)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.134:61616");
// 使用工厂对象创建一个connection对象
Connection connection = connectionFactory.createConnection();
// 开启连接,调用connection对象的start方法
connection.start();
// 创建session对象,其中有两个参数:第一个参数:是否开启事务,如果为true,开启,第二个参数无意义;
// false,关闭,一般关闭;第二个参数,应答模式:手动应答还是自动应答,一般自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 使用session对象创建一个Destination对象,两种形式:queue和topic
Topic topic = session.createTopic("test_topic");
// 使用session对象创建一个consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//接收
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
TextMessage textMessage = (TextMessage)message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//等待接收
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
还没有评论,来说两句吧...