JMS操作ActiveMQ
一般情况下我们都是通过jms操作mq,如果使用原生的jms代码比较繁琐,还好spring有集成的jms,通过jms模板类,能够适应各种mq的操作,其就像jdbc模板一样将具体的创建连接、连接的管理、消息的发送和接收等操作进行了封装,我们只需对mq连接池和jmsTemplate进行配置就可以了。
要使用mq进行消息发送和接收的缓存区,mq相当于是服务端,生产者和消费者都是客户端,生产者将消息发送到mq服务端,服务端再通知消费者消费,或是由消费者轮询mq服务端的队列进行消费,因此首先需要启动mq这个服务端。
一、启动ActiveMQ
这里以ActiveMQ为例,下一代ActiveMQ,Artemis的配置也类似。官网的操作指导地址:https://activemq.apache.org/getting-started
1、先从官网下载安装包:https://activemq.apache.org/index.html,这里以Windows为例进行安装,Linux操作相似。官网对于mq的安装环境有一定要求,比如jre版本。
2、解压缩安装包
3、进入命令目录bin
4、启动mq
不能直接双击activemq.bat,否则命令行窗口会闪退,可以通过cmd在命令行输入activemq start命令启动
使用netstat -an|find “61616”查看端口监听情况
如果想要修改监听端口可以修改conf下的activemq.xml文件
5、使用管理接口
打开地址 http://127.0.0.1:8161/admin/,登录用户和密码是admin,点击Queues菜单,查看注册到服务端的队列,且可以手动创建队列。
二、spring集成jms操作mq
引入spring集成jms的包和activemq的包,其他spring包不特别列出,包括junit。
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.20.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
1、首先定义连接工厂连接的服务端地址
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" >
</amq:connectionFactory>
通过引入activemq和jms的xml模式,可以通过标签元素简写bean的定义
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq=http://activemq.apache.org/schema/core
2、定义mq缓存消息的队列
<amq:queue id="goodsQueue" physicalName="teriste.goodsQueue"></amq:queue>
3、定义操作连接工厂和队列的jmsTemplate类
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg name="connectionFactory" ref="connectionFactory" ></constructor-arg>
<property name="defaultDestinationName" value="teriste.goodsQueue" ></property>
</bean>
4、引入消息生产者,其需要注入jmsTemplate进行消息发送操作,我们再代码中使用的是JmsOperations接口,其实现类就是JmsTemplate
<bean id="queryGoodsService" class="com.teriste.api.service.impl.QueryGoodsServiceImpl">
<constructor-arg name="jmsOperations" ref="jmsTemplate"></constructor-arg>
</bean>
5、然后是定义监听器容器,其中定义了监听队列的所有消费者
<jms:listener-container>
<jms:listener destination="teriste.goodsQueue" ref="consumer"
method="consumerMessage"/>
</jms:listener-container>
6、引入消费者
<bean id="consumer" class="com.teriste.api.controller.MessageConsumerBusiness"/>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.2.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 创建连接池连接amq,url是再activemq.xml中配置的地址 -->
<!-- 有了amq:connectionFactory注解就不需要显示定义class属性
org.apache.activemq.spring.ActiveMQConnectionFactory-->
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" trustAllPackages="true">
</amq:connectionFactory>
<!-- 配置监听队列监听生产者发送的消息 -->
<amq:queue id="goodsQueue" physicalName="teriste.goodsQueue"></amq:queue>
<!-- jms模板,用于建立连接和操作队列 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg name="connectionFactory" ref="connectionFactory" ></constructor-arg>
<property name="defaultDestinationName" value="teriste.goodsQueue" ></property>
</bean>
<!-- 生产者通过jmsOperations向队列中发送消息,其实现类就是 jmsTemplate-->
<bean id="queryGoodsService" class="com.teriste.api.service.impl.QueryGoodsServiceImpl">
<constructor-arg name="jmsOperations" ref="jmsTemplate"></constructor-arg>
</bean>
<!-- jms监听器,即消费者监听队列产生的消息进行消费 -->
<jms:listener-container>
<jms:listener destination="teriste.goodsQueue" ref="consumer"
method="consumerMessage"/>
</jms:listener-container>
<!-- 消费者 -->
<bean id="consumer" class="com.teriste.api.service.impl.controller.MessageConsumerBusiness"/>
</beans>
7、定义消息对象
public class ReqMessageDto implements Serializable {
private static final long serialVersionUID = 388556503381616296L;
private String name;
private String storeCode;
private String goodsCode;
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* @param name the name to set
*/
public void setName(String name) {
this.name = name;
}
/**
* @return the storeCode
*/
public String getStoreCode() {
return storeCode;
}
/**
* @param storeCode the storeCode to set
*/
public void setStoreCode(String storeCode) {
this.storeCode = storeCode;
}
/**
* @return the goodsCode
*/
public String getGoodsCode() {
return goodsCode;
}
/**
* @param goodsCode the goodsCode to set
*/
public void setGoodsCode(String goodsCode) {
this.goodsCode = goodsCode;
}
/**
* @return the serialversionuid
*/
public static long getSerialversionuid() {
return serialVersionUID;
}
@Override
public String toString() {
return "ReqMessageDto [name=" + name + ", storeCode=" + storeCode + ", goodsCode=" + goodsCode + "]";
}
}
8、实现生产者
import org.springframework.jms.core.JmsOperations;
import com.teriste.api.service.QueryGoodsService;
import com.teriste.api.service.dto.request.ReqMessageDto;
public class QueryGoodsServiceImpl implements QueryGoodsService {
JmsOperations jmsOperations;
public QueryGoodsServiceImpl(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
@Override
public void queryGoodsList() {
ReqMessageDto reqMessageDto = new ReqMessageDto();
reqMessageDto.setName("test");
reqMessageDto.setGoodsCode("001001");
reqMessageDto.setStoreCode("10002");
jmsOperations.convertAndSend(reqMessageDto);
}
}
9、实现消费者
import com.teriste.api.service.dto.request.ReqMessageDto;
public class MessageConsumerBusiness {
public void consumerMessage(Object reqMessage) {
if (reqMessage instanceof ReqMessageDto) {
System.out.println((ReqMessageDto) reqMessage);
}
}
}
10、测试类
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.teriste.api.service.QueryGoodsService;
public class TestAmq {
@Test
public void sendMessage() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("conf/spring/spring-amq.xml");
QueryGoodsService queryGoodsService = context.getBean(QueryGoodsService.class);
queryGoodsService.queryGoodsList();
}
}
11、启动测试类
报如下错误,消息对象是不安全的
根据提示,打开地址http://activemq.apache.org/objectmessage.html
这里为了方便就设置所有包都是可信任的
还没有评论,来说两句吧...