ActiveMQ之P2P点对点通信方式

水深无声 2022-09-25 00:20 288阅读 0赞

上一篇的代码是发布/订阅(PUS/SUB)的消息通信方式,发布者发布的一个主题,可以由多个消费者来订阅.

这一篇我们来介绍点对点(P2P)模型.P2P就好比两个人打电话,这两个人是独享这一条通信链路的,一方发送消息一方接收.

在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。

  1. package activemqDemo02;
  2. import java.io.IOException;
  3. import javax.jms.Connection;
  4. import javax.jms.ConnectionFactory;
  5. import javax.jms.Destination;
  6. import javax.jms.JMSException;
  7. import javax.jms.Message;
  8. import javax.jms.MessageConsumer;
  9. import javax.jms.MessageListener;
  10. import javax.jms.Session;
  11. import javax.jms.TextMessage;
  12. import org.apache.activemq.ActiveMQConnectionFactory;
  13. /**
  14. * 消息消费者(consumer):它用于接收发送到目的地的消息
  15. * @author licheng
  16. *
  17. */
  18. public class Consumer {
  19. public static final String url = "tcp://localhost:61616"; // 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
  20. ConnectionFactory factory;
  21. Connection connection;
  22. Session session;
  23. MessageConsumer[] consumers;
  24. ComunicateMode comunicateMode = ComunicateMode.pubsub;
  25. enum ComunicateMode {
  26. p2p, pubsub
  27. }
  28. public Consumer(ComunicateMode mode, String[] destinationNames) throws JMSException{
  29. this.comunicateMode = mode;
  30. factory = new ActiveMQConnectionFactory(url); // 这里的url也可以不指定,java代码将默认将端口赋值为61616
  31. connection = factory.createConnection();
  32. connection.start();
  33. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  34. consumers = new MessageConsumer[destinationNames.length];
  35. for (int i = 0; i < destinationNames.length; i++) {
  36. Destination destination = comunicateMode == ComunicateMode.pubsub ? session.createTopic("Topic." + destinationNames[i]) : session.createQueue("Queue." + destinationNames[i]);
  37. consumers[i] = session.createConsumer(destination);
  38. consumers[i].setMessageListener(new MessageListener(){
  39. public void onMessage(Message message) {
  40. try {
  41. System.out.println(String.format("收到消息[%s]", ((TextMessage) message).getText()));
  42. } catch (JMSException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. });
  47. }
  48. }
  49. public void close() throws JMSException {
  50. if(connection != null) {
  51. connection.close();
  52. }
  53. }
  54. public static void main(String[] args) throws JMSException, IOException {
  55. Consumer consumer = new Consumer(ComunicateMode.pubsub, new String[] {"2"}); // 这里可以修改消息传输方式为pubsub
  56. System.in.read();
  57. consumer.close();
  58. }
  59. }
  60. package activemqDemo02;
  61. import java.io.BufferedReader;
  62. import java.io.IOException;
  63. import java.io.InputStreamReader;
  64. import javax.jms.Connection;
  65. import javax.jms.ConnectionFactory;
  66. import javax.jms.Destination;
  67. import javax.jms.JMSException;
  68. import javax.jms.MessageProducer;
  69. import javax.jms.Session;
  70. import javax.jms.TextMessage;
  71. import org.apache.activemq.ActiveMQConnectionFactory;
  72. /**
  73. * 消息生产者(produceer):用于把消息发送到一个目的地
  74. *
  75. * @author licheng
  76. *
  77. */
  78. public class Publisher {
  79. public static final String url = "tcp://localhost:61616"; // 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
  80. ConnectionFactory factory;
  81. Connection connection;
  82. Session session;
  83. MessageProducer producer;
  84. Destination[] destinations;
  85. ComunicateMode comunicateMode = ComunicateMode.pubsub;
  86. enum ComunicateMode {
  87. p2p, pubsub
  88. }
  89. public Publisher(ComunicateMode mode) throws JMSException {
  90. this.comunicateMode = mode;
  91. factory = new ActiveMQConnectionFactory(url); // 这里的url也可以不指定,java代码将默认将端口赋值为61616
  92. connection = factory.createConnection();
  93. try {
  94. connection.start();
  95. } catch (JMSException e) {
  96. connection.close();
  97. throw e;
  98. }
  99. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  100. producer = session.createProducer(null);
  101. }
  102. protected void setDestinations(String[] stocks) throws JMSException {
  103. destinations = new Destination[stocks.length];
  104. for(int i = 0; i < stocks.length; i++) {
  105. destinations[i] = comunicateMode == ComunicateMode.pubsub ? session.createTopic("Topic." + stocks[i]) : session.createQueue("Queue." + stocks[i]);
  106. }
  107. }
  108. protected void sendMessage(String msg) throws JMSException {
  109. for (Destination item : destinations) {
  110. TextMessage msgMessage = session.createTextMessage(msg);
  111. producer.send(item, msgMessage);
  112. System.out.println(String.format("成功向Topic为[%s]发送消息[%s]", item.toString(), msgMessage.getText()));
  113. }
  114. }
  115. protected void close() throws JMSException {
  116. if(connection != null) {
  117. connection.close();
  118. }
  119. }
  120. public static void main(String[] args) throws JMSException, InterruptedException, IOException {
  121. Publisher publisher = new Publisher(ComunicateMode.pubsub); // 这里可以修改消息传输方式为pubsub
  122. publisher.setDestinations(new String[] {"1","2","3"});
  123. BufferedReader reader = null;
  124. String contentString = "";
  125. do {
  126. System.out.println("请输入要发送的内容(exit退出)");
  127. reader = new BufferedReader(new InputStreamReader(System.in));
  128. contentString = reader.readLine();
  129. if(contentString.equals("exit")) {
  130. break;
  131. }
  132. publisher.sendMessage(contentString);
  133. } while (!contentString.equals("exit"));
  134. reader.close();
  135. publisher.close();
  136. }
  137. }

先开启activemq.bat,然后运行Consumer,再运行Publisher就可以了.队列和主题在代码里体现的区别在于,一个是创建消息队列,一个是创建消息主题.好困啊~

发表评论

表情:
评论列表 (有 0 条评论,288人围观)

还没有评论,来说两句吧...

相关阅读

    相关 P2P通信-Gossip传播

    日期:2017.8.15  Gossip是p2p通信时候,广泛使用的一种协议。 具有以下特点: 最终一致性。 明确的收敛速度O(n²)和时间复杂度O(logn) 较强

    相关 activeMQ 方式

    在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。 点对点的模式主要建立在一个队