ActiveMQ之P2P点对点通信方式
上一篇的代码是发布/订阅(PUS/SUB)的消息通信方式,发布者发布的一个主题,可以由多个消费者来订阅.
这一篇我们来介绍点对点(P2P)模型.P2P就好比两个人打电话,这两个人是独享这一条通信链路的,一方发送消息一方接收.
在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
package activemqDemo02;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息消费者(consumer):它用于接收发送到目的地的消息
* @author licheng
*
*/
public class Consumer {
public static final String url = "tcp://localhost:61616"; // 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
ConnectionFactory factory;
Connection connection;
Session session;
MessageConsumer[] consumers;
ComunicateMode comunicateMode = ComunicateMode.pubsub;
enum ComunicateMode {
p2p, pubsub
}
public Consumer(ComunicateMode mode, String[] destinationNames) throws JMSException{
this.comunicateMode = mode;
factory = new ActiveMQConnectionFactory(url); // 这里的url也可以不指定,java代码将默认将端口赋值为61616
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumers = new MessageConsumer[destinationNames.length];
for (int i = 0; i < destinationNames.length; i++) {
Destination destination = comunicateMode == ComunicateMode.pubsub ? session.createTopic("Topic." + destinationNames[i]) : session.createQueue("Queue." + destinationNames[i]);
consumers[i] = session.createConsumer(destination);
consumers[i].setMessageListener(new MessageListener(){
public void onMessage(Message message) {
try {
System.out.println(String.format("收到消息[%s]", ((TextMessage) message).getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
public void close() throws JMSException {
if(connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException, IOException {
Consumer consumer = new Consumer(ComunicateMode.pubsub, new String[] {"2"}); // 这里可以修改消息传输方式为pubsub
System.in.read();
consumer.close();
}
}
package activemqDemo02;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息生产者(produceer):用于把消息发送到一个目的地
*
* @author licheng
*
*/
public class Publisher {
public static final String url = "tcp://localhost:61616"; // 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
ConnectionFactory factory;
Connection connection;
Session session;
MessageProducer producer;
Destination[] destinations;
ComunicateMode comunicateMode = ComunicateMode.pubsub;
enum ComunicateMode {
p2p, pubsub
}
public Publisher(ComunicateMode mode) throws JMSException {
this.comunicateMode = mode;
factory = new ActiveMQConnectionFactory(url); // 这里的url也可以不指定,java代码将默认将端口赋值为61616
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException e) {
connection.close();
throw e;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
protected void setDestinations(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for(int i = 0; i < stocks.length; i++) {
destinations[i] = comunicateMode == ComunicateMode.pubsub ? session.createTopic("Topic." + stocks[i]) : session.createQueue("Queue." + stocks[i]);
}
}
protected void sendMessage(String msg) throws JMSException {
for (Destination item : destinations) {
TextMessage msgMessage = session.createTextMessage(msg);
producer.send(item, msgMessage);
System.out.println(String.format("成功向Topic为[%s]发送消息[%s]", item.toString(), msgMessage.getText()));
}
}
protected void close() throws JMSException {
if(connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException, InterruptedException, IOException {
Publisher publisher = new Publisher(ComunicateMode.pubsub); // 这里可以修改消息传输方式为pubsub
publisher.setDestinations(new String[] {"1","2","3"});
BufferedReader reader = null;
String contentString = "";
do {
System.out.println("请输入要发送的内容(exit退出)");
reader = new BufferedReader(new InputStreamReader(System.in));
contentString = reader.readLine();
if(contentString.equals("exit")) {
break;
}
publisher.sendMessage(contentString);
} while (!contentString.equals("exit"));
reader.close();
publisher.close();
}
}
先开启activemq.bat,然后运行Consumer,再运行Publisher就可以了.队列和主题在代码里体现的区别在于,一个是创建消息队列,一个是创建消息主题.好困啊~
还没有评论,来说两句吧...