jms基础入门(activemq)

秒速五厘米 2022-06-03 09:00 290阅读 0赞

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

1.ActiveMQ安装与使用

点我下载

下载后,进入目录apache-activemq-5.14.0\bin\win32|64
这里写图片描述

当然,还可以通过命令开启,

  • 普通启动 ./activemq start
  • 启动并指定日志文件 ./activemq start >tmp/smlog
  • 后台启动方式nohup ./activemq start >/tmp/smlog

运行成功后,打开网页,访问http://localhost:8161/admin/
第一次登陆,会要求输入密码 , 默认账号admin 默认密码admin

这里写图片描述
以上是登陆成功的页面,不同版本可能页面不同,但功能能用就证明登陆成功,

java程序演示

创建一个java项目,导入jar包,
这里写图片描述

  1. 创建一个会话创建者,
  2. package cn.itcast.demo;
  3. import javax.jms.Connection;
  4. import javax.jms.ConnectionFactory;
  5. import javax.jms.Destination;
  6. import javax.jms.JMSException;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Session;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnection;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. public class JMSProducer {
  13. // 默认连接用户名
  14. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  15. // 默认连接密码
  16. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  17. // 默认连接地址
  18. private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
  19. // 发送的消息数量
  20. private static final int SENDNUM = 10;
  21. public static void main(String[] args) {
  22. // 连接工厂
  23. ConnectionFactory connectionFactory;
  24. // 连接
  25. Connection connection = null;
  26. // 会话 接受或者发送消息的线程
  27. Session session;
  28. // 消息的目的地
  29. Destination destination;
  30. // 消息生产者
  31. MessageProducer messageProducer;
  32. // 实例化连接工厂
  33. connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
  34. JMSProducer.BROKEURL);
  35. try {
  36. // 通过连接工厂获取连接
  37. connection = connectionFactory.createConnection();
  38. // 启动连接
  39. connection.start();
  40. // 创建session
  41. session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  42. // 创建一个名称为HelloWorld的消息队列
  43. destination = session.createQueue("HelloWorld");
  44. // 创建消息生产者
  45. messageProducer = session.createProducer(destination);
  46. // 发送消息
  47. sendMessage(session, messageProducer);
  48. session.commit();
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. } finally {
  52. if (connection != null) {
  53. try {
  54. connection.close();
  55. } catch (JMSException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }
  60. }
  61. /** * 发送消息 * * @param session * @param messageProducer * 消息生产者 * @throws Exception */
  62. public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
  63. for (int i = 0; i < JMSProducer.SENDNUM; i++) {
  64. // 创建一条文本消息
  65. TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
  66. System.out.println("发送消息:Activemq 发送消息" + i);
  67. // 通过消息生产者发出消息
  68. messageProducer.send(message);
  69. }
  70. }
  71. }

运行如上代码测试
这里写图片描述

然后我们进入管理页面看看
这里写图片描述

