ActiveMQ入门

梦里梦外; 2022-04-25 04:48 315阅读 0赞

ActiveMQ介绍

  1. MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQRabbitMQkafkaActiveMQApache下的开源项目,完全支持JMS1.1J2EE1.4规范的JMS Provider实现。

特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)

ActiveMQ安装

这里写图片描述
我这里提供一个安装好的虚拟机:http://download.csdn.net/download/liuyuanq123/10217892
服务器运行后,我们可以直接访问到activeMQ的界面:
这里写图片描述
然后点击queues可以看到现在没有一条消息:
这里写图片描述

ActiveMQ测试

编写一个测试类对ActiveMQ进行测试,首先得向pom文件中添加ActiveMQ相关的jar包:

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-all</artifactId>
  4. </dependency>

queue的发送代码如下:

  1. public void testMQProducerQueue() throws Exception{
  2. //1、创建工厂连接对象,需要制定ip和端口号
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
  4. //2、使用连接工厂创建一个连接对象
  5. Connection connection = connectionFactory.createConnection();
  6. //3、开启连接
  7. connection.start();
  8. //4、使用连接对象创建会话(session)对象
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
  11. Queue queue = session.createQueue("test-queue");
  12. //6、使用会话对象创建生产者对象
  13. MessageProducer producer = session.createProducer(queue);
  14. //7、使用会话对象创建一个消息对象
  15. TextMessage textMessage = session.createTextMessage("hello!test-queue");
  16. //8、发送消息
  17. producer.send(textMessage);
  18. //9、关闭资源
  19. producer.close();
  20. session.close();
  21. connection.close();
  22. }

接收代码

  1. public void TestMQConsumerQueue() throws Exception{
  2. //1、创建工厂连接对象,需要制定ip和端口号
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
  4. //2、使用连接工厂创建一个连接对象
  5. Connection connection = connectionFactory.createConnection();
  6. //3、开启连接
  7. connection.start();
  8. //4、使用连接对象创建会话(session)对象
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
  11. Queue queue = session.createQueue("test-queue");
  12. //6、使用会话对象创建生产者对象
  13. MessageConsumer consumer = session.createConsumer(queue);
  14. //7、向consumer对象中设置一个messageListener对象,用来接收消息
  15. consumer.setMessageListener(new MessageListener() {
  16. @Override
  17. public void onMessage(Message message) {
  18. // TODO Auto-generated method stub
  19. if(message instanceof TextMessage){
  20. TextMessage textMessage = (TextMessage)message;
  21. try {
  22. System.out.println(textMessage.getText());
  23. } catch (JMSException e) {
  24. // TODO Auto-generated catch block
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. });
  30. //8、程序等待接收用户消息
  31. System.in.read();
  32. //9、关闭资源
  33. consumer.close();
  34. session.close();
  35. connection.close();
  36. }

然后当我们运行queue发送的时候可以看到队列里已经有一条消息了,但没有发送出去:
这里写图片描述
然后在运行queue 的接收端,可以看到消息已经发出了:
这里写图片描述
这里写图片描述
接着对topic进行测试,发送代码如下:

  1. public void TestTopicProducer() throws Exception{
  2. //1、创建工厂连接对象,需要制定ip和端口号
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
  4. //2、使用连接工厂创建一个连接对象
  5. Connection connection = connectionFactory.createConnection();
  6. //3、开启连接
  7. connection.start();
  8. //4、使用连接对象创建会话(session)对象
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
  11. Topic topic = session.createTopic("test-topic");
  12. //6、使用会话对象创建生产者对象
  13. MessageProducer producer = session.createProducer(topic);
  14. //7、使用会话对象创建一个消息对象
  15. TextMessage textMessage = session.createTextMessage("hello!test-topic");
  16. //8、发送消息
  17. producer.send(textMessage);
  18. //9、关闭资源
  19. producer.close();
  20. session.close();
  21. connection.close();
  22. }

接收代码:

  1. public void TestTopicConsumer() throws Exception{
  2. //1、创建工厂连接对象,需要制定ip和端口号
  3. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
  4. //2、使用连接工厂创建一个连接对象
  5. Connection connection = connectionFactory.createConnection();
  6. //3、开启连接
  7. connection.start();
  8. //4、使用连接对象创建会话(session)对象
  9. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  10. //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
  11. Topic topic = session.createTopic("test-topic");
  12. //6、使用会话对象创建生产者对象
  13. MessageConsumer consumer = session.createConsumer(topic);
  14. //7、向consumer对象中设置一个messageListener对象,用来接收消息
  15. consumer.setMessageListener(new MessageListener() {
  16. @Override
  17. public void onMessage(Message message) {
  18. // TODO Auto-generated method stub
  19. if(message instanceof TextMessage){
  20. TextMessage textMessage = (TextMessage)message;
  21. try {
  22. System.out.println(textMessage.getText());
  23. } catch (JMSException e) {
  24. // TODO Auto-generated catch block
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. });
  30. //8、程序等待接收用户消息
  31. System.in.read();
  32. //9、关闭资源
  33. consumer.close();
  34. session.close();
  35. connection.close();
  36. }

然后运行topic发送:
这里写图片描述
可以看到消息已经发送出去。再运行topic接收:
这里写图片描述
可以看到有了一个消费者,但是没有接收的消息,这是因为正常情况下我们的topic消息不会再服务器持久化,所以要先打开消费者,再打开生产者,这个时候我们再运行生产者发送一条消息看到消息已经接收到了:
这里写图片描述
这里写图片描述

原文:https://blog.csdn.net/qq_33404395/article/details/80590113

发表评论

表情:
评论列表 (有 0 条评论,315人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ActiveMQ入门

    前言:最近在公司用到了activeMq接收消息,从网上查了些资料,形成了一个大概的理解,所以总结一下,因为知识都是网上学习的难免有疏漏之处,望理解。 参考: [https:

    相关 activeMQ入门

    目录 一、下载activeMQ包 二、准备Linux环境 三、所遇问题 -------------------- 一、下载activeMQ包         可

    相关 ActiveMQ入门

    ActiveMQ介绍      MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