(1)ActiveMQ笔记:ActiveMQ与Spring整合&使用例子

﹏ヽ暗。殇╰゛Y 2022-06-01 01:23 338阅读 0赞

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
我所在的项目中,ActiveMQ被用来实现大批量邮件发送功能(项目需求中会出现一次性发送N多封邮件的情况,所以需要将邮件请求放入一个队列中,然后专门处理邮件的服务器就可以从队列中取出请求并发送邮件,原系统就可以做其他事而不被大批量邮件发送功能阻塞,并能实现与原有系统的功能隔离,降低系统的耦合)
在项目实现中,原有系统将大量的邮件请求发送给ActiveMQ服务器,形成消息队列,然后邮件服务器从ActiveMQ消息队列中取出请求,发送邮件。
首先看下最简单的整合例子吧。
示例项目源代码下载:https://github.com/SecondMagic/ActiveMQ_demo
1.下载ActiveMQ:
http://activemq.apache.org/download-archives.html
选择一个版本下载,本文中所用的是ActiveMQ 5.11.1 Release
下载完解压后的样子:
activemq-all-5.11.1.jar文件为之后整合时所需的jar包
点击或者命令行启动bin\activemq.bat 后activemq服务就在本地启动了,默认端口号61616
这里写图片描述

可以通过浏览器查看管理界面:
http://127.0.0.1:8161/admin/ 账号:admin 密码:admin
点击queues可以查看消息队列,其中第一列为消息目的地名,第二列为等待下发/消费的信息数量,第三列为消费者数量(接收消息方),第四列为进入队列的信息数量,第五列为出列的信息数量(已经被接收走)。
这里写图片描述

停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可,不行的话,可以到服务中手动暂停服务
这里写图片描述

2.现在ActiveMQ服务已经启动完毕,接下来开始整合项目
首先创建一个web项目,配置spring,这个这里就不赘述了,可以到源代码中查看相应部分,主要讲下activeMQ与spring的整合。
项目结构如下:
这里写图片描述
需要的是上面提到的activemq-all-5.11.1.jar,放入项目lib中
dispatcherServlet-servlet.xml,applicationContext.xml都是spring,spring mvc相关配置

applicationContext-activeMQ.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
  3. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
  4. <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  5. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  6. <property name="connectionFactory" ref="connectionFactory"/>
  7. </bean>
  8. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  9. <!--<bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">-->
  10. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  11. <property name="brokerURL" value="tcp://xx.xx.xx.xx:61616"/>
  12. <property name="userName" value="admin" />
  13. <property name="password" value="admin" />
  14. <property name="useAsyncSend" value="true"/>
  15. </bean>
  16. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  17. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  18. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  19. <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  20. </bean>
  21. <!--这个是队列目的地,即1中管理界面queues中的第一列信息-->
  22. <bean id="subscribeQueue" class="org.apache.activemq.command.ActiveMQQueue">
  23. <constructor-arg>
  24. <value>MailQueue</value>
  25. </constructor-arg>
  26. </bean>
  27. <!-- 消息监听器,用来监听是否有信息接收 -->
  28. <bean id="subscribeReceiver" class="com.activityMQ.reciveService.reciveService"/>
  29. <!-- 消息监听容器 -->
  30. <bean id="subscribeContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  31. <property name="connectionFactory" ref="connectionFactory"/>
  32. <property name="destination" ref="subscribeQueue"/>
  33. <property name="messageListener" ref="subscribeReceiver"/>
  34. <!-- 消息监听器消费者并发处理数量设置 -->
  35. <property name="concurrentConsumers" value="2"/>
  36. </bean>
  37. </beans>

首先有一点,就是ConnectionFactory会有两个,一个是真正的用来产生Connection 的ConnectionFactory,由对应的JMS供应商提供,另一个是spring用来管理真正的ConnectionFactory的ConnectionFactory。

配置完后记得在web.xml中加载该xml

建立sendMessage和reciveMessage文件
sendService.java文件
queueName为发送信息的目的地,同上面所说一样
message为要发送的信息

  1. package com.activeMQ.sendService;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.Session;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.jms.core.JmsTemplate;
  7. import org.springframework.jms.core.MessageCreator;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.stereotype.Service;
  10. @Component
  11. @Service
  12. public class sendService {
  13. @Autowired
  14. private JmsTemplate jmsTemplate;
  15. public void sendMessage(String queueName, final String message){
  16. jmsTemplate.send(queueName, new MessageCreator() {
  17. public Message createMessage(Session session) throws JMSException {
  18. System.out.println("========:"+session.createTextMessage(message).toString());
  19. return session.createTextMessage(message);
  20. }
  21. });
  22. }
  23. }

reciveService文件
通过配置文件配置,将会不停监听是否有信息需要接收

  1. package com.activityMQ.reciveService;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.MessageListener;
  5. import org.apache.activemq.command.ActiveMQTextMessage;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class reciveService implements MessageListener{
  9. @Override
  10. public void onMessage(Message message) {
  11. // TODO Auto-generated method stub
  12. try {
  13. String xmlMessage = (String)((ActiveMQTextMessage) message).getText();
  14. System.out.println(xmlMessage);
  15. } catch (JMSException e) {
  16. // TODO Auto-generated catch block
  17. e.printStackTrace();
  18. }
  19. }
  20. }

3.启动运行
这里写图片描述

可以看到确实多了一条记录,下面一行中的为2条还未被接收的信息,因为他们的目的地名为testSendInfo,而我们这里只配置了MailQueue,所以无法接收。修改配置即可接收。
这里写图片描述
控制台中也输出了相应的信息:
这里写图片描述

实际使用时一般都会将接收端单独部署成一个服务,还有负载均衡,集群等等。

发表评论

表情:
评论列表 (有 0 条评论,338人围观)

还没有评论,来说两句吧...

相关阅读

    相关 spring整合使用activemq

    activemq简单的demo这里就不做演示了,今天介绍一下如何利用spring整合activemq,也是实际工作中涉及到的,希望对各位伙伴有所帮助, 1、安装activem