我们创建一个消费者,

  1. package cn.itcast.demo;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnection;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. public class JMSConsumer {
  12. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认连接用户名
  13. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认连接密码
  14. private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认连接地址
  15. public static void main(String[] args) {
  16. ConnectionFactory connectionFactory;// 连接工厂
  17. Connection connection = null;// 连接
  18. Session session;// 会话 接受或者发送消息的线程
  19. Destination destination;// 消息的目的地
  20. MessageConsumer messageConsumer;// 消息的消费者
  21. // 实例化连接工厂
  22. connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
  23. JMSConsumer.BROKEURL);
  24. try {
  25. // 通过连接工厂获取连接
  26. connection = connectionFactory.createConnection();
  27. // 启动连接
  28. connection.start();
  29. // 创建session
  30. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  31. // 创建一个连接HelloWorld的消息队列
  32. destination = session.createQueue("HelloWorld");
  33. // 创建消息消费者
  34. messageConsumer = session.createConsumer(destination);
  35. while (true) {
  36. TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
  37. if (textMessage != null) {
  38. System.out.println("收到的消息:" + textMessage.getText());
  39. } else {
  40. break;
  41. }
  42. }
  43. } catch (JMSException e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }

运行结果如下
这里写图片描述

然后我们再进入到管理页面
这里写图片描述

其实这些代码没有什么技术量,,复制粘贴就行,,唯一需要改的就是消息队列名了,注意一下就行,

maven整合spring+jms(activemq)

applicationContext.xml(模板如下)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
  3. <!-- 扫盲包 -->
  4. <context:component-scan base-package="cn.itcast.activemq" />
  5. <!-- ActiveMQ 连接工厂 -->
  6. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  7. <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
  8. <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
  9. <!-- Spring Caching连接工厂 -->
  10. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  11. <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
  12. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  13. <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  14. <!-- 同上,同理 -->
  15. <!-- <constructor-arg ref="amqConnectionFactory" /> -->
  16. <!-- Session缓存数量 -->
  17. <property name="sessionCacheSize" value="100" />
  18. </bean>
  19. <!-- Spring JmsTemplate 的消息生产者 start-->
  20. <!-- 定义JmsTemplate的Queue类型 -->
  21. <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
  22. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  23. <constructor-arg ref="connectionFactory" />
  24. <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
  25. <property name="pubSubDomain" value="false" />
  26. </bean>
  27. 这也是二选一
  28. <!-- 定义JmsTemplate的Topic类型 -->
  29. <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
  30. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  31. <constructor-arg ref="connectionFactory" />
  32. <!-- pub/sub模型(发布/订阅) -->
  33. <property name="pubSubDomain" value="true" />
  34. </bean>
  35. <!--Spring JmsTemplate 的消息生产者 end-->
  36. <!-- 消息消费者 start-->
  37. <!-- 定义Queue监听器 -->
  38. <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
  39. <!--消息队列名称 消费者类名,首字母小写,需要加上注解才行 -->
  40. <jms:listener destination="test.queue" ref="queueReceiver1"/>
  41. </jms:listener-container>
  42. 俩种监听方式,一个是一对一,一个是一对多,区别我就不多说了,不会自行百度,通常开发都是二选一,自己选择合适的
  43. <!-- 定义Topic监听器 -->
  44. <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
  45. <jms:listener destination="test.topic" ref="topicReceiver1"/>
  46. <jms:listener destination="test.topic" ref="topicReceiver2"/>
  47. </jms:listener-container>
  48. <!-- 消息消费者 end -->
  49. </beans>

pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.12</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework</groupId>
  9. <artifactId>spring-core</artifactId>
  10. <version>4.1.7.RELEASE</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework</groupId>
  14. <artifactId>spring-jms</artifactId>
  15. <version>4.1.7.RELEASE</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.activemq</groupId>
  19. <artifactId>activemq-all</artifactId>
  20. <version>5.2.0</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework</groupId>
  24. <artifactId>spring-test</artifactId>
  25. <version>4.1.7.RELEASE</version>
  26. </dependency>
  27. <!-- <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.2</version> </dependency> -->
  28. </dependencies>

好了,我们开始撸代码(做一个注册发短信提示激活功能)

首先写消费者

0.创建一个maven工程

1.复制原先准备好的pom文件

2.编写web.xml

  1. <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
  2. <listener>
  3. <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  4. </listener>
  5. <!-- 指定spring配置文件的位置,文件放在resources下 -->
  6. <context-param>
  7. <param-name>contextConfigLocation</param-name>
  8. <param-value>classpath:applicationContext.xml</param-value>
  9. </context-param>
  10. </web-app>

3.配置applicationContext.xml文件(放在resources目录下)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
  3. <!-- 扫盲包 -->
  4. <context:component-scan base-package="cn.itcast.activemq" />
  5. <!-- ActiveMQ 连接工厂 -->
  6. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  7. <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
  8. <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
  9. <!-- Spring Caching连接工厂 -->
  10. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  11. <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
  12. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  13. <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  14. <!-- 同上,同理 -->
  15. <!-- <constructor-arg ref="amqConnectionFactory" /> -->
  16. <!-- Session缓存数量 -->
  17. <property name="sessionCacheSize" value="100" />
  18. </bean>
  19. <!-- 消息消费者 start-->
  20. <!-- 定义Queue监听器 -->
  21. <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
  22. <jms:listener destination="sms.queue" ref="registerQueue"/>
  23. </jms:listener-container>
  24. <!-- 消息消费者 end -->
  25. </beans>

编写消费者类

需要给applicationContext的包扫描扫到,

  1. package cn.itcast.activemq;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.stereotype.Component;
  5. import javax.jms.JMSException;
  6. import javax.jms.MapMessage;
  7. import javax.jms.Message;
  8. import javax.jms.MessageListener;
  9. /** * 发送短信系统, */
  10. @Component
  11. public class RegisterQueue implements MessageListener {
  12. private Logger log = LoggerFactory.getLogger(RegisterQueue.class);
  13. public void onMessage(Message message) {
  14. //接收map类型数据
  15. MapMessage map = (MapMessage)message;
  16. try {
  17. String telephone = map.getString("telephone"); //接收电话
  18. String content = map.getString("content") ; //这是生产者存入的字段名
  19. log.info("telephone:"+telephone);
  20. log.info("content:"+content);
  21. log.info("模拟发送短信");
  22. } catch (JMSException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }

最后我们需要注意的是,因为是一个单独的系统,所以我们需要配置一个tomcat,在pom文件里面

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.tomcat.maven</groupId>
  4. <artifactId>tomcat7-maven-plugin</artifactId>
  5. <version>2.2</version>
  6. <configuration>
  7. <uriEncoding>UTF-8</uriEncoding>
  8. <path>/bos_sms</path>
  9. <port>8892</port>
  10. </configuration>
  11. </plugin>
  12. </plugins>

消费者已经写完了,然后我们开始写生产者(客户端),

由于客户端涉及相关业务,不能发完整代码,体谅一下

1.创建一个maven工程,

2.配置web.xml文件,

  1. <-- 配置web.xml -->
  2. <!--struts2拦截器-->
  3. <filter>
  4. <filter-name>struts2</filter-name>
  5. <filter-class>org.apache.struts2.dispatcher.ng.filter.StrutsPrepareAndExecuteFilter</filter-class>
  6. </filter>
  7. <filter-mapping>
  8. <filter-name>struts2</filter-name>
  9. <url-pattern>*</url-pattern>
  10. </filter-mapping>
  11. <!--spring初始化配置-->
  12. <context-param>
  13. <param-name>contextConfigLocation</param-name>
  14. <param-value>classpath:applicationContext.xml</param-value>
  15. </context-param>
  16. <!-- 配置spring框架的监听器 -->
  17. <listener>
  18. <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  19. </listener>

配置applicationContext.xml

  1. <!--把这下面的东西拷贝进去修改修改就行了,-->
  2. <!-- 扫盲包 -->
  3. <context:component-scan base-package="cn.itcast.activemq" />
  4. <!-- ActiveMQ 连接工厂 -->
  5. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  6. <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
  7. <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
  8. <!-- Spring Caching连接工厂 -->
  9. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  10. <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
  11. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  12. <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  13. <!-- 同上,同理 -->
  14. <!-- <constructor-arg ref="amqConnectionFactory" /> -->
  15. <!-- Session缓存数量 -->
  16. <property name="sessionCacheSize" value="100" />
  17. </bean>
  18. <!-- Spring JmsTemplate 的消息生产者 start-->
  19. <!-- 定义JmsTemplate的Queue类型 -->
  20. <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
  21. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  22. <constructor-arg ref="connectionFactory" />
  23. <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
  24. <property name="pubSubDomain" value="false" />
  25. </bean>

创建一个生产者

  1. @Resource
  2. private JmsTemplate template;
  3. @Action(value = "customerAction_regist",results = {
  4. @Result(name = "fail",location = "/signup-fail.html",type = "redirect"),
  5. @Result(name = "success",location = "/signup-success.html",type = "redirect")})
  6. public String regist(){
  7. String sms_name = "sms.queue";//消息队列明,需要跟我们先前设置消费者的一样
  8. final String content = "恭喜你,你的账号已注册,请登陆邮箱激活!";//消息内容
  9. //创建一个消息匿名类
  10. template.send(sms_name, new MessageCreator() {
  11. public Message createMessage(Session session) throws JMSException {
  12. MapMessage mapMessage = session.createMapMessage();//获取map的消息对象
  13. mapMessage.setString("telephone",customer.getTelephone());
  14. mapMessage.setString("content",content);
  15. return mapMessage;
  16. }
  17. });

最后还需要配置一个tomcat ,

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.tomcat.maven</groupId>
  4. <artifactId>tomcat7-maven-plugin</artifactId>
  5. <version>2.2</version>
  6. <configuration>
  7. <uriEncoding>UTF-8</uriEncoding>
  8. <path>/bos_before</path>
  9. <port>8890</port>
  10. </configuration>
  11. </plugin>
  12. </plugins>

最后启动运行试试,需要注意的是,俩个项目都要启动,
这里写图片描述

sms服务启动成功,接下来我们启动另外一个服务,
这里写图片描述

也启动成功,我们开始代码测试

测试抛出异常

  1. Exception occurred during processing request: null
  2. java.lang.NullPointerException

根据提示信息发现 , JmsTemplate template;注入失败,

然后一顿乱找,发现是切面的影响的,,这个有点坑

把这行代码注释掉就没问题了

  1. <!--使用切面注解扫描-->
  2. <aop:aspectj-autoproxy proxy-target-class="true"/>

原因是代理类不能使用JmsTemplate,

最后运行是没问题的,

这里写图片描述

jms就讲到这里,提前祝大家圣诞节快乐

发表评论

表情:
评论列表 (有 0 条评论,290人围观)

还没有评论,来说两句吧...

相关阅读

    相关 activeMQ Jms Demo

    概述 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provide

    相关 JMS操作ActiveMQ

    一般情况下我们都是通过jms操作mq,如果使用原生的jms代码比较繁琐,还好spring有集成的jms,通过jms模板类,能够适应各种mq的操作,其就像jdbc模板一样将具体的