JMS操作ActiveMQ

叁歲伎倆 2022-02-04 11:49 484阅读 0赞

一般情况下我们都是通过jms操作mq,如果使用原生的jms代码比较繁琐,还好spring有集成的jms,通过jms模板类,能够适应各种mq的操作,其就像jdbc模板一样将具体的创建连接、连接的管理、消息的发送和接收等操作进行了封装,我们只需对mq连接池和jmsTemplate进行配置就可以了。

要使用mq进行消息发送和接收的缓存区,mq相当于是服务端,生产者和消费者都是客户端,生产者将消息发送到mq服务端,服务端再通知消费者消费,或是由消费者轮询mq服务端的队列进行消费,因此首先需要启动mq这个服务端。

一、启动ActiveMQ

这里以ActiveMQ为例,下一代ActiveMQ,Artemis的配置也类似。官网的操作指导地址:https://activemq.apache.org/getting-started

1、先从官网下载安装包:https://activemq.apache.org/index.html,这里以Windows为例进行安装,Linux操作相似。官网对于mq的安装环境有一定要求,比如jre版本。

2、解压缩安装包

3、进入命令目录bin

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70

4、启动mq

不能直接双击activemq.bat,否则命令行窗口会闪退,可以通过cmd在命令行输入activemq start命令启动

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 1

使用netstat -an|find “61616”查看端口监听情况

20190507172600300.png

如果想要修改监听端口可以修改conf下的activemq.xml文件

20190507172600334.png

5、使用管理接口

打开地址 http://127.0.0.1:8161/admin/,登录用户和密码是admin,点击Queues菜单,查看注册到服务端的队列,且可以手动创建队列。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 2

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 3

二、spring集成jms操作mq

引入spring集成jms的包和activemq的包,其他spring包不特别列出,包括junit。

  1. <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
  2. <dependency>
  3. <groupId>org.springframework</groupId>
  4. <artifactId>spring-jms</artifactId>
  5. <version>4.3.20.RELEASE</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
  8. <dependency>
  9. <groupId>org.apache.activemq</groupId>
  10. <artifactId>activemq-all</artifactId>
  11. <version>5.15.9</version>
  12. </dependency>

1、首先定义连接工厂连接的服务端地址

  1. <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" >
  2. </amq:connectionFactory>

通过引入activemq和jms的xml模式,可以通过标签元素简写bean的定义

  1. xmlns:jms="http://www.springframework.org/schema/jms"
  2. xmlns:amq=http://activemq.apache.org/schema/core

2、定义mq缓存消息的队列

  1. <amq:queue id="goodsQueue" physicalName="teriste.goodsQueue"></amq:queue>

3、定义操作连接工厂和队列的jmsTemplate类

  1. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  2. <constructor-arg name="connectionFactory" ref="connectionFactory" ></constructor-arg>
  3. <property name="defaultDestinationName" value="teriste.goodsQueue" ></property>
  4. </bean>

4、引入消息生产者,其需要注入jmsTemplate进行消息发送操作,我们再代码中使用的是JmsOperations接口,其实现类就是JmsTemplate

  1. <bean id="queryGoodsService" class="com.teriste.api.service.impl.QueryGoodsServiceImpl">
  2. <constructor-arg name="jmsOperations" ref="jmsTemplate"></constructor-arg>
  3. </bean>

20190507172802786.png

5、然后是定义监听器容器,其中定义了监听队列的所有消费者

  1. <jms:listener-container>
  2. <jms:listener destination="teriste.goodsQueue" ref="consumer"
  3. method="consumerMessage"/>
  4. </jms:listener-container>

6、引入消费者

  1. <bean id="consumer" class="com.teriste.api.controller.MessageConsumerBusiness"/>
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <beans xmlns="http://www.springframework.org/schema/beans"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xmlns:jms="http://www.springframework.org/schema/jms"
  6. xmlns:amq="http://activemq.apache.org/schema/core"
  7. xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.2.xsd
  8. http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  9. http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
  10. <!-- 创建连接池连接amq,url是再activemq.xml中配置的地址 -->
  11. <!-- 有了amq:connectionFactory注解就不需要显示定义class属性
  12. org.apache.activemq.spring.ActiveMQConnectionFactory-->
  13. <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" trustAllPackages="true">
  14. </amq:connectionFactory>
  15. <!-- 配置监听队列监听生产者发送的消息 -->
  16. <amq:queue id="goodsQueue" physicalName="teriste.goodsQueue"></amq:queue>
  17. <!-- jms模板,用于建立连接和操作队列 -->
  18. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  19. <constructor-arg name="connectionFactory" ref="connectionFactory" ></constructor-arg>
  20. <property name="defaultDestinationName" value="teriste.goodsQueue" ></property>
  21. </bean>
  22. <!-- 生产者通过jmsOperations向队列中发送消息,其实现类就是 jmsTemplate-->
  23. <bean id="queryGoodsService" class="com.teriste.api.service.impl.QueryGoodsServiceImpl">
  24. <constructor-arg name="jmsOperations" ref="jmsTemplate"></constructor-arg>
  25. </bean>
  26. <!-- jms监听器,即消费者监听队列产生的消息进行消费 -->
  27. <jms:listener-container>
  28. <jms:listener destination="teriste.goodsQueue" ref="consumer"
  29. method="consumerMessage"/>
  30. </jms:listener-container>
  31. <!-- 消费者 -->
  32. <bean id="consumer" class="com.teriste.api.service.impl.controller.MessageConsumerBusiness"/>
  33. </beans>

