学习ActiveMQ(二):点对点(队列)模式消息演示

落日映苍穹つ 2022-01-06 03:07 350阅读 0赞

  一:介绍

  点对点的消息发送方式主要建立在 消息(Message ),队列(Queue),发送者(Sender),消费者(receiver)上,Queue 存贮消息,Sender 发送消息,receive接收消息.具体点就是Sender Client通过Queue发送message ,而 receiver Client从Queue中接收消息。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
  二:通过jms编码接口之间的关系,流程如下:

  1.创建连接Connection
  2.创建会话Session
  3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
  4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination
  5.生产者向目标发送TextMessage消息send()
  6.消费者设置监听器,监听消息。

1663969-20190420113406123-1252960696.png

  三:创建实例

  1.打开IDEA,创建一个ActiveMQ的maven项目,如下图:

1663969-20190420114417696-84082933.png

  2.自己新创建两个java文件,appConsumer消费者类和appProducer生产者类,项目结构如下图:

1663969-20190420114557871-1618691385.png

  3.生产者代码如下:

  1. 1 package com.liu.jms;
  2. 2
  3. 3 import org.apache.activemq.ActiveMQConnectionFactory;
  4. 4
  5. 5 import javax.jms.*;
  6. 6
  7. 7 public class appProducer {
  8. 8
  9. 9 private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式
  10. 10 private static final String queueName = "queue-test";//定义队列的名称
  11. 11
  12. 12 public static void main(String[] args) throws JMSException {
  13. 13 //1.创建connectionFactory
  14. 14 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
  15. 15 //2.创建connection
  16. 16 Connection connection = connectionFactory.createConnection();
  17. 17 //3.启动连接
  18. 18 connection.start();
  19. 19 //4.创建session
  20. 20 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  21. 21 //5.创建destination
  22. 22 Destination destination = session.createQueue(queueName);
  23. 23 //6.创建生产者
  24. 24 MessageProducer producer = session.createProducer(destination);
  25. 25
  26. 26 for (int i = 0; i < 100; i++) {
  27. 27
  28. 28 TextMessage textMessage = session.createTextMessage("test" + i);
  29. 29 //7.发送消息
  30. 30 producer.send(textMessage);
  31. 31
  32. 32 System.out.println("发送消息" + textMessage.getText());
  33. 33
  34. 34 }
  35. 35 //8.关闭连接
  36. 36 connection.close();
  37. 37 }
  38. 38 }

  如代码所示,通过tcp方式连接了服务端,(别忘了启动服务端的服务)。链接的具体参数可以参考http://activemq.apache.org/connection-configuration-uri.html。

然后创建了一个生产者,这个生产者绑定了一个以名为queueName的队列为目的源,代表着这个生产者的消息会发到这个消息队列上面去。然后通过for循环发送了一百个消息。

  4.消费者代码如下:

  1. package com.liu.jms;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. public class appConsumer {
  5. private static final String url = "tcp://127.0.0.1:61616";
  6. private static final String queueName = "queue-test";
  7. public static void main(String[] args) throws JMSException {
  8. //1.创建connectionFactory
  9. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
  10. //2.创建connection
  11. Connection connection = connectionFactory.createConnection();
  12. //3.启动连接
  13. connection.start();
  14. //4.创建session
  15. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  16. //5.创建destination
  17. Destination destination = session.createQueue(queueName);
  18. //6.创建消费者
  19. MessageConsumer consumer = session.createConsumer(destination);
  20. //7.创建一个监听器
  21. consumer.setMessageListener(new MessageListener() {
  22. @Override
  23. public void onMessage(Message message) {
  24. TextMessage textMessage = (TextMessage)message;
  25. try {
  26. System.out.println("接收到的消息:" + textMessage.getText());
  27. } catch (JMSException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. });
  32. //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了)
  33. //connection.close();
  34. }
  35. }

  如代码所示:消费者和生产者不同的是,消费需要建立一个监听器,来监听以名为queueName的队列上是否有了消息,有消息就会接受,然后通过onMessage方法对消息进行处理。

  5.测试

  首先启动消费者这个java类,观察控制台,如下图:

1663969-20190420115938987-766664311.png

  接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。

1663969-20190420120045613-863093974.png

  此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。

1663969-20190420120200345-1121870112.png

  6.打开activemq的控制台查看Queues:(http://127.0.0.1:8161/admin/queues.jsp)如下图所示:队列有一个名字是我们设置的queue-test,消费者也有一个就是我们创建的那个消费者类,队列中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。

1663969-20190420120345367-1066396560.png

  7.那么我启动了两个相同目标队列的消费者呢?重新测试一下看看,为方便看清结果,重启一下服务。然后运行两遍消费者,idea控制台如下图:有两个消费者,且都没有收到消息。

1663969-20190420123650975-1865841097.png

  8.启动生产者,如下图:生产了一百条信息。

1663969-20190420123912031-80206536.png

  9.看看两个消费者的控制台,如下两张图:其中一个消费接收到的全是奇数的消息,而另一个接收到的都是偶数的消息。

  1663969-20190420123944527-229402338.png

1663969-20190420124005733-1703818548.png

  10.看看activemq控制台,如下图,消费者确实是两个。

1663969-20190420124635308-1801093382.png

  11.得出结论:当有点对点模式下,两个消费者消费的消息之和是生产者产生的消息总数,且每一个消息都只会被一个消费者接收,不会出现两个消费者接收同一消息的情况。

这一篇通过这个简单的小demo我们已经实现了点对点的通信方式,并了解它的特性。下一篇将会学习订阅发布模式。

转载于:https://www.cnblogs.com/liuyuan1227/p/10740053.html

发表评论

表情:
评论列表 (有 0 条评论,350人围观)

还没有评论,来说两句吧...

相关阅读

    相关 activeMQ 方式

    在点对点的传输方式中,消息数据被持久化,每条消息都能被消费,没有监听QUEUE地址也能被消费,数据不会丢失,一对一的发布接受策略,保证数据完整。 点对点的模式主要建立在一个队