ActiveMQ应用入门

た 入场券 2021-11-17 10:31 392阅读 0赞

ActiveMQ应用入门

  • ActiveMQ中常用API介绍
  • JMS-Hello
    • 导入依赖
    • 创建ActiveMQ中的生产者
    • 创建ActiveMQ中的消费者
    • 测试

ActiveMQ中常用API介绍

下述API都是接口类型,由定义在javax.jms包中.是JMS标准接口定义
在这里插入图片描述

JMS-Hello

导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-all</artifactId>
  5. <version>5.12.0</version>
  6. </dependency>
  7. </dependencies>

创建ActiveMQ中的生产者

  1. package com.yjn.jms;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. /** * ActiveMQ中的生产者(Producer) * @author JustinNeil * */
  5. public class HelloProducer {
  6. public void sendhello2ActiveMq(String messageText) {
  7. // 连接工厂,用于创建Connection对象
  8. ConnectionFactory factory = null;
  9. // activeMQ 连接对象
  10. Connection conn = null;
  11. // 一次和ActiveMQ的持久会话对象
  12. Session session = null;
  13. // 目的地
  14. Destination destination = null;
  15. // 消息发送者
  16. MessageProducer producer = null;
  17. // 封装消息的对象
  18. Message message = null;
  19. try {
  20. /* * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. * 构造方法: public ActiveMQConnectionFactory(String userName, String password, * String brokerURL) * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议. */
  21. factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.192.171:61616");
  22. // 创建链接对象
  23. conn = factory.createConnection();
  24. // 启动连接对象
  25. conn.start();
  26. /* * 创建会话对象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事务, * 可选值为true|false * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. * acknowledgeMode - 消息确认机制, 可选值为: * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制 */
  27. session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  28. // 创建目的地,目的地的命名既是队列的命令
  29. destination = session.createQueue("MQ-Hello");
  30. // 创建消息生成者, 创建的消息生成者与某目的地对应, 即方法参数目的地.
  31. producer = session.createProducer(destination);
  32. // 创建消息对象,创建一个文本消息对象。此消息对象中保存要传递的文本数据.
  33. message = session.createTextMessage(messageText);
  34. // 发送消息
  35. producer.send(message);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. System.out.println("访问ActiveMQ服务发生错误!!");
  39. } finally {
  40. try {
  41. // 回收消息发送者资源
  42. if (null != producer)
  43. producer.close();
  44. } catch (JMSException e) {
  45. e.printStackTrace();
  46. }
  47. try {
  48. // 回收会话资源
  49. if (null != session)
  50. session.close();
  51. } catch (JMSException e) {
  52. e.printStackTrace();
  53. }
  54. try {
  55. // 回收链接资源
  56. if (null != conn)
  57. conn.close();
  58. } catch (JMSException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. }

创建ActiveMQ中的消费者

  1. package com.yjn.jms;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. /** * ActiveMQ中的消费者(Consumer) * @author JustinNeil * */
  5. public class HelloConsumer {
  6. public void reciveHelloFormActiveMq() {
  7. // 连接工厂,用于创建Connection对象
  8. ConnectionFactory factory = null;
  9. // activeMQ 连接对象
  10. Connection conn = null;
  11. // 一次和ActiveMQ的持久会话对象
  12. Session session = null;
  13. // 目的地
  14. Destination destination = null;
  15. // 消息消费者
  16. MessageConsumer consumer = null;
  17. // 封装消息的对象
  18. Message message = null;
  19. try {
  20. /* * 创建链接工厂 ActiveMQConnectionFactory -由ActiveMQ实现的ConnectionFactory接口实现类. * 构造方法: public ActiveMQConnectionFactory(String userName, String password, * String brokerURL) * userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置. * password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置. * brokerURL -访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号 此链接基于TCP/IP协议. */
  21. factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.192.171:61616");
  22. // 创建链接对象
  23. conn = factory.createConnection();
  24. // 启动连接对象
  25. conn.start();
  26. /* * 创建会话对象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事务, * 可选值为true|false * true - 使用事务, 当设置此变量值, 则acknowledgeMode参数无效, * 建议传递的acknowledgeMode参数值为 Session.SESSION_TRANSACTED * false - 不使用事务, 设置此变量值,则acknowledgeMode参数必须设置. * acknowledgeMode - 消息确认机制, 可选值为: * Session.AUTO_ACKNOWLEDGE - 自动确认消息机制 * Session.CLIENT_ACKNOWLEDGE -客户端确认消息机制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制 */
  27. session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  28. // 创建目的地,目的地的命名既是队列的命令
  29. destination = session.createQueue("MQ-Hello");
  30. // 创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地.
  31. consumer = session.createConsumer(destination);
  32. // 从ActiveMQ中获取消息
  33. message = consumer.receive();
  34. String textMessage = ((TextMessage)message).getText();
  35. System.out.println("ActiveMQ获取的消息是:"+textMessage);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. System.out.println("访问ActiveMQ服务发生错误!!");
  39. } finally {
  40. try {
  41. // 回收消息发送者资源
  42. if (null != consumer)
  43. consumer.close();
  44. } catch (JMSException e) {
  45. e.printStackTrace();
  46. }
  47. try {
  48. // 回收会话资源
  49. if (null != session)
  50. session.close();
  51. } catch (JMSException e) {
  52. e.printStackTrace();
  53. }
  54. try {
  55. // 回收链接资源
  56. if (null != conn)
  57. conn.close();
  58. } catch (JMSException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. }

测试

生产者

  1. public static void main(String[] args) {
  2. HelloProducer helloProducer = new HelloProducer();
  3. helloProducer.sendhello2ActiveMq("你好。。。");
  4. }

消费者

  1. public static void main(String[] args) {
  2. HelloConsumer helloConsumer = new HelloConsumer();
  3. helloConsumer.reciveHelloFormActiveMq();
  4. }

先启动消费者,再启动生产者,查看效果
消费者启动后阻塞状态,等待消息

发表评论

表情:
评论列表 (有 0 条评论,392人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ActiveMQ入门

    前言:最近在公司用到了activeMq接收消息,从网上查了些资料,形成了一个大概的理解,所以总结一下,因为知识都是网上学习的难免有疏漏之处,望理解。 参考: [https:

    相关 activeMQ入门

    目录 一、下载activeMQ包 二、准备Linux环境 三、所遇问题 -------------------- 一、下载activeMQ包         可

    相关 ActiveMQ入门

    ActiveMQ介绍      MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ

    相关 ActiveMQ应用

    1. PTP处理模式(Queue) > 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。 > 消息被消费以后,queue中不再有存储