7、定义消息对象

  1. public class ReqMessageDto implements Serializable {
  2. private static final long serialVersionUID = 388556503381616296L;
  3. private String name;
  4. private String storeCode;
  5. private String goodsCode;
  6. /**
  7. * @return the name
  8. */
  9. public String getName() {
  10. return name;
  11. }
  12. /**
  13. * @param name the name to set
  14. */
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. /**
  19. * @return the storeCode
  20. */
  21. public String getStoreCode() {
  22. return storeCode;
  23. }
  24. /**
  25. * @param storeCode the storeCode to set
  26. */
  27. public void setStoreCode(String storeCode) {
  28. this.storeCode = storeCode;
  29. }
  30. /**
  31. * @return the goodsCode
  32. */
  33. public String getGoodsCode() {
  34. return goodsCode;
  35. }
  36. /**
  37. * @param goodsCode the goodsCode to set
  38. */
  39. public void setGoodsCode(String goodsCode) {
  40. this.goodsCode = goodsCode;
  41. }
  42. /**
  43. * @return the serialversionuid
  44. */
  45. public static long getSerialversionuid() {
  46. return serialVersionUID;
  47. }
  48. @Override
  49. public String toString() {
  50. return "ReqMessageDto [name=" + name + ", storeCode=" + storeCode + ", goodsCode=" + goodsCode + "]";
  51. }
  52. }

8、实现生产者

  1. import org.springframework.jms.core.JmsOperations;
  2. import com.teriste.api.service.QueryGoodsService;
  3. import com.teriste.api.service.dto.request.ReqMessageDto;
  4. public class QueryGoodsServiceImpl implements QueryGoodsService {
  5. JmsOperations jmsOperations;
  6. public QueryGoodsServiceImpl(JmsOperations jmsOperations) {
  7. this.jmsOperations = jmsOperations;
  8. }
  9. @Override
  10. public void queryGoodsList() {
  11. ReqMessageDto reqMessageDto = new ReqMessageDto();
  12. reqMessageDto.setName("test");
  13. reqMessageDto.setGoodsCode("001001");
  14. reqMessageDto.setStoreCode("10002");
  15. jmsOperations.convertAndSend(reqMessageDto);
  16. }
  17. }

9、实现消费者

  1. import com.teriste.api.service.dto.request.ReqMessageDto;
  2. public class MessageConsumerBusiness {
  3. public void consumerMessage(Object reqMessage) {
  4. if (reqMessage instanceof ReqMessageDto) {
  5. System.out.println((ReqMessageDto) reqMessage);
  6. }
  7. }
  8. }

10、测试类

  1. import org.junit.Test;
  2. import org.springframework.context.support.ClassPathXmlApplicationContext;
  3. import com.teriste.api.service.QueryGoodsService;
  4. public class TestAmq {
  5. @Test
  6. public void sendMessage() {
  7. ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("conf/spring/spring-amq.xml");
  8. QueryGoodsService queryGoodsService = context.getBean(QueryGoodsService.class);
  9. queryGoodsService.queryGoodsList();
  10. }
  11. }

11、启动测试类

报如下错误,消息对象是不安全的

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 4

根据提示,打开地址http://activemq.apache.org/objectmessage.html

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 5

这里为了方便就设置所有包都是可信任的

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2Rvbmd5dXh1MzQyNzE5_size_16_color_FFFFFF_t_70 6

2019050717315162.png

发表评论

表情:
评论列表 (有 0 条评论,484人围观)

还没有评论,来说两句吧...

相关阅读

    相关 JMS-ActiveMQ系列

    个人总结:消息队列,ActiveMQ现在的企业用得很广泛,分点对点(生产消费),发布/订阅两种。现开发的爬虫系统,多个worker抓取到的信息发到mq,mq监听器把信息异步存到

    相关 activeMQ Jms Demo

    概述 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provide

    相关 JMS操作ActiveMQ

    一般情况下我们都是通过jms操作mq,如果使用原生的jms代码比较繁琐,还好spring有集成的jms,通过jms模板类,能够适应各种mq的操作,其就像jdbc模板一样将具体的