浅谈ActiveMQ与使用

Bertha 。 2023-08-17 17:50 212阅读 0赞

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

1468250-20190720125917002-1679869251.png

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

1468250-20190720131946328-164805159.png

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

1468250-20190720132250768-792851447.png

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转到https://www.cnblogs.com/xiguadadage/p/11216463.html

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

1468250-20190720133435448-1493417535.png

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

1468250-20190720133622101-407557464.png

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

1468250-20190720135038845-65269295.png

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-all</artifactId>
  4. <version>5.9.0</version>
  5. </dependency>

(2)生产者类

  1. /**
  2. * @Description 生产者
  3. * @Date 2019/7/20
  4. * @Created by yqh
  5. */
  6. public class MyProducer {
  7. private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
  8. public static void main(String[] args) throws JMSException {
  9. // 创建连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. // 创建连接
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. // 打开连接
  14. connection.start();
  15. // 创建会话
  16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  17. // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
  18. Destination destination = session.createQueue("myQueue");
  19. // 创建一个生产者
  20. MessageProducer producer = session.createProducer(destination);
  21. // 向队列推送10个文本消息数据
  22. for (int i = 1 ; i <= 10 ; i++){
  23. // 创建文本消息
  24. TextMessage message = session.createTextMessage("第" + i + "个文本消息");
  25. //发送消息
  26. producer.send(message);
  27. //在本地打印消息
  28. System.out.println("已发送的消息:" + message.getText());
  29. }
  30. //关闭连接
  31. connection.close();
  32. }
  33. }

运行结果:

  1. 已发送的消息:第1个文本消息
  2. 已发送的消息:第2个文本消息
  3. 已发送的消息:第3个文本消息
  4. 已发送的消息:第4个文本消息
  5. 已发送的消息:第5个文本消息
  6. 已发送的消息:第6个文本消息
  7. 已发送的消息:第7个文本消息
  8. 已发送的消息:第8个文本消息
  9. 已发送的消息:第9个文本消息
  10. 已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

1468250-20190720143748647-1928502093.png

(3)消费者类

  1. /**
  2. * @Description 消费者类
  3. * @Date 2019/7/20 0020
  4. * @Created by yqh
  5. */
  6. public class MyConsumer {
  7. private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
  8. public static void main(String[] args) throws JMSException {
  9. // 创建连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. // 创建连接
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. // 打开连接
  14. connection.start();
  15. // 创建会话
  16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  17. // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
  18. Destination destination = session.createQueue("myQueue");
  19. // 创建消费者
  20. MessageConsumer consumer = session.createConsumer(destination);
  21. // 创建消费的监听
  22. consumer.setMessageListener(new MessageListener() {
  23. public void onMessage(Message message) {
  24. TextMessage textMessage = (TextMessage) message;
  25. try {
  26. System.out.println("消费的消息:" + textMessage.getText());
  27. } catch (JMSException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. });
  32. }
  33. }

测试结果:

  1. 消费的消息:第1个文本消息
  2. 消费的消息:第2个文本消息
  3. 消费的消息:第3个文本消息
  4. 消费的消息:第4个文本消息
  5. 消费的消息:第5个文本消息
  6. 消费的消息:第6个文本消息
  7. 消费的消息:第7个文本消息
  8. 消费的消息:第8个文本消息
  9. 消费的消息:第9个文本消息
  10. 消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

1468250-20190720144840403-1467992562.png

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

  1. // Consumer1控制台
  2. 消费的消息:第1个文本消息
  3. 消费的消息:第3个文本消息
  4. 消费的消息:第5个文本消息
  5. 消费的消息:第7个文本消息
  6. 消费的消息:第9个文本消息
  7. // Consumer2控制台
  8. 消费的消息:第2个文本消息
  9. 消费的消息:第4个文本消息
  10. 消费的消息:第6个文本消息
  11. 消费的消息:第8个文本消息
  12. 消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

  1. /**
  2. * @Description 基于发布/订阅模式传输类型的生产者测试
  3. * @Date 2019/7/20 0020
  4. * @Created by yqh
  5. */
  6. public class MyProducerForTopic {
  7. private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
  8. public static void main(String[] args) throws JMSException {
  9. // 创建连接工厂
  10. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  11. // 创建连接
  12. Connection connection = activeMQConnectionFactory.createConnection();
  13. // 打开连接
  14. connection.start();
  15. // 创建会话
  16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  17. // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
  18. Destination destination = session.createTopic("topicTest");
  19. // 创建一个生产者
  20. MessageProducer producer = session.createProducer(destination);
  21. // 向队列推送10个文本消息数据
  22. for (int i = 1 ; i <= 10 ; i++){
  23. // 创建文本消息
  24. TextMessage message = session.createTextMessage("第" + i + "个文本消息");
  25. //发送消息
  26. producer.send(message);
  27. //在本地打印消息
  28. System.out.println("已发送的消息:" + message.getText());
  29. }
  30. //关闭连接
  31. connection.close();
  32. }
  33. }
  34. /**
  35. * @Description 基于发布/订阅模式传输类型的消费者测试
  36. * @Date 2019/7/20 0020
  37. * @Created by yqh
  38. */
  39. public class MyConsumerForTopic {
  40. private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
  41. public static void main(String[] args) throws JMSException {
  42. // 创建连接工厂
  43. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
  44. // 创建连接
  45. Connection connection = activeMQConnectionFactory.createConnection();
  46. // 打开连接
  47. connection.start();
  48. // 创建会话
  49. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  50. // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
  51. Destination destination = session.createTopic("topicTest");
  52. // 创建消费者
  53. MessageConsumer consumer = session.createConsumer(destination);
  54. // 创建消费的监听
  55. consumer.setMessageListener(new MessageListener() {
  56. public void onMessage(Message message) {
  57. TextMessage textMessage = (TextMessage) message;
  58. try {
  59. System.out.println("消费的消息:" + textMessage.getText());
  60. } catch (JMSException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. });
  65. }
  66. }

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

#

转载于:https://www.cnblogs.com/xiguadadage/p/11217604.html

发表评论

表情:
评论列表 (有 0 条评论,212人围观)

还没有评论,来说两句吧...

相关阅读

    相关 索引搜索

    简介 介绍了索引尤其是倒排索引,再通过索引进行搜索。使用倒排索引和调整过滤顺序等手段,优化检索逻辑,避免每次搜索都要遍历所有数据。 文章供6000余字,全文脉络如下:

    相关 ActiveMQ使用

    一、什么是消息中间件 消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送 ![1468250-20190720125917002-1679869251.pn

    相关 GITSVN

     浅谈GIT与SVN 1.GIT是分布式的,SVN是集中式的 GIT也拥有跟SVN一样的集中式版本库或服务器,但GIT更侧重于分布式,这种模式可以让你在一个