ActiveMQ之spring集成Topic 太过爱你忘了你带给我的痛 2022-08-05 02:25 128阅读 0赞 通过Spring对ActiveMQ进行配置开发,发布订阅模式,支持消息的持久化。可以通过配置clientId实现持久化订阅。 1.生产者 spring配置: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:lang="http://www.springframework.org/schema/lang" xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd"> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://192.168.18.43:61616" /> <!-- 是否异步发送 --> <property name="useAsyncSend" value="false" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="testDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="spring.Topic" /> </bean> <!-- jmsTemplate,用于向任意地址发送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="testDestination" /> <!-- 订阅发布模式 --> <property name="pubSubDomain" value="true" /> <!-- receiveTimeout表示接收消息时的超时时间 --> <property name="receiveTimeout" value="30000" /> </bean> </beans> 发送消息: package cn.slimsmart.activemq.demo.spring.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; @SuppressWarnings("resource") public class ProducerMain { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:topic/producer.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // 设置消息属性 msg.setStringProperty("property", "属性"); // 设置消息内容 msg.setText("生产者发送队列消息"); return msg; } }); } } **2.消费者** spring配置,实现了持久化订阅 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" xmlns:lang="http://www.springframework.org/schema/lang" xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd"> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID 持久化订阅 --> <property name="clientId" value="client_test" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://192.168.18.43:61616" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="testDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="spring.Topic" /> </bean> <!-- 异步接收Queue消息Container --> <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="testDestination" /> <property name="messageListener" ref="notifyMessageListener" /> <!-- 普通发布订阅模式 --> <!-- <property name="pubSubDomain" value="true"/> --> <property name="receiveTimeout" value="30000"/> <!-- 持久化订阅配置 --> <property name="subscriptionDurable" value="true"/> <property name="clientId" value="client_test" /> <property name="durableSubscriptionName" value="client_test"/> </bean> <bean id="notifyMessageListener" class="cn.slimsmart.activemq.demo.spring.NotifyMessageListener"></bean> </beans> 消费者启动类; package cn.slimsmart.activemq.demo.spring.topic; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { @SuppressWarnings("resource") public static void main(String[] args) { new ClassPathXmlApplicationContext("classpath:topic/consumer.xml"); } }
还没有评论,来说两句吧